]>
Commit | Line | Data |
---|---|---|
164d3d29 | 1 | #include "AliStorageEventManager.h" |
2 | ||
3 | #include <iostream> | |
4 | #include <sstream> | |
5 | #include <fstream> | |
6 | ||
7 | #include <TList.h> | |
8 | #include <TStreamerInfo.h> | |
9 | #include <TThread.h> | |
10 | ||
11 | #include "zmq.hpp" | |
12 | ||
13 | #include "AliESDEvent.h" | |
14 | #include "AliESDtrack.h" | |
15 | #include "AliTrackPointArray.h" | |
16 | #include "AliESDfriendTrack.h" | |
17 | #include "AliExternalTrackParam.h" | |
18 | #include "AliTrackerBase.h" | |
19 | #include "AliTracker.h" | |
20 | ||
21 | using namespace zmq; | |
22 | using namespace std; | |
23 | ||
24 | AliStorageEventManager *AliStorageEventManager::fManagerInstance = 0; | |
25 | ||
26 | AliStorageEventManager::AliStorageEventManager() | |
27 | { | |
28 | //read config file | |
29 | TThread::Lock(); | |
30 | ifstream configFile (GetConfigFilePath()); | |
31 | ||
32 | if (configFile.is_open()) | |
33 | { | |
34 | string line; | |
35 | int from,to; | |
36 | while(configFile.good()) | |
37 | { | |
38 | getline(configFile,line); | |
39 | from = line.find("\"")+1; | |
40 | to = line.find_last_of("\""); | |
41 | if(line.find("STORAGE_SERVER=")==0) | |
42 | { | |
43 | fStorageServer=line.substr(from,to-from); | |
44 | } | |
45 | else if(line.find("EVENT_SERVER=")==0) | |
46 | { | |
47 | fEventServer=line.substr(from,to-from); | |
48 | } | |
49 | else if(line.find("STORAGE_SERVER_PORT=")==0) | |
50 | { | |
51 | fStorageServerPort=atoi(line.substr(from,to-from).c_str()); | |
52 | } | |
53 | else if(line.find("EVENT_SERVER_PORT=")==0) | |
54 | { | |
55 | fEventServerPort=atoi(line.substr(from,to-from).c_str()); | |
56 | } | |
57 | else if(line.find("STORAGE_CLIENT_PORT=")==0) | |
58 | { | |
59 | fStorageClientPort=atoi(line.substr(from,to-from).c_str()); | |
60 | } | |
61 | else if(line.find("XML_SERVER_PORT=")==0) | |
62 | { | |
63 | fXmlServerPort=atoi(line.substr(from,to-from).c_str()); | |
64 | } | |
65 | } | |
66 | if(configFile.eof()) | |
67 | { | |
68 | configFile.clear(); | |
69 | } | |
70 | configFile.close(); | |
71 | } | |
72 | else | |
73 | { | |
74 | cout<<"EVENT MANAGER -- Unable to open config file"<<endl; | |
75 | } | |
76 | TThread::UnLock(); | |
77 | ||
78 | for(int i=0;i<NUMBER_OF_SOCKETS;i++) | |
79 | { | |
80 | fContexts[i] = new context_t(); | |
81 | } | |
82 | } | |
83 | AliStorageEventManager::~AliStorageEventManager() | |
84 | { | |
85 | if(fManagerInstance){delete fManagerInstance;fManagerInstance=0;} | |
86 | } | |
87 | ||
88 | AliStorageEventManager* AliStorageEventManager::GetEventManagerInstance() | |
89 | { | |
90 | TThread::Lock(); | |
91 | if(fManagerInstance==0) | |
92 | { | |
93 | fManagerInstance = new AliStorageEventManager(); | |
94 | } | |
95 | TThread::UnLock(); | |
96 | return fManagerInstance; | |
97 | } | |
98 | ||
99 | ||
fae81379 | 100 | void freeBuff (void *data, void *hint) |
164d3d29 | 101 | { |
cc94dac0 | 102 | // free(data); |
164d3d29 | 103 | } |
104 | ||
105 | bool AliStorageEventManager::CreateSocket(storageSockets socket) | |
106 | { | |
fae81379 | 107 | cout<<"Creating socket:"<<socket<<endl; |
108 | ||
109 | switch(socket) | |
110 | { | |
111 | case SERVER_COMMUNICATION_REQ: | |
112 | { | |
113 | fSockets[SERVER_COMMUNICATION_REQ] = | |
114 | new socket_t(*fContexts[SERVER_COMMUNICATION_REQ],ZMQ_REQ); | |
115 | try | |
116 | { | |
117 | fSockets[SERVER_COMMUNICATION_REQ]->connect(Form("tcp://%s:%d",fStorageServer.c_str(),fStorageServerPort)); | |
118 | } | |
119 | catch (const zmq::error_t& e) | |
120 | { | |
121 | cout<<"MANAGER -- "<<e.what()<<endl; | |
122 | return 0; | |
123 | } | |
124 | } | |
125 | break; | |
126 | case SERVER_COMMUNICATION_REP: | |
127 | { | |
128 | fSockets[SERVER_COMMUNICATION_REP] = | |
129 | new socket_t(*fContexts[SERVER_COMMUNICATION_REP],ZMQ_REP); | |
130 | try | |
131 | { | |
132 | fSockets[SERVER_COMMUNICATION_REP]->bind(Form("tcp://*:%d",fStorageServerPort)); | |
133 | } | |
134 | catch (const zmq::error_t& e) | |
135 | { | |
136 | cout<<"MANAGER -- "<<e.what()<<endl; | |
137 | return 0; | |
138 | } | |
139 | } | |
140 | break; | |
141 | case CLIENT_COMMUNICATION_REQ: | |
142 | { | |
143 | fSockets[CLIENT_COMMUNICATION_REQ] = | |
144 | new socket_t(*fContexts[CLIENT_COMMUNICATION_REQ],ZMQ_REQ); | |
145 | try | |
146 | { | |
147 | fSockets[CLIENT_COMMUNICATION_REQ]->connect(Form("tcp://%s:%d",fStorageServer.c_str(), fStorageClientPort)); | |
148 | } | |
149 | catch (const zmq::error_t& e) | |
150 | { | |
151 | cout<<"MANAGER -- "<<e.what()<<endl; | |
152 | return 0; | |
153 | } | |
154 | } | |
155 | break; | |
156 | case CLIENT_COMMUNICATION_REP: | |
157 | { | |
158 | fSockets[CLIENT_COMMUNICATION_REP] = | |
159 | new socket_t(*fContexts[CLIENT_COMMUNICATION_REP],ZMQ_REP); | |
160 | try | |
161 | { | |
162 | fSockets[CLIENT_COMMUNICATION_REP]->bind(Form("tcp://*:%d",fStorageClientPort)); | |
163 | } | |
164 | catch (const zmq::error_t& e) | |
165 | { | |
166 | cout<<"MANAGER -- "<<e.what()<<endl; | |
167 | return 0; | |
168 | } | |
169 | } | |
170 | break; | |
171 | case EVENTS_SERVER_PUB: | |
172 | { | |
173 | fSockets[EVENTS_SERVER_PUB] = | |
174 | new socket_t(*fContexts[EVENTS_SERVER_PUB],ZMQ_PUB); | |
175 | try | |
176 | { | |
177 | fSockets[EVENTS_SERVER_PUB]->bind(Form("tcp://*:%d",fEventServerPort)); | |
178 | } | |
179 | catch (const zmq::error_t& e) | |
180 | { | |
181 | cout<<"MANAGER -- "<<e.what()<<endl; | |
182 | return 0; | |
183 | } | |
184 | } | |
185 | break; | |
186 | case EVENTS_SERVER_SUB: | |
187 | { | |
188 | fSockets[EVENTS_SERVER_SUB] = | |
189 | new socket_t(*fContexts[EVENTS_SERVER_SUB],ZMQ_SUB); | |
190 | fSockets[EVENTS_SERVER_SUB]->setsockopt(ZMQ_SUBSCRIBE,"",0); | |
191 | try | |
192 | { | |
193 | fSockets[EVENTS_SERVER_SUB]->connect(Form("tcp://%s:%d",fEventServer.c_str(),fEventServerPort)); | |
194 | } | |
195 | catch (const zmq::error_t& e) | |
196 | { | |
197 | cout<<"MANAGER -- "<<e.what()<<endl; | |
198 | return 0; | |
199 | ||
200 | } | |
201 | } | |
202 | break; | |
203 | case XML_PUB: | |
204 | { | |
205 | fSockets[XML_PUB] = | |
206 | new socket_t(*fContexts[XML_PUB],ZMQ_PUB); | |
207 | try | |
208 | { | |
209 | fSockets[XML_PUB]->bind(Form("tcp://*:%d",fXmlServerPort)); | |
210 | } | |
211 | catch (const zmq::error_t& e) | |
212 | { | |
213 | cout<<"MANAGER -- "<<e.what()<<endl; | |
214 | return 0; | |
215 | } | |
216 | } | |
217 | break; | |
218 | default:break; | |
219 | } | |
220 | return 1; | |
164d3d29 | 221 | } |
222 | ||
223 | void AliStorageEventManager::Send(vector<serverListStruct> list,storageSockets socket) | |
224 | { | |
225 | //send size of the struct first | |
226 | int numberOfRecords = list.size(); | |
227 | message_t message(20); | |
228 | snprintf ((char *)message.data(), 20 ,"%d",numberOfRecords); | |
229 | ||
230 | fSockets[socket]->send(message); | |
231 | if(numberOfRecords==0)return; | |
fae81379 | 232 | message_t *tmpMessage = new message_t(); |
233 | fSockets[socket]->recv(tmpMessage);//empty message just to keep req-rep order | |
234 | ||
a410aca4 | 235 | // //prepare message with event's list |
236 | // char *buffer = reinterpret_cast<char*> (&list[0]); | |
237 | // message_t *reply = new message_t((void*)buffer, | |
238 | // sizeof(serverListStruct)*numberOfRecords,0); | |
239 | // fSockets[socket]->send(*reply); | |
240 | // if(reply){delete reply;} | |
241 | ||
fae81379 | 242 | message_t reply(sizeof(serverListStruct)*numberOfRecords); |
a410aca4 | 243 | memcpy(reply.data(), reinterpret_cast<const char*> (&list[0]), sizeof(serverListStruct)*numberOfRecords); |
244 | ||
245 | fSockets[socket]->send(reply); | |
fae81379 | 246 | if(tmpMessage){delete tmpMessage;} |
164d3d29 | 247 | } |
248 | ||
249 | void AliStorageEventManager::Send(struct serverRequestStruct *request,storageSockets socket) | |
250 | { | |
251 | char *buffer = (char*)(request); | |
252 | message_t *requestMessage = new message_t((void*)buffer, | |
253 | sizeof(struct serverRequestStruct) | |
254 | +sizeof(struct listRequestStruct) | |
fae81379 | 255 | +sizeof(struct eventStruct),freeBuff); |
164d3d29 | 256 | fSockets[socket]->send(*requestMessage); |
257 | } | |
258 | ||
259 | bool AliStorageEventManager::Send(struct clientRequestStruct *request,storageSockets socket,int timeout) | |
260 | { | |
261 | pollitem_t items[1] = {{*fSockets[socket],0,ZMQ_POLLIN,0}} ; | |
262 | ||
263 | ||
264 | char *buffer = (char*)(request); | |
265 | message_t *requestMessage = new message_t((void*)buffer, | |
fae81379 | 266 | sizeof(struct clientRequestStruct),freeBuff); |
164d3d29 | 267 | |
268 | try | |
269 | { | |
270 | fSockets[socket]->send(*requestMessage); | |
271 | } | |
272 | catch (const zmq::error_t& e) | |
273 | { | |
274 | cout<<"MANAGER -- "<<e.what()<<endl; | |
275 | cout<<e.num()<<endl; | |
276 | if(fSockets[socket]){delete fSockets[socket];fSockets[socket]=0;} | |
277 | ||
278 | CreateSocket(socket); | |
a410aca4 | 279 | delete requestMessage; |
164d3d29 | 280 | return 0; |
281 | ||
282 | } | |
283 | if(timeout>=0) | |
284 | { | |
285 | if(poll (&items[0], 1, timeout)==0) | |
286 | { | |
a410aca4 | 287 | delete requestMessage; |
164d3d29 | 288 | return 0; |
289 | } | |
290 | } | |
a410aca4 | 291 | delete requestMessage; |
164d3d29 | 292 | return 1; |
293 | } | |
294 | ||
295 | void AliStorageEventManager::Send(long message,storageSockets socket) | |
296 | { | |
297 | stringstream streamBuffer; | |
298 | streamBuffer << message; | |
299 | string stringBuffer = streamBuffer.str(); | |
300 | char *buffer = (char*)stringBuffer.c_str(); | |
fae81379 | 301 | message_t *replyMessage = new message_t((void*)buffer,sizeof(stringBuffer),freeBuff); |
302 | ||
164d3d29 | 303 | fSockets[socket]->send(*replyMessage); |
304 | delete replyMessage; | |
305 | streamBuffer.str(string()); | |
306 | streamBuffer.clear(); | |
307 | } | |
308 | ||
309 | void AliStorageEventManager::Send(bool message,storageSockets socket) | |
310 | { | |
311 | char *buffer; | |
312 | if(message==true) | |
313 | { | |
314 | buffer = (char*)("true"); | |
315 | } | |
316 | else | |
317 | { | |
318 | buffer = (char*)("false"); | |
319 | } | |
fae81379 | 320 | message_t *replyMessage = new message_t((void*)buffer,sizeof(buffer),freeBuff); |
164d3d29 | 321 | fSockets[socket]->send(*replyMessage); |
322 | delete replyMessage; | |
323 | } | |
324 | ||
325 | void AliStorageEventManager::Send(AliESDEvent *event, storageSockets socket) | |
326 | { | |
327 | TMessage tmess(kMESS_OBJECT); | |
328 | tmess.Reset(); | |
329 | tmess.WriteObject(event); | |
330 | TMessage::EnableSchemaEvolutionForAll(kTRUE); | |
331 | ||
332 | int bufsize = tmess.Length(); | |
333 | char* buf = (char*) malloc(bufsize * sizeof(char)); | |
334 | memcpy(buf, tmess.Buffer(), bufsize); | |
335 | ||
fae81379 | 336 | message_t message((void*)buf, bufsize, freeBuff); |
164d3d29 | 337 | fSockets[socket]->send(message); |
338 | } | |
339 | ||
340 | void AliStorageEventManager::SendAsXml(AliESDEvent *event,storageSockets socket) | |
341 | { | |
342 | cout<<"SENDING AS XML"<<endl; | |
343 | stringstream bufferStream; | |
344 | bufferStream << "<?xml version=\"1.0\" encoding=\"utf-8\"?>"<<endl; | |
345 | bufferStream << "<ESD xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xmlns:xsd=\"http://www.w3.org/2001/XMLSchema\">" << endl; | |
346 | ||
347 | for(int i=0;i<event->GetNumberOfTracks();i++) | |
348 | { | |
349 | AliESDtrack *track = event->GetTrack(i); | |
164d3d29 | 350 | bufferStream << "\t<track mass=\""<<track->GetMass()<<"\">" <<endl; |
351 | const AliTrackPointArray *array = track->GetTrackPointArray(); | |
352 | ||
353 | if(array) | |
354 | { | |
355 | const float *x = array->GetX(); | |
356 | const float *y = array->GetY(); | |
357 | const float *z = array->GetZ(); | |
358 | int n = array->GetNPoints(); | |
359 | ||
360 | for(int j=0;j<n;j++) | |
361 | { | |
362 | bufferStream <<"\t\t<point>"<<endl; | |
363 | bufferStream <<"\t\t\t<x>"<< x[j] <<"</x>"<<endl; | |
364 | bufferStream <<"\t\t\t<y>"<< y[j] <<"</y>"<<endl; | |
365 | bufferStream <<"\t\t\t<z>"<< z[j] <<"</z>"<<endl; | |
366 | bufferStream <<"\t\t</point>"<<endl; | |
367 | } | |
368 | } | |
369 | else cout<<"no array"<<endl; | |
370 | ||
371 | bufferStream << "\t</track>"<<endl; | |
372 | } | |
373 | ||
374 | bufferStream << "</ESD>"<<endl; | |
375 | ||
376 | string bufferString = bufferStream.str(); | |
377 | message_t message(bufferString.size()); | |
378 | memcpy (message.data(), bufferString.data(), bufferString.size()); | |
379 | ||
380 | fSockets[socket]->send(message); | |
381 | cout<<"xml sent"<<endl; | |
382 | } | |
383 | ||
384 | vector<serverListStruct> AliStorageEventManager::GetServerListVector(storageSockets socket) | |
385 | { | |
386 | //get size of the incomming message | |
387 | message_t sizeMessage; | |
388 | fSockets[socket]->recv(&sizeMessage); | |
389 | int numberOfRecords; | |
390 | istringstream iss(static_cast<char*>(sizeMessage.data())); | |
391 | iss >> numberOfRecords; | |
392 | ||
393 | if(numberOfRecords==0){cout<<"MANAGER -- list is empty"<<endl;} | |
394 | ||
395 | fSockets[socket]->send(*(new message_t()));//receive empty message just to keep req-rep order | |
396 | ||
397 | //get list of events | |
398 | message_t *response = new message_t(sizeof(serverListStruct)*numberOfRecords); | |
399 | fSockets[socket]->recv(response); | |
400 | ||
401 | vector<serverListStruct> receivedList(static_cast<serverListStruct*>(response->data()), static_cast<serverListStruct*>(response->data()) + numberOfRecords); | |
402 | ||
a410aca4 | 403 | if (response) {delete response;} |
164d3d29 | 404 | return receivedList; |
405 | } | |
406 | ||
fae81379 | 407 | AliESDEvent* AliStorageEventManager::GetEvent(storageSockets socket,int timeout) |
164d3d29 | 408 | { |
409 | message_t* message = new message_t(); | |
410 | ||
411 | try | |
412 | { | |
413 | fSockets[socket]->recv(message); | |
414 | } | |
415 | catch (const zmq::error_t& e) | |
416 | { | |
417 | cout<<"MANAGER -- "<<e.what()<<endl; | |
418 | return NULL; | |
419 | } | |
420 | ||
421 | TBufferFile *mess = new TBufferFile(TBuffer::kRead, | |
422 | message->size()+sizeof(UInt_t), | |
423 | message->data()); | |
424 | mess->InitMap(); | |
425 | mess->ReadClass();// get first the class stored in message | |
426 | mess->SetBufferOffset(sizeof(UInt_t) + sizeof(kMESS_OBJECT)); | |
427 | mess->ResetMap(); | |
428 | ||
429 | AliESDEvent* data = (AliESDEvent*)(mess->ReadObjectAny(AliESDEvent::Class())); | |
13a64a87 | 430 | |
164d3d29 | 431 | if (data) |
432 | { | |
13a64a87 | 433 | data->GetStdContent(); |
434 | if(message){delete message;} | |
cc94dac0 | 435 | return data; |
164d3d29 | 436 | } |
437 | else | |
438 | { | |
439 | if(message){delete message;} | |
440 | return NULL; | |
441 | } | |
442 | } | |
443 | ||
444 | struct serverRequestStruct* AliStorageEventManager::GetServerStruct(storageSockets socket) | |
445 | { | |
446 | struct serverRequestStruct *request = new struct serverRequestStruct; | |
447 | message_t *requestMessage = new message_t(); | |
448 | fSockets[socket]->recv(requestMessage); | |
449 | request = static_cast<struct serverRequestStruct*>(requestMessage->data()); | |
450 | return request; | |
451 | } | |
452 | ||
453 | struct clientRequestStruct* AliStorageEventManager::GetClientStruct(storageSockets socket) | |
454 | { | |
455 | struct clientRequestStruct *request = new struct clientRequestStruct; | |
456 | message_t *requestMessage = new message_t(); | |
457 | fSockets[socket]->recv(requestMessage); | |
458 | request = static_cast<struct clientRequestStruct*>(requestMessage->data()); | |
459 | return request; | |
460 | } | |
461 | ||
462 | bool AliStorageEventManager::GetBool(storageSockets socket) | |
463 | { | |
464 | message_t *response = new message_t(); | |
465 | fSockets[socket]->recv(response); | |
466 | char *result = (char*)response->data(); | |
467 | ||
468 | if(!strcmp("true",result)){return true;} | |
469 | else{return false;} | |
470 | } | |
471 | ||
472 | long AliStorageEventManager::GetLong(storageSockets socket) | |
473 | { | |
fae81379 | 474 | message_t *responseMessage = new message_t(); |
164d3d29 | 475 | fSockets[socket]->recv(responseMessage); |
fae81379 | 476 | |
477 | long result = 0; | |
478 | ||
479 | if(responseMessage) | |
480 | { | |
481 | result = (long)atoi(static_cast<char*>(responseMessage->data())); | |
482 | delete responseMessage; | |
483 | } | |
484 | return result; | |
164d3d29 | 485 | } |
486 |