d55260ab63f4c2bcdb3f720788219c5834386892
[u/mrichter/AliRoot.git] / MONITOR / alistoragemanager / AliStorageEventManager.cxx
1 #include "AliStorageEventManager.h"
2
3 #include <iostream>
4 #include <sstream>
5 #include <fstream>
6
7 #include <TList.h>
8 #include <TStreamerInfo.h>
9 #include <TThread.h>
10
11 #include "zmq.hpp"
12
13 #include "AliESDEvent.h"
14 #include "AliESDtrack.h"
15 #include "AliTrackPointArray.h"
16 #include "AliESDfriendTrack.h"
17 #include "AliExternalTrackParam.h"
18 #include "AliTrackerBase.h"
19 #include "AliTracker.h"
20
21 using namespace zmq;
22 using namespace std;
23
24 AliStorageEventManager *AliStorageEventManager::fManagerInstance = 0;
25
26 AliStorageEventManager::AliStorageEventManager()
27 {
28         //read config file
29         TThread::Lock();
30         ifstream configFile (GetConfigFilePath());
31     
32         if (configFile.is_open())
33         {
34                 string line;
35                 int from,to;
36                 while(configFile.good())
37                 {
38                         getline(configFile,line);
39                         from = line.find("\"")+1;
40                         to = line.find_last_of("\"");
41                         if(line.find("STORAGE_SERVER=")==0)
42                         {
43                                 fStorageServer=line.substr(from,to-from);
44                         }
45                         else if(line.find("EVENT_SERVER=")==0)
46                         {
47                                 fEventServer=line.substr(from,to-from);
48                         }
49                         else if(line.find("STORAGE_SERVER_PORT=")==0)
50                         {
51                                 fStorageServerPort=atoi(line.substr(from,to-from).c_str());
52                         }
53                         else if(line.find("EVENT_SERVER_PORT=")==0)
54                         {
55                                 fEventServerPort=atoi(line.substr(from,to-from).c_str());
56                         }
57                         else if(line.find("STORAGE_CLIENT_PORT=")==0)
58                         {
59                                 fStorageClientPort=atoi(line.substr(from,to-from).c_str());
60                         }
61                         else if(line.find("XML_SERVER_PORT=")==0)
62                         {
63                                 fXmlServerPort=atoi(line.substr(from,to-from).c_str());
64                         }
65                 }
66                 if(configFile.eof())
67                 {
68                         configFile.clear();
69                 }
70                 configFile.close();
71         }
72         else
73         {
74                 cout<<"EVENT MANAGER -- Unable to open config file"<<endl;
75         }
76         TThread::UnLock();
77         
78         for(int i=0;i<NUMBER_OF_SOCKETS;i++)
79         {
80                 fContexts[i] = new context_t();
81         }
82 }
83 AliStorageEventManager::~AliStorageEventManager()
84 {
85         if(fManagerInstance){delete fManagerInstance;fManagerInstance=0;}
86 }
87
88 AliStorageEventManager* AliStorageEventManager::GetEventManagerInstance()
89 {
90         TThread::Lock();
91         if(fManagerInstance==0)
92         {
93                 fManagerInstance = new AliStorageEventManager();
94         }
95         TThread::UnLock();
96         return fManagerInstance;
97 }
98
99
100 void __freeBuff (void *data, void *hint)
101 {
102     free(data);
103 }
104
105 bool AliStorageEventManager::CreateSocket(storageSockets socket)
106 {
107         switch(socket)
108         {
109         case SERVER_COMMUNICATION_REQ:
110         {
111                 fSockets[SERVER_COMMUNICATION_REQ] =
112                         new socket_t(*fContexts[SERVER_COMMUNICATION_REQ],ZMQ_REQ);
113                 try
114                 {
115                         fSockets[SERVER_COMMUNICATION_REQ]->connect(Form("tcp://%s:%d",fStorageServer.c_str(),fStorageServerPort));
116
117                 }
118                 catch (const zmq::error_t& e)
119                 {
120                         cout<<"MANAGER -- "<<e.what()<<endl;
121                         return 0;
122                 }
123         }
124         break;
125         case SERVER_COMMUNICATION_REP:
126         {
127                 fSockets[SERVER_COMMUNICATION_REP] =
128                         new socket_t(*fContexts[SERVER_COMMUNICATION_REP],ZMQ_REP);
129                 try
130                 {
131                         fSockets[SERVER_COMMUNICATION_REP]->bind(Form("tcp://*:%d",fStorageServerPort));
132                 }
133                 catch (const zmq::error_t& e)
134                 {
135                         cout<<"MANAGER -- "<<e.what()<<endl;
136                         return 0;
137                 }
138         }
139         break;
140         case CLIENT_COMMUNICATION_REQ:
141         {
142                 fSockets[CLIENT_COMMUNICATION_REQ] =
143                         new socket_t(*fContexts[CLIENT_COMMUNICATION_REQ],ZMQ_REQ);
144                 try
145                 {
146                         fSockets[CLIENT_COMMUNICATION_REQ]->connect(Form("tcp://%s:%d",fStorageServer.c_str(), fStorageClientPort));
147                 }
148                 catch (const zmq::error_t& e)
149                 {
150                         cout<<"MANAGER -- "<<e.what()<<endl;
151                         return 0;
152                 }
153         }
154         break;
155         case CLIENT_COMMUNICATION_REP:
156         {
157                 fSockets[CLIENT_COMMUNICATION_REP] =
158                         new socket_t(*fContexts[CLIENT_COMMUNICATION_REP],ZMQ_REP);
159                 try
160                 {
161                         fSockets[CLIENT_COMMUNICATION_REP]->bind(Form("tcp://*:%d",fStorageClientPort));
162                 }
163                 catch (const zmq::error_t& e)
164                 {
165                         cout<<"MANAGER -- "<<e.what()<<endl;
166                         return 0;
167                 }
168         }
169         break;
170         case EVENTS_SERVER_PUB:
171         {
172                 fSockets[EVENTS_SERVER_PUB] =
173                         new socket_t(*fContexts[EVENTS_SERVER_PUB],ZMQ_PUB);
174                 try
175                 {
176                         fSockets[EVENTS_SERVER_PUB]->bind(Form("tcp://*:%d",fEventServerPort));
177                 }
178                 catch (const zmq::error_t& e)
179                 {
180                         cout<<"MANAGER -- "<<e.what()<<endl;
181                         return 0;
182                 }
183         }
184         break;
185         case EVENTS_SERVER_SUB:
186         {
187                 fSockets[EVENTS_SERVER_SUB] =
188                         new socket_t(*fContexts[EVENTS_SERVER_SUB],ZMQ_SUB);
189                 fSockets[EVENTS_SERVER_SUB]->setsockopt(ZMQ_SUBSCRIBE,"",0);
190                 try
191                 {
192                         fSockets[EVENTS_SERVER_SUB]->connect(Form("tcp://%s:%d",fEventServer.c_str(),fEventServerPort));
193                 }
194                 catch (const zmq::error_t& e)
195                 {
196                         cout<<"MANAGER -- "<<e.what()<<endl;
197                         return 0;
198
199                 }
200         }
201         break;
202         case XML_PUB:
203         {
204                 fSockets[XML_PUB] =
205                         new socket_t(*fContexts[XML_PUB],ZMQ_PUB);
206                 try
207                 {
208                         fSockets[XML_PUB]->bind(Form("tcp://*:%d",fXmlServerPort));
209                 }
210                 catch (const zmq::error_t& e)
211                 {
212                         cout<<"MANAGER -- "<<e.what()<<endl;
213                         return 0;
214                 }
215         }
216         break;
217         default:break;
218         }
219         return 1;
220 }
221
222 void AliStorageEventManager::Send(vector<serverListStruct> list,storageSockets socket)
223 {
224         //send size of the struct first
225         int numberOfRecords = list.size();
226         message_t message(20);
227         snprintf ((char *)message.data(), 20 ,"%d",numberOfRecords);
228
229         fSockets[socket]->send(message);
230         if(numberOfRecords==0)return;
231         fSockets[socket]->recv((new message_t));//empty message just to keep req-rep order
232
233         // //prepare message with event's list
234         // char *buffer = reinterpret_cast<char*> (&list[0]);
235         // message_t *reply = new message_t((void*)buffer,
236         //                    sizeof(serverListStruct)*numberOfRecords,0);
237         // fSockets[socket]->send(*reply);
238         // if(reply){delete reply;}
239
240         zmq::message_t reply(sizeof(serverListStruct)*numberOfRecords);
241         memcpy(reply.data(), reinterpret_cast<const char*> (&list[0]), sizeof(serverListStruct)*numberOfRecords);
242
243         fSockets[socket]->send(reply);
244
245 }
246
247 void AliStorageEventManager::Send(struct serverRequestStruct *request,storageSockets socket)
248 {
249         char *buffer = (char*)(request);
250         message_t *requestMessage = new message_t((void*)buffer,
251                                            sizeof(struct serverRequestStruct)
252                                            +sizeof(struct listRequestStruct)
253                                            +sizeof(struct eventStruct),0);      
254         fSockets[socket]->send(*requestMessage);
255 }
256
257 bool AliStorageEventManager::Send(struct clientRequestStruct *request,storageSockets socket,int timeout)
258 {
259         pollitem_t items[1] =  {{*fSockets[socket],0,ZMQ_POLLIN,0}} ;
260         
261         
262         char *buffer = (char*)(request);
263         message_t *requestMessage = new message_t((void*)buffer,
264                                                   sizeof(struct clientRequestStruct),0);
265
266         try
267         {
268                 fSockets[socket]->send(*requestMessage);
269         }
270         catch (const zmq::error_t& e)
271         {
272                 cout<<"MANAGER -- "<<e.what()<<endl;
273                 cout<<e.num()<<endl;
274                 if(fSockets[socket]){delete fSockets[socket];fSockets[socket]=0;}
275
276                 CreateSocket(socket);
277                 delete requestMessage;
278                 return 0;
279                 
280         }       
281         if(timeout>=0)
282         {
283                 if(poll (&items[0], 1, timeout)==0)
284                 {
285                   delete requestMessage;
286                         return 0;
287                 }
288         }
289         delete requestMessage;          
290         return 1;
291 }
292
293 void AliStorageEventManager::Send(long message,storageSockets socket)
294 {
295         stringstream streamBuffer;
296         streamBuffer << message;
297         string stringBuffer = streamBuffer.str();
298         char *buffer = (char*)stringBuffer.c_str();
299         message_t *replyMessage = new message_t((void*)buffer,sizeof(long),0);
300         fSockets[socket]->send(*replyMessage);
301         delete replyMessage;
302         streamBuffer.str(string());
303         streamBuffer.clear();
304 }
305
306 void AliStorageEventManager::Send(bool message,storageSockets socket)
307 {
308         char *buffer;
309         if(message==true)
310         {
311                 buffer = (char*)("true");
312         }
313         else
314         {
315                 buffer = (char*)("false");
316         }
317         message_t *replyMessage = new message_t((void*)buffer,sizeof(char*),0);
318         fSockets[socket]->send(*replyMessage);
319         delete replyMessage;
320 }
321
322 void AliStorageEventManager::Send(AliESDEvent *event, storageSockets socket)
323 {
324         TMessage tmess(kMESS_OBJECT);
325         tmess.Reset();
326         tmess.WriteObject(event);
327         TMessage::EnableSchemaEvolutionForAll(kTRUE);
328
329         int bufsize = tmess.Length();
330         char* buf = (char*) malloc(bufsize * sizeof(char));
331         memcpy(buf, tmess.Buffer(), bufsize);
332         
333         zmq::message_t message((void*)buf, bufsize, 0, 0);
334         fSockets[socket]->send(message);
335 }
336
337 void AliStorageEventManager::SendAsXml(AliESDEvent *event,storageSockets socket)
338 {
339         cout<<"SENDING AS XML"<<endl;
340         stringstream bufferStream;
341         bufferStream << "<?xml version=\"1.0\" encoding=\"utf-8\"?>"<<endl;
342         bufferStream << "<ESD xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xmlns:xsd=\"http://www.w3.org/2001/XMLSchema\">" << endl;
343         
344         for(int i=0;i<event->GetNumberOfTracks();i++)
345         {
346                 AliESDtrack *track = event->GetTrack(i);
347                 //double hgfhgf[size];
348                 //track->GetESDpid(pid);
349                 bufferStream << "\t<track mass=\""<<track->GetMass()<<"\">" <<endl;
350                 const AliTrackPointArray *array = track->GetTrackPointArray();
351
352                 if(array)
353                 {
354                         const float *x = array->GetX();
355                         const float *y = array->GetY();
356                         const float *z = array->GetZ();
357                         int n = array->GetNPoints();
358
359                         for(int j=0;j<n;j++)
360                         {
361                                 bufferStream <<"\t\t<point>"<<endl;
362                                 bufferStream <<"\t\t\t<x>"<< x[j] <<"</x>"<<endl;
363                                 bufferStream <<"\t\t\t<y>"<< y[j] <<"</y>"<<endl;
364                                 bufferStream <<"\t\t\t<z>"<< z[j] <<"</z>"<<endl;
365                                 bufferStream <<"\t\t</point>"<<endl;
366                         }
367                 }
368                 else cout<<"no array"<<endl;
369
370                 bufferStream << "\t</track>"<<endl;
371         }
372
373         bufferStream << "</ESD>"<<endl;
374
375         string bufferString = bufferStream.str();
376         message_t message(bufferString.size());
377         memcpy (message.data(), bufferString.data(), bufferString.size());      
378         
379         fSockets[socket]->send(message);
380         cout<<"xml sent"<<endl;
381 }
382
383 vector<serverListStruct> AliStorageEventManager::GetServerListVector(storageSockets socket)
384 {
385         //get size of the incomming message
386         message_t sizeMessage;
387         fSockets[socket]->recv(&sizeMessage);
388         int numberOfRecords;
389         istringstream iss(static_cast<char*>(sizeMessage.data()));
390         iss >> numberOfRecords;
391
392         if(numberOfRecords==0){cout<<"MANAGER -- list is empty"<<endl;}
393         
394         fSockets[socket]->send(*(new message_t()));//receive empty message just to keep req-rep order
395         
396 //get list of events
397         message_t *response = new message_t(sizeof(serverListStruct)*numberOfRecords);
398         fSockets[socket]->recv(response);
399         
400         vector<serverListStruct> receivedList(static_cast<serverListStruct*>(response->data()), static_cast<serverListStruct*>(response->data()) + numberOfRecords);
401
402         if (response) {delete response;}
403         return receivedList;
404 }
405
406 AliESDEvent* AliStorageEventManager::GetEvent(storageSockets socket,int timeout,TTree **tmpTree)
407 {
408   message_t* message = new message_t();
409
410   try
411   {
412     fSockets[socket]->recv(message);
413   }
414   catch (const zmq::error_t& e)
415   {
416     cout<<"MANAGER -- "<<e.what()<<endl;
417     return NULL;
418   }
419         
420   TBufferFile *mess = new TBufferFile(TBuffer::kRead,
421                                       message->size()+sizeof(UInt_t),
422                                       message->data());
423   mess->InitMap();
424   mess->ReadClass();// get first the class stored in message
425   mess->SetBufferOffset(sizeof(UInt_t) + sizeof(kMESS_OBJECT));
426   mess->ResetMap();
427         
428   AliESDEvent* data = (AliESDEvent*)(mess->ReadObjectAny(AliESDEvent::Class()));
429         
430   if (data)
431   {
432     TTree* tree= new TTree("esdTree", "esdTree");
433     data->WriteToTree(tree);
434     tree->Fill();
435     AliESDEvent* event= new AliESDEvent();
436     event->ReadFromTree(tree);
437     tree->GetEntry(0);
438     if(data){delete data;}
439     //if(tree){delete tree;}
440     if(message){delete message;}
441     if(tmpTree){*tmpTree = tree;}
442     return event;               
443   }
444   else
445   {
446     if(message){delete message;}
447     return NULL;
448   }
449 }
450
451 struct serverRequestStruct* AliStorageEventManager::GetServerStruct(storageSockets socket)
452 {
453         struct serverRequestStruct *request = new struct serverRequestStruct;
454         message_t *requestMessage = new message_t();
455         fSockets[socket]->recv(requestMessage);
456         request = static_cast<struct serverRequestStruct*>(requestMessage->data());
457         return request;
458 }
459
460 struct clientRequestStruct* AliStorageEventManager::GetClientStruct(storageSockets socket)
461 {
462         struct clientRequestStruct *request = new struct clientRequestStruct;
463         message_t *requestMessage = new message_t();
464         fSockets[socket]->recv(requestMessage);
465         request = static_cast<struct clientRequestStruct*>(requestMessage->data());
466         return request;
467 }
468
469 bool AliStorageEventManager::GetBool(storageSockets socket)
470 {
471         message_t *response = new message_t();
472         fSockets[socket]->recv(response);
473         char *result = (char*)response->data();
474         
475         if(!strcmp("true",result)){return true;}
476         else{return false;}
477 }
478
479 long AliStorageEventManager::GetLong(storageSockets socket)
480 {
481         message_t *responseMessage = new message_t;
482         fSockets[socket]->recv(responseMessage);
483         return (long)atoi(static_cast<char*>(responseMessage->data()));
484 }
485