]> git.uio.no Git - u/mrichter/AliRoot.git/blob - MONITOR/alistoragemanager/AliStorageEventManager.cxx
Online reconstruction simplified and prepared to be controlled by Storage Manager...
[u/mrichter/AliRoot.git] / MONITOR / alistoragemanager / AliStorageEventManager.cxx
1 #include "AliStorageEventManager.h"
2
3 #include <iostream>
4 #include <sstream>
5 #include <fstream>
6
7 #include <TList.h>
8 #include <TStreamerInfo.h>
9 #include <TThread.h>
10
11 #include "zmq.hpp"
12
13 #include "AliESDEvent.h"
14 #include "AliESDtrack.h"
15 #include "AliTrackPointArray.h"
16 #include "AliESDfriendTrack.h"
17 #include "AliExternalTrackParam.h"
18 #include "AliTrackerBase.h"
19 #include "AliTracker.h"
20
21 using namespace zmq;
22 using namespace std;
23
24 AliStorageEventManager *AliStorageEventManager::fManagerInstance = 0;
25
26 AliStorageEventManager::AliStorageEventManager()
27 {
28     //read config file
29     TThread::Lock();
30     ifstream configFile (GetConfigFilePath());
31     
32     if (configFile.is_open())
33     {
34         string line;
35         int from,to;
36         while(configFile.good())
37         {
38             getline(configFile,line);
39             from = line.find("\"")+1;
40             to = line.find_last_of("\"");
41             if(line.find("STORAGE_SERVER=")==0)
42             {
43                 fStorageServer=line.substr(from,to-from);
44             }
45             else if(line.find("EVENT_SERVER=")==0)
46             {
47                 fEventServer=line.substr(from,to-from);
48             }
49             else if(line.find("STORAGE_SERVER_PORT=")==0)
50             {
51                 fStorageServerPort=atoi(line.substr(from,to-from).c_str());
52             }
53             else if(line.find("EVENT_SERVER_PORT=")==0)
54             {
55                 fEventServerPort=atoi(line.substr(from,to-from).c_str());
56             }
57             else if(line.find("STORAGE_CLIENT_PORT=")==0)
58             {
59                 fStorageClientPort=atoi(line.substr(from,to-from).c_str());
60             }
61             else if(line.find("XML_SERVER_PORT=")==0)
62             {
63                 fXmlServerPort=atoi(line.substr(from,to-from).c_str());
64             }
65         }
66         if(configFile.eof()){configFile.clear();}
67         configFile.close();
68     }
69     else{cout<<"EVENT MANAGER -- Unable to open config file"<<endl;}
70     TThread::UnLock();
71     
72     for(int i=0;i<NUMBER_OF_SOCKETS;i++){fContexts[i] = new context_t();}
73 }
74 AliStorageEventManager::~AliStorageEventManager()
75 {
76     if(fManagerInstance){delete fManagerInstance;fManagerInstance=0;}
77 }
78
79 AliStorageEventManager* AliStorageEventManager::GetEventManagerInstance()
80 {
81     TThread::Lock();
82     if(fManagerInstance==0)
83     {
84         fManagerInstance = new AliStorageEventManager();
85     }
86     TThread::UnLock();
87     return fManagerInstance;
88 }
89
90
91 void freeBuff (void *data, void *hint)
92 {
93     //  free(data);
94 }
95
96 bool AliStorageEventManager::CreateSocket(storageSockets socket)
97 {
98     cout<<"Creating socket:"<<socket<<endl;
99     
100     switch(socket)
101     {
102         case SERVER_COMMUNICATION_REQ:
103         {
104             fSockets[SERVER_COMMUNICATION_REQ] =
105             new socket_t(*fContexts[SERVER_COMMUNICATION_REQ],ZMQ_REQ);
106             try
107             {
108                 fSockets[SERVER_COMMUNICATION_REQ]->connect(Form("tcp://%s:%d",fStorageServer.c_str(),fStorageServerPort));
109             }
110             catch (const zmq::error_t& e)
111             {
112                 cout<<"MANAGER -- "<<e.what()<<endl;
113                 return 0;
114             }
115         }
116             break;
117         case SERVER_COMMUNICATION_REP:
118         {
119             fSockets[SERVER_COMMUNICATION_REP] =
120             new socket_t(*fContexts[SERVER_COMMUNICATION_REP],ZMQ_REP);
121             try
122             {
123                 fSockets[SERVER_COMMUNICATION_REP]->bind(Form("tcp://*:%d",fStorageServerPort));
124             }
125             catch (const zmq::error_t& e)
126             {
127                 cout<<"MANAGER -- "<<e.what()<<endl;
128                 return 0;
129             }
130         }
131             break;
132         case CLIENT_COMMUNICATION_REQ:
133         {
134             fSockets[CLIENT_COMMUNICATION_REQ] =
135             new socket_t(*fContexts[CLIENT_COMMUNICATION_REQ],ZMQ_REQ);
136             try
137             {
138                 fSockets[CLIENT_COMMUNICATION_REQ]->connect(Form("tcp://%s:%d",fStorageServer.c_str(), fStorageClientPort));
139             }
140             catch (const zmq::error_t& e)
141             {
142                 cout<<"MANAGER -- "<<e.what()<<endl;
143                 return 0;
144             }
145         }
146             break;
147         case CLIENT_COMMUNICATION_REP:
148         {
149             fSockets[CLIENT_COMMUNICATION_REP] =
150             new socket_t(*fContexts[CLIENT_COMMUNICATION_REP],ZMQ_REP);
151             try
152             {
153                 fSockets[CLIENT_COMMUNICATION_REP]->bind(Form("tcp://*:%d",fStorageClientPort));
154             }
155             catch (const zmq::error_t& e)
156             {
157                 cout<<"MANAGER -- "<<e.what()<<endl;
158                 return 0;
159             }
160         }
161             break;
162         case EVENTS_SERVER_PUB:
163         {
164             fSockets[EVENTS_SERVER_PUB] =
165             new socket_t(*fContexts[EVENTS_SERVER_PUB],ZMQ_PUB);
166             try
167             {
168                 fSockets[EVENTS_SERVER_PUB]->bind(Form("tcp://*:%d",fEventServerPort));
169             }
170             catch (const zmq::error_t& e)
171             {
172                 cout<<"MANAGER -- "<<e.what()<<endl;
173                 return 0;
174             }
175         }
176             break;
177         case EVENTS_SERVER_SUB:
178         {
179             fSockets[EVENTS_SERVER_SUB] =
180             new socket_t(*fContexts[EVENTS_SERVER_SUB],ZMQ_SUB);
181             fSockets[EVENTS_SERVER_SUB]->setsockopt(ZMQ_SUBSCRIBE,"",0);
182             try
183             {
184                 fSockets[EVENTS_SERVER_SUB]->connect(Form("tcp://%s:%d",fEventServer.c_str(),fEventServerPort));
185             }
186             catch (const zmq::error_t& e)
187             {
188                 cout<<"MANAGER -- "<<e.what()<<endl;
189                 return 0;
190                 
191             }
192         }
193             break;
194         case XML_PUB:
195         {
196             fSockets[XML_PUB] =
197             new socket_t(*fContexts[XML_PUB],ZMQ_PUB);
198             try
199             {
200                 fSockets[XML_PUB]->bind(Form("tcp://*:%d",fXmlServerPort));
201             }
202             catch (const zmq::error_t& e)
203             {
204                 cout<<"MANAGER -- "<<e.what()<<endl;
205                 return 0;
206             }
207         }
208             break;
209         default:break;
210     }
211     return 1;
212 }
213
214 void AliStorageEventManager::Send(vector<serverListStruct> list,storageSockets socket)
215 {
216     //send size of the struct first
217     int numberOfRecords = list.size();
218     message_t message(20);
219     snprintf ((char *)message.data(), 20 ,"%d",numberOfRecords);
220     try{
221         fSockets[socket]->send(message);
222     }
223     catch(const zmq::error_t &e)
224     {
225         cout<<"MANAGER -- send vector -- "<<e.what()<<endl;
226     }
227     //if(numberOfRecords==0)return;
228     message_t *tmpMessage = new message_t();
229     
230     try{
231         fSockets[socket]->recv(tmpMessage);//empty message just to keep req-rep order
232     }
233     catch(const zmq::error_t &e)
234     {
235         cout<<"MANAGER -- send vector -- "<<e.what()<<endl;
236     }
237     // //prepare message with event's list
238     // char *buffer = reinterpret_cast<char*> (&list[0]);
239     // message_t *reply = new message_t((void*)buffer,
240     //                sizeof(serverListStruct)*numberOfRecords,0);
241     // fSockets[socket]->send(*reply);
242     // if(reply){delete reply;}
243     
244     message_t reply(sizeof(serverListStruct)*numberOfRecords);
245     memcpy(reply.data(), reinterpret_cast<const char*> (&list[0]), sizeof(serverListStruct)*numberOfRecords);
246     
247     try{
248         fSockets[socket]->send(reply);
249     }
250     catch(const zmq::error_t &e)
251     {
252         cout<<"MANAGER -- send vector -- "<<e.what()<<endl;
253     }
254     if(tmpMessage){delete tmpMessage;}
255 }
256
257 void AliStorageEventManager::Send(struct serverRequestStruct *request,storageSockets socket)
258 {
259     char *buffer = (char*)(request);
260     message_t *requestMessage = new message_t((void*)buffer,
261                                               sizeof(struct serverRequestStruct)
262                                               +sizeof(struct listRequestStruct)
263                                               +sizeof(struct eventStruct),freeBuff);
264     try{
265         fSockets[socket]->send(*requestMessage);
266     }
267     catch(const zmq::error_t &e)
268     {
269         cout<<"MANAGER -- send serverRequestStruct -- "<<e.what()<<endl;
270     }
271 }
272
273 bool AliStorageEventManager::Send(struct clientRequestStruct *request,storageSockets socket,int timeout)
274 {
275     pollitem_t items[1] =  {{*fSockets[socket],0,ZMQ_POLLIN,0}} ;
276     
277     
278     char *buffer = (char*)(request);
279     message_t *requestMessage = new message_t((void*)buffer,
280                                               sizeof(struct clientRequestStruct),freeBuff);
281     
282     try{
283         fSockets[socket]->send(*requestMessage);
284     }
285     catch (const zmq::error_t& e)
286     {
287         cout<<"MANAGER -- "<<e.what()<<endl;
288         cout<<e.num()<<endl;
289         if(fSockets[socket]){delete fSockets[socket];fSockets[socket]=0;}
290         
291         CreateSocket(socket);
292         delete requestMessage;
293         return 0;
294         
295     }
296     if(timeout>=0)
297     {
298         if(poll (&items[0], 1, timeout)==0)
299         {
300             delete requestMessage;
301             return 0;
302         }
303     }
304     delete requestMessage;
305     return 1;
306 }
307
308 void AliStorageEventManager::Send(long message,storageSockets socket)
309 {
310     stringstream streamBuffer;
311     streamBuffer << message;
312     string stringBuffer = streamBuffer.str();
313     char *buffer = (char*)stringBuffer.c_str();
314     message_t *replyMessage = new message_t((void*)buffer,sizeof(stringBuffer),freeBuff);
315     
316     try{
317         fSockets[socket]->send(*replyMessage);
318     }
319     catch(const zmq::error_t &e)
320     {
321         cout<<"MANAGER -- send long -- "<<e.what()<<endl;
322     }
323     delete replyMessage;
324     streamBuffer.str(string());
325     streamBuffer.clear();
326 }
327
328 void AliStorageEventManager::Send(bool message,storageSockets socket)
329 {
330     char *buffer;
331     if(message==true)
332     {
333         buffer = (char*)("true");
334     }
335     else
336     {
337         buffer = (char*)("false");
338     }
339     message_t *replyMessage = new message_t((void*)buffer,sizeof(buffer),freeBuff);
340     try{
341         fSockets[socket]->send(*replyMessage);
342     }
343     catch(const zmq::error_t &e)
344     {
345         cout<<"MANAGER -- send bool -- "<<e.what()<<endl;
346     }
347     delete replyMessage;
348 }
349
350 void AliStorageEventManager::Send(AliESDEvent *event, storageSockets socket)
351 {
352     TMessage tmess(kMESS_OBJECT);
353     tmess.Reset();
354     tmess.WriteObject(event);
355     TMessage::EnableSchemaEvolutionForAll(kTRUE);
356     
357     int bufsize = tmess.Length();
358     char* buf = (char*) malloc(bufsize * sizeof(char));
359     memcpy(buf, tmess.Buffer(), bufsize);
360     
361     message_t message((void*)buf, bufsize, freeBuff);
362     try{
363         fSockets[socket]->send(message);
364     }
365     catch(const zmq::error_t &e)
366     {
367         cout<<"MANAGER -- send AliESDEvent -- "<<e.what()<<endl;
368     }
369 }
370
371 void AliStorageEventManager::SendAsXml(AliESDEvent *event,storageSockets socket)
372 {
373     cout<<"SENDING AS XML"<<endl;
374     stringstream bufferStream;
375     bufferStream << "<?xml version=\"1.0\" encoding=\"utf-8\"?>"<<endl;
376     bufferStream << "<ESD xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xmlns:xsd=\"http://www.w3.org/2001/XMLSchema\">" << endl;
377     
378     for(int i=0;i<event->GetNumberOfTracks();i++)
379     {
380         AliESDtrack *track = event->GetTrack(i);
381         bufferStream << "\t<track mass=\""<<track->GetMass()<<"\">" <<endl;
382         const AliTrackPointArray *array = track->GetTrackPointArray();
383         
384         if(array)
385         {
386             const float *x = array->GetX();
387             const float *y = array->GetY();
388             const float *z = array->GetZ();
389             int n = array->GetNPoints();
390             
391             for(int j=0;j<n;j++)
392             {
393                 bufferStream <<"\t\t<point>"<<endl;
394                 bufferStream <<"\t\t\t<x>"<< x[j] <<"</x>"<<endl;
395                 bufferStream <<"\t\t\t<y>"<< y[j] <<"</y>"<<endl;
396                 bufferStream <<"\t\t\t<z>"<< z[j] <<"</z>"<<endl;
397                 bufferStream <<"\t\t</point>"<<endl;
398             }
399         }
400         else cout<<"no array"<<endl;
401         
402         bufferStream << "\t</track>"<<endl;
403     }
404     
405     bufferStream << "</ESD>"<<endl;
406     
407     string bufferString = bufferStream.str();
408     message_t message(bufferString.size());
409     memcpy (message.data(), bufferString.data(), bufferString.size());
410     
411     try{
412         fSockets[socket]->send(message);
413     }
414     catch(const zmq::error_t &e)
415     {
416         cout<<"MANAGER -- send send xml -- "<<e.what()<<endl;
417     }
418     cout<<"xml sent"<<endl;
419 }
420
421 vector<serverListStruct> AliStorageEventManager::GetServerListVector(storageSockets socket, int timeout)
422 {
423   pollitem_t items[1] =  {{*fSockets[socket],0,ZMQ_POLLIN,0}} ;
424   if(timeout>=0){if(poll (&items[0], 1, timeout)==0){vector<serverListStruct> emptyVector;return emptyVector;}}
425
426     //get size of the incomming message
427     message_t sizeMessage;
428     
429     try{
430         fSockets[socket]->recv(&sizeMessage);
431     }
432     catch(const zmq::error_t &e)
433     {
434         cout<<"MANAGER -- get vector -- "<<e.what()<<endl;
435     }
436     int numberOfRecords;
437     istringstream iss(static_cast<char*>(sizeMessage.data()));
438     iss >> numberOfRecords;
439     
440     if(numberOfRecords==0){cout<<"MANAGER -- list is empty"<<endl;}
441     
442     try{
443         fSockets[socket]->send(*(new message_t()));//receive empty message just to keep req-rep order
444     }
445     catch(const zmq::error_t &e)
446     {
447         cout<<"MANAGER -- get vector -- "<<e.what()<<endl;
448     }
449     //get list of events
450     message_t *response = new message_t(sizeof(serverListStruct)*numberOfRecords);
451     try{
452         fSockets[socket]->recv(response);
453     }
454     catch(const zmq::error_t &e)
455     {
456         cout<<"MANAGER -- get vector -- "<<e.what()<<endl;
457     }
458     
459     vector<serverListStruct> receivedList(static_cast<serverListStruct*>(response->data()), static_cast<serverListStruct*>(response->data()) + numberOfRecords);
460     
461     if (response) {delete response;}
462     return receivedList;
463 }
464
465 AliESDEvent* AliStorageEventManager::GetEvent(storageSockets socket,int timeout)
466 {
467     pollitem_t items[1] =  {{*fSockets[socket],0,ZMQ_POLLIN,0}} ;
468     if(timeout>=0){if(poll (&items[0], 1, timeout)==0){return NULL;}}
469     
470     message_t* message = new message_t();
471     
472     try
473     {
474         fSockets[socket]->recv(message);
475     }
476     catch (const zmq::error_t& e)
477     {
478         cout<<"MANAGER -- "<<e.what()<<endl;
479         return NULL;
480     }
481     
482     TBufferFile *mess = new TBufferFile(TBuffer::kRead,
483                                         message->size()+sizeof(UInt_t),
484                                         message->data());
485     mess->InitMap();
486     mess->ReadClass();// get first the class stored in message
487     mess->SetBufferOffset(sizeof(UInt_t) + sizeof(kMESS_OBJECT));
488     mess->ResetMap();
489     
490     AliESDEvent* data = (AliESDEvent*)(mess->ReadObjectAny(AliESDEvent::Class()));
491     
492     if (data)
493     {
494         data->GetStdContent();
495         if(message){delete message;}
496         return data;
497     }
498     else
499     {
500         if(message){delete message;}
501         return NULL;
502     }
503 }
504
505 struct serverRequestStruct* AliStorageEventManager::GetServerStruct(storageSockets socket)
506 {
507     struct serverRequestStruct *request = new struct serverRequestStruct;
508     message_t *requestMessage = new message_t();
509     try{
510         fSockets[socket]->recv(requestMessage);
511     }
512     catch(const zmq::error_t &e)
513     {
514         cout<<"MANAGER -- get serverRequestStruct -- "<<e.what()<<endl;
515     }
516     request = static_cast<struct serverRequestStruct*>(requestMessage->data());
517     return request;
518 }
519
520 struct clientRequestStruct* AliStorageEventManager::GetClientStruct(storageSockets socket,int timeout)
521 {
522     pollitem_t items[1] =  {{*fSockets[socket],0,ZMQ_POLLIN,0}} ;
523     if(timeout>=0){if(poll (&items[0], 1, timeout)==0){return NULL;}}
524     
525     struct clientRequestStruct *request = new struct clientRequestStruct;
526     message_t *requestMessage = new message_t();
527     try{
528         fSockets[socket]->recv(requestMessage);
529     }
530     catch(const zmq::error_t &e)
531     {
532         cout<<"MANAGER -- get clientRequestStruct -- "<<e.what()<<endl;
533     }
534     request = static_cast<struct clientRequestStruct*>(requestMessage->data());
535     return request;
536 }
537
538 bool AliStorageEventManager::GetBool(storageSockets socket)
539 {
540     message_t *response = new message_t();
541     try{
542         fSockets[socket]->recv(response);
543     }
544     catch(const zmq::error_t &e)
545     {
546         cout<<"MANAGER -- get bool -- "<<e.what()<<endl;
547     }
548     char *result = (char*)response->data();
549     
550     if(!strcmp("true",result)){return true;}
551     else{return false;}
552 }
553
554 long AliStorageEventManager::GetLong(storageSockets socket)
555 {
556     message_t *responseMessage = new message_t();
557     try{
558         fSockets[socket]->recv(responseMessage);
559     }
560     catch(const zmq::error_t &e)
561     {
562         cout<<"MANAGER -- get long -- "<<e.what()<<endl;
563     }
564     
565     long result = 0;
566     
567     if(responseMessage)
568     {
569         result = (long)atoi(static_cast<char*>(responseMessage->data()));
570         delete responseMessage;
571     }
572     return result;
573 }
574