]> git.uio.no Git - u/mrichter/AliRoot.git/blob - MONITOR/alistoragemanager/AliStorageEventManager.cxx
350de7bc8bd6c00def4d4773d7cfc9fb9791d36d
[u/mrichter/AliRoot.git] / MONITOR / alistoragemanager / AliStorageEventManager.cxx
1 #include "AliStorageEventManager.h"
2
3 #include <iostream>
4 #include <sstream>
5 #include <fstream>
6
7 #include <TList.h>
8 #include <TStreamerInfo.h>
9 #include <TThread.h>
10
11 #include "zmq.hpp"
12
13 #include "AliESDEvent.h"
14 #include "AliESDtrack.h"
15 #include "AliTrackPointArray.h"
16 #include "AliESDfriendTrack.h"
17 #include "AliExternalTrackParam.h"
18 #include "AliTrackerBase.h"
19 #include "AliTracker.h"
20
21 using namespace zmq;
22 using namespace std;
23
24 AliStorageEventManager *AliStorageEventManager::fManagerInstance = 0;
25
26 AliStorageEventManager::AliStorageEventManager()
27 {
28     //read config file
29     TThread::Lock();
30     ifstream configFile (GetConfigFilePath());
31     
32     if (configFile.is_open())
33     {
34         string line;
35         int from,to;
36         while(configFile.good())
37         {
38             getline(configFile,line);
39             from = line.find("\"")+1;
40             to = line.find_last_of("\"");
41             if(line.find("STORAGE_SERVER=")==0)
42             {
43                 fStorageServer=line.substr(from,to-from);
44             }
45             else if(line.find("EVENT_SERVER=")==0)
46             {
47                 fEventServer=line.substr(from,to-from);
48             }
49             else if(line.find("STORAGE_SERVER_PORT=")==0)
50             {
51                 fStorageServerPort=atoi(line.substr(from,to-from).c_str());
52             }
53             else if(line.find("EVENT_SERVER_PORT=")==0)
54             {
55                 fEventServerPort=atoi(line.substr(from,to-from).c_str());
56             }
57             else if(line.find("STORAGE_CLIENT_PORT=")==0)
58             {
59                 fStorageClientPort=atoi(line.substr(from,to-from).c_str());
60             }
61             else if(line.find("XML_SERVER_PORT=")==0)
62             {
63                 fXmlServerPort=atoi(line.substr(from,to-from).c_str());
64             }
65         }
66         if(configFile.eof())
67         {
68             configFile.clear();
69         }
70         configFile.close();
71     }
72     else
73     {
74         cout<<"EVENT MANAGER -- Unable to open config file"<<endl;
75     }
76     TThread::UnLock();
77     
78     for(int i=0;i<NUMBER_OF_SOCKETS;i++)
79     {
80         fContexts[i] = new context_t();
81     }
82 }
83 AliStorageEventManager::~AliStorageEventManager()
84 {
85     if(fManagerInstance){delete fManagerInstance;fManagerInstance=0;}
86 }
87
88 AliStorageEventManager* AliStorageEventManager::GetEventManagerInstance()
89 {
90     TThread::Lock();
91     if(fManagerInstance==0)
92     {
93         fManagerInstance = new AliStorageEventManager();
94     }
95     TThread::UnLock();
96     return fManagerInstance;
97 }
98
99
100 void freeBuff (void *data, void *hint)
101 {
102     //  free(data);
103 }
104
105 bool AliStorageEventManager::CreateSocket(storageSockets socket)
106 {
107     cout<<"Creating socket:"<<socket<<endl;
108     
109     switch(socket)
110     {
111         case SERVER_COMMUNICATION_REQ:
112         {
113             fSockets[SERVER_COMMUNICATION_REQ] =
114             new socket_t(*fContexts[SERVER_COMMUNICATION_REQ],ZMQ_REQ);
115             try
116             {
117                 fSockets[SERVER_COMMUNICATION_REQ]->connect(Form("tcp://%s:%d",fStorageServer.c_str(),fStorageServerPort));
118             }
119             catch (const zmq::error_t& e)
120             {
121                 cout<<"MANAGER -- "<<e.what()<<endl;
122                 return 0;
123             }
124         }
125             break;
126         case SERVER_COMMUNICATION_REP:
127         {
128             fSockets[SERVER_COMMUNICATION_REP] =
129             new socket_t(*fContexts[SERVER_COMMUNICATION_REP],ZMQ_REP);
130             try
131             {
132                 fSockets[SERVER_COMMUNICATION_REP]->bind(Form("tcp://*:%d",fStorageServerPort));
133             }
134             catch (const zmq::error_t& e)
135             {
136                 cout<<"MANAGER -- "<<e.what()<<endl;
137                 return 0;
138             }
139         }
140             break;
141         case CLIENT_COMMUNICATION_REQ:
142         {
143             fSockets[CLIENT_COMMUNICATION_REQ] =
144             new socket_t(*fContexts[CLIENT_COMMUNICATION_REQ],ZMQ_REQ);
145             try
146             {
147                 fSockets[CLIENT_COMMUNICATION_REQ]->connect(Form("tcp://%s:%d",fStorageServer.c_str(), fStorageClientPort));
148             }
149             catch (const zmq::error_t& e)
150             {
151                 cout<<"MANAGER -- "<<e.what()<<endl;
152                 return 0;
153             }
154         }
155             break;
156         case CLIENT_COMMUNICATION_REP:
157         {
158             fSockets[CLIENT_COMMUNICATION_REP] =
159             new socket_t(*fContexts[CLIENT_COMMUNICATION_REP],ZMQ_REP);
160             try
161             {
162                 fSockets[CLIENT_COMMUNICATION_REP]->bind(Form("tcp://*:%d",fStorageClientPort));
163             }
164             catch (const zmq::error_t& e)
165             {
166                 cout<<"MANAGER -- "<<e.what()<<endl;
167                 return 0;
168             }
169         }
170             break;
171         case EVENTS_SERVER_PUB:
172         {
173             fSockets[EVENTS_SERVER_PUB] =
174             new socket_t(*fContexts[EVENTS_SERVER_PUB],ZMQ_PUB);
175             try
176             {
177                 fSockets[EVENTS_SERVER_PUB]->bind(Form("tcp://*:%d",fEventServerPort));
178             }
179             catch (const zmq::error_t& e)
180             {
181                 cout<<"MANAGER -- "<<e.what()<<endl;
182                 return 0;
183             }
184         }
185             break;
186         case EVENTS_SERVER_SUB:
187         {
188             fSockets[EVENTS_SERVER_SUB] =
189             new socket_t(*fContexts[EVENTS_SERVER_SUB],ZMQ_SUB);
190             fSockets[EVENTS_SERVER_SUB]->setsockopt(ZMQ_SUBSCRIBE,"",0);
191             try
192             {
193                 fSockets[EVENTS_SERVER_SUB]->connect(Form("tcp://%s:%d",fEventServer.c_str(),fEventServerPort));
194             }
195             catch (const zmq::error_t& e)
196             {
197                 cout<<"MANAGER -- "<<e.what()<<endl;
198                 return 0;
199                 
200             }
201         }
202             break;
203         case XML_PUB:
204         {
205             fSockets[XML_PUB] =
206             new socket_t(*fContexts[XML_PUB],ZMQ_PUB);
207             try
208             {
209                 fSockets[XML_PUB]->bind(Form("tcp://*:%d",fXmlServerPort));
210             }
211             catch (const zmq::error_t& e)
212             {
213                 cout<<"MANAGER -- "<<e.what()<<endl;
214                 return 0;
215             }
216         }
217             break;
218         default:break;
219     }
220     return 1;
221 }
222
223 void AliStorageEventManager::Send(vector<serverListStruct> list,storageSockets socket)
224 {
225     //send size of the struct first
226     int numberOfRecords = list.size();
227     message_t message(20);
228     snprintf ((char *)message.data(), 20 ,"%d",numberOfRecords);
229     
230     fSockets[socket]->send(message);
231     if(numberOfRecords==0)return;
232     message_t *tmpMessage = new message_t();
233     fSockets[socket]->recv(tmpMessage);//empty message just to keep req-rep order
234     
235     // //prepare message with event's list
236     // char *buffer = reinterpret_cast<char*> (&list[0]);
237     // message_t *reply = new message_t((void*)buffer,
238     //                sizeof(serverListStruct)*numberOfRecords,0);
239     // fSockets[socket]->send(*reply);
240     // if(reply){delete reply;}
241     
242     message_t reply(sizeof(serverListStruct)*numberOfRecords);
243     memcpy(reply.data(), reinterpret_cast<const char*> (&list[0]), sizeof(serverListStruct)*numberOfRecords);
244     
245     fSockets[socket]->send(reply);
246     if(tmpMessage){delete tmpMessage;}
247 }
248
249 void AliStorageEventManager::Send(struct serverRequestStruct *request,storageSockets socket)
250 {
251     char *buffer = (char*)(request);
252     message_t *requestMessage = new message_t((void*)buffer,
253                                               sizeof(struct serverRequestStruct)
254                                               +sizeof(struct listRequestStruct)
255                                               +sizeof(struct eventStruct),freeBuff);
256     fSockets[socket]->send(*requestMessage);
257 }
258
259 bool AliStorageEventManager::Send(struct clientRequestStruct *request,storageSockets socket,int timeout)
260 {
261     pollitem_t items[1] =  {{*fSockets[socket],0,ZMQ_POLLIN,0}} ;
262     
263     
264     char *buffer = (char*)(request);
265     message_t *requestMessage = new message_t((void*)buffer,
266                                               sizeof(struct clientRequestStruct),freeBuff);
267     
268     try
269     {
270         fSockets[socket]->send(*requestMessage);
271     }
272     catch (const zmq::error_t& e)
273     {
274         cout<<"MANAGER -- "<<e.what()<<endl;
275         cout<<e.num()<<endl;
276         if(fSockets[socket]){delete fSockets[socket];fSockets[socket]=0;}
277         
278         CreateSocket(socket);
279         delete requestMessage;
280         return 0;
281         
282     }
283     if(timeout>=0)
284     {
285         if(poll (&items[0], 1, timeout)==0)
286         {
287             delete requestMessage;
288             return 0;
289         }
290     }
291     delete requestMessage;
292     return 1;
293 }
294
295 void AliStorageEventManager::Send(long message,storageSockets socket)
296 {
297     stringstream streamBuffer;
298     streamBuffer << message;
299     string stringBuffer = streamBuffer.str();
300     char *buffer = (char*)stringBuffer.c_str();
301     message_t *replyMessage = new message_t((void*)buffer,sizeof(stringBuffer),freeBuff);
302     
303     fSockets[socket]->send(*replyMessage);
304     delete replyMessage;
305     streamBuffer.str(string());
306     streamBuffer.clear();
307 }
308
309 void AliStorageEventManager::Send(bool message,storageSockets socket)
310 {
311     char *buffer;
312     if(message==true)
313     {
314         buffer = (char*)("true");
315     }
316     else
317     {
318         buffer = (char*)("false");
319     }
320     message_t *replyMessage = new message_t((void*)buffer,sizeof(buffer),freeBuff);
321     fSockets[socket]->send(*replyMessage);
322     delete replyMessage;
323 }
324
325 void AliStorageEventManager::Send(AliESDEvent *event, storageSockets socket)
326 {
327     TMessage tmess(kMESS_OBJECT);
328     tmess.Reset();
329     tmess.WriteObject(event);
330     TMessage::EnableSchemaEvolutionForAll(kTRUE);
331     
332     int bufsize = tmess.Length();
333     char* buf = (char*) malloc(bufsize * sizeof(char));
334     memcpy(buf, tmess.Buffer(), bufsize);
335     
336     message_t message((void*)buf, bufsize, freeBuff);
337     fSockets[socket]->send(message);
338 }
339
340 void AliStorageEventManager::SendAsXml(AliESDEvent *event,storageSockets socket)
341 {
342     cout<<"SENDING AS XML"<<endl;
343     stringstream bufferStream;
344     bufferStream << "<?xml version=\"1.0\" encoding=\"utf-8\"?>"<<endl;
345     bufferStream << "<ESD xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xmlns:xsd=\"http://www.w3.org/2001/XMLSchema\">" << endl;
346     
347     for(int i=0;i<event->GetNumberOfTracks();i++)
348     {
349         AliESDtrack *track = event->GetTrack(i);
350         bufferStream << "\t<track mass=\""<<track->GetMass()<<"\">" <<endl;
351         const AliTrackPointArray *array = track->GetTrackPointArray();
352         
353         if(array)
354         {
355             const float *x = array->GetX();
356             const float *y = array->GetY();
357             const float *z = array->GetZ();
358             int n = array->GetNPoints();
359             
360             for(int j=0;j<n;j++)
361             {
362                 bufferStream <<"\t\t<point>"<<endl;
363                 bufferStream <<"\t\t\t<x>"<< x[j] <<"</x>"<<endl;
364                 bufferStream <<"\t\t\t<y>"<< y[j] <<"</y>"<<endl;
365                 bufferStream <<"\t\t\t<z>"<< z[j] <<"</z>"<<endl;
366                 bufferStream <<"\t\t</point>"<<endl;
367             }
368         }
369         else cout<<"no array"<<endl;
370         
371         bufferStream << "\t</track>"<<endl;
372     }
373     
374     bufferStream << "</ESD>"<<endl;
375     
376     string bufferString = bufferStream.str();
377     message_t message(bufferString.size());
378     memcpy (message.data(), bufferString.data(), bufferString.size());
379     
380     fSockets[socket]->send(message);
381     cout<<"xml sent"<<endl;
382 }
383
384 vector<serverListStruct> AliStorageEventManager::GetServerListVector(storageSockets socket)
385 {
386     //get size of the incomming message
387     message_t sizeMessage;
388     fSockets[socket]->recv(&sizeMessage);
389     int numberOfRecords;
390     istringstream iss(static_cast<char*>(sizeMessage.data()));
391     iss >> numberOfRecords;
392     
393     if(numberOfRecords==0){cout<<"MANAGER -- list is empty"<<endl;}
394     
395     fSockets[socket]->send(*(new message_t()));//receive empty message just to keep req-rep order
396     
397     //get list of events
398     message_t *response = new message_t(sizeof(serverListStruct)*numberOfRecords);
399     fSockets[socket]->recv(response);
400     
401     vector<serverListStruct> receivedList(static_cast<serverListStruct*>(response->data()), static_cast<serverListStruct*>(response->data()) + numberOfRecords);
402     
403     if (response) {delete response;}
404     return receivedList;
405 }
406
407 AliESDEvent* AliStorageEventManager::GetEvent(storageSockets socket,int timeout)
408 {
409     pollitem_t items[1] =  {{*fSockets[socket],0,ZMQ_POLLIN,0}} ;
410     
411     if(timeout>=0)
412     {
413         if(poll (&items[0], 1, timeout)==0)
414         {
415             return NULL;
416         }
417     }
418     
419     message_t* message = new message_t();
420     
421     try
422     {
423         fSockets[socket]->recv(message);
424     }
425     catch (const zmq::error_t& e)
426     {
427         cout<<"MANAGER -- "<<e.what()<<endl;
428         return NULL;
429     }
430     
431     TBufferFile *mess = new TBufferFile(TBuffer::kRead,
432                                         message->size()+sizeof(UInt_t),
433                                         message->data());
434     mess->InitMap();
435     mess->ReadClass();// get first the class stored in message
436     mess->SetBufferOffset(sizeof(UInt_t) + sizeof(kMESS_OBJECT));
437     mess->ResetMap();
438     
439     AliESDEvent* data = (AliESDEvent*)(mess->ReadObjectAny(AliESDEvent::Class()));
440     
441     if (data)
442     {
443         data->GetStdContent();
444         if(message){delete message;}
445         return data;
446     }
447     else
448     {
449         if(message){delete message;}
450         return NULL;
451     }
452 }
453
454 struct serverRequestStruct* AliStorageEventManager::GetServerStruct(storageSockets socket)
455 {
456     struct serverRequestStruct *request = new struct serverRequestStruct;
457     message_t *requestMessage = new message_t();
458     fSockets[socket]->recv(requestMessage);
459     request = static_cast<struct serverRequestStruct*>(requestMessage->data());
460     return request;
461 }
462
463 struct clientRequestStruct* AliStorageEventManager::GetClientStruct(storageSockets socket)
464 {
465     struct clientRequestStruct *request = new struct clientRequestStruct;
466     message_t *requestMessage = new message_t();
467     fSockets[socket]->recv(requestMessage);
468     request = static_cast<struct clientRequestStruct*>(requestMessage->data());
469     return request;
470 }
471
472 bool AliStorageEventManager::GetBool(storageSockets socket)
473 {
474     message_t *response = new message_t();
475     fSockets[socket]->recv(response);
476     char *result = (char*)response->data();
477     
478     if(!strcmp("true",result)){return true;}
479     else{return false;}
480 }
481
482 long AliStorageEventManager::GetLong(storageSockets socket)
483 {
484     message_t *responseMessage = new message_t();
485     fSockets[socket]->recv(responseMessage);
486     
487     long result = 0;
488     
489     if(responseMessage)
490     {
491         result = (long)atoi(static_cast<char*>(responseMessage->data()));
492         delete responseMessage;
493     }
494     return result;
495 }
496