Client Thread of Storage Manager split into smaller classes. Preparation for moving...
[u/mrichter/AliRoot.git] / MONITOR / alistoragemanager / AliStorageEventManager.cxx
CommitLineData
164d3d29 1#include "AliStorageEventManager.h"
2
3#include <iostream>
4#include <sstream>
5#include <fstream>
6
7#include <TList.h>
8#include <TStreamerInfo.h>
9#include <TThread.h>
10
11#include "zmq.hpp"
12
13#include "AliESDEvent.h"
14#include "AliESDtrack.h"
15#include "AliTrackPointArray.h"
16#include "AliESDfriendTrack.h"
17#include "AliExternalTrackParam.h"
18#include "AliTrackerBase.h"
19#include "AliTracker.h"
20
21using namespace zmq;
22using namespace std;
23
24AliStorageEventManager *AliStorageEventManager::fManagerInstance = 0;
25
26AliStorageEventManager::AliStorageEventManager()
27{
626a3158 28 //read config file
29 TThread::Lock();
30 ifstream configFile (GetConfigFilePath());
164d3d29 31
626a3158 32 if (configFile.is_open())
33 {
34 string line;
35 int from,to;
36 while(configFile.good())
37 {
38 getline(configFile,line);
39 from = line.find("\"")+1;
40 to = line.find_last_of("\"");
41 if(line.find("STORAGE_SERVER=")==0)
42 {
43 fStorageServer=line.substr(from,to-from);
44 }
45 else if(line.find("EVENT_SERVER=")==0)
46 {
47 fEventServer=line.substr(from,to-from);
48 }
49 else if(line.find("STORAGE_SERVER_PORT=")==0)
50 {
51 fStorageServerPort=atoi(line.substr(from,to-from).c_str());
52 }
53 else if(line.find("EVENT_SERVER_PORT=")==0)
54 {
55 fEventServerPort=atoi(line.substr(from,to-from).c_str());
56 }
57 else if(line.find("STORAGE_CLIENT_PORT=")==0)
58 {
59 fStorageClientPort=atoi(line.substr(from,to-from).c_str());
60 }
61 else if(line.find("XML_SERVER_PORT=")==0)
62 {
63 fXmlServerPort=atoi(line.substr(from,to-from).c_str());
64 }
65 }
efaef546 66 if(configFile.eof()){configFile.clear();}
626a3158 67 configFile.close();
68 }
efaef546 69 else{cout<<"EVENT MANAGER -- Unable to open config file"<<endl;}
626a3158 70 TThread::UnLock();
71
efaef546 72 for(int i=0;i<NUMBER_OF_SOCKETS;i++){fContexts[i] = new context_t();}
164d3d29 73}
74AliStorageEventManager::~AliStorageEventManager()
75{
626a3158 76 if(fManagerInstance){delete fManagerInstance;fManagerInstance=0;}
164d3d29 77}
78
79AliStorageEventManager* AliStorageEventManager::GetEventManagerInstance()
80{
626a3158 81 TThread::Lock();
82 if(fManagerInstance==0)
83 {
84 fManagerInstance = new AliStorageEventManager();
85 }
86 TThread::UnLock();
87 return fManagerInstance;
164d3d29 88}
89
90
fae81379 91void freeBuff (void *data, void *hint)
164d3d29 92{
626a3158 93 // free(data);
164d3d29 94}
95
96bool AliStorageEventManager::CreateSocket(storageSockets socket)
97{
fae81379 98 cout<<"Creating socket:"<<socket<<endl;
99
100 switch(socket)
101 {
102 case SERVER_COMMUNICATION_REQ:
103 {
104 fSockets[SERVER_COMMUNICATION_REQ] =
105 new socket_t(*fContexts[SERVER_COMMUNICATION_REQ],ZMQ_REQ);
106 try
107 {
108 fSockets[SERVER_COMMUNICATION_REQ]->connect(Form("tcp://%s:%d",fStorageServer.c_str(),fStorageServerPort));
109 }
110 catch (const zmq::error_t& e)
111 {
112 cout<<"MANAGER -- "<<e.what()<<endl;
113 return 0;
114 }
115 }
116 break;
117 case SERVER_COMMUNICATION_REP:
118 {
119 fSockets[SERVER_COMMUNICATION_REP] =
120 new socket_t(*fContexts[SERVER_COMMUNICATION_REP],ZMQ_REP);
121 try
122 {
123 fSockets[SERVER_COMMUNICATION_REP]->bind(Form("tcp://*:%d",fStorageServerPort));
124 }
125 catch (const zmq::error_t& e)
126 {
127 cout<<"MANAGER -- "<<e.what()<<endl;
128 return 0;
129 }
130 }
131 break;
132 case CLIENT_COMMUNICATION_REQ:
133 {
134 fSockets[CLIENT_COMMUNICATION_REQ] =
135 new socket_t(*fContexts[CLIENT_COMMUNICATION_REQ],ZMQ_REQ);
136 try
137 {
138 fSockets[CLIENT_COMMUNICATION_REQ]->connect(Form("tcp://%s:%d",fStorageServer.c_str(), fStorageClientPort));
139 }
140 catch (const zmq::error_t& e)
141 {
142 cout<<"MANAGER -- "<<e.what()<<endl;
143 return 0;
144 }
145 }
146 break;
147 case CLIENT_COMMUNICATION_REP:
148 {
149 fSockets[CLIENT_COMMUNICATION_REP] =
150 new socket_t(*fContexts[CLIENT_COMMUNICATION_REP],ZMQ_REP);
151 try
152 {
153 fSockets[CLIENT_COMMUNICATION_REP]->bind(Form("tcp://*:%d",fStorageClientPort));
154 }
155 catch (const zmq::error_t& e)
156 {
157 cout<<"MANAGER -- "<<e.what()<<endl;
158 return 0;
159 }
160 }
161 break;
162 case EVENTS_SERVER_PUB:
163 {
164 fSockets[EVENTS_SERVER_PUB] =
165 new socket_t(*fContexts[EVENTS_SERVER_PUB],ZMQ_PUB);
166 try
167 {
168 fSockets[EVENTS_SERVER_PUB]->bind(Form("tcp://*:%d",fEventServerPort));
169 }
170 catch (const zmq::error_t& e)
171 {
172 cout<<"MANAGER -- "<<e.what()<<endl;
173 return 0;
174 }
175 }
176 break;
177 case EVENTS_SERVER_SUB:
178 {
179 fSockets[EVENTS_SERVER_SUB] =
180 new socket_t(*fContexts[EVENTS_SERVER_SUB],ZMQ_SUB);
181 fSockets[EVENTS_SERVER_SUB]->setsockopt(ZMQ_SUBSCRIBE,"",0);
182 try
183 {
184 fSockets[EVENTS_SERVER_SUB]->connect(Form("tcp://%s:%d",fEventServer.c_str(),fEventServerPort));
185 }
186 catch (const zmq::error_t& e)
187 {
188 cout<<"MANAGER -- "<<e.what()<<endl;
189 return 0;
190
191 }
192 }
193 break;
194 case XML_PUB:
195 {
196 fSockets[XML_PUB] =
197 new socket_t(*fContexts[XML_PUB],ZMQ_PUB);
198 try
199 {
200 fSockets[XML_PUB]->bind(Form("tcp://*:%d",fXmlServerPort));
201 }
202 catch (const zmq::error_t& e)
203 {
204 cout<<"MANAGER -- "<<e.what()<<endl;
205 return 0;
206 }
207 }
208 break;
209 default:break;
210 }
211 return 1;
164d3d29 212}
213
214void AliStorageEventManager::Send(vector<serverListStruct> list,storageSockets socket)
215{
626a3158 216 //send size of the struct first
217 int numberOfRecords = list.size();
218 message_t message(20);
219 snprintf ((char *)message.data(), 20 ,"%d",numberOfRecords);
efaef546 220 try{
221 fSockets[socket]->send(message);
222 }
223 catch(const zmq::error_t &e)
224 {
225 cout<<"MANAGER -- send vector -- "<<e.what()<<endl;
226 }
227 //if(numberOfRecords==0)return;
fae81379 228 message_t *tmpMessage = new message_t();
fae81379 229
efaef546 230 try{
231 fSockets[socket]->recv(tmpMessage);//empty message just to keep req-rep order
232 }
233 catch(const zmq::error_t &e)
234 {
235 cout<<"MANAGER -- send vector -- "<<e.what()<<endl;
236 }
626a3158 237 // //prepare message with event's list
238 // char *buffer = reinterpret_cast<char*> (&list[0]);
239 // message_t *reply = new message_t((void*)buffer,
240 // sizeof(serverListStruct)*numberOfRecords,0);
241 // fSockets[socket]->send(*reply);
242 // if(reply){delete reply;}
243
244 message_t reply(sizeof(serverListStruct)*numberOfRecords);
245 memcpy(reply.data(), reinterpret_cast<const char*> (&list[0]), sizeof(serverListStruct)*numberOfRecords);
246
efaef546 247 try{
248 fSockets[socket]->send(reply);
249 }
250 catch(const zmq::error_t &e)
251 {
252 cout<<"MANAGER -- send vector -- "<<e.what()<<endl;
253 }
fae81379 254 if(tmpMessage){delete tmpMessage;}
164d3d29 255}
256
257void AliStorageEventManager::Send(struct serverRequestStruct *request,storageSockets socket)
258{
626a3158 259 char *buffer = (char*)(request);
260 message_t *requestMessage = new message_t((void*)buffer,
261 sizeof(struct serverRequestStruct)
262 +sizeof(struct listRequestStruct)
263 +sizeof(struct eventStruct),freeBuff);
efaef546 264 try{
265 fSockets[socket]->send(*requestMessage);
266 }
267 catch(const zmq::error_t &e)
268 {
269 cout<<"MANAGER -- send serverRequestStruct -- "<<e.what()<<endl;
270 }
164d3d29 271}
272
273bool AliStorageEventManager::Send(struct clientRequestStruct *request,storageSockets socket,int timeout)
274{
626a3158 275 pollitem_t items[1] = {{*fSockets[socket],0,ZMQ_POLLIN,0}} ;
276
277
278 char *buffer = (char*)(request);
279 message_t *requestMessage = new message_t((void*)buffer,
280 sizeof(struct clientRequestStruct),freeBuff);
281
efaef546 282 try{
626a3158 283 fSockets[socket]->send(*requestMessage);
284 }
285 catch (const zmq::error_t& e)
286 {
287 cout<<"MANAGER -- "<<e.what()<<endl;
288 cout<<e.num()<<endl;
289 if(fSockets[socket]){delete fSockets[socket];fSockets[socket]=0;}
290
291 CreateSocket(socket);
292 delete requestMessage;
293 return 0;
294
295 }
296 if(timeout>=0)
297 {
298 if(poll (&items[0], 1, timeout)==0)
299 {
300 delete requestMessage;
301 return 0;
302 }
303 }
304 delete requestMessage;
305 return 1;
164d3d29 306}
307
308void AliStorageEventManager::Send(long message,storageSockets socket)
309{
626a3158 310 stringstream streamBuffer;
311 streamBuffer << message;
312 string stringBuffer = streamBuffer.str();
313 char *buffer = (char*)stringBuffer.c_str();
314 message_t *replyMessage = new message_t((void*)buffer,sizeof(stringBuffer),freeBuff);
fae81379 315
efaef546 316 try{
317 fSockets[socket]->send(*replyMessage);
318 }
319 catch(const zmq::error_t &e)
320 {
321 cout<<"MANAGER -- send long -- "<<e.what()<<endl;
322 }
626a3158 323 delete replyMessage;
324 streamBuffer.str(string());
325 streamBuffer.clear();
164d3d29 326}
327
328void AliStorageEventManager::Send(bool message,storageSockets socket)
329{
626a3158 330 char *buffer;
331 if(message==true)
332 {
333 buffer = (char*)("true");
334 }
335 else
336 {
337 buffer = (char*)("false");
338 }
339 message_t *replyMessage = new message_t((void*)buffer,sizeof(buffer),freeBuff);
efaef546 340 try{
341 fSockets[socket]->send(*replyMessage);
342 }
343 catch(const zmq::error_t &e)
344 {
345 cout<<"MANAGER -- send bool -- "<<e.what()<<endl;
346 }
626a3158 347 delete replyMessage;
164d3d29 348}
349
350void AliStorageEventManager::Send(AliESDEvent *event, storageSockets socket)
351{
626a3158 352 TMessage tmess(kMESS_OBJECT);
353 tmess.Reset();
354 tmess.WriteObject(event);
355 TMessage::EnableSchemaEvolutionForAll(kTRUE);
356
357 int bufsize = tmess.Length();
358 char* buf = (char*) malloc(bufsize * sizeof(char));
359 memcpy(buf, tmess.Buffer(), bufsize);
360
361 message_t message((void*)buf, bufsize, freeBuff);
efaef546 362 try{
363 fSockets[socket]->send(message);
364 }
365 catch(const zmq::error_t &e)
366 {
367 cout<<"MANAGER -- send AliESDEvent -- "<<e.what()<<endl;
368 }
164d3d29 369}
370
371void AliStorageEventManager::SendAsXml(AliESDEvent *event,storageSockets socket)
372{
626a3158 373 cout<<"SENDING AS XML"<<endl;
374 stringstream bufferStream;
375 bufferStream << "<?xml version=\"1.0\" encoding=\"utf-8\"?>"<<endl;
376 bufferStream << "<ESD xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xmlns:xsd=\"http://www.w3.org/2001/XMLSchema\">" << endl;
377
378 for(int i=0;i<event->GetNumberOfTracks();i++)
379 {
380 AliESDtrack *track = event->GetTrack(i);
381 bufferStream << "\t<track mass=\""<<track->GetMass()<<"\">" <<endl;
382 const AliTrackPointArray *array = track->GetTrackPointArray();
383
384 if(array)
385 {
386 const float *x = array->GetX();
387 const float *y = array->GetY();
388 const float *z = array->GetZ();
389 int n = array->GetNPoints();
390
391 for(int j=0;j<n;j++)
392 {
393 bufferStream <<"\t\t<point>"<<endl;
394 bufferStream <<"\t\t\t<x>"<< x[j] <<"</x>"<<endl;
395 bufferStream <<"\t\t\t<y>"<< y[j] <<"</y>"<<endl;
396 bufferStream <<"\t\t\t<z>"<< z[j] <<"</z>"<<endl;
397 bufferStream <<"\t\t</point>"<<endl;
398 }
399 }
400 else cout<<"no array"<<endl;
401
402 bufferStream << "\t</track>"<<endl;
403 }
404
405 bufferStream << "</ESD>"<<endl;
406
407 string bufferString = bufferStream.str();
408 message_t message(bufferString.size());
409 memcpy (message.data(), bufferString.data(), bufferString.size());
410
efaef546 411 try{
412 fSockets[socket]->send(message);
413 }
414 catch(const zmq::error_t &e)
415 {
416 cout<<"MANAGER -- send send xml -- "<<e.what()<<endl;
417 }
626a3158 418 cout<<"xml sent"<<endl;
164d3d29 419}
420
421vector<serverListStruct> AliStorageEventManager::GetServerListVector(storageSockets socket)
422{
626a3158 423 //get size of the incomming message
424 message_t sizeMessage;
efaef546 425
426 try{
427 fSockets[socket]->recv(&sizeMessage);
428 }
429 catch(const zmq::error_t &e)
430 {
431 cout<<"MANAGER -- get vector -- "<<e.what()<<endl;
432 }
626a3158 433 int numberOfRecords;
434 istringstream iss(static_cast<char*>(sizeMessage.data()));
435 iss >> numberOfRecords;
436
437 if(numberOfRecords==0){cout<<"MANAGER -- list is empty"<<endl;}
438
efaef546 439 try{
440 fSockets[socket]->send(*(new message_t()));//receive empty message just to keep req-rep order
441 }
442 catch(const zmq::error_t &e)
443 {
444 cout<<"MANAGER -- get vector -- "<<e.what()<<endl;
445 }
626a3158 446 //get list of events
447 message_t *response = new message_t(sizeof(serverListStruct)*numberOfRecords);
efaef546 448 try{
449 fSockets[socket]->recv(response);
450 }
451 catch(const zmq::error_t &e)
452 {
453 cout<<"MANAGER -- get vector -- "<<e.what()<<endl;
454 }
626a3158 455
456 vector<serverListStruct> receivedList(static_cast<serverListStruct*>(response->data()), static_cast<serverListStruct*>(response->data()) + numberOfRecords);
457
458 if (response) {delete response;}
459 return receivedList;
164d3d29 460}
461
fae81379 462AliESDEvent* AliStorageEventManager::GetEvent(storageSockets socket,int timeout)
164d3d29 463{
626a3158 464 pollitem_t items[1] = {{*fSockets[socket],0,ZMQ_POLLIN,0}} ;
465
466 if(timeout>=0)
467 {
468 if(poll (&items[0], 1, timeout)==0)
469 {
470 return NULL;
471 }
472 }
473
474 message_t* message = new message_t();
475
476 try
477 {
478 fSockets[socket]->recv(message);
479 }
480 catch (const zmq::error_t& e)
481 {
482 cout<<"MANAGER -- "<<e.what()<<endl;
483 return NULL;
484 }
485
486 TBufferFile *mess = new TBufferFile(TBuffer::kRead,
487 message->size()+sizeof(UInt_t),
488 message->data());
489 mess->InitMap();
490 mess->ReadClass();// get first the class stored in message
491 mess->SetBufferOffset(sizeof(UInt_t) + sizeof(kMESS_OBJECT));
492 mess->ResetMap();
493
494 AliESDEvent* data = (AliESDEvent*)(mess->ReadObjectAny(AliESDEvent::Class()));
495
496 if (data)
497 {
498 data->GetStdContent();
499 if(message){delete message;}
500 return data;
501 }
502 else
503 {
504 if(message){delete message;}
505 return NULL;
506 }
164d3d29 507}
508
509struct serverRequestStruct* AliStorageEventManager::GetServerStruct(storageSockets socket)
510{
626a3158 511 struct serverRequestStruct *request = new struct serverRequestStruct;
512 message_t *requestMessage = new message_t();
efaef546 513 try{
514 fSockets[socket]->recv(requestMessage);
515 }
516 catch(const zmq::error_t &e)
517 {
518 cout<<"MANAGER -- get serverRequestStruct -- "<<e.what()<<endl;
519 }
626a3158 520 request = static_cast<struct serverRequestStruct*>(requestMessage->data());
521 return request;
164d3d29 522}
523
186c4b6e 524struct clientRequestStruct* AliStorageEventManager::GetClientStruct(storageSockets socket,int timeout)
164d3d29 525{
186c4b6e 526 pollitem_t items[1] = {{*fSockets[socket],0,ZMQ_POLLIN,0}} ;
527 if(timeout>=0){if(poll (&items[0], 1, timeout)==0){return NULL;}}
528
626a3158 529 struct clientRequestStruct *request = new struct clientRequestStruct;
530 message_t *requestMessage = new message_t();
efaef546 531 try{
532 fSockets[socket]->recv(requestMessage);
533 }
534 catch(const zmq::error_t &e)
535 {
536 cout<<"MANAGER -- get clientRequestStruct -- "<<e.what()<<endl;
537 }
626a3158 538 request = static_cast<struct clientRequestStruct*>(requestMessage->data());
539 return request;
164d3d29 540}
541
542bool AliStorageEventManager::GetBool(storageSockets socket)
543{
626a3158 544 message_t *response = new message_t();
efaef546 545 try{
546 fSockets[socket]->recv(response);
547 }
548 catch(const zmq::error_t &e)
549 {
550 cout<<"MANAGER -- get bool -- "<<e.what()<<endl;
551 }
626a3158 552 char *result = (char*)response->data();
553
554 if(!strcmp("true",result)){return true;}
555 else{return false;}
164d3d29 556}
557
558long AliStorageEventManager::GetLong(storageSockets socket)
559{
626a3158 560 message_t *responseMessage = new message_t();
efaef546 561 try{
562 fSockets[socket]->recv(responseMessage);
563 }
564 catch(const zmq::error_t &e)
565 {
566 cout<<"MANAGER -- get long -- "<<e.what()<<endl;
567 }
fae81379 568
569 long result = 0;
570
571 if(responseMessage)
572 {
573 result = (long)atoi(static_cast<char*>(responseMessage->data()));
574 delete responseMessage;
575 }
626a3158 576 return result;
164d3d29 577}
578