1f28d067cb4291cab0a92abbc75c4a2078530ceb
[u/mrichter/AliRoot.git] / MONITOR / alistoragemanager / AliStorageEventManager.cxx
1 #include "AliStorageEventManager.h"
2
3 #include <iostream>
4 #include <sstream>
5 #include <fstream>
6
7 #include <TList.h>
8 #include <TStreamerInfo.h>
9 #include <TThread.h>
10
11 #include "zmq.hpp"
12
13 #include "AliESDEvent.h"
14 #include "AliESDtrack.h"
15 #include "AliTrackPointArray.h"
16 #include "AliESDfriendTrack.h"
17 #include "AliExternalTrackParam.h"
18 #include "AliTrackerBase.h"
19 #include "AliTracker.h"
20
21 using namespace zmq;
22 using namespace std;
23
24 AliStorageEventManager *AliStorageEventManager::fManagerInstance = 0;
25
26 AliStorageEventManager::AliStorageEventManager()
27 {
28     //read config file
29     TThread::Lock();
30     ifstream configFile (GetConfigFilePath());
31     
32     if (configFile.is_open())
33     {
34         string line;
35         int from,to;
36         while(configFile.good())
37         {
38             getline(configFile,line);
39             from = line.find("\"")+1;
40             to = line.find_last_of("\"");
41             if(line.find("STORAGE_SERVER=")==0)
42             {
43                 fStorageServer=line.substr(from,to-from);
44             }
45             else if(line.find("EVENT_SERVER=")==0)
46             {
47                 fEventServer=line.substr(from,to-from);
48             }
49             else if(line.find("STORAGE_SERVER_PORT=")==0)
50             {
51                 fStorageServerPort=atoi(line.substr(from,to-from).c_str());
52             }
53             else if(line.find("EVENT_SERVER_PORT=")==0)
54             {
55                 fEventServerPort=atoi(line.substr(from,to-from).c_str());
56             }
57             else if(line.find("STORAGE_CLIENT_PORT=")==0)
58             {
59                 fStorageClientPort=atoi(line.substr(from,to-from).c_str());
60             }
61             else if(line.find("XML_SERVER_PORT=")==0)
62             {
63                 fXmlServerPort=atoi(line.substr(from,to-from).c_str());
64             }
65         }
66         if(configFile.eof()){configFile.clear();}
67         configFile.close();
68     }
69     else{cout<<"EVENT MANAGER -- Unable to open config file"<<endl;}
70     TThread::UnLock();
71     
72     for(int i=0;i<NUMBER_OF_SOCKETS;i++){fContexts[i] = new context_t();}
73 }
74 AliStorageEventManager::~AliStorageEventManager()
75 {
76     if(fManagerInstance){delete fManagerInstance;fManagerInstance=0;}
77 }
78
79 AliStorageEventManager* AliStorageEventManager::GetEventManagerInstance()
80 {
81     TThread::Lock();
82     if(fManagerInstance==0)
83     {
84         fManagerInstance = new AliStorageEventManager();
85     }
86     TThread::UnLock();
87     return fManagerInstance;
88 }
89
90
91 void freeBuff (void *data, void *hint)
92 {
93     //  free(data);
94 }
95
96 bool AliStorageEventManager::CreateSocket(storageSockets socket)
97 {
98     cout<<"Creating socket:"<<socket<<endl;
99     
100     switch(socket)
101     {
102         case SERVER_COMMUNICATION_REQ:
103         {
104             fSockets[SERVER_COMMUNICATION_REQ] =
105             new socket_t(*fContexts[SERVER_COMMUNICATION_REQ],ZMQ_REQ);
106             try
107             {
108                 fSockets[SERVER_COMMUNICATION_REQ]->connect(Form("tcp://%s:%d",fStorageServer.c_str(),fStorageServerPort));
109             }
110             catch (const zmq::error_t& e)
111             {
112                 cout<<"MANAGER -- "<<e.what()<<endl;
113                 return 0;
114             }
115         }
116             break;
117         case SERVER_COMMUNICATION_REP:
118         {
119             fSockets[SERVER_COMMUNICATION_REP] =
120             new socket_t(*fContexts[SERVER_COMMUNICATION_REP],ZMQ_REP);
121             try
122             {
123                 fSockets[SERVER_COMMUNICATION_REP]->bind(Form("tcp://*:%d",fStorageServerPort));
124             }
125             catch (const zmq::error_t& e)
126             {
127                 cout<<"MANAGER -- "<<e.what()<<endl;
128                 return 0;
129             }
130         }
131             break;
132         case CLIENT_COMMUNICATION_REQ:
133         {
134             fSockets[CLIENT_COMMUNICATION_REQ] =
135             new socket_t(*fContexts[CLIENT_COMMUNICATION_REQ],ZMQ_REQ);
136             try
137             {
138                 fSockets[CLIENT_COMMUNICATION_REQ]->connect(Form("tcp://%s:%d",fStorageServer.c_str(), fStorageClientPort));
139             }
140             catch (const zmq::error_t& e)
141             {
142                 cout<<"MANAGER -- "<<e.what()<<endl;
143                 return 0;
144             }
145         }
146             break;
147         case CLIENT_COMMUNICATION_REP:
148         {
149             fSockets[CLIENT_COMMUNICATION_REP] =
150             new socket_t(*fContexts[CLIENT_COMMUNICATION_REP],ZMQ_REP);
151             try
152             {
153                 fSockets[CLIENT_COMMUNICATION_REP]->bind(Form("tcp://*:%d",fStorageClientPort));
154             }
155             catch (const zmq::error_t& e)
156             {
157                 cout<<"MANAGER -- "<<e.what()<<endl;
158                 return 0;
159             }
160         }
161             break;
162         case EVENTS_SERVER_PUB:
163         {
164             fSockets[EVENTS_SERVER_PUB] =
165             new socket_t(*fContexts[EVENTS_SERVER_PUB],ZMQ_PUB);
166             try
167             {
168                 fSockets[EVENTS_SERVER_PUB]->bind(Form("tcp://*:%d",fEventServerPort));
169             }
170             catch (const zmq::error_t& e)
171             {
172                 cout<<"MANAGER -- "<<e.what()<<endl;
173                 return 0;
174             }
175         }
176             break;
177         case EVENTS_SERVER_SUB:
178         {
179             fSockets[EVENTS_SERVER_SUB] =
180             new socket_t(*fContexts[EVENTS_SERVER_SUB],ZMQ_SUB);
181             fSockets[EVENTS_SERVER_SUB]->setsockopt(ZMQ_SUBSCRIBE,"",0);
182             try
183             {
184                 fSockets[EVENTS_SERVER_SUB]->connect(Form("tcp://%s:%d",fEventServer.c_str(),fEventServerPort));
185             }
186             catch (const zmq::error_t& e)
187             {
188                 cout<<"MANAGER -- "<<e.what()<<endl;
189                 return 0;
190                 
191             }
192         }
193             break;
194         case XML_PUB:
195         {
196             fSockets[XML_PUB] =
197             new socket_t(*fContexts[XML_PUB],ZMQ_PUB);
198             try
199             {
200                 fSockets[XML_PUB]->bind(Form("tcp://*:%d",fXmlServerPort));
201             }
202             catch (const zmq::error_t& e)
203             {
204                 cout<<"MANAGER -- "<<e.what()<<endl;
205                 return 0;
206             }
207         }
208             break;
209         default:break;
210     }
211     return 1;
212 }
213
214 void AliStorageEventManager::Send(vector<serverListStruct> list,storageSockets socket)
215 {
216     //send size of the struct first
217     int numberOfRecords = list.size();
218     message_t message(20);
219     snprintf ((char *)message.data(), 20 ,"%d",numberOfRecords);
220     try{
221         fSockets[socket]->send(message);
222     }
223     catch(const zmq::error_t &e)
224     {
225         cout<<"MANAGER -- send vector -- "<<e.what()<<endl;
226     }
227     //if(numberOfRecords==0)return;
228     message_t *tmpMessage = new message_t();
229     
230     try{
231         fSockets[socket]->recv(tmpMessage);//empty message just to keep req-rep order
232     }
233     catch(const zmq::error_t &e)
234     {
235         cout<<"MANAGER -- send vector -- "<<e.what()<<endl;
236     }
237     // //prepare message with event's list
238     // char *buffer = reinterpret_cast<char*> (&list[0]);
239     // message_t *reply = new message_t((void*)buffer,
240     //                sizeof(serverListStruct)*numberOfRecords,0);
241     // fSockets[socket]->send(*reply);
242     // if(reply){delete reply;}
243     
244     message_t reply(sizeof(serverListStruct)*numberOfRecords);
245     memcpy(reply.data(), reinterpret_cast<const char*> (&list[0]), sizeof(serverListStruct)*numberOfRecords);
246     
247     try{
248         fSockets[socket]->send(reply);
249     }
250     catch(const zmq::error_t &e)
251     {
252         cout<<"MANAGER -- send vector -- "<<e.what()<<endl;
253     }
254     if(tmpMessage){delete tmpMessage;}
255 }
256
257 void AliStorageEventManager::Send(struct serverRequestStruct *request,storageSockets socket)
258 {
259     char *buffer = (char*)(request);
260     message_t *requestMessage = new message_t((void*)buffer,
261                                               sizeof(struct serverRequestStruct)
262                                               +sizeof(struct listRequestStruct)
263                                               +sizeof(struct eventStruct),freeBuff);
264     try{
265         fSockets[socket]->send(*requestMessage);
266     }
267     catch(const zmq::error_t &e)
268     {
269         cout<<"MANAGER -- send serverRequestStruct -- "<<e.what()<<endl;
270     }
271 }
272
273 bool AliStorageEventManager::Send(struct clientRequestStruct *request,storageSockets socket,int timeout)
274 {
275     pollitem_t items[1] =  {{*fSockets[socket],0,ZMQ_POLLIN,0}} ;
276     
277     
278     char *buffer = (char*)(request);
279     message_t *requestMessage = new message_t((void*)buffer,
280                                               sizeof(struct clientRequestStruct),freeBuff);
281     
282     try{
283         fSockets[socket]->send(*requestMessage);
284     }
285     catch (const zmq::error_t& e)
286     {
287         cout<<"MANAGER -- "<<e.what()<<endl;
288         cout<<e.num()<<endl;
289         if(fSockets[socket]){delete fSockets[socket];fSockets[socket]=0;}
290         
291         CreateSocket(socket);
292         delete requestMessage;
293         return 0;
294         
295     }
296     if(timeout>=0)
297     {
298         if(poll (&items[0], 1, timeout)==0)
299         {
300             delete requestMessage;
301             return 0;
302         }
303     }
304     delete requestMessage;
305     return 1;
306 }
307
308 void AliStorageEventManager::Send(long message,storageSockets socket)
309 {
310     stringstream streamBuffer;
311     streamBuffer << message;
312     string stringBuffer = streamBuffer.str();
313     char *buffer = (char*)stringBuffer.c_str();
314     message_t *replyMessage = new message_t((void*)buffer,sizeof(stringBuffer),freeBuff);
315     
316     try{
317         fSockets[socket]->send(*replyMessage);
318     }
319     catch(const zmq::error_t &e)
320     {
321         cout<<"MANAGER -- send long -- "<<e.what()<<endl;
322     }
323     delete replyMessage;
324     streamBuffer.str(string());
325     streamBuffer.clear();
326 }
327
328 void AliStorageEventManager::Send(bool message,storageSockets socket)
329 {
330     char *buffer;
331     if(message==true)
332     {
333         buffer = (char*)("true");
334     }
335     else
336     {
337         buffer = (char*)("false");
338     }
339     message_t *replyMessage = new message_t((void*)buffer,sizeof(buffer),freeBuff);
340     try{
341         fSockets[socket]->send(*replyMessage);
342     }
343     catch(const zmq::error_t &e)
344     {
345         cout<<"MANAGER -- send bool -- "<<e.what()<<endl;
346     }
347     delete replyMessage;
348 }
349
350 void AliStorageEventManager::Send(AliESDEvent *event, storageSockets socket)
351 {
352   if(!event){return;}
353
354     TMessage tmess(kMESS_OBJECT);
355     tmess.Reset();
356     tmess.WriteObject(event);
357     //  TMessage::EnableSchemaEvolutionForAll(kTRUE);
358     
359     int bufsize = tmess.Length();
360     char* buf = (char*) malloc(bufsize * sizeof(char));
361     memcpy(buf, tmess.Buffer(), bufsize);
362     
363     message_t message((void*)buf, bufsize, freeBuff);
364     try{
365         fSockets[socket]->send(message);
366     }
367     catch(const zmq::error_t &e)
368     {
369         cout<<"MANAGER -- send AliESDEvent -- "<<e.what()<<endl;
370     }
371 }
372
373 void AliStorageEventManager::SendAsXml(AliESDEvent *event,storageSockets socket)
374 {
375     cout<<"SENDING AS XML"<<endl;
376     stringstream bufferStream;
377     bufferStream << "<?xml version=\"1.0\" encoding=\"utf-8\"?>"<<endl;
378     bufferStream << "<ESD xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xmlns:xsd=\"http://www.w3.org/2001/XMLSchema\">" << endl;
379     
380     for(int i=0;i<event->GetNumberOfTracks();i++)
381     {
382         AliESDtrack *track = event->GetTrack(i);
383         AliKalmanTrack *ITStrack = track->GetITStrack();
384         const AliTrackPointArray *array = track->GetTrackPointArray();
385       
386         bufferStream << "\t<track mass=\""<<track->GetMass()<<"\"";
387         //      bufferStream << "\t<track esdpid=\""<<track->GetESDpid();
388         bufferStream << "\t pid=\""<<track->PID()<<"\"";
389         bufferStream << "\t energy=\""<<track->E()<<"\"";
390         bufferStream << "\t volumeID=\""<<array->GetVolumeID()<<"\">" <<endl;
391
392         
393
394       
395         
396         if(array)
397         {
398             const float *x = array->GetX();
399             const float *y = array->GetY();
400             const float *z = array->GetZ();
401             int n = array->GetNPoints();
402             
403             for(int j=0;j<n;j++)
404             {
405               bufferStream <<"\t\t<point volumeID=\""<<array->GetVolumeID()[j]<<"\">"<<endl;
406                 bufferStream <<"\t\t\t<x>"<< x[j] <<"</x>"<<endl;
407                 bufferStream <<"\t\t\t<y>"<< y[j] <<"</y>"<<endl;
408                 bufferStream <<"\t\t\t<z>"<< z[j] <<"</z>"<<endl;
409                 bufferStream <<"\t\t</point>"<<endl;
410             }
411         }
412         else cout<<"no array"<<endl;
413         
414         bufferStream << "\t</track>"<<endl;
415     }
416     
417     bufferStream << "</ESD>"<<endl;
418     
419     string bufferString = bufferStream.str();
420     message_t message(bufferString.size());
421     memcpy (message.data(), bufferString.data(), bufferString.size());
422     
423     try{
424         fSockets[socket]->send(message);
425     }
426     catch(const zmq::error_t &e)
427     {
428         cout<<"MANAGER -- send send xml -- "<<e.what()<<endl;
429     }
430     cout<<"xml sent"<<endl;
431 }
432
433 vector<serverListStruct> AliStorageEventManager::GetServerListVector(storageSockets socket, int timeout)
434 {
435   pollitem_t items[1] =  {{*fSockets[socket],0,ZMQ_POLLIN,0}} ;
436   if(timeout>=0){if(poll (&items[0], 1, timeout)==0){vector<serverListStruct> emptyVector;return emptyVector;}}
437
438     //get size of the incomming message
439     message_t sizeMessage;
440     
441     try{
442         fSockets[socket]->recv(&sizeMessage);
443     }
444     catch(const zmq::error_t &e)
445     {
446         cout<<"MANAGER -- get vector -- "<<e.what()<<endl;
447     }
448     int numberOfRecords;
449     istringstream iss(static_cast<char*>(sizeMessage.data()));
450     iss >> numberOfRecords;
451     
452     if(numberOfRecords==0){cout<<"MANAGER -- list is empty"<<endl;}
453     
454     try{
455         fSockets[socket]->send(*(new message_t()));//receive empty message just to keep req-rep order
456     }
457     catch(const zmq::error_t &e)
458     {
459         cout<<"MANAGER -- get vector -- "<<e.what()<<endl;
460     }
461     //get list of events
462     message_t *response = new message_t(sizeof(serverListStruct)*numberOfRecords);
463     try{
464         fSockets[socket]->recv(response);
465     }
466     catch(const zmq::error_t &e)
467     {
468         cout<<"MANAGER -- get vector -- "<<e.what()<<endl;
469     }
470     
471     vector<serverListStruct> receivedList(static_cast<serverListStruct*>(response->data()), static_cast<serverListStruct*>(response->data()) + numberOfRecords);
472     
473     if (response) {delete response;}
474     return receivedList;
475 }
476
477 AliESDEvent* AliStorageEventManager::GetEvent(storageSockets socket,int timeout)
478 {
479     pollitem_t items[1] =  {{*fSockets[socket],0,ZMQ_POLLIN,0}} ;
480     if(timeout>=0){
481         try{(poll (&items[0], 1, timeout)==0);}
482         catch(const zmq::error_t &e){
483           cout<<"EVENT MANAGER -- GetEvent():"<<e.what()<<endl;
484             return NULL;
485           }
486     }
487     message_t* message = new message_t();
488     
489     try
490     {
491         fSockets[socket]->recv(message);
492     }
493     catch (const zmq::error_t& e)
494     {
495         cout<<"MANAGER -- "<<e.what()<<endl;
496         return NULL;
497     }
498     TBufferFile *mess = new TBufferFile(TBuffer::kRead,
499                                         message->size()+sizeof(UInt_t),
500                                         message->data());
501     mess->InitMap();
502     mess->ReadClass();// get first the class stored in message
503     mess->SetBufferOffset(sizeof(UInt_t) + sizeof(kMESS_OBJECT));
504     mess->ResetMap();
505     
506     AliESDEvent* data = (AliESDEvent*)(mess->ReadObjectAny(AliESDEvent::Class()));
507     
508     if (data)
509     {
510         data->GetStdContent();
511         if(message){delete message;}
512         return data;
513     }
514     else
515     {
516         if(message){delete message;}
517         return NULL;
518     }
519 }
520
521 struct serverRequestStruct* AliStorageEventManager::GetServerStruct(storageSockets socket)
522 {
523     struct serverRequestStruct *request = new struct serverRequestStruct;
524     message_t *requestMessage = new message_t();
525     try{
526         fSockets[socket]->recv(requestMessage);
527     }
528     catch(const zmq::error_t &e)
529     {
530         cout<<"MANAGER -- get serverRequestStruct -- "<<e.what()<<endl;
531         request->messageType = -1;
532         return request;
533     }
534     request = static_cast<struct serverRequestStruct*>(requestMessage->data());
535     return request;
536 }
537
538 struct clientRequestStruct* AliStorageEventManager::GetClientStruct(storageSockets socket,int timeout)
539 {
540     pollitem_t items[1] =  {{*fSockets[socket],0,ZMQ_POLLIN,0}} ;
541     if(timeout>=0){if(poll (&items[0], 1, timeout)==0){return NULL;}}
542     
543     struct clientRequestStruct *request = new struct clientRequestStruct;
544     message_t *requestMessage = new message_t();
545     try{
546         fSockets[socket]->recv(requestMessage);
547     }
548     catch(const zmq::error_t &e)
549     {
550         cout<<"MANAGER -- get clientRequestStruct -- "<<e.what()<<endl;
551     }
552     request = static_cast<struct clientRequestStruct*>(requestMessage->data());
553     return request;
554 }
555
556 bool AliStorageEventManager::GetBool(storageSockets socket)
557 {
558     message_t *response = new message_t();
559     try{
560         fSockets[socket]->recv(response);
561     }
562     catch(const zmq::error_t &e)
563     {
564         cout<<"MANAGER -- get bool -- "<<e.what()<<endl;
565     }
566     char *result = (char*)response->data();
567     
568     if(!strcmp("true",result)){return true;}
569     else{return false;}
570 }
571
572 long AliStorageEventManager::GetLong(storageSockets socket)
573 {
574     message_t *responseMessage = new message_t();
575     try{
576         fSockets[socket]->recv(responseMessage);
577     }
578     catch(const zmq::error_t &e)
579     {
580         cout<<"MANAGER -- get long -- "<<e.what()<<endl;
581     }
582     
583     long result = 0;
584     
585     if(responseMessage)
586     {
587         result = (long)atoi(static_cast<char*>(responseMessage->data()));
588         delete responseMessage;
589     }
590     return result;
591 }
592