MONITOR without ZEROMQ
[u/mrichter/AliRoot.git] / MONITOR / MONITORzmq / AliStorageEventManager.cxx
CommitLineData
164d3d29 1#include "AliStorageEventManager.h"
2
3#include <iostream>
4#include <sstream>
5#include <fstream>
6
7#include <TList.h>
8#include <TStreamerInfo.h>
9#include <TThread.h>
10
11#include "zmq.hpp"
12
13#include "AliESDEvent.h"
14#include "AliESDtrack.h"
15#include "AliTrackPointArray.h"
16#include "AliESDfriendTrack.h"
17#include "AliExternalTrackParam.h"
18#include "AliTrackerBase.h"
19#include "AliTracker.h"
20
21using namespace zmq;
22using namespace std;
23
24AliStorageEventManager *AliStorageEventManager::fManagerInstance = 0;
25
26AliStorageEventManager::AliStorageEventManager()
27{
626a3158 28 //read config file
29 TThread::Lock();
30 ifstream configFile (GetConfigFilePath());
164d3d29 31
626a3158 32 if (configFile.is_open())
33 {
34 string line;
35 int from,to;
36 while(configFile.good())
37 {
38 getline(configFile,line);
39 from = line.find("\"")+1;
40 to = line.find_last_of("\"");
41 if(line.find("STORAGE_SERVER=")==0)
42 {
43 fStorageServer=line.substr(from,to-from);
44 }
45 else if(line.find("EVENT_SERVER=")==0)
46 {
47 fEventServer=line.substr(from,to-from);
48 }
49 else if(line.find("STORAGE_SERVER_PORT=")==0)
50 {
51 fStorageServerPort=atoi(line.substr(from,to-from).c_str());
52 }
53 else if(line.find("EVENT_SERVER_PORT=")==0)
54 {
55 fEventServerPort=atoi(line.substr(from,to-from).c_str());
56 }
57 else if(line.find("STORAGE_CLIENT_PORT=")==0)
58 {
59 fStorageClientPort=atoi(line.substr(from,to-from).c_str());
60 }
61 else if(line.find("XML_SERVER_PORT=")==0)
62 {
63 fXmlServerPort=atoi(line.substr(from,to-from).c_str());
64 }
a7f93de6 65 else if(line.find("ITS_POINTS_SERVER_PORT=")==0)
66 {
67 fItsPointsServerPort=atoi(line.substr(from,to-from).c_str());
68 cout<<"ITS port is:"<<fItsPointsServerPort<<endl;
69 }
626a3158 70 }
efaef546 71 if(configFile.eof()){configFile.clear();}
626a3158 72 configFile.close();
73 }
efaef546 74 else{cout<<"EVENT MANAGER -- Unable to open config file"<<endl;}
626a3158 75 TThread::UnLock();
76
efaef546 77 for(int i=0;i<NUMBER_OF_SOCKETS;i++){fContexts[i] = new context_t();}
164d3d29 78}
79AliStorageEventManager::~AliStorageEventManager()
80{
626a3158 81 if(fManagerInstance){delete fManagerInstance;fManagerInstance=0;}
164d3d29 82}
83
84AliStorageEventManager* AliStorageEventManager::GetEventManagerInstance()
85{
626a3158 86 TThread::Lock();
87 if(fManagerInstance==0)
88 {
89 fManagerInstance = new AliStorageEventManager();
90 }
91 TThread::UnLock();
92 return fManagerInstance;
164d3d29 93}
94
95
fae81379 96void freeBuff (void *data, void *hint)
164d3d29 97{
626a3158 98 // free(data);
164d3d29 99}
100
101bool AliStorageEventManager::CreateSocket(storageSockets socket)
102{
fae81379 103 cout<<"Creating socket:"<<socket<<endl;
104
105 switch(socket)
106 {
107 case SERVER_COMMUNICATION_REQ:
108 {
109 fSockets[SERVER_COMMUNICATION_REQ] =
110 new socket_t(*fContexts[SERVER_COMMUNICATION_REQ],ZMQ_REQ);
111 try
112 {
113 fSockets[SERVER_COMMUNICATION_REQ]->connect(Form("tcp://%s:%d",fStorageServer.c_str(),fStorageServerPort));
114 }
115 catch (const zmq::error_t& e)
116 {
117 cout<<"MANAGER -- "<<e.what()<<endl;
118 return 0;
119 }
120 }
121 break;
122 case SERVER_COMMUNICATION_REP:
123 {
124 fSockets[SERVER_COMMUNICATION_REP] =
125 new socket_t(*fContexts[SERVER_COMMUNICATION_REP],ZMQ_REP);
126 try
127 {
128 fSockets[SERVER_COMMUNICATION_REP]->bind(Form("tcp://*:%d",fStorageServerPort));
129 }
130 catch (const zmq::error_t& e)
131 {
132 cout<<"MANAGER -- "<<e.what()<<endl;
133 return 0;
134 }
135 }
136 break;
137 case CLIENT_COMMUNICATION_REQ:
138 {
139 fSockets[CLIENT_COMMUNICATION_REQ] =
140 new socket_t(*fContexts[CLIENT_COMMUNICATION_REQ],ZMQ_REQ);
141 try
142 {
143 fSockets[CLIENT_COMMUNICATION_REQ]->connect(Form("tcp://%s:%d",fStorageServer.c_str(), fStorageClientPort));
144 }
145 catch (const zmq::error_t& e)
146 {
147 cout<<"MANAGER -- "<<e.what()<<endl;
148 return 0;
149 }
150 }
151 break;
152 case CLIENT_COMMUNICATION_REP:
153 {
154 fSockets[CLIENT_COMMUNICATION_REP] =
155 new socket_t(*fContexts[CLIENT_COMMUNICATION_REP],ZMQ_REP);
156 try
157 {
158 fSockets[CLIENT_COMMUNICATION_REP]->bind(Form("tcp://*:%d",fStorageClientPort));
159 }
160 catch (const zmq::error_t& e)
161 {
162 cout<<"MANAGER -- "<<e.what()<<endl;
163 return 0;
164 }
165 }
166 break;
167 case EVENTS_SERVER_PUB:
168 {
169 fSockets[EVENTS_SERVER_PUB] =
170 new socket_t(*fContexts[EVENTS_SERVER_PUB],ZMQ_PUB);
171 try
172 {
173 fSockets[EVENTS_SERVER_PUB]->bind(Form("tcp://*:%d",fEventServerPort));
174 }
175 catch (const zmq::error_t& e)
176 {
177 cout<<"MANAGER -- "<<e.what()<<endl;
178 return 0;
179 }
180 }
181 break;
182 case EVENTS_SERVER_SUB:
183 {
184 fSockets[EVENTS_SERVER_SUB] =
185 new socket_t(*fContexts[EVENTS_SERVER_SUB],ZMQ_SUB);
186 fSockets[EVENTS_SERVER_SUB]->setsockopt(ZMQ_SUBSCRIBE,"",0);
187 try
188 {
189 fSockets[EVENTS_SERVER_SUB]->connect(Form("tcp://%s:%d",fEventServer.c_str(),fEventServerPort));
190 }
191 catch (const zmq::error_t& e)
192 {
193 cout<<"MANAGER -- "<<e.what()<<endl;
194 return 0;
195
196 }
197 }
198 break;
199 case XML_PUB:
200 {
201 fSockets[XML_PUB] =
202 new socket_t(*fContexts[XML_PUB],ZMQ_PUB);
203 try
204 {
205 fSockets[XML_PUB]->bind(Form("tcp://*:%d",fXmlServerPort));
206 }
207 catch (const zmq::error_t& e)
208 {
209 cout<<"MANAGER -- "<<e.what()<<endl;
210 return 0;
211 }
212 }
a7f93de6 213 break;
214 case ITS_POINTS_PUB:
215 {
216 fSockets[ITS_POINTS_PUB] =
217 new socket_t(*fContexts[ITS_POINTS_PUB],ZMQ_PUB);
218 try
219 {
220 fSockets[ITS_POINTS_PUB]->bind(Form("tcp://*:%d",fItsPointsServerPort));
221 }
222 catch (const zmq::error_t& e)
223 {
224 cout<<"MANAGER -- "<<e.what()<<endl;
225 return 0;
226 }
227 }
228 break;
229 case ITS_POINTS_SUB:
230 {
231 fSockets[ITS_POINTS_SUB] =
232 new socket_t(*fContexts[ITS_POINTS_SUB],ZMQ_SUB);
233 fSockets[ITS_POINTS_SUB]->setsockopt(ZMQ_SUBSCRIBE,"",0);
234 try
235 {
236 fSockets[ITS_POINTS_SUB]->connect(Form("tcp://%s:%d",fEventServer.c_str(),fItsPointsServerPort));
237 }
238 catch (const zmq::error_t& e)
239 {
240 cout<<"MANAGER -- "<<e.what()<<endl;
241 return 0;
242
243 }
244 }
fae81379 245 break;
246 default:break;
247 }
248 return 1;
164d3d29 249}
250
251void AliStorageEventManager::Send(vector<serverListStruct> list,storageSockets socket)
252{
626a3158 253 //send size of the struct first
254 int numberOfRecords = list.size();
255 message_t message(20);
256 snprintf ((char *)message.data(), 20 ,"%d",numberOfRecords);
efaef546 257 try{
258 fSockets[socket]->send(message);
259 }
260 catch(const zmq::error_t &e)
261 {
262 cout<<"MANAGER -- send vector -- "<<e.what()<<endl;
263 }
264 //if(numberOfRecords==0)return;
fae81379 265 message_t *tmpMessage = new message_t();
fae81379 266
efaef546 267 try{
268 fSockets[socket]->recv(tmpMessage);//empty message just to keep req-rep order
269 }
270 catch(const zmq::error_t &e)
271 {
272 cout<<"MANAGER -- send vector -- "<<e.what()<<endl;
273 }
626a3158 274 // //prepare message with event's list
275 // char *buffer = reinterpret_cast<char*> (&list[0]);
276 // message_t *reply = new message_t((void*)buffer,
277 // sizeof(serverListStruct)*numberOfRecords,0);
278 // fSockets[socket]->send(*reply);
279 // if(reply){delete reply;}
280
281 message_t reply(sizeof(serverListStruct)*numberOfRecords);
282 memcpy(reply.data(), reinterpret_cast<const char*> (&list[0]), sizeof(serverListStruct)*numberOfRecords);
283
efaef546 284 try{
285 fSockets[socket]->send(reply);
286 }
287 catch(const zmq::error_t &e)
288 {
289 cout<<"MANAGER -- send vector -- "<<e.what()<<endl;
290 }
fae81379 291 if(tmpMessage){delete tmpMessage;}
164d3d29 292}
293
294void AliStorageEventManager::Send(struct serverRequestStruct *request,storageSockets socket)
295{
626a3158 296 char *buffer = (char*)(request);
297 message_t *requestMessage = new message_t((void*)buffer,
298 sizeof(struct serverRequestStruct)
299 +sizeof(struct listRequestStruct)
300 +sizeof(struct eventStruct),freeBuff);
efaef546 301 try{
302 fSockets[socket]->send(*requestMessage);
303 }
304 catch(const zmq::error_t &e)
305 {
306 cout<<"MANAGER -- send serverRequestStruct -- "<<e.what()<<endl;
307 }
164d3d29 308}
309
310bool AliStorageEventManager::Send(struct clientRequestStruct *request,storageSockets socket,int timeout)
311{
626a3158 312 pollitem_t items[1] = {{*fSockets[socket],0,ZMQ_POLLIN,0}} ;
313
314
315 char *buffer = (char*)(request);
316 message_t *requestMessage = new message_t((void*)buffer,
317 sizeof(struct clientRequestStruct),freeBuff);
318
efaef546 319 try{
626a3158 320 fSockets[socket]->send(*requestMessage);
321 }
322 catch (const zmq::error_t& e)
323 {
324 cout<<"MANAGER -- "<<e.what()<<endl;
325 cout<<e.num()<<endl;
326 if(fSockets[socket]){delete fSockets[socket];fSockets[socket]=0;}
327
328 CreateSocket(socket);
329 delete requestMessage;
330 return 0;
331
332 }
333 if(timeout>=0)
334 {
335 if(poll (&items[0], 1, timeout)==0)
336 {
337 delete requestMessage;
338 return 0;
339 }
340 }
341 delete requestMessage;
342 return 1;
164d3d29 343}
344
345void AliStorageEventManager::Send(long message,storageSockets socket)
346{
626a3158 347 stringstream streamBuffer;
348 streamBuffer << message;
349 string stringBuffer = streamBuffer.str();
350 char *buffer = (char*)stringBuffer.c_str();
351 message_t *replyMessage = new message_t((void*)buffer,sizeof(stringBuffer),freeBuff);
fae81379 352
efaef546 353 try{
354 fSockets[socket]->send(*replyMessage);
355 }
356 catch(const zmq::error_t &e)
357 {
358 cout<<"MANAGER -- send long -- "<<e.what()<<endl;
359 }
626a3158 360 delete replyMessage;
361 streamBuffer.str(string());
362 streamBuffer.clear();
164d3d29 363}
364
365void AliStorageEventManager::Send(bool message,storageSockets socket)
366{
626a3158 367 char *buffer;
368 if(message==true)
369 {
370 buffer = (char*)("true");
371 }
372 else
373 {
374 buffer = (char*)("false");
375 }
376 message_t *replyMessage = new message_t((void*)buffer,sizeof(buffer),freeBuff);
efaef546 377 try{
378 fSockets[socket]->send(*replyMessage);
379 }
380 catch(const zmq::error_t &e)
381 {
382 cout<<"MANAGER -- send bool -- "<<e.what()<<endl;
383 }
626a3158 384 delete replyMessage;
164d3d29 385}
386
387void AliStorageEventManager::Send(AliESDEvent *event, storageSockets socket)
388{
e9ce60b6 389 if(!event){return;}
390
626a3158 391 TMessage tmess(kMESS_OBJECT);
392 tmess.Reset();
393 tmess.WriteObject(event);
e9ce60b6 394 // TMessage::EnableSchemaEvolutionForAll(kTRUE);
626a3158 395
396 int bufsize = tmess.Length();
397 char* buf = (char*) malloc(bufsize * sizeof(char));
398 memcpy(buf, tmess.Buffer(), bufsize);
399
400 message_t message((void*)buf, bufsize, freeBuff);
efaef546 401 try{
402 fSockets[socket]->send(message);
403 }
404 catch(const zmq::error_t &e)
405 {
406 cout<<"MANAGER -- send AliESDEvent -- "<<e.what()<<endl;
407 }
164d3d29 408}
409
a7f93de6 410void AliStorageEventManager::Send(TFile *file, storageSockets socket)
411{
412 cout<<"sending tfile to socket:"<<endl;
413 TTree *tree;
414
415 if(file)
416 {
417 file->ls();
418 cout<<"1"<<endl;
419 //std::string *dfTitle = new std::string(file->GetListOfKeys()->Last()->GetTitle());
420 string *name = new string(file->GetListOfKeys()->Last()->GetTitle());
421 cout<<"treeTitle:"<<name->data()<<endl;
422
423 tree = (TTree*)((TDirectoryFile*)file->Get(name->data()))->Get("TreeR");
424
425
426 // TDirectoryFile *df = (TDirectoryFile*)file->Get(dfTitle->data());
427 //if(df)
428 //{
429 // cout<<"directory file extracted"<<endl;
430 //tree = (TTree*)df->Get("TreeR");
431 cout<<"2"<<endl;
432 tree->Branch("event",&name);
433 cout<<"branch created:"<<endl;
434 tree->Print();
435 tree->Fill();
436 cout<<"ttree filled with name"<<endl;
437 // delete df;
438 //}
439 tree->Print();
440 }
441 else
442 {
443 tree = NULL;
444 cout<<"file is empty"<<endl;
445 }
446
447 //TMessage::EnableSchemaEvolutionForAll(kTRUE);
448 TMessage tmess(kMESS_OBJECT);
449 tmess.Reset();
450 tmess.WriteObject(tree);
451 cout<<"file written to tmessage"<<endl;
452
453 int bufsize = tmess.Length();
454 char* buf = (char*) malloc(bufsize * sizeof(char));
455 memcpy(buf, tmess.Buffer(), bufsize);
456 cout<<"messaged copied to buffer"<<endl;
457
458 message_t message((void*)buf, bufsize, freeBuff);
459 cout<<"message_t created"<<endl;
460 try{
461 fSockets[socket]->send(message);
462 }
463 catch(const zmq::error_t &e)
464 {
465 cout<<"MANAGER -- send TFile -- "<<e.what()<<endl;
466 }
467 //if(tree){delete tree;}
468}
469
470void AliStorageEventManager::Send(struct recPointsStruct *files, storageSockets socket)
471{
472 for(int i=0;i<10;i++){Send(files->files[i],socket);}
473}
164d3d29 474void AliStorageEventManager::SendAsXml(AliESDEvent *event,storageSockets socket)
475{
626a3158 476 cout<<"SENDING AS XML"<<endl;
477 stringstream bufferStream;
478 bufferStream << "<?xml version=\"1.0\" encoding=\"utf-8\"?>"<<endl;
479 bufferStream << "<ESD xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xmlns:xsd=\"http://www.w3.org/2001/XMLSchema\">" << endl;
480
481 for(int i=0;i<event->GetNumberOfTracks();i++)
482 {
483 AliESDtrack *track = event->GetTrack(i);
e9ce60b6 484 AliKalmanTrack *ITStrack = track->GetITStrack();
485 const AliTrackPointArray *array = track->GetTrackPointArray();
486
487 bufferStream << "\t<track mass=\""<<track->GetMass()<<"\"";
488 // bufferStream << "\t<track esdpid=\""<<track->GetESDpid();
489 bufferStream << "\t pid=\""<<track->PID()<<"\"";
490 bufferStream << "\t energy=\""<<track->E()<<"\"";
491 bufferStream << "\t volumeID=\""<<array->GetVolumeID()<<"\">" <<endl;
492
493
494
495
626a3158 496
497 if(array)
498 {
499 const float *x = array->GetX();
500 const float *y = array->GetY();
501 const float *z = array->GetZ();
502 int n = array->GetNPoints();
503
504 for(int j=0;j<n;j++)
505 {
e9ce60b6 506 bufferStream <<"\t\t<point volumeID=\""<<array->GetVolumeID()[j]<<"\">"<<endl;
626a3158 507 bufferStream <<"\t\t\t<x>"<< x[j] <<"</x>"<<endl;
508 bufferStream <<"\t\t\t<y>"<< y[j] <<"</y>"<<endl;
509 bufferStream <<"\t\t\t<z>"<< z[j] <<"</z>"<<endl;
510 bufferStream <<"\t\t</point>"<<endl;
511 }
512 }
513 else cout<<"no array"<<endl;
514
515 bufferStream << "\t</track>"<<endl;
516 }
517
518 bufferStream << "</ESD>"<<endl;
519
520 string bufferString = bufferStream.str();
521 message_t message(bufferString.size());
522 memcpy (message.data(), bufferString.data(), bufferString.size());
523
efaef546 524 try{
525 fSockets[socket]->send(message);
526 }
527 catch(const zmq::error_t &e)
528 {
529 cout<<"MANAGER -- send send xml -- "<<e.what()<<endl;
530 }
626a3158 531 cout<<"xml sent"<<endl;
164d3d29 532}
533
d2416c53 534vector<serverListStruct> AliStorageEventManager::GetServerListVector(storageSockets socket, int timeout)
164d3d29 535{
d2416c53 536 pollitem_t items[1] = {{*fSockets[socket],0,ZMQ_POLLIN,0}} ;
537 if(timeout>=0){if(poll (&items[0], 1, timeout)==0){vector<serverListStruct> emptyVector;return emptyVector;}}
538
626a3158 539 //get size of the incomming message
540 message_t sizeMessage;
efaef546 541
542 try{
543 fSockets[socket]->recv(&sizeMessage);
544 }
545 catch(const zmq::error_t &e)
546 {
547 cout<<"MANAGER -- get vector -- "<<e.what()<<endl;
548 }
626a3158 549 int numberOfRecords;
550 istringstream iss(static_cast<char*>(sizeMessage.data()));
551 iss >> numberOfRecords;
552
553 if(numberOfRecords==0){cout<<"MANAGER -- list is empty"<<endl;}
554
efaef546 555 try{
556 fSockets[socket]->send(*(new message_t()));//receive empty message just to keep req-rep order
557 }
558 catch(const zmq::error_t &e)
559 {
560 cout<<"MANAGER -- get vector -- "<<e.what()<<endl;
561 }
626a3158 562 //get list of events
563 message_t *response = new message_t(sizeof(serverListStruct)*numberOfRecords);
efaef546 564 try{
565 fSockets[socket]->recv(response);
566 }
567 catch(const zmq::error_t &e)
568 {
569 cout<<"MANAGER -- get vector -- "<<e.what()<<endl;
570 }
626a3158 571
572 vector<serverListStruct> receivedList(static_cast<serverListStruct*>(response->data()), static_cast<serverListStruct*>(response->data()) + numberOfRecords);
573
574 if (response) {delete response;}
575 return receivedList;
164d3d29 576}
577
fae81379 578AliESDEvent* AliStorageEventManager::GetEvent(storageSockets socket,int timeout)
164d3d29 579{
626a3158 580 pollitem_t items[1] = {{*fSockets[socket],0,ZMQ_POLLIN,0}} ;
f5e8dfd2 581 if(timeout>=0){
582 try{(poll (&items[0], 1, timeout)==0);}
583 catch(const zmq::error_t &e){
584 cout<<"EVENT MANAGER -- GetEvent():"<<e.what()<<endl;
585 return NULL;
586 }
587 }
626a3158 588 message_t* message = new message_t();
589
590 try
591 {
592 fSockets[socket]->recv(message);
593 }
594 catch (const zmq::error_t& e)
595 {
596 cout<<"MANAGER -- "<<e.what()<<endl;
597 return NULL;
598 }
626a3158 599 TBufferFile *mess = new TBufferFile(TBuffer::kRead,
600 message->size()+sizeof(UInt_t),
601 message->data());
602 mess->InitMap();
603 mess->ReadClass();// get first the class stored in message
604 mess->SetBufferOffset(sizeof(UInt_t) + sizeof(kMESS_OBJECT));
605 mess->ResetMap();
606
607 AliESDEvent* data = (AliESDEvent*)(mess->ReadObjectAny(AliESDEvent::Class()));
608
609 if (data)
610 {
611 data->GetStdContent();
612 if(message){delete message;}
613 return data;
614 }
615 else
616 {
617 if(message){delete message;}
618 return NULL;
619 }
164d3d29 620}
621
a7f93de6 622TFile* AliStorageEventManager::GetFile(storageSockets socket,int timeout)
623{
624 cout<<"get file"<<endl;
625 pollitem_t items[1] = {{*fSockets[socket],0,ZMQ_POLLIN,0}} ;
626 if(timeout>=0){
627 try{(poll (&items[0], 1, timeout)==0);}
628 catch(const zmq::error_t &e){
629 cout<<"EVENT MANAGER -- GetFile():"<<e.what()<<endl;
630 return NULL;
631 }
632 }
633 message_t* message = new message_t();
634 cout<<"polling passed"<<endl;
635
636 try
637 {
638 cout<<"waiting for file on socket:"<<socket<<endl;
639 fSockets[socket]->recv(message);
640 }
641 catch (const zmq::error_t& e)
642 {
643 cout<<"MANAGER -- "<<e.what()<<endl;
644 return NULL;
645 }
646 cout<<"createing buffer file"<<endl;
647 TBufferFile *mess = new TBufferFile(TBuffer::kRead,
648 message->size()+sizeof(UInt_t),
649 message->data());
650
651 //TMessage *mess = new TMessage();
652 //mess->SetReadMode();
653
654 mess->InitMap();
655 mess->ReadClass();
656 mess->SetBufferOffset(sizeof(UInt_t) + sizeof(kMESS_OBJECT));
657 mess->ResetMap();
658
659 //mess->ReadBuf(message->data(),message->size()+sizeof(UInt_t));
660
661 cout<<"reading file from buffer"<<endl;
662 TTree* tree = (TTree*)(mess->ReadObjectAny(TTree::Class()));
663 //TFile* data = (TFile*)mess->ReadObject(TFile::Class());
664
665 if(tree)
666 {
667 cout<<"received a tree:"<<endl;
668 tree->Print();
669 std::string *dfTitle = new std::string();
670 tree->SetBranchAddress("event",&dfTitle);
671 tree->GetEntry(0);
672
673 cout<<"setting df's name to:"<<dfTitle->data()<<endl;
674 TDirectoryFile *df = new TDirectoryFile(dfTitle->data(),dfTitle->data());
675 df->Add(tree);
676 cout<<"added tree to directory file"<<endl;
677
678 TFile *file = new TFile();
679 df->Write();
680
681 cout<<"created file:"<<endl;
682 file->ls();
683 if(message){delete message;}
684 if(df){delete df;}
685 if(tree){delete tree;}
686 return file;
687 }
688 else
689 {
690 cout<<"no tree found"<<endl;
691 if(message){delete message;}
692 return NULL;
693 }
694}
695
696struct recPointsStruct* AliStorageEventManager::GetFiles(storageSockets socket,int timeout)
697{
698 struct recPointsStruct *files = new struct recPointsStruct;
699
700 for(int i=0;i<10;i++)
701 {
702 files->files[i] = GetFile(socket,timeout);
703 }
704 return files;
705}
706
164d3d29 707struct serverRequestStruct* AliStorageEventManager::GetServerStruct(storageSockets socket)
708{
626a3158 709 struct serverRequestStruct *request = new struct serverRequestStruct;
710 message_t *requestMessage = new message_t();
efaef546 711 try{
712 fSockets[socket]->recv(requestMessage);
713 }
714 catch(const zmq::error_t &e)
715 {
716 cout<<"MANAGER -- get serverRequestStruct -- "<<e.what()<<endl;
e9ce60b6 717 request->messageType = -1;
718 return request;
efaef546 719 }
626a3158 720 request = static_cast<struct serverRequestStruct*>(requestMessage->data());
721 return request;
164d3d29 722}
723
186c4b6e 724struct clientRequestStruct* AliStorageEventManager::GetClientStruct(storageSockets socket,int timeout)
164d3d29 725{
186c4b6e 726 pollitem_t items[1] = {{*fSockets[socket],0,ZMQ_POLLIN,0}} ;
727 if(timeout>=0){if(poll (&items[0], 1, timeout)==0){return NULL;}}
728
626a3158 729 struct clientRequestStruct *request = new struct clientRequestStruct;
730 message_t *requestMessage = new message_t();
efaef546 731 try{
732 fSockets[socket]->recv(requestMessage);
733 }
734 catch(const zmq::error_t &e)
735 {
736 cout<<"MANAGER -- get clientRequestStruct -- "<<e.what()<<endl;
737 }
626a3158 738 request = static_cast<struct clientRequestStruct*>(requestMessage->data());
739 return request;
164d3d29 740}
741
742bool AliStorageEventManager::GetBool(storageSockets socket)
743{
626a3158 744 message_t *response = new message_t();
efaef546 745 try{
746 fSockets[socket]->recv(response);
747 }
748 catch(const zmq::error_t &e)
749 {
750 cout<<"MANAGER -- get bool -- "<<e.what()<<endl;
751 }
626a3158 752 char *result = (char*)response->data();
753
754 if(!strcmp("true",result)){return true;}
755 else{return false;}
164d3d29 756}
757
758long AliStorageEventManager::GetLong(storageSockets socket)
759{
626a3158 760 message_t *responseMessage = new message_t();
efaef546 761 try{
762 fSockets[socket]->recv(responseMessage);
763 }
764 catch(const zmq::error_t &e)
765 {
766 cout<<"MANAGER -- get long -- "<<e.what()<<endl;
767 }
fae81379 768
769 long result = 0;
770
771 if(responseMessage)
772 {
773 result = (long)atoi(static_cast<char*>(responseMessage->data()));
774 delete responseMessage;
775 }
626a3158 776 return result;
164d3d29 777}
778