1 #include "AliStorageEventManager.h"
8 #include <TStreamerInfo.h>
13 #include "AliESDEvent.h"
14 #include "AliESDtrack.h"
15 #include "AliTrackPointArray.h"
16 #include "AliESDfriendTrack.h"
17 #include "AliExternalTrackParam.h"
18 #include "AliTrackerBase.h"
19 #include "AliTracker.h"
24 AliStorageEventManager *AliStorageEventManager::fManagerInstance = 0;
26 AliStorageEventManager::AliStorageEventManager()
30 ifstream configFile (GetConfigFilePath());
32 if (configFile.is_open())
36 while(configFile.good())
38 getline(configFile,line);
39 from = line.find("\"")+1;
40 to = line.find_last_of("\"");
41 if(line.find("STORAGE_SERVER=")==0)
43 fStorageServer=line.substr(from,to-from);
45 else if(line.find("EVENT_SERVER=")==0)
47 fEventServer=line.substr(from,to-from);
49 else if(line.find("STORAGE_SERVER_PORT=")==0)
51 fStorageServerPort=atoi(line.substr(from,to-from).c_str());
53 else if(line.find("EVENT_SERVER_PORT=")==0)
55 fEventServerPort=atoi(line.substr(from,to-from).c_str());
57 else if(line.find("STORAGE_CLIENT_PORT=")==0)
59 fStorageClientPort=atoi(line.substr(from,to-from).c_str());
61 else if(line.find("XML_SERVER_PORT=")==0)
63 fXmlServerPort=atoi(line.substr(from,to-from).c_str());
74 cout<<"EVENT MANAGER -- Unable to open config file"<<endl;
78 for(int i=0;i<NUMBER_OF_SOCKETS;i++)
80 fContexts[i] = new context_t();
83 AliStorageEventManager::~AliStorageEventManager()
85 if(fManagerInstance){delete fManagerInstance;fManagerInstance=0;}
88 AliStorageEventManager* AliStorageEventManager::GetEventManagerInstance()
91 if(fManagerInstance==0)
93 fManagerInstance = new AliStorageEventManager();
96 return fManagerInstance;
100 void freeBuff (void *data, void *hint)
105 bool AliStorageEventManager::CreateSocket(storageSockets socket)
107 cout<<"Creating socket:"<<socket<<endl;
111 case SERVER_COMMUNICATION_REQ:
113 fSockets[SERVER_COMMUNICATION_REQ] =
114 new socket_t(*fContexts[SERVER_COMMUNICATION_REQ],ZMQ_REQ);
117 fSockets[SERVER_COMMUNICATION_REQ]->connect(Form("tcp://%s:%d",fStorageServer.c_str(),fStorageServerPort));
119 catch (const zmq::error_t& e)
121 cout<<"MANAGER -- "<<e.what()<<endl;
126 case SERVER_COMMUNICATION_REP:
128 fSockets[SERVER_COMMUNICATION_REP] =
129 new socket_t(*fContexts[SERVER_COMMUNICATION_REP],ZMQ_REP);
132 fSockets[SERVER_COMMUNICATION_REP]->bind(Form("tcp://*:%d",fStorageServerPort));
134 catch (const zmq::error_t& e)
136 cout<<"MANAGER -- "<<e.what()<<endl;
141 case CLIENT_COMMUNICATION_REQ:
143 fSockets[CLIENT_COMMUNICATION_REQ] =
144 new socket_t(*fContexts[CLIENT_COMMUNICATION_REQ],ZMQ_REQ);
147 fSockets[CLIENT_COMMUNICATION_REQ]->connect(Form("tcp://%s:%d",fStorageServer.c_str(), fStorageClientPort));
149 catch (const zmq::error_t& e)
151 cout<<"MANAGER -- "<<e.what()<<endl;
156 case CLIENT_COMMUNICATION_REP:
158 fSockets[CLIENT_COMMUNICATION_REP] =
159 new socket_t(*fContexts[CLIENT_COMMUNICATION_REP],ZMQ_REP);
162 fSockets[CLIENT_COMMUNICATION_REP]->bind(Form("tcp://*:%d",fStorageClientPort));
164 catch (const zmq::error_t& e)
166 cout<<"MANAGER -- "<<e.what()<<endl;
171 case EVENTS_SERVER_PUB:
173 fSockets[EVENTS_SERVER_PUB] =
174 new socket_t(*fContexts[EVENTS_SERVER_PUB],ZMQ_PUB);
177 fSockets[EVENTS_SERVER_PUB]->bind(Form("tcp://*:%d",fEventServerPort));
179 catch (const zmq::error_t& e)
181 cout<<"MANAGER -- "<<e.what()<<endl;
186 case EVENTS_SERVER_SUB:
188 fSockets[EVENTS_SERVER_SUB] =
189 new socket_t(*fContexts[EVENTS_SERVER_SUB],ZMQ_SUB);
190 fSockets[EVENTS_SERVER_SUB]->setsockopt(ZMQ_SUBSCRIBE,"",0);
193 fSockets[EVENTS_SERVER_SUB]->connect(Form("tcp://%s:%d",fEventServer.c_str(),fEventServerPort));
195 catch (const zmq::error_t& e)
197 cout<<"MANAGER -- "<<e.what()<<endl;
206 new socket_t(*fContexts[XML_PUB],ZMQ_PUB);
209 fSockets[XML_PUB]->bind(Form("tcp://*:%d",fXmlServerPort));
211 catch (const zmq::error_t& e)
213 cout<<"MANAGER -- "<<e.what()<<endl;
223 void AliStorageEventManager::Send(vector<serverListStruct> list,storageSockets socket)
225 //send size of the struct first
226 int numberOfRecords = list.size();
227 message_t message(20);
228 snprintf ((char *)message.data(), 20 ,"%d",numberOfRecords);
230 fSockets[socket]->send(message);
231 if(numberOfRecords==0)return;
232 message_t *tmpMessage = new message_t();
233 fSockets[socket]->recv(tmpMessage);//empty message just to keep req-rep order
235 // //prepare message with event's list
236 // char *buffer = reinterpret_cast<char*> (&list[0]);
237 // message_t *reply = new message_t((void*)buffer,
238 // sizeof(serverListStruct)*numberOfRecords,0);
239 // fSockets[socket]->send(*reply);
240 // if(reply){delete reply;}
242 message_t reply(sizeof(serverListStruct)*numberOfRecords);
243 memcpy(reply.data(), reinterpret_cast<const char*> (&list[0]), sizeof(serverListStruct)*numberOfRecords);
245 fSockets[socket]->send(reply);
246 if(tmpMessage){delete tmpMessage;}
249 void AliStorageEventManager::Send(struct serverRequestStruct *request,storageSockets socket)
251 char *buffer = (char*)(request);
252 message_t *requestMessage = new message_t((void*)buffer,
253 sizeof(struct serverRequestStruct)
254 +sizeof(struct listRequestStruct)
255 +sizeof(struct eventStruct),freeBuff);
256 fSockets[socket]->send(*requestMessage);
259 bool AliStorageEventManager::Send(struct clientRequestStruct *request,storageSockets socket,int timeout)
261 pollitem_t items[1] = {{*fSockets[socket],0,ZMQ_POLLIN,0}} ;
264 char *buffer = (char*)(request);
265 message_t *requestMessage = new message_t((void*)buffer,
266 sizeof(struct clientRequestStruct),freeBuff);
270 fSockets[socket]->send(*requestMessage);
272 catch (const zmq::error_t& e)
274 cout<<"MANAGER -- "<<e.what()<<endl;
276 if(fSockets[socket]){delete fSockets[socket];fSockets[socket]=0;}
278 CreateSocket(socket);
279 delete requestMessage;
285 if(poll (&items[0], 1, timeout)==0)
287 delete requestMessage;
291 delete requestMessage;
295 void AliStorageEventManager::Send(long message,storageSockets socket)
297 stringstream streamBuffer;
298 streamBuffer << message;
299 string stringBuffer = streamBuffer.str();
300 char *buffer = (char*)stringBuffer.c_str();
301 message_t *replyMessage = new message_t((void*)buffer,sizeof(stringBuffer),freeBuff);
303 fSockets[socket]->send(*replyMessage);
305 streamBuffer.str(string());
306 streamBuffer.clear();
309 void AliStorageEventManager::Send(bool message,storageSockets socket)
314 buffer = (char*)("true");
318 buffer = (char*)("false");
320 message_t *replyMessage = new message_t((void*)buffer,sizeof(buffer),freeBuff);
321 fSockets[socket]->send(*replyMessage);
325 void AliStorageEventManager::Send(AliESDEvent *event, storageSockets socket)
327 TMessage tmess(kMESS_OBJECT);
329 tmess.WriteObject(event);
330 TMessage::EnableSchemaEvolutionForAll(kTRUE);
332 int bufsize = tmess.Length();
333 char* buf = (char*) malloc(bufsize * sizeof(char));
334 memcpy(buf, tmess.Buffer(), bufsize);
336 message_t message((void*)buf, bufsize, freeBuff);
337 fSockets[socket]->send(message);
340 void AliStorageEventManager::SendAsXml(AliESDEvent *event,storageSockets socket)
342 cout<<"SENDING AS XML"<<endl;
343 stringstream bufferStream;
344 bufferStream << "<?xml version=\"1.0\" encoding=\"utf-8\"?>"<<endl;
345 bufferStream << "<ESD xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xmlns:xsd=\"http://www.w3.org/2001/XMLSchema\">" << endl;
347 for(int i=0;i<event->GetNumberOfTracks();i++)
349 AliESDtrack *track = event->GetTrack(i);
350 bufferStream << "\t<track mass=\""<<track->GetMass()<<"\">" <<endl;
351 const AliTrackPointArray *array = track->GetTrackPointArray();
355 const float *x = array->GetX();
356 const float *y = array->GetY();
357 const float *z = array->GetZ();
358 int n = array->GetNPoints();
362 bufferStream <<"\t\t<point>"<<endl;
363 bufferStream <<"\t\t\t<x>"<< x[j] <<"</x>"<<endl;
364 bufferStream <<"\t\t\t<y>"<< y[j] <<"</y>"<<endl;
365 bufferStream <<"\t\t\t<z>"<< z[j] <<"</z>"<<endl;
366 bufferStream <<"\t\t</point>"<<endl;
369 else cout<<"no array"<<endl;
371 bufferStream << "\t</track>"<<endl;
374 bufferStream << "</ESD>"<<endl;
376 string bufferString = bufferStream.str();
377 message_t message(bufferString.size());
378 memcpy (message.data(), bufferString.data(), bufferString.size());
380 fSockets[socket]->send(message);
381 cout<<"xml sent"<<endl;
384 vector<serverListStruct> AliStorageEventManager::GetServerListVector(storageSockets socket)
386 //get size of the incomming message
387 message_t sizeMessage;
388 fSockets[socket]->recv(&sizeMessage);
390 istringstream iss(static_cast<char*>(sizeMessage.data()));
391 iss >> numberOfRecords;
393 if(numberOfRecords==0){cout<<"MANAGER -- list is empty"<<endl;}
395 fSockets[socket]->send(*(new message_t()));//receive empty message just to keep req-rep order
398 message_t *response = new message_t(sizeof(serverListStruct)*numberOfRecords);
399 fSockets[socket]->recv(response);
401 vector<serverListStruct> receivedList(static_cast<serverListStruct*>(response->data()), static_cast<serverListStruct*>(response->data()) + numberOfRecords);
403 if (response) {delete response;}
407 AliESDEvent* AliStorageEventManager::GetEvent(storageSockets socket,int timeout)
409 message_t* message = new message_t();
413 fSockets[socket]->recv(message);
415 catch (const zmq::error_t& e)
417 cout<<"MANAGER -- "<<e.what()<<endl;
421 TBufferFile *mess = new TBufferFile(TBuffer::kRead,
422 message->size()+sizeof(UInt_t),
425 mess->ReadClass();// get first the class stored in message
426 mess->SetBufferOffset(sizeof(UInt_t) + sizeof(kMESS_OBJECT));
429 AliESDEvent* data = (AliESDEvent*)(mess->ReadObjectAny(AliESDEvent::Class()));
433 data->GetStdContent();
434 if(message){delete message;}
437 TTree* tree= new TTree("esdTree", "esdTree");
438 data->WriteToTree(tree);
440 AliESDEvent* event= new AliESDEvent();
441 event->ReadFromTree(tree);
443 if(data){delete data;}
444 if(tree){delete tree;}
445 if(message){delete message;}
451 if(message){delete message;}
456 struct serverRequestStruct* AliStorageEventManager::GetServerStruct(storageSockets socket)
458 struct serverRequestStruct *request = new struct serverRequestStruct;
459 message_t *requestMessage = new message_t();
460 fSockets[socket]->recv(requestMessage);
461 request = static_cast<struct serverRequestStruct*>(requestMessage->data());
465 struct clientRequestStruct* AliStorageEventManager::GetClientStruct(storageSockets socket)
467 struct clientRequestStruct *request = new struct clientRequestStruct;
468 message_t *requestMessage = new message_t();
469 fSockets[socket]->recv(requestMessage);
470 request = static_cast<struct clientRequestStruct*>(requestMessage->data());
474 bool AliStorageEventManager::GetBool(storageSockets socket)
476 message_t *response = new message_t();
477 fSockets[socket]->recv(response);
478 char *result = (char*)response->data();
480 if(!strcmp("true",result)){return true;}
484 long AliStorageEventManager::GetLong(storageSockets socket)
486 message_t *responseMessage = new message_t();
487 fSockets[socket]->recv(responseMessage);
493 result = (long)atoi(static_cast<char*>(responseMessage->data()));
494 delete responseMessage;