]> git.uio.no Git - u/mrichter/AliRoot.git/blob - MONITOR/alistoragemanager/AliStorageEventManager.cxx
Merge branch 'master' of https://git.cern.ch/reps/AliRoot
[u/mrichter/AliRoot.git] / MONITOR / alistoragemanager / AliStorageEventManager.cxx
1 #include "AliStorageEventManager.h"
2
3 #include <iostream>
4 #include <sstream>
5 #include <fstream>
6
7 #include <TList.h>
8 #include <TStreamerInfo.h>
9 #include <TThread.h>
10
11 #include "zmq.hpp"
12
13 #include "AliESDEvent.h"
14 #include "AliESDtrack.h"
15 #include "AliTrackPointArray.h"
16 #include "AliESDfriendTrack.h"
17 #include "AliExternalTrackParam.h"
18 #include "AliTrackerBase.h"
19 #include "AliTracker.h"
20
21 using namespace zmq;
22 using namespace std;
23
24 AliStorageEventManager *AliStorageEventManager::fManagerInstance = 0;
25
26 AliStorageEventManager::AliStorageEventManager()
27 {
28         //read config file
29         TThread::Lock();
30         ifstream configFile (GetConfigFilePath());
31     
32         if (configFile.is_open())
33         {
34                 string line;
35                 int from,to;
36                 while(configFile.good())
37                 {
38                         getline(configFile,line);
39                         from = line.find("\"")+1;
40                         to = line.find_last_of("\"");
41                         if(line.find("STORAGE_SERVER=")==0)
42                         {
43                                 fStorageServer=line.substr(from,to-from);
44                         }
45                         else if(line.find("EVENT_SERVER=")==0)
46                         {
47                                 fEventServer=line.substr(from,to-from);
48                         }
49                         else if(line.find("STORAGE_SERVER_PORT=")==0)
50                         {
51                                 fStorageServerPort=atoi(line.substr(from,to-from).c_str());
52                         }
53                         else if(line.find("EVENT_SERVER_PORT=")==0)
54                         {
55                                 fEventServerPort=atoi(line.substr(from,to-from).c_str());
56                         }
57                         else if(line.find("STORAGE_CLIENT_PORT=")==0)
58                         {
59                                 fStorageClientPort=atoi(line.substr(from,to-from).c_str());
60                         }
61                         else if(line.find("XML_SERVER_PORT=")==0)
62                         {
63                                 fXmlServerPort=atoi(line.substr(from,to-from).c_str());
64                         }
65                 }
66                 if(configFile.eof())
67                 {
68                         configFile.clear();
69                 }
70                 configFile.close();
71         }
72         else
73         {
74                 cout<<"EVENT MANAGER -- Unable to open config file"<<endl;
75         }
76         TThread::UnLock();
77         
78         for(int i=0;i<NUMBER_OF_SOCKETS;i++)
79         {
80                 fContexts[i] = new context_t();
81         }
82 }
83 AliStorageEventManager::~AliStorageEventManager()
84 {
85         if(fManagerInstance){delete fManagerInstance;fManagerInstance=0;}
86 }
87
88 AliStorageEventManager* AliStorageEventManager::GetEventManagerInstance()
89 {
90         TThread::Lock();
91         if(fManagerInstance==0)
92         {
93                 fManagerInstance = new AliStorageEventManager();
94         }
95         TThread::UnLock();
96         return fManagerInstance;
97 }
98
99
100 void __freeBuff (void *data, void *hint)
101 {
102     free(data);
103 }
104
105 bool AliStorageEventManager::CreateSocket(storageSockets socket)
106 {
107         switch(socket)
108         {
109         case SERVER_COMMUNICATION_REQ:
110         {
111                 fSockets[SERVER_COMMUNICATION_REQ] =
112                         new socket_t(*fContexts[SERVER_COMMUNICATION_REQ],ZMQ_REQ);
113                 try
114                 {
115                         fSockets[SERVER_COMMUNICATION_REQ]->connect(Form("tcp://%s:%d",fStorageServer.c_str(),fStorageServerPort));
116
117                 }
118                 catch (const zmq::error_t& e)
119                 {
120                         cout<<"MANAGER -- "<<e.what()<<endl;
121                         return 0;
122                 }
123         }
124         break;
125         case SERVER_COMMUNICATION_REP:
126         {
127                 fSockets[SERVER_COMMUNICATION_REP] =
128                         new socket_t(*fContexts[SERVER_COMMUNICATION_REP],ZMQ_REP);
129                 try
130                 {
131                         fSockets[SERVER_COMMUNICATION_REP]->bind(Form("tcp://*:%d",fStorageServerPort));
132                 }
133                 catch (const zmq::error_t& e)
134                 {
135                         cout<<"MANAGER -- "<<e.what()<<endl;
136                         return 0;
137                 }
138         }
139         break;
140         case CLIENT_COMMUNICATION_REQ:
141         {
142                 fSockets[CLIENT_COMMUNICATION_REQ] =
143                         new socket_t(*fContexts[CLIENT_COMMUNICATION_REQ],ZMQ_REQ);
144                 try
145                 {
146                         fSockets[CLIENT_COMMUNICATION_REQ]->connect(Form("tcp://%s:%d",fStorageServer.c_str(), fStorageClientPort));
147                 }
148                 catch (const zmq::error_t& e)
149                 {
150                         cout<<"MANAGER -- "<<e.what()<<endl;
151                         return 0;
152                 }
153         }
154         break;
155         case CLIENT_COMMUNICATION_REP:
156         {
157                 fSockets[CLIENT_COMMUNICATION_REP] =
158                         new socket_t(*fContexts[CLIENT_COMMUNICATION_REP],ZMQ_REP);
159                 try
160                 {
161                         fSockets[CLIENT_COMMUNICATION_REP]->bind(Form("tcp://*:%d",fStorageClientPort));
162                 }
163                 catch (const zmq::error_t& e)
164                 {
165                         cout<<"MANAGER -- "<<e.what()<<endl;
166                         return 0;
167                 }
168         }
169         break;
170         case EVENTS_SERVER_PUB:
171         {
172                 fSockets[EVENTS_SERVER_PUB] =
173                         new socket_t(*fContexts[EVENTS_SERVER_PUB],ZMQ_PUB);
174                 try
175                 {
176                         fSockets[EVENTS_SERVER_PUB]->bind(Form("tcp://*:%d",fEventServerPort));
177                 }
178                 catch (const zmq::error_t& e)
179                 {
180                         cout<<"MANAGER -- "<<e.what()<<endl;
181                         return 0;
182                 }
183         }
184         break;
185         case EVENTS_SERVER_SUB:
186         {
187                 fSockets[EVENTS_SERVER_SUB] =
188                         new socket_t(*fContexts[EVENTS_SERVER_SUB],ZMQ_SUB);
189                 fSockets[EVENTS_SERVER_SUB]->setsockopt(ZMQ_SUBSCRIBE,"",0);
190                 try
191                 {
192                         fSockets[EVENTS_SERVER_SUB]->connect(Form("tcp://%s:%d",fEventServer.c_str(),fEventServerPort));
193                 }
194                 catch (const zmq::error_t& e)
195                 {
196                         cout<<"MANAGER -- "<<e.what()<<endl;
197                         return 0;
198
199                 }
200         }
201         break;
202         case XML_PUB:
203         {
204                 fSockets[XML_PUB] =
205                         new socket_t(*fContexts[XML_PUB],ZMQ_PUB);
206                 try
207                 {
208                         fSockets[XML_PUB]->bind(Form("tcp://*:%d",fXmlServerPort));
209                 }
210                 catch (const zmq::error_t& e)
211                 {
212                         cout<<"MANAGER -- "<<e.what()<<endl;
213                         return 0;
214                 }
215         }
216         break;
217         default:break;
218         }
219         return 1;
220 }
221
222 void AliStorageEventManager::Send(vector<serverListStruct> list,storageSockets socket)
223 {
224         //send size of the struct first
225         int numberOfRecords = list.size();
226         message_t message(20);
227         snprintf ((char *)message.data(), 20 ,"%d",numberOfRecords);
228
229         fSockets[socket]->send(message);
230         if(numberOfRecords==0)return;
231         fSockets[socket]->recv((new message_t));//empty message just to keep req-rep order
232
233         //prepare message with event's list
234         char *buffer = reinterpret_cast<char*> (&list[0]);
235         message_t *reply = new message_t((void*)buffer,
236                               sizeof(serverListStruct)*numberOfRecords,0);
237         fSockets[socket]->send(*reply);
238
239         if(reply){delete reply;}
240 }
241
242 void AliStorageEventManager::Send(struct serverRequestStruct *request,storageSockets socket)
243 {
244         char *buffer = (char*)(request);
245         message_t *requestMessage = new message_t((void*)buffer,
246                                            sizeof(struct serverRequestStruct)
247                                            +sizeof(struct listRequestStruct)
248                                            +sizeof(struct eventStruct),0);      
249         fSockets[socket]->send(*requestMessage);
250 }
251
252 bool AliStorageEventManager::Send(struct clientRequestStruct *request,storageSockets socket,int timeout)
253 {
254         pollitem_t items[1] =  {{*fSockets[socket],0,ZMQ_POLLIN,0}} ;
255         
256         
257         char *buffer = (char*)(request);
258         message_t *requestMessage = new message_t((void*)buffer,
259                                                   sizeof(struct clientRequestStruct),0);
260
261         try
262         {
263                 fSockets[socket]->send(*requestMessage);
264         }
265         catch (const zmq::error_t& e)
266         {
267                 cout<<"MANAGER -- "<<e.what()<<endl;
268                 cout<<e.num()<<endl;
269                 if(fSockets[socket]){delete fSockets[socket];fSockets[socket]=0;}
270
271                 CreateSocket(socket);
272                 return 0;
273                 
274         }       
275         if(timeout>=0)
276         {
277                 if(poll (&items[0], 1, timeout)==0)
278                 {
279                         return 0;
280                 }
281         }
282
283         return 1;
284 }
285
286 void AliStorageEventManager::Send(long message,storageSockets socket)
287 {
288         stringstream streamBuffer;
289         streamBuffer << message;
290         string stringBuffer = streamBuffer.str();
291         char *buffer = (char*)stringBuffer.c_str();
292         message_t *replyMessage = new message_t((void*)buffer,sizeof(long),0);
293         fSockets[socket]->send(*replyMessage);
294         delete replyMessage;
295         streamBuffer.str(string());
296         streamBuffer.clear();
297 }
298
299 void AliStorageEventManager::Send(bool message,storageSockets socket)
300 {
301         char *buffer;
302         if(message==true)
303         {
304                 buffer = (char*)("true");
305         }
306         else
307         {
308                 buffer = (char*)("false");
309         }
310         message_t *replyMessage = new message_t((void*)buffer,sizeof(char*),0);
311         fSockets[socket]->send(*replyMessage);
312         delete replyMessage;
313 }
314
315 void AliStorageEventManager::Send(AliESDEvent *event, storageSockets socket)
316 {
317         TMessage tmess(kMESS_OBJECT);
318         tmess.Reset();
319         tmess.WriteObject(event);
320         TMessage::EnableSchemaEvolutionForAll(kTRUE);
321
322         int bufsize = tmess.Length();
323         char* buf = (char*) malloc(bufsize * sizeof(char));
324         memcpy(buf, tmess.Buffer(), bufsize);
325         
326         zmq::message_t message((void*)buf, bufsize, 0, 0);
327         fSockets[socket]->send(message);
328 }
329
330 void AliStorageEventManager::SendAsXml(AliESDEvent *event,storageSockets socket)
331 {
332         cout<<"SENDING AS XML"<<endl;
333         stringstream bufferStream;
334         bufferStream << "<?xml version=\"1.0\" encoding=\"utf-8\"?>"<<endl;
335         bufferStream << "<ESD xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xmlns:xsd=\"http://www.w3.org/2001/XMLSchema\">" << endl;
336         
337         for(int i=0;i<event->GetNumberOfTracks();i++)
338         {
339                 AliESDtrack *track = event->GetTrack(i);
340                 //double hgfhgf[size];
341                 //track->GetESDpid(pid);
342                 bufferStream << "\t<track mass=\""<<track->GetMass()<<"\">" <<endl;
343                 const AliTrackPointArray *array = track->GetTrackPointArray();
344
345                 if(array)
346                 {
347                         const float *x = array->GetX();
348                         const float *y = array->GetY();
349                         const float *z = array->GetZ();
350                         int n = array->GetNPoints();
351
352                         for(int j=0;j<n;j++)
353                         {
354                                 bufferStream <<"\t\t<point>"<<endl;
355                                 bufferStream <<"\t\t\t<x>"<< x[j] <<"</x>"<<endl;
356                                 bufferStream <<"\t\t\t<y>"<< y[j] <<"</y>"<<endl;
357                                 bufferStream <<"\t\t\t<z>"<< z[j] <<"</z>"<<endl;
358                                 bufferStream <<"\t\t</point>"<<endl;
359                         }
360                 }
361                 else cout<<"no array"<<endl;
362
363                 bufferStream << "\t</track>"<<endl;
364         }
365
366         bufferStream << "</ESD>"<<endl;
367
368         string bufferString = bufferStream.str();
369         message_t message(bufferString.size());
370         memcpy (message.data(), bufferString.data(), bufferString.size());      
371         
372         fSockets[socket]->send(message);
373         cout<<"xml sent"<<endl;
374 }
375
376 vector<serverListStruct> AliStorageEventManager::GetServerListVector(storageSockets socket)
377 {
378         //get size of the incomming message
379         message_t sizeMessage;
380         fSockets[socket]->recv(&sizeMessage);
381         int numberOfRecords;
382         istringstream iss(static_cast<char*>(sizeMessage.data()));
383         iss >> numberOfRecords;
384
385         if(numberOfRecords==0){cout<<"MANAGER -- list is empty"<<endl;}
386         
387         fSockets[socket]->send(*(new message_t()));//receive empty message just to keep req-rep order
388         
389 //get list of events
390         message_t *response = new message_t(sizeof(serverListStruct)*numberOfRecords);
391         fSockets[socket]->recv(response);
392         
393         vector<serverListStruct> receivedList(static_cast<serverListStruct*>(response->data()), static_cast<serverListStruct*>(response->data()) + numberOfRecords);
394
395         return receivedList;
396 }
397
398 AliESDEvent* AliStorageEventManager::GetEvent(storageSockets socket,int timeout,TTree **tmpTree)
399 {
400   message_t* message = new message_t();
401
402   try
403   {
404     fSockets[socket]->recv(message);
405   }
406   catch (const zmq::error_t& e)
407   {
408     cout<<"MANAGER -- "<<e.what()<<endl;
409     return NULL;
410   }
411         
412   TBufferFile *mess = new TBufferFile(TBuffer::kRead,
413                                       message->size()+sizeof(UInt_t),
414                                       message->data());
415   mess->InitMap();
416   mess->ReadClass();// get first the class stored in message
417   mess->SetBufferOffset(sizeof(UInt_t) + sizeof(kMESS_OBJECT));
418   mess->ResetMap();
419         
420   AliESDEvent* data = (AliESDEvent*)(mess->ReadObjectAny(AliESDEvent::Class()));
421         
422   if (data)
423   {
424     TTree* tree= new TTree("esdTree", "esdTree");
425     data->WriteToTree(tree);
426     tree->Fill();
427     AliESDEvent* event= new AliESDEvent();
428     event->ReadFromTree(tree);
429     tree->GetEntry(0);
430     if(data){delete data;}
431     //if(tree){delete tree;}
432     if(message){delete message;}
433     if(tmpTree){*tmpTree = tree;}
434     return event;               
435   }
436   else
437   {
438     if(message){delete message;}
439     return NULL;
440   }
441 }
442
443 struct serverRequestStruct* AliStorageEventManager::GetServerStruct(storageSockets socket)
444 {
445         struct serverRequestStruct *request = new struct serverRequestStruct;
446         message_t *requestMessage = new message_t();
447         fSockets[socket]->recv(requestMessage);
448         request = static_cast<struct serverRequestStruct*>(requestMessage->data());
449         return request;
450 }
451
452 struct clientRequestStruct* AliStorageEventManager::GetClientStruct(storageSockets socket)
453 {
454         struct clientRequestStruct *request = new struct clientRequestStruct;
455         message_t *requestMessage = new message_t();
456         fSockets[socket]->recv(requestMessage);
457         request = static_cast<struct clientRequestStruct*>(requestMessage->data());
458         return request;
459 }
460
461 bool AliStorageEventManager::GetBool(storageSockets socket)
462 {
463         message_t *response = new message_t();
464         fSockets[socket]->recv(response);
465         char *result = (char*)response->data();
466         
467         if(!strcmp("true",result)){return true;}
468         else{return false;}
469 }
470
471 long AliStorageEventManager::GetLong(storageSockets socket)
472 {
473         message_t *responseMessage = new message_t;
474         fSockets[socket]->recv(responseMessage);
475         return (long)atoi(static_cast<char*>(responseMessage->data()));
476 }
477