1 #include "AliStorageEventManager.h"
8 #include <TStreamerInfo.h>
13 #include "AliESDEvent.h"
14 #include "AliESDtrack.h"
15 #include "AliTrackPointArray.h"
16 #include "AliESDfriendTrack.h"
17 #include "AliExternalTrackParam.h"
18 #include "AliTrackerBase.h"
19 #include "AliTracker.h"
24 AliStorageEventManager *AliStorageEventManager::fManagerInstance = 0;
26 AliStorageEventManager::AliStorageEventManager()
30 ifstream configFile (GetConfigFilePath());
32 if (configFile.is_open())
36 while(configFile.good())
38 getline(configFile,line);
39 from = line.find("\"")+1;
40 to = line.find_last_of("\"");
41 if(line.find("STORAGE_SERVER=")==0)
43 fStorageServer=line.substr(from,to-from);
45 else if(line.find("EVENT_SERVER=")==0)
47 fEventServer=line.substr(from,to-from);
49 else if(line.find("STORAGE_SERVER_PORT=")==0)
51 fStorageServerPort=atoi(line.substr(from,to-from).c_str());
53 else if(line.find("EVENT_SERVER_PORT=")==0)
55 fEventServerPort=atoi(line.substr(from,to-from).c_str());
57 else if(line.find("STORAGE_CLIENT_PORT=")==0)
59 fStorageClientPort=atoi(line.substr(from,to-from).c_str());
61 else if(line.find("XML_SERVER_PORT=")==0)
63 fXmlServerPort=atoi(line.substr(from,to-from).c_str());
66 if(configFile.eof()){configFile.clear();}
69 else{cout<<"EVENT MANAGER -- Unable to open config file"<<endl;}
72 for(int i=0;i<NUMBER_OF_SOCKETS;i++){fContexts[i] = new context_t();}
74 AliStorageEventManager::~AliStorageEventManager()
76 if(fManagerInstance){delete fManagerInstance;fManagerInstance=0;}
79 AliStorageEventManager* AliStorageEventManager::GetEventManagerInstance()
82 if(fManagerInstance==0)
84 fManagerInstance = new AliStorageEventManager();
87 return fManagerInstance;
91 void freeBuff (void *data, void *hint)
96 bool AliStorageEventManager::CreateSocket(storageSockets socket)
98 cout<<"Creating socket:"<<socket<<endl;
102 case SERVER_COMMUNICATION_REQ:
104 fSockets[SERVER_COMMUNICATION_REQ] =
105 new socket_t(*fContexts[SERVER_COMMUNICATION_REQ],ZMQ_REQ);
108 fSockets[SERVER_COMMUNICATION_REQ]->connect(Form("tcp://%s:%d",fStorageServer.c_str(),fStorageServerPort));
110 catch (const zmq::error_t& e)
112 cout<<"MANAGER -- "<<e.what()<<endl;
117 case SERVER_COMMUNICATION_REP:
119 fSockets[SERVER_COMMUNICATION_REP] =
120 new socket_t(*fContexts[SERVER_COMMUNICATION_REP],ZMQ_REP);
123 fSockets[SERVER_COMMUNICATION_REP]->bind(Form("tcp://*:%d",fStorageServerPort));
125 catch (const zmq::error_t& e)
127 cout<<"MANAGER -- "<<e.what()<<endl;
132 case CLIENT_COMMUNICATION_REQ:
134 fSockets[CLIENT_COMMUNICATION_REQ] =
135 new socket_t(*fContexts[CLIENT_COMMUNICATION_REQ],ZMQ_REQ);
138 fSockets[CLIENT_COMMUNICATION_REQ]->connect(Form("tcp://%s:%d",fStorageServer.c_str(), fStorageClientPort));
140 catch (const zmq::error_t& e)
142 cout<<"MANAGER -- "<<e.what()<<endl;
147 case CLIENT_COMMUNICATION_REP:
149 fSockets[CLIENT_COMMUNICATION_REP] =
150 new socket_t(*fContexts[CLIENT_COMMUNICATION_REP],ZMQ_REP);
153 fSockets[CLIENT_COMMUNICATION_REP]->bind(Form("tcp://*:%d",fStorageClientPort));
155 catch (const zmq::error_t& e)
157 cout<<"MANAGER -- "<<e.what()<<endl;
162 case EVENTS_SERVER_PUB:
164 fSockets[EVENTS_SERVER_PUB] =
165 new socket_t(*fContexts[EVENTS_SERVER_PUB],ZMQ_PUB);
168 fSockets[EVENTS_SERVER_PUB]->bind(Form("tcp://*:%d",fEventServerPort));
170 catch (const zmq::error_t& e)
172 cout<<"MANAGER -- "<<e.what()<<endl;
177 case EVENTS_SERVER_SUB:
179 fSockets[EVENTS_SERVER_SUB] =
180 new socket_t(*fContexts[EVENTS_SERVER_SUB],ZMQ_SUB);
181 fSockets[EVENTS_SERVER_SUB]->setsockopt(ZMQ_SUBSCRIBE,"",0);
184 fSockets[EVENTS_SERVER_SUB]->connect(Form("tcp://%s:%d",fEventServer.c_str(),fEventServerPort));
186 catch (const zmq::error_t& e)
188 cout<<"MANAGER -- "<<e.what()<<endl;
197 new socket_t(*fContexts[XML_PUB],ZMQ_PUB);
200 fSockets[XML_PUB]->bind(Form("tcp://*:%d",fXmlServerPort));
202 catch (const zmq::error_t& e)
204 cout<<"MANAGER -- "<<e.what()<<endl;
214 void AliStorageEventManager::Send(vector<serverListStruct> list,storageSockets socket)
216 //send size of the struct first
217 int numberOfRecords = list.size();
218 message_t message(20);
219 snprintf ((char *)message.data(), 20 ,"%d",numberOfRecords);
221 fSockets[socket]->send(message);
223 catch(const zmq::error_t &e)
225 cout<<"MANAGER -- send vector -- "<<e.what()<<endl;
227 //if(numberOfRecords==0)return;
228 message_t *tmpMessage = new message_t();
231 fSockets[socket]->recv(tmpMessage);//empty message just to keep req-rep order
233 catch(const zmq::error_t &e)
235 cout<<"MANAGER -- send vector -- "<<e.what()<<endl;
237 // //prepare message with event's list
238 // char *buffer = reinterpret_cast<char*> (&list[0]);
239 // message_t *reply = new message_t((void*)buffer,
240 // sizeof(serverListStruct)*numberOfRecords,0);
241 // fSockets[socket]->send(*reply);
242 // if(reply){delete reply;}
244 message_t reply(sizeof(serverListStruct)*numberOfRecords);
245 memcpy(reply.data(), reinterpret_cast<const char*> (&list[0]), sizeof(serverListStruct)*numberOfRecords);
248 fSockets[socket]->send(reply);
250 catch(const zmq::error_t &e)
252 cout<<"MANAGER -- send vector -- "<<e.what()<<endl;
254 if(tmpMessage){delete tmpMessage;}
257 void AliStorageEventManager::Send(struct serverRequestStruct *request,storageSockets socket)
259 char *buffer = (char*)(request);
260 message_t *requestMessage = new message_t((void*)buffer,
261 sizeof(struct serverRequestStruct)
262 +sizeof(struct listRequestStruct)
263 +sizeof(struct eventStruct),freeBuff);
265 fSockets[socket]->send(*requestMessage);
267 catch(const zmq::error_t &e)
269 cout<<"MANAGER -- send serverRequestStruct -- "<<e.what()<<endl;
273 bool AliStorageEventManager::Send(struct clientRequestStruct *request,storageSockets socket,int timeout)
275 pollitem_t items[1] = {{*fSockets[socket],0,ZMQ_POLLIN,0}} ;
278 char *buffer = (char*)(request);
279 message_t *requestMessage = new message_t((void*)buffer,
280 sizeof(struct clientRequestStruct),freeBuff);
283 fSockets[socket]->send(*requestMessage);
285 catch (const zmq::error_t& e)
287 cout<<"MANAGER -- "<<e.what()<<endl;
289 if(fSockets[socket]){delete fSockets[socket];fSockets[socket]=0;}
291 CreateSocket(socket);
292 delete requestMessage;
298 if(poll (&items[0], 1, timeout)==0)
300 delete requestMessage;
304 delete requestMessage;
308 void AliStorageEventManager::Send(long message,storageSockets socket)
310 stringstream streamBuffer;
311 streamBuffer << message;
312 string stringBuffer = streamBuffer.str();
313 char *buffer = (char*)stringBuffer.c_str();
314 message_t *replyMessage = new message_t((void*)buffer,sizeof(stringBuffer),freeBuff);
317 fSockets[socket]->send(*replyMessage);
319 catch(const zmq::error_t &e)
321 cout<<"MANAGER -- send long -- "<<e.what()<<endl;
324 streamBuffer.str(string());
325 streamBuffer.clear();
328 void AliStorageEventManager::Send(bool message,storageSockets socket)
333 buffer = (char*)("true");
337 buffer = (char*)("false");
339 message_t *replyMessage = new message_t((void*)buffer,sizeof(buffer),freeBuff);
341 fSockets[socket]->send(*replyMessage);
343 catch(const zmq::error_t &e)
345 cout<<"MANAGER -- send bool -- "<<e.what()<<endl;
350 void AliStorageEventManager::Send(AliESDEvent *event, storageSockets socket)
354 TMessage tmess(kMESS_OBJECT);
356 tmess.WriteObject(event);
357 // TMessage::EnableSchemaEvolutionForAll(kTRUE);
359 int bufsize = tmess.Length();
360 char* buf = (char*) malloc(bufsize * sizeof(char));
361 memcpy(buf, tmess.Buffer(), bufsize);
363 message_t message((void*)buf, bufsize, freeBuff);
365 fSockets[socket]->send(message);
367 catch(const zmq::error_t &e)
369 cout<<"MANAGER -- send AliESDEvent -- "<<e.what()<<endl;
373 void AliStorageEventManager::SendAsXml(AliESDEvent *event,storageSockets socket)
375 cout<<"SENDING AS XML"<<endl;
376 stringstream bufferStream;
377 bufferStream << "<?xml version=\"1.0\" encoding=\"utf-8\"?>"<<endl;
378 bufferStream << "<ESD xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xmlns:xsd=\"http://www.w3.org/2001/XMLSchema\">" << endl;
380 for(int i=0;i<event->GetNumberOfTracks();i++)
382 AliESDtrack *track = event->GetTrack(i);
383 AliKalmanTrack *ITStrack = track->GetITStrack();
384 const AliTrackPointArray *array = track->GetTrackPointArray();
386 bufferStream << "\t<track mass=\""<<track->GetMass()<<"\"";
387 // bufferStream << "\t<track esdpid=\""<<track->GetESDpid();
388 bufferStream << "\t pid=\""<<track->PID()<<"\"";
389 bufferStream << "\t energy=\""<<track->E()<<"\"";
390 bufferStream << "\t volumeID=\""<<array->GetVolumeID()<<"\">" <<endl;
398 const float *x = array->GetX();
399 const float *y = array->GetY();
400 const float *z = array->GetZ();
401 int n = array->GetNPoints();
405 bufferStream <<"\t\t<point volumeID=\""<<array->GetVolumeID()[j]<<"\">"<<endl;
406 bufferStream <<"\t\t\t<x>"<< x[j] <<"</x>"<<endl;
407 bufferStream <<"\t\t\t<y>"<< y[j] <<"</y>"<<endl;
408 bufferStream <<"\t\t\t<z>"<< z[j] <<"</z>"<<endl;
409 bufferStream <<"\t\t</point>"<<endl;
412 else cout<<"no array"<<endl;
414 bufferStream << "\t</track>"<<endl;
417 bufferStream << "</ESD>"<<endl;
419 string bufferString = bufferStream.str();
420 message_t message(bufferString.size());
421 memcpy (message.data(), bufferString.data(), bufferString.size());
424 fSockets[socket]->send(message);
426 catch(const zmq::error_t &e)
428 cout<<"MANAGER -- send send xml -- "<<e.what()<<endl;
430 cout<<"xml sent"<<endl;
433 vector<serverListStruct> AliStorageEventManager::GetServerListVector(storageSockets socket, int timeout)
435 pollitem_t items[1] = {{*fSockets[socket],0,ZMQ_POLLIN,0}} ;
436 if(timeout>=0){if(poll (&items[0], 1, timeout)==0){vector<serverListStruct> emptyVector;return emptyVector;}}
438 //get size of the incomming message
439 message_t sizeMessage;
442 fSockets[socket]->recv(&sizeMessage);
444 catch(const zmq::error_t &e)
446 cout<<"MANAGER -- get vector -- "<<e.what()<<endl;
449 istringstream iss(static_cast<char*>(sizeMessage.data()));
450 iss >> numberOfRecords;
452 if(numberOfRecords==0){cout<<"MANAGER -- list is empty"<<endl;}
455 fSockets[socket]->send(*(new message_t()));//receive empty message just to keep req-rep order
457 catch(const zmq::error_t &e)
459 cout<<"MANAGER -- get vector -- "<<e.what()<<endl;
462 message_t *response = new message_t(sizeof(serverListStruct)*numberOfRecords);
464 fSockets[socket]->recv(response);
466 catch(const zmq::error_t &e)
468 cout<<"MANAGER -- get vector -- "<<e.what()<<endl;
471 vector<serverListStruct> receivedList(static_cast<serverListStruct*>(response->data()), static_cast<serverListStruct*>(response->data()) + numberOfRecords);
473 if (response) {delete response;}
477 AliESDEvent* AliStorageEventManager::GetEvent(storageSockets socket,int timeout)
479 pollitem_t items[1] = {{*fSockets[socket],0,ZMQ_POLLIN,0}} ;
480 if(timeout>=0){if(poll (&items[0], 1, timeout)==0){return NULL;}}
482 message_t* message = new message_t();
486 fSockets[socket]->recv(message);
488 catch (const zmq::error_t& e)
490 cout<<"MANAGER -- "<<e.what()<<endl;
494 TBufferFile *mess = new TBufferFile(TBuffer::kRead,
495 message->size()+sizeof(UInt_t),
498 mess->ReadClass();// get first the class stored in message
499 mess->SetBufferOffset(sizeof(UInt_t) + sizeof(kMESS_OBJECT));
502 AliESDEvent* data = (AliESDEvent*)(mess->ReadObjectAny(AliESDEvent::Class()));
506 data->GetStdContent();
507 if(message){delete message;}
512 if(message){delete message;}
517 struct serverRequestStruct* AliStorageEventManager::GetServerStruct(storageSockets socket)
519 struct serverRequestStruct *request = new struct serverRequestStruct;
520 message_t *requestMessage = new message_t();
522 fSockets[socket]->recv(requestMessage);
524 catch(const zmq::error_t &e)
526 cout<<"MANAGER -- get serverRequestStruct -- "<<e.what()<<endl;
527 request->messageType = -1;
530 request = static_cast<struct serverRequestStruct*>(requestMessage->data());
534 struct clientRequestStruct* AliStorageEventManager::GetClientStruct(storageSockets socket,int timeout)
536 pollitem_t items[1] = {{*fSockets[socket],0,ZMQ_POLLIN,0}} ;
537 if(timeout>=0){if(poll (&items[0], 1, timeout)==0){return NULL;}}
539 struct clientRequestStruct *request = new struct clientRequestStruct;
540 message_t *requestMessage = new message_t();
542 fSockets[socket]->recv(requestMessage);
544 catch(const zmq::error_t &e)
546 cout<<"MANAGER -- get clientRequestStruct -- "<<e.what()<<endl;
548 request = static_cast<struct clientRequestStruct*>(requestMessage->data());
552 bool AliStorageEventManager::GetBool(storageSockets socket)
554 message_t *response = new message_t();
556 fSockets[socket]->recv(response);
558 catch(const zmq::error_t &e)
560 cout<<"MANAGER -- get bool -- "<<e.what()<<endl;
562 char *result = (char*)response->data();
564 if(!strcmp("true",result)){return true;}
568 long AliStorageEventManager::GetLong(storageSockets socket)
570 message_t *responseMessage = new message_t();
572 fSockets[socket]->recv(responseMessage);
574 catch(const zmq::error_t &e)
576 cout<<"MANAGER -- get long -- "<<e.what()<<endl;
583 result = (long)atoi(static_cast<char*>(responseMessage->data()));
584 delete responseMessage;