]> git.uio.no Git - u/mrichter/AliRoot.git/blob - MONITOR/alistoragemanager/AliStorageEventManager.cxx
Merge branch 'master' of https://git.cern.ch/reps/AliRoot
[u/mrichter/AliRoot.git] / MONITOR / alistoragemanager / AliStorageEventManager.cxx
1 #include "AliStorageEventManager.h"
2
3 #include <iostream>
4 #include <sstream>
5 #include <fstream>
6
7 #include <TList.h>
8 #include <TStreamerInfo.h>
9 #include <TThread.h>
10
11 #include "zmq.hpp"
12
13 #include "AliESDEvent.h"
14 #include "AliESDtrack.h"
15 #include "AliTrackPointArray.h"
16 #include "AliESDfriendTrack.h"
17 #include "AliExternalTrackParam.h"
18 #include "AliTrackerBase.h"
19 #include "AliTracker.h"
20
21 using namespace zmq;
22 using namespace std;
23
24 AliStorageEventManager *AliStorageEventManager::fManagerInstance = 0;
25
26 AliStorageEventManager::AliStorageEventManager()
27 {
28         //read config file
29         TThread::Lock();
30         ifstream configFile (GetConfigFilePath());
31     
32         if (configFile.is_open())
33         {
34                 string line;
35                 int from,to;
36                 while(configFile.good())
37                 {
38                         getline(configFile,line);
39                         from = line.find("\"")+1;
40                         to = line.find_last_of("\"");
41                         if(line.find("STORAGE_SERVER=")==0)
42                         {
43                                 fStorageServer=line.substr(from,to-from);
44                         }
45                         else if(line.find("EVENT_SERVER=")==0)
46                         {
47                                 fEventServer=line.substr(from,to-from);
48                         }
49                         else if(line.find("STORAGE_SERVER_PORT=")==0)
50                         {
51                                 fStorageServerPort=atoi(line.substr(from,to-from).c_str());
52                         }
53                         else if(line.find("EVENT_SERVER_PORT=")==0)
54                         {
55                                 fEventServerPort=atoi(line.substr(from,to-from).c_str());
56                         }
57                         else if(line.find("STORAGE_CLIENT_PORT=")==0)
58                         {
59                                 fStorageClientPort=atoi(line.substr(from,to-from).c_str());
60                         }
61                         else if(line.find("XML_SERVER_PORT=")==0)
62                         {
63                                 fXmlServerPort=atoi(line.substr(from,to-from).c_str());
64                         }
65                 }
66                 if(configFile.eof())
67                 {
68                         configFile.clear();
69                 }
70                 configFile.close();
71         }
72         else
73         {
74                 cout<<"EVENT MANAGER -- Unable to open config file"<<endl;
75         }
76         TThread::UnLock();
77         
78         for(int i=0;i<NUMBER_OF_SOCKETS;i++)
79         {
80                 fContexts[i] = new context_t();
81         }
82 }
83 AliStorageEventManager::~AliStorageEventManager()
84 {
85         if(fManagerInstance){delete fManagerInstance;fManagerInstance=0;}
86 }
87
88 AliStorageEventManager* AliStorageEventManager::GetEventManagerInstance()
89 {
90         TThread::Lock();
91         if(fManagerInstance==0)
92         {
93                 fManagerInstance = new AliStorageEventManager();
94         }
95         TThread::UnLock();
96         return fManagerInstance;
97 }
98
99
100 void freeBuff (void *data, void *hint)
101 {
102   //  free(data);
103 }
104
105 bool AliStorageEventManager::CreateSocket(storageSockets socket)
106 {
107     cout<<"Creating socket:"<<socket<<endl;
108     
109     switch(socket)
110     {
111         case SERVER_COMMUNICATION_REQ:
112         {
113             fSockets[SERVER_COMMUNICATION_REQ] =
114             new socket_t(*fContexts[SERVER_COMMUNICATION_REQ],ZMQ_REQ);
115             try
116             {
117                 fSockets[SERVER_COMMUNICATION_REQ]->connect(Form("tcp://%s:%d",fStorageServer.c_str(),fStorageServerPort));
118             }
119             catch (const zmq::error_t& e)
120             {
121                 cout<<"MANAGER -- "<<e.what()<<endl;
122                 return 0;
123             }
124         }
125             break;
126         case SERVER_COMMUNICATION_REP:
127         {
128             fSockets[SERVER_COMMUNICATION_REP] =
129             new socket_t(*fContexts[SERVER_COMMUNICATION_REP],ZMQ_REP);
130             try
131             {
132                 fSockets[SERVER_COMMUNICATION_REP]->bind(Form("tcp://*:%d",fStorageServerPort));
133             }
134             catch (const zmq::error_t& e)
135             {
136                 cout<<"MANAGER -- "<<e.what()<<endl;
137                 return 0;
138             }
139         }
140             break;
141         case CLIENT_COMMUNICATION_REQ:
142         {
143             fSockets[CLIENT_COMMUNICATION_REQ] =
144             new socket_t(*fContexts[CLIENT_COMMUNICATION_REQ],ZMQ_REQ);
145             try
146             {
147                 fSockets[CLIENT_COMMUNICATION_REQ]->connect(Form("tcp://%s:%d",fStorageServer.c_str(), fStorageClientPort));
148             }
149             catch (const zmq::error_t& e)
150             {
151                 cout<<"MANAGER -- "<<e.what()<<endl;
152                 return 0;
153             }
154         }
155             break;
156         case CLIENT_COMMUNICATION_REP:
157         {
158             fSockets[CLIENT_COMMUNICATION_REP] =
159             new socket_t(*fContexts[CLIENT_COMMUNICATION_REP],ZMQ_REP);
160             try
161             {
162                 fSockets[CLIENT_COMMUNICATION_REP]->bind(Form("tcp://*:%d",fStorageClientPort));
163             }
164             catch (const zmq::error_t& e)
165             {
166                 cout<<"MANAGER -- "<<e.what()<<endl;
167                 return 0;
168             }
169         }
170             break;
171         case EVENTS_SERVER_PUB:
172         {
173             fSockets[EVENTS_SERVER_PUB] =
174             new socket_t(*fContexts[EVENTS_SERVER_PUB],ZMQ_PUB);
175             try
176             {
177                 fSockets[EVENTS_SERVER_PUB]->bind(Form("tcp://*:%d",fEventServerPort));
178             }
179             catch (const zmq::error_t& e)
180             {
181                 cout<<"MANAGER -- "<<e.what()<<endl;
182                 return 0;
183             }
184         }
185             break;
186         case EVENTS_SERVER_SUB:
187         {
188             fSockets[EVENTS_SERVER_SUB] =
189             new socket_t(*fContexts[EVENTS_SERVER_SUB],ZMQ_SUB);
190             fSockets[EVENTS_SERVER_SUB]->setsockopt(ZMQ_SUBSCRIBE,"",0);
191             try
192             {
193                 fSockets[EVENTS_SERVER_SUB]->connect(Form("tcp://%s:%d",fEventServer.c_str(),fEventServerPort));
194             }
195             catch (const zmq::error_t& e)
196             {
197                 cout<<"MANAGER -- "<<e.what()<<endl;
198                 return 0;
199                 
200             }
201         }
202             break;
203         case XML_PUB:
204         {
205             fSockets[XML_PUB] =
206             new socket_t(*fContexts[XML_PUB],ZMQ_PUB);
207             try
208             {
209                 fSockets[XML_PUB]->bind(Form("tcp://*:%d",fXmlServerPort));
210             }
211             catch (const zmq::error_t& e)
212             {
213                 cout<<"MANAGER -- "<<e.what()<<endl;
214                 return 0;
215             }
216         }
217             break;
218         default:break;
219     }
220     return 1;
221 }
222
223 void AliStorageEventManager::Send(vector<serverListStruct> list,storageSockets socket)
224 {
225         //send size of the struct first
226         int numberOfRecords = list.size();
227         message_t message(20);
228         snprintf ((char *)message.data(), 20 ,"%d",numberOfRecords);
229
230         fSockets[socket]->send(message);
231         if(numberOfRecords==0)return;
232     message_t *tmpMessage = new message_t();
233         fSockets[socket]->recv(tmpMessage);//empty message just to keep req-rep order
234     
235         // //prepare message with event's list
236         // char *buffer = reinterpret_cast<char*> (&list[0]);
237         // message_t *reply = new message_t((void*)buffer,
238         //                    sizeof(serverListStruct)*numberOfRecords,0);
239         // fSockets[socket]->send(*reply);
240         // if(reply){delete reply;}
241
242         message_t reply(sizeof(serverListStruct)*numberOfRecords);
243         memcpy(reply.data(), reinterpret_cast<const char*> (&list[0]), sizeof(serverListStruct)*numberOfRecords);
244
245         fSockets[socket]->send(reply);
246     if(tmpMessage){delete tmpMessage;}
247 }
248
249 void AliStorageEventManager::Send(struct serverRequestStruct *request,storageSockets socket)
250 {
251         char *buffer = (char*)(request);
252         message_t *requestMessage = new message_t((void*)buffer,
253                                            sizeof(struct serverRequestStruct)
254                                            +sizeof(struct listRequestStruct)
255                                            +sizeof(struct eventStruct),freeBuff);
256         fSockets[socket]->send(*requestMessage);
257 }
258
259 bool AliStorageEventManager::Send(struct clientRequestStruct *request,storageSockets socket,int timeout)
260 {
261         pollitem_t items[1] =  {{*fSockets[socket],0,ZMQ_POLLIN,0}} ;
262         
263         
264         char *buffer = (char*)(request);
265         message_t *requestMessage = new message_t((void*)buffer,
266                                                   sizeof(struct clientRequestStruct),freeBuff);
267
268         try
269         {
270                 fSockets[socket]->send(*requestMessage);
271         }
272         catch (const zmq::error_t& e)
273         {
274                 cout<<"MANAGER -- "<<e.what()<<endl;
275                 cout<<e.num()<<endl;
276                 if(fSockets[socket]){delete fSockets[socket];fSockets[socket]=0;}
277
278                 CreateSocket(socket);
279                 delete requestMessage;
280                 return 0;
281                 
282         }       
283         if(timeout>=0)
284         {
285                 if(poll (&items[0], 1, timeout)==0)
286                 {
287                   delete requestMessage;
288                         return 0;
289                 }
290         }
291         delete requestMessage;          
292         return 1;
293 }
294
295 void AliStorageEventManager::Send(long message,storageSockets socket)
296 {
297         stringstream streamBuffer;
298         streamBuffer << message;
299         string stringBuffer = streamBuffer.str();
300         char *buffer = (char*)stringBuffer.c_str();
301         message_t *replyMessage = new message_t((void*)buffer,sizeof(stringBuffer),freeBuff);
302     
303         fSockets[socket]->send(*replyMessage);
304         delete replyMessage;
305         streamBuffer.str(string());
306         streamBuffer.clear();
307 }
308
309 void AliStorageEventManager::Send(bool message,storageSockets socket)
310 {
311         char *buffer;
312         if(message==true)
313         {
314                 buffer = (char*)("true");
315         }
316         else
317         {
318                 buffer = (char*)("false");
319         }
320         message_t *replyMessage = new message_t((void*)buffer,sizeof(buffer),freeBuff);
321         fSockets[socket]->send(*replyMessage);
322         delete replyMessage;
323 }
324
325 void AliStorageEventManager::Send(AliESDEvent *event, storageSockets socket)
326 {
327         TMessage tmess(kMESS_OBJECT);
328         tmess.Reset();
329         tmess.WriteObject(event);
330         TMessage::EnableSchemaEvolutionForAll(kTRUE);
331
332         int bufsize = tmess.Length();
333         char* buf = (char*) malloc(bufsize * sizeof(char));
334         memcpy(buf, tmess.Buffer(), bufsize);
335         
336         message_t message((void*)buf, bufsize, freeBuff);
337         fSockets[socket]->send(message);
338 }
339
340 void AliStorageEventManager::SendAsXml(AliESDEvent *event,storageSockets socket)
341 {
342         cout<<"SENDING AS XML"<<endl;
343         stringstream bufferStream;
344         bufferStream << "<?xml version=\"1.0\" encoding=\"utf-8\"?>"<<endl;
345         bufferStream << "<ESD xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xmlns:xsd=\"http://www.w3.org/2001/XMLSchema\">" << endl;
346         
347         for(int i=0;i<event->GetNumberOfTracks();i++)
348         {
349                 AliESDtrack *track = event->GetTrack(i);
350                 bufferStream << "\t<track mass=\""<<track->GetMass()<<"\">" <<endl;
351                 const AliTrackPointArray *array = track->GetTrackPointArray();
352
353                 if(array)
354                 {
355                         const float *x = array->GetX();
356                         const float *y = array->GetY();
357                         const float *z = array->GetZ();
358                         int n = array->GetNPoints();
359
360                         for(int j=0;j<n;j++)
361                         {
362                                 bufferStream <<"\t\t<point>"<<endl;
363                                 bufferStream <<"\t\t\t<x>"<< x[j] <<"</x>"<<endl;
364                                 bufferStream <<"\t\t\t<y>"<< y[j] <<"</y>"<<endl;
365                                 bufferStream <<"\t\t\t<z>"<< z[j] <<"</z>"<<endl;
366                                 bufferStream <<"\t\t</point>"<<endl;
367                         }
368                 }
369                 else cout<<"no array"<<endl;
370
371                 bufferStream << "\t</track>"<<endl;
372         }
373
374         bufferStream << "</ESD>"<<endl;
375
376         string bufferString = bufferStream.str();
377         message_t message(bufferString.size());
378         memcpy (message.data(), bufferString.data(), bufferString.size());      
379         
380         fSockets[socket]->send(message);
381         cout<<"xml sent"<<endl;
382 }
383
384 vector<serverListStruct> AliStorageEventManager::GetServerListVector(storageSockets socket)
385 {
386         //get size of the incomming message
387         message_t sizeMessage;
388         fSockets[socket]->recv(&sizeMessage);
389         int numberOfRecords;
390         istringstream iss(static_cast<char*>(sizeMessage.data()));
391         iss >> numberOfRecords;
392
393         if(numberOfRecords==0){cout<<"MANAGER -- list is empty"<<endl;}
394         
395         fSockets[socket]->send(*(new message_t()));//receive empty message just to keep req-rep order
396         
397 //get list of events
398         message_t *response = new message_t(sizeof(serverListStruct)*numberOfRecords);
399         fSockets[socket]->recv(response);
400         
401         vector<serverListStruct> receivedList(static_cast<serverListStruct*>(response->data()), static_cast<serverListStruct*>(response->data()) + numberOfRecords);
402
403         if (response) {delete response;}
404         return receivedList;
405 }
406
407 AliESDEvent* AliStorageEventManager::GetEvent(storageSockets socket,int timeout)
408 {
409   message_t* message = new message_t();
410
411   try
412   {
413     fSockets[socket]->recv(message);
414   }
415   catch (const zmq::error_t& e)
416   {
417     cout<<"MANAGER -- "<<e.what()<<endl;
418     return NULL;
419   }
420         
421   TBufferFile *mess = new TBufferFile(TBuffer::kRead,
422                                       message->size()+sizeof(UInt_t),
423                                       message->data());
424   mess->InitMap();
425   mess->ReadClass();// get first the class stored in message
426   mess->SetBufferOffset(sizeof(UInt_t) + sizeof(kMESS_OBJECT));
427   mess->ResetMap();
428         
429   AliESDEvent* data = (AliESDEvent*)(mess->ReadObjectAny(AliESDEvent::Class()));
430
431   if (data)
432   {
433     data->GetStdContent();
434     if(message){delete message;}
435     return data;
436   }
437   else
438   {
439     if(message){delete message;}
440     return NULL;
441   }
442 }
443
444 struct serverRequestStruct* AliStorageEventManager::GetServerStruct(storageSockets socket)
445 {
446         struct serverRequestStruct *request = new struct serverRequestStruct;
447         message_t *requestMessage = new message_t();
448         fSockets[socket]->recv(requestMessage);
449         request = static_cast<struct serverRequestStruct*>(requestMessage->data());
450         return request;
451 }
452
453 struct clientRequestStruct* AliStorageEventManager::GetClientStruct(storageSockets socket)
454 {
455         struct clientRequestStruct *request = new struct clientRequestStruct;
456         message_t *requestMessage = new message_t();
457         fSockets[socket]->recv(requestMessage);
458         request = static_cast<struct clientRequestStruct*>(requestMessage->data());
459         return request;
460 }
461
462 bool AliStorageEventManager::GetBool(storageSockets socket)
463 {
464         message_t *response = new message_t();
465         fSockets[socket]->recv(response);
466         char *result = (char*)response->data();
467         
468         if(!strcmp("true",result)){return true;}
469         else{return false;}
470 }
471
472 long AliStorageEventManager::GetLong(storageSockets socket)
473 {
474         message_t *responseMessage = new message_t();
475         fSockets[socket]->recv(responseMessage);
476     
477     long result = 0;
478     
479     if(responseMessage)
480     {
481         result = (long)atoi(static_cast<char*>(responseMessage->data()));
482         delete responseMessage;
483     }
484         return result;
485 }
486