Loop over marked events added to Event Display.
[u/mrichter/AliRoot.git] / MONITOR / alistoragemanager / AliStorageEventManager.cxx
CommitLineData
164d3d29 1#include "AliStorageEventManager.h"
2
3#include <iostream>
4#include <sstream>
5#include <fstream>
6
7#include <TList.h>
8#include <TStreamerInfo.h>
9#include <TThread.h>
10
11#include "zmq.hpp"
12
13#include "AliESDEvent.h"
14#include "AliESDtrack.h"
15#include "AliTrackPointArray.h"
16#include "AliESDfriendTrack.h"
17#include "AliExternalTrackParam.h"
18#include "AliTrackerBase.h"
19#include "AliTracker.h"
20
21using namespace zmq;
22using namespace std;
23
24AliStorageEventManager *AliStorageEventManager::fManagerInstance = 0;
25
26AliStorageEventManager::AliStorageEventManager()
27{
626a3158 28 //read config file
29 TThread::Lock();
30 ifstream configFile (GetConfigFilePath());
164d3d29 31
626a3158 32 if (configFile.is_open())
33 {
34 string line;
35 int from,to;
36 while(configFile.good())
37 {
38 getline(configFile,line);
39 from = line.find("\"")+1;
40 to = line.find_last_of("\"");
41 if(line.find("STORAGE_SERVER=")==0)
42 {
43 fStorageServer=line.substr(from,to-from);
44 }
45 else if(line.find("EVENT_SERVER=")==0)
46 {
47 fEventServer=line.substr(from,to-from);
48 }
49 else if(line.find("STORAGE_SERVER_PORT=")==0)
50 {
51 fStorageServerPort=atoi(line.substr(from,to-from).c_str());
52 }
53 else if(line.find("EVENT_SERVER_PORT=")==0)
54 {
55 fEventServerPort=atoi(line.substr(from,to-from).c_str());
56 }
57 else if(line.find("STORAGE_CLIENT_PORT=")==0)
58 {
59 fStorageClientPort=atoi(line.substr(from,to-from).c_str());
60 }
61 else if(line.find("XML_SERVER_PORT=")==0)
62 {
63 fXmlServerPort=atoi(line.substr(from,to-from).c_str());
64 }
65 }
66 if(configFile.eof())
67 {
68 configFile.clear();
69 }
70 configFile.close();
71 }
72 else
73 {
74 cout<<"EVENT MANAGER -- Unable to open config file"<<endl;
75 }
76 TThread::UnLock();
77
78 for(int i=0;i<NUMBER_OF_SOCKETS;i++)
79 {
80 fContexts[i] = new context_t();
81 }
164d3d29 82}
83AliStorageEventManager::~AliStorageEventManager()
84{
626a3158 85 if(fManagerInstance){delete fManagerInstance;fManagerInstance=0;}
164d3d29 86}
87
88AliStorageEventManager* AliStorageEventManager::GetEventManagerInstance()
89{
626a3158 90 TThread::Lock();
91 if(fManagerInstance==0)
92 {
93 fManagerInstance = new AliStorageEventManager();
94 }
95 TThread::UnLock();
96 return fManagerInstance;
164d3d29 97}
98
99
fae81379 100void freeBuff (void *data, void *hint)
164d3d29 101{
626a3158 102 // free(data);
164d3d29 103}
104
105bool AliStorageEventManager::CreateSocket(storageSockets socket)
106{
fae81379 107 cout<<"Creating socket:"<<socket<<endl;
108
109 switch(socket)
110 {
111 case SERVER_COMMUNICATION_REQ:
112 {
113 fSockets[SERVER_COMMUNICATION_REQ] =
114 new socket_t(*fContexts[SERVER_COMMUNICATION_REQ],ZMQ_REQ);
115 try
116 {
117 fSockets[SERVER_COMMUNICATION_REQ]->connect(Form("tcp://%s:%d",fStorageServer.c_str(),fStorageServerPort));
118 }
119 catch (const zmq::error_t& e)
120 {
121 cout<<"MANAGER -- "<<e.what()<<endl;
122 return 0;
123 }
124 }
125 break;
126 case SERVER_COMMUNICATION_REP:
127 {
128 fSockets[SERVER_COMMUNICATION_REP] =
129 new socket_t(*fContexts[SERVER_COMMUNICATION_REP],ZMQ_REP);
130 try
131 {
132 fSockets[SERVER_COMMUNICATION_REP]->bind(Form("tcp://*:%d",fStorageServerPort));
133 }
134 catch (const zmq::error_t& e)
135 {
136 cout<<"MANAGER -- "<<e.what()<<endl;
137 return 0;
138 }
139 }
140 break;
141 case CLIENT_COMMUNICATION_REQ:
142 {
143 fSockets[CLIENT_COMMUNICATION_REQ] =
144 new socket_t(*fContexts[CLIENT_COMMUNICATION_REQ],ZMQ_REQ);
145 try
146 {
147 fSockets[CLIENT_COMMUNICATION_REQ]->connect(Form("tcp://%s:%d",fStorageServer.c_str(), fStorageClientPort));
148 }
149 catch (const zmq::error_t& e)
150 {
151 cout<<"MANAGER -- "<<e.what()<<endl;
152 return 0;
153 }
154 }
155 break;
156 case CLIENT_COMMUNICATION_REP:
157 {
158 fSockets[CLIENT_COMMUNICATION_REP] =
159 new socket_t(*fContexts[CLIENT_COMMUNICATION_REP],ZMQ_REP);
160 try
161 {
162 fSockets[CLIENT_COMMUNICATION_REP]->bind(Form("tcp://*:%d",fStorageClientPort));
163 }
164 catch (const zmq::error_t& e)
165 {
166 cout<<"MANAGER -- "<<e.what()<<endl;
167 return 0;
168 }
169 }
170 break;
171 case EVENTS_SERVER_PUB:
172 {
173 fSockets[EVENTS_SERVER_PUB] =
174 new socket_t(*fContexts[EVENTS_SERVER_PUB],ZMQ_PUB);
175 try
176 {
177 fSockets[EVENTS_SERVER_PUB]->bind(Form("tcp://*:%d",fEventServerPort));
178 }
179 catch (const zmq::error_t& e)
180 {
181 cout<<"MANAGER -- "<<e.what()<<endl;
182 return 0;
183 }
184 }
185 break;
186 case EVENTS_SERVER_SUB:
187 {
188 fSockets[EVENTS_SERVER_SUB] =
189 new socket_t(*fContexts[EVENTS_SERVER_SUB],ZMQ_SUB);
190 fSockets[EVENTS_SERVER_SUB]->setsockopt(ZMQ_SUBSCRIBE,"",0);
191 try
192 {
193 fSockets[EVENTS_SERVER_SUB]->connect(Form("tcp://%s:%d",fEventServer.c_str(),fEventServerPort));
194 }
195 catch (const zmq::error_t& e)
196 {
197 cout<<"MANAGER -- "<<e.what()<<endl;
198 return 0;
199
200 }
201 }
202 break;
203 case XML_PUB:
204 {
205 fSockets[XML_PUB] =
206 new socket_t(*fContexts[XML_PUB],ZMQ_PUB);
207 try
208 {
209 fSockets[XML_PUB]->bind(Form("tcp://*:%d",fXmlServerPort));
210 }
211 catch (const zmq::error_t& e)
212 {
213 cout<<"MANAGER -- "<<e.what()<<endl;
214 return 0;
215 }
216 }
217 break;
218 default:break;
219 }
220 return 1;
164d3d29 221}
222
223void AliStorageEventManager::Send(vector<serverListStruct> list,storageSockets socket)
224{
626a3158 225 //send size of the struct first
226 int numberOfRecords = list.size();
227 message_t message(20);
228 snprintf ((char *)message.data(), 20 ,"%d",numberOfRecords);
229
230 fSockets[socket]->send(message);
231 if(numberOfRecords==0)return;
fae81379 232 message_t *tmpMessage = new message_t();
626a3158 233 fSockets[socket]->recv(tmpMessage);//empty message just to keep req-rep order
fae81379 234
626a3158 235 // //prepare message with event's list
236 // char *buffer = reinterpret_cast<char*> (&list[0]);
237 // message_t *reply = new message_t((void*)buffer,
238 // sizeof(serverListStruct)*numberOfRecords,0);
239 // fSockets[socket]->send(*reply);
240 // if(reply){delete reply;}
241
242 message_t reply(sizeof(serverListStruct)*numberOfRecords);
243 memcpy(reply.data(), reinterpret_cast<const char*> (&list[0]), sizeof(serverListStruct)*numberOfRecords);
244
245 fSockets[socket]->send(reply);
fae81379 246 if(tmpMessage){delete tmpMessage;}
164d3d29 247}
248
249void AliStorageEventManager::Send(struct serverRequestStruct *request,storageSockets socket)
250{
626a3158 251 char *buffer = (char*)(request);
252 message_t *requestMessage = new message_t((void*)buffer,
253 sizeof(struct serverRequestStruct)
254 +sizeof(struct listRequestStruct)
255 +sizeof(struct eventStruct),freeBuff);
256 fSockets[socket]->send(*requestMessage);
164d3d29 257}
258
259bool AliStorageEventManager::Send(struct clientRequestStruct *request,storageSockets socket,int timeout)
260{
626a3158 261 pollitem_t items[1] = {{*fSockets[socket],0,ZMQ_POLLIN,0}} ;
262
263
264 char *buffer = (char*)(request);
265 message_t *requestMessage = new message_t((void*)buffer,
266 sizeof(struct clientRequestStruct),freeBuff);
267
268 try
269 {
270 fSockets[socket]->send(*requestMessage);
271 }
272 catch (const zmq::error_t& e)
273 {
274 cout<<"MANAGER -- "<<e.what()<<endl;
275 cout<<e.num()<<endl;
276 if(fSockets[socket]){delete fSockets[socket];fSockets[socket]=0;}
277
278 CreateSocket(socket);
279 delete requestMessage;
280 return 0;
281
282 }
283 if(timeout>=0)
284 {
285 if(poll (&items[0], 1, timeout)==0)
286 {
287 delete requestMessage;
288 return 0;
289 }
290 }
291 delete requestMessage;
292 return 1;
164d3d29 293}
294
295void AliStorageEventManager::Send(long message,storageSockets socket)
296{
626a3158 297 stringstream streamBuffer;
298 streamBuffer << message;
299 string stringBuffer = streamBuffer.str();
300 char *buffer = (char*)stringBuffer.c_str();
301 message_t *replyMessage = new message_t((void*)buffer,sizeof(stringBuffer),freeBuff);
fae81379 302
626a3158 303 fSockets[socket]->send(*replyMessage);
304 delete replyMessage;
305 streamBuffer.str(string());
306 streamBuffer.clear();
164d3d29 307}
308
309void AliStorageEventManager::Send(bool message,storageSockets socket)
310{
626a3158 311 char *buffer;
312 if(message==true)
313 {
314 buffer = (char*)("true");
315 }
316 else
317 {
318 buffer = (char*)("false");
319 }
320 message_t *replyMessage = new message_t((void*)buffer,sizeof(buffer),freeBuff);
321 fSockets[socket]->send(*replyMessage);
322 delete replyMessage;
164d3d29 323}
324
325void AliStorageEventManager::Send(AliESDEvent *event, storageSockets socket)
326{
626a3158 327 TMessage tmess(kMESS_OBJECT);
328 tmess.Reset();
329 tmess.WriteObject(event);
330 TMessage::EnableSchemaEvolutionForAll(kTRUE);
331
332 int bufsize = tmess.Length();
333 char* buf = (char*) malloc(bufsize * sizeof(char));
334 memcpy(buf, tmess.Buffer(), bufsize);
335
336 message_t message((void*)buf, bufsize, freeBuff);
337 fSockets[socket]->send(message);
164d3d29 338}
339
340void AliStorageEventManager::SendAsXml(AliESDEvent *event,storageSockets socket)
341{
626a3158 342 cout<<"SENDING AS XML"<<endl;
343 stringstream bufferStream;
344 bufferStream << "<?xml version=\"1.0\" encoding=\"utf-8\"?>"<<endl;
345 bufferStream << "<ESD xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xmlns:xsd=\"http://www.w3.org/2001/XMLSchema\">" << endl;
346
347 for(int i=0;i<event->GetNumberOfTracks();i++)
348 {
349 AliESDtrack *track = event->GetTrack(i);
350 bufferStream << "\t<track mass=\""<<track->GetMass()<<"\">" <<endl;
351 const AliTrackPointArray *array = track->GetTrackPointArray();
352
353 if(array)
354 {
355 const float *x = array->GetX();
356 const float *y = array->GetY();
357 const float *z = array->GetZ();
358 int n = array->GetNPoints();
359
360 for(int j=0;j<n;j++)
361 {
362 bufferStream <<"\t\t<point>"<<endl;
363 bufferStream <<"\t\t\t<x>"<< x[j] <<"</x>"<<endl;
364 bufferStream <<"\t\t\t<y>"<< y[j] <<"</y>"<<endl;
365 bufferStream <<"\t\t\t<z>"<< z[j] <<"</z>"<<endl;
366 bufferStream <<"\t\t</point>"<<endl;
367 }
368 }
369 else cout<<"no array"<<endl;
370
371 bufferStream << "\t</track>"<<endl;
372 }
373
374 bufferStream << "</ESD>"<<endl;
375
376 string bufferString = bufferStream.str();
377 message_t message(bufferString.size());
378 memcpy (message.data(), bufferString.data(), bufferString.size());
379
380 fSockets[socket]->send(message);
381 cout<<"xml sent"<<endl;
164d3d29 382}
383
384vector<serverListStruct> AliStorageEventManager::GetServerListVector(storageSockets socket)
385{
626a3158 386 //get size of the incomming message
387 message_t sizeMessage;
388 fSockets[socket]->recv(&sizeMessage);
389 int numberOfRecords;
390 istringstream iss(static_cast<char*>(sizeMessage.data()));
391 iss >> numberOfRecords;
392
393 if(numberOfRecords==0){cout<<"MANAGER -- list is empty"<<endl;}
394
395 fSockets[socket]->send(*(new message_t()));//receive empty message just to keep req-rep order
396
397 //get list of events
398 message_t *response = new message_t(sizeof(serverListStruct)*numberOfRecords);
399 fSockets[socket]->recv(response);
400
401 vector<serverListStruct> receivedList(static_cast<serverListStruct*>(response->data()), static_cast<serverListStruct*>(response->data()) + numberOfRecords);
402
403 if (response) {delete response;}
404 return receivedList;
164d3d29 405}
406
fae81379 407AliESDEvent* AliStorageEventManager::GetEvent(storageSockets socket,int timeout)
164d3d29 408{
626a3158 409 pollitem_t items[1] = {{*fSockets[socket],0,ZMQ_POLLIN,0}} ;
410
411 if(timeout>=0)
412 {
413 if(poll (&items[0], 1, timeout)==0)
414 {
415 return NULL;
416 }
417 }
418
419 message_t* message = new message_t();
420
421 try
422 {
423 fSockets[socket]->recv(message);
424 }
425 catch (const zmq::error_t& e)
426 {
427 cout<<"MANAGER -- "<<e.what()<<endl;
428 return NULL;
429 }
430
431 TBufferFile *mess = new TBufferFile(TBuffer::kRead,
432 message->size()+sizeof(UInt_t),
433 message->data());
434 mess->InitMap();
435 mess->ReadClass();// get first the class stored in message
436 mess->SetBufferOffset(sizeof(UInt_t) + sizeof(kMESS_OBJECT));
437 mess->ResetMap();
438
439 AliESDEvent* data = (AliESDEvent*)(mess->ReadObjectAny(AliESDEvent::Class()));
440
441 if (data)
442 {
443 data->GetStdContent();
444 if(message){delete message;}
445 return data;
446 }
447 else
448 {
449 if(message){delete message;}
450 return NULL;
451 }
164d3d29 452}
453
454struct serverRequestStruct* AliStorageEventManager::GetServerStruct(storageSockets socket)
455{
626a3158 456 struct serverRequestStruct *request = new struct serverRequestStruct;
457 message_t *requestMessage = new message_t();
458 fSockets[socket]->recv(requestMessage);
459 request = static_cast<struct serverRequestStruct*>(requestMessage->data());
460 return request;
164d3d29 461}
462
463struct clientRequestStruct* AliStorageEventManager::GetClientStruct(storageSockets socket)
464{
626a3158 465 struct clientRequestStruct *request = new struct clientRequestStruct;
466 message_t *requestMessage = new message_t();
467 fSockets[socket]->recv(requestMessage);
468 request = static_cast<struct clientRequestStruct*>(requestMessage->data());
469 return request;
164d3d29 470}
471
472bool AliStorageEventManager::GetBool(storageSockets socket)
473{
626a3158 474 message_t *response = new message_t();
475 fSockets[socket]->recv(response);
476 char *result = (char*)response->data();
477
478 if(!strcmp("true",result)){return true;}
479 else{return false;}
164d3d29 480}
481
482long AliStorageEventManager::GetLong(storageSockets socket)
483{
626a3158 484 message_t *responseMessage = new message_t();
485 fSockets[socket]->recv(responseMessage);
fae81379 486
487 long result = 0;
488
489 if(responseMessage)
490 {
491 result = (long)atoi(static_cast<char*>(responseMessage->data()));
492 delete responseMessage;
493 }
626a3158 494 return result;
164d3d29 495}
496