Fixing file after rebasing master
[u/mrichter/AliRoot.git] / MONITOR / MONITORzmq / AliStorageEventManager.cxx
1 #include "AliStorageEventManager.h"
2
3 #include <iostream>
4 #include <sstream>
5 #include <fstream>
6
7 #include <TList.h>
8 #include <TStreamerInfo.h>
9 #include <TThread.h>
10
11 #include "zmq.hpp"
12
13 #include "AliESDEvent.h"
14 #include "AliESDtrack.h"
15 #include "AliTrackPointArray.h"
16 #include "AliESDfriendTrack.h"
17 #include "AliExternalTrackParam.h"
18 #include "AliTrackerBase.h"
19 #include "AliTracker.h"
20
21 using namespace zmq;
22 using namespace std;
23
24 AliStorageEventManager *AliStorageEventManager::fManagerInstance = 0;
25
26 AliStorageEventManager::AliStorageEventManager()
27 {
28     //read config file
29     TThread::Lock();
30     ifstream configFile (GetConfigFilePath());
31     
32     if (configFile.is_open())
33     {
34         string line;
35         int from,to;
36         while(configFile.good())
37         {
38             getline(configFile,line);
39             from = line.find("\"")+1;
40             to = line.find_last_of("\"");
41             if(line.find("STORAGE_SERVER=")==0)
42             {
43                 fStorageServer=line.substr(from,to-from);
44             }
45             else if(line.find("EVENT_SERVER=")==0)
46             {
47                 fEventServer=line.substr(from,to-from);
48             }
49             else if(line.find("STORAGE_SERVER_PORT=")==0)
50             {
51                 fStorageServerPort=atoi(line.substr(from,to-from).c_str());
52             }
53             else if(line.find("EVENT_SERVER_PORT=")==0)
54             {
55                 fEventServerPort=atoi(line.substr(from,to-from).c_str());
56             }
57             else if(line.find("STORAGE_CLIENT_PORT=")==0)
58             {
59                 fStorageClientPort=atoi(line.substr(from,to-from).c_str());
60             }
61             else if(line.find("XML_SERVER_PORT=")==0)
62             {
63                 fXmlServerPort=atoi(line.substr(from,to-from).c_str());
64             }
65             else if(line.find("ITS_POINTS_SERVER_PORT=")==0)
66             {
67                 fItsPointsServerPort=atoi(line.substr(from,to-from).c_str());
68                 cout<<"ITS port is:"<<fItsPointsServerPort<<endl;
69             }
70         }
71         if(configFile.eof()){configFile.clear();}
72         configFile.close();
73     }
74     else{cout<<"EVENT MANAGER -- Unable to open config file"<<endl;}
75     TThread::UnLock();
76     
77     for(int i=0;i<NUMBER_OF_SOCKETS;i++){fContexts[i] = new context_t();}
78 }
79 AliStorageEventManager::~AliStorageEventManager()
80 {
81     if(fManagerInstance){delete fManagerInstance;fManagerInstance=0;}
82 }
83
84 AliStorageEventManager* AliStorageEventManager::GetEventManagerInstance()
85 {
86     TThread::Lock();
87     if(fManagerInstance==0)
88     {
89         fManagerInstance = new AliStorageEventManager();
90     }
91     TThread::UnLock();
92     return fManagerInstance;
93 }
94
95
96 void freeBuff (void *data, void *hint)
97 {
98     //  free(data);
99 }
100
101 bool AliStorageEventManager::CreateSocket(storageSockets socket)
102 {
103     cout<<"Creating socket:"<<socket<<endl;
104     
105     switch(socket)
106     {
107         case SERVER_COMMUNICATION_REQ:
108         {
109             fSockets[SERVER_COMMUNICATION_REQ] =
110             new socket_t(*fContexts[SERVER_COMMUNICATION_REQ],ZMQ_REQ);
111             try
112             {
113                 fSockets[SERVER_COMMUNICATION_REQ]->connect(Form("tcp://%s:%d",fStorageServer.c_str(),fStorageServerPort));
114             }
115             catch (const zmq::error_t& e)
116             {
117                 cout<<"MANAGER -- "<<e.what()<<endl;
118                 return 0;
119             }
120         }
121             break;
122         case SERVER_COMMUNICATION_REP:
123         {
124             fSockets[SERVER_COMMUNICATION_REP] =
125             new socket_t(*fContexts[SERVER_COMMUNICATION_REP],ZMQ_REP);
126             try
127             {
128                 fSockets[SERVER_COMMUNICATION_REP]->bind(Form("tcp://*:%d",fStorageServerPort));
129             }
130             catch (const zmq::error_t& e)
131             {
132                 cout<<"MANAGER -- "<<e.what()<<endl;
133                 return 0;
134             }
135         }
136             break;
137         case CLIENT_COMMUNICATION_REQ:
138         {
139             fSockets[CLIENT_COMMUNICATION_REQ] =
140             new socket_t(*fContexts[CLIENT_COMMUNICATION_REQ],ZMQ_REQ);
141             try
142             {
143                 fSockets[CLIENT_COMMUNICATION_REQ]->connect(Form("tcp://%s:%d",fStorageServer.c_str(), fStorageClientPort));
144             }
145             catch (const zmq::error_t& e)
146             {
147                 cout<<"MANAGER -- "<<e.what()<<endl;
148                 return 0;
149             }
150         }
151             break;
152         case CLIENT_COMMUNICATION_REP:
153         {
154             fSockets[CLIENT_COMMUNICATION_REP] =
155             new socket_t(*fContexts[CLIENT_COMMUNICATION_REP],ZMQ_REP);
156             try
157             {
158                 fSockets[CLIENT_COMMUNICATION_REP]->bind(Form("tcp://*:%d",fStorageClientPort));
159             }
160             catch (const zmq::error_t& e)
161             {
162                 cout<<"MANAGER -- "<<e.what()<<endl;
163                 return 0;
164             }
165         }
166             break;
167         case EVENTS_SERVER_PUB:
168         {
169             fSockets[EVENTS_SERVER_PUB] =
170             new socket_t(*fContexts[EVENTS_SERVER_PUB],ZMQ_PUB);
171             try
172             {
173                 fSockets[EVENTS_SERVER_PUB]->bind(Form("tcp://*:%d",fEventServerPort));
174             }
175             catch (const zmq::error_t& e)
176             {
177                 cout<<"MANAGER -- "<<e.what()<<endl;
178                 return 0;
179             }
180         }
181             break;
182         case EVENTS_SERVER_SUB:
183         {
184             fSockets[EVENTS_SERVER_SUB] =
185             new socket_t(*fContexts[EVENTS_SERVER_SUB],ZMQ_SUB);
186             fSockets[EVENTS_SERVER_SUB]->setsockopt(ZMQ_SUBSCRIBE,"",0);
187             try
188             {
189                 fSockets[EVENTS_SERVER_SUB]->connect(Form("tcp://%s:%d",fEventServer.c_str(),fEventServerPort));
190             }
191             catch (const zmq::error_t& e)
192             {
193                 cout<<"MANAGER -- "<<e.what()<<endl;
194                 return 0;
195                 
196             }
197         }
198             break;
199         case XML_PUB:
200         {
201             fSockets[XML_PUB] =
202             new socket_t(*fContexts[XML_PUB],ZMQ_PUB);
203             try
204             {
205                 fSockets[XML_PUB]->bind(Form("tcp://*:%d",fXmlServerPort));
206             }
207             catch (const zmq::error_t& e)
208             {
209                 cout<<"MANAGER -- "<<e.what()<<endl;
210                 return 0;
211             }
212         }
213         break;
214     case ITS_POINTS_PUB:
215         {
216             fSockets[ITS_POINTS_PUB] =
217             new socket_t(*fContexts[ITS_POINTS_PUB],ZMQ_PUB);
218             try
219             {
220                 fSockets[ITS_POINTS_PUB]->bind(Form("tcp://*:%d",fItsPointsServerPort));
221             }
222             catch (const zmq::error_t& e)
223             {
224                 cout<<"MANAGER -- "<<e.what()<<endl;
225                 return 0;
226             }
227         }
228         break;
229     case ITS_POINTS_SUB:
230         {
231             fSockets[ITS_POINTS_SUB] =
232             new socket_t(*fContexts[ITS_POINTS_SUB],ZMQ_SUB);
233             fSockets[ITS_POINTS_SUB]->setsockopt(ZMQ_SUBSCRIBE,"",0);
234             try
235             {
236               fSockets[ITS_POINTS_SUB]->connect(Form("tcp://%s:%d",fEventServer.c_str(),fItsPointsServerPort));
237             }
238             catch (const zmq::error_t& e)
239             {
240                 cout<<"MANAGER -- "<<e.what()<<endl;
241                 return 0;
242                 
243             }
244         }
245             break;
246         default:break;
247     }
248     return 1;
249 }
250
251 void AliStorageEventManager::Send(vector<serverListStruct> list,storageSockets socket)
252 {
253     //send size of the struct first
254     int numberOfRecords = list.size();
255     message_t message(20);
256     snprintf ((char *)message.data(), 20 ,"%d",numberOfRecords);
257     try{
258         fSockets[socket]->send(message);
259     }
260     catch(const zmq::error_t &e)
261     {
262         cout<<"MANAGER -- send vector -- "<<e.what()<<endl;
263     }
264     //if(numberOfRecords==0)return;
265     message_t *tmpMessage = new message_t();
266     
267     try{
268         fSockets[socket]->recv(tmpMessage);//empty message just to keep req-rep order
269     }
270     catch(const zmq::error_t &e)
271     {
272         cout<<"MANAGER -- send vector -- "<<e.what()<<endl;
273     }
274     // //prepare message with event's list
275     // char *buffer = reinterpret_cast<char*> (&list[0]);
276     // message_t *reply = new message_t((void*)buffer,
277     //                sizeof(serverListStruct)*numberOfRecords,0);
278     // fSockets[socket]->send(*reply);
279     // if(reply){delete reply;}
280     
281     message_t reply(sizeof(serverListStruct)*numberOfRecords);
282     memcpy(reply.data(), reinterpret_cast<const char*> (&list[0]), sizeof(serverListStruct)*numberOfRecords);
283     
284     try{
285         fSockets[socket]->send(reply);
286     }
287     catch(const zmq::error_t &e)
288     {
289         cout<<"MANAGER -- send vector -- "<<e.what()<<endl;
290     }
291     if(tmpMessage){delete tmpMessage;}
292 }
293
294 void AliStorageEventManager::Send(struct serverRequestStruct *request,storageSockets socket)
295 {
296     char *buffer = (char*)(request);
297     message_t *requestMessage = new message_t((void*)buffer,
298                                               sizeof(struct serverRequestStruct)
299                                               +sizeof(struct listRequestStruct)
300                                               +sizeof(struct eventStruct),freeBuff);
301     try{
302         fSockets[socket]->send(*requestMessage);
303     }
304     catch(const zmq::error_t &e)
305     {
306         cout<<"MANAGER -- send serverRequestStruct -- "<<e.what()<<endl;
307     }
308 }
309
310 bool AliStorageEventManager::Send(struct clientRequestStruct *request,storageSockets socket,int timeout)
311 {
312     pollitem_t items[1] =  {{*fSockets[socket],0,ZMQ_POLLIN,0}} ;
313     
314     
315     char *buffer = (char*)(request);
316     message_t *requestMessage = new message_t((void*)buffer,
317                                               sizeof(struct clientRequestStruct),freeBuff);
318     
319     try{
320         fSockets[socket]->send(*requestMessage);
321     }
322     catch (const zmq::error_t& e)
323     {
324         cout<<"MANAGER -- "<<e.what()<<endl;
325         cout<<e.num()<<endl;
326         if(fSockets[socket]){delete fSockets[socket];fSockets[socket]=0;}
327         
328         CreateSocket(socket);
329         delete requestMessage;
330         return 0;
331         
332     }
333     if(timeout>=0)
334     {
335         if(poll (&items[0], 1, timeout)==0)
336         {
337             delete requestMessage;
338             return 0;
339         }
340     }
341     delete requestMessage;
342     return 1;
343 }
344
345 void AliStorageEventManager::Send(long message,storageSockets socket)
346 {
347     stringstream streamBuffer;
348     streamBuffer << message;
349     string stringBuffer = streamBuffer.str();
350     char *buffer = (char*)stringBuffer.c_str();
351     message_t *replyMessage = new message_t((void*)buffer,sizeof(stringBuffer),freeBuff);
352     
353     try{
354         fSockets[socket]->send(*replyMessage);
355     }
356     catch(const zmq::error_t &e)
357     {
358         cout<<"MANAGER -- send long -- "<<e.what()<<endl;
359     }
360     delete replyMessage;
361     streamBuffer.str(string());
362     streamBuffer.clear();
363 }
364
365 void AliStorageEventManager::Send(bool message,storageSockets socket)
366 {
367     char *buffer;
368     if(message==true)
369     {
370         buffer = (char*)("true");
371     }
372     else
373     {
374         buffer = (char*)("false");
375     }
376     message_t *replyMessage = new message_t((void*)buffer,sizeof(buffer),freeBuff);
377     try{
378         fSockets[socket]->send(*replyMessage);
379     }
380     catch(const zmq::error_t &e)
381     {
382         cout<<"MANAGER -- send bool -- "<<e.what()<<endl;
383     }
384     delete replyMessage;
385 }
386
387 void AliStorageEventManager::Send(AliESDEvent *event, storageSockets socket)
388 {
389   if(!event){return;}
390
391     TMessage tmess(kMESS_OBJECT);
392     tmess.Reset();
393     tmess.WriteObject(event);
394     //  TMessage::EnableSchemaEvolutionForAll(kTRUE);
395     
396     int bufsize = tmess.Length();
397     char* buf = (char*) malloc(bufsize * sizeof(char));
398     memcpy(buf, tmess.Buffer(), bufsize);
399     
400     message_t message((void*)buf, bufsize, freeBuff);
401     try{
402         fSockets[socket]->send(message);
403     }
404     catch(const zmq::error_t &e)
405     {
406         cout<<"MANAGER -- send AliESDEvent -- "<<e.what()<<endl;
407     }
408 }
409
410 void AliStorageEventManager::Send(TFile *file, storageSockets socket)
411 {
412   cout<<"sending tfile to socket:"<<endl;
413   TTree *tree;
414
415   if(file)
416     {
417       file->ls();
418       cout<<"1"<<endl;
419       //std::string *dfTitle = new std::string(file->GetListOfKeys()->Last()->GetTitle());
420       string *name = new string(file->GetListOfKeys()->Last()->GetTitle());
421       cout<<"treeTitle:"<<name->data()<<endl;
422       
423       tree = (TTree*)((TDirectoryFile*)file->Get(name->data()))->Get("TreeR");
424
425
426         // TDirectoryFile *df = (TDirectoryFile*)file->Get(dfTitle->data());
427         //if(df)
428         //{
429         //        cout<<"directory file extracted"<<endl;
430         //tree = (TTree*)df->Get("TreeR");
431         cout<<"2"<<endl;
432       tree->Branch("event",&name);
433       cout<<"branch created:"<<endl;
434       tree->Print();
435       tree->Fill();
436       cout<<"ttree filled with name"<<endl;
437           // delete df;
438           //}
439       tree->Print();
440     }
441   else
442     {
443       tree = NULL;
444       cout<<"file is empty"<<endl;
445     }
446
447   //TMessage::EnableSchemaEvolutionForAll(kTRUE);
448   TMessage tmess(kMESS_OBJECT);
449   tmess.Reset();
450   tmess.WriteObject(tree);
451   cout<<"file written to tmessage"<<endl;
452  
453   int bufsize = tmess.Length();
454   char* buf = (char*) malloc(bufsize * sizeof(char));
455   memcpy(buf, tmess.Buffer(), bufsize);
456   cout<<"messaged copied to buffer"<<endl;
457       
458   message_t message((void*)buf, bufsize, freeBuff);
459   cout<<"message_t created"<<endl;
460   try{
461     fSockets[socket]->send(message);
462   }
463   catch(const zmq::error_t &e)
464     {
465       cout<<"MANAGER -- send TFile -- "<<e.what()<<endl;
466     }
467   //if(tree){delete tree;}
468 }
469
470 void AliStorageEventManager::Send(struct recPointsStruct *files, storageSockets socket)
471 {
472   for(int i=0;i<10;i++){Send(files->files[i],socket);}
473 }
474 void AliStorageEventManager::SendAsXml(AliESDEvent *event,storageSockets socket)
475 {
476     cout<<"SENDING AS XML"<<endl;
477     stringstream bufferStream;
478     bufferStream << "<?xml version=\"1.0\" encoding=\"utf-8\"?>"<<endl;
479     bufferStream << "<ESD xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xmlns:xsd=\"http://www.w3.org/2001/XMLSchema\">" << endl;
480     
481     for(int i=0;i<event->GetNumberOfTracks();i++)
482     {
483         AliESDtrack *track = event->GetTrack(i);
484         AliKalmanTrack *ITStrack = track->GetITStrack();
485         const AliTrackPointArray *array = track->GetTrackPointArray();
486       
487         bufferStream << "\t<track mass=\""<<track->GetMass()<<"\"";
488         //      bufferStream << "\t<track esdpid=\""<<track->GetESDpid();
489         bufferStream << "\t pid=\""<<track->PID()<<"\"";
490         bufferStream << "\t energy=\""<<track->E()<<"\"";
491         bufferStream << "\t volumeID=\""<<array->GetVolumeID()<<"\">" <<endl;
492
493         
494
495       
496         
497         if(array)
498         {
499             const float *x = array->GetX();
500             const float *y = array->GetY();
501             const float *z = array->GetZ();
502             int n = array->GetNPoints();
503             
504             for(int j=0;j<n;j++)
505             {
506               bufferStream <<"\t\t<point volumeID=\""<<array->GetVolumeID()[j]<<"\">"<<endl;
507                 bufferStream <<"\t\t\t<x>"<< x[j] <<"</x>"<<endl;
508                 bufferStream <<"\t\t\t<y>"<< y[j] <<"</y>"<<endl;
509                 bufferStream <<"\t\t\t<z>"<< z[j] <<"</z>"<<endl;
510                 bufferStream <<"\t\t</point>"<<endl;
511             }
512         }
513         else cout<<"no array"<<endl;
514         
515         bufferStream << "\t</track>"<<endl;
516     }
517     
518     bufferStream << "</ESD>"<<endl;
519     
520     string bufferString = bufferStream.str();
521     message_t message(bufferString.size());
522     memcpy (message.data(), bufferString.data(), bufferString.size());
523     
524     try{
525         fSockets[socket]->send(message);
526     }
527     catch(const zmq::error_t &e)
528     {
529         cout<<"MANAGER -- send send xml -- "<<e.what()<<endl;
530     }
531     cout<<"xml sent"<<endl;
532 }
533
534 vector<serverListStruct> AliStorageEventManager::GetServerListVector(storageSockets socket, int timeout)
535 {
536   pollitem_t items[1] =  {{*fSockets[socket],0,ZMQ_POLLIN,0}} ;
537   if(timeout>=0){if(poll (&items[0], 1, timeout)==0){vector<serverListStruct> emptyVector;return emptyVector;}}
538
539     //get size of the incomming message
540     message_t sizeMessage;
541     
542     try{
543         fSockets[socket]->recv(&sizeMessage);
544     }
545     catch(const zmq::error_t &e)
546     {
547         cout<<"MANAGER -- get vector -- "<<e.what()<<endl;
548     }
549     int numberOfRecords;
550     istringstream iss(static_cast<char*>(sizeMessage.data()));
551     iss >> numberOfRecords;
552     
553     if(numberOfRecords==0){cout<<"MANAGER -- list is empty"<<endl;}
554     
555     try{
556         fSockets[socket]->send(*(new message_t()));//receive empty message just to keep req-rep order
557     }
558     catch(const zmq::error_t &e)
559     {
560         cout<<"MANAGER -- get vector -- "<<e.what()<<endl;
561     }
562     //get list of events
563     message_t *response = new message_t(sizeof(serverListStruct)*numberOfRecords);
564     try{
565         fSockets[socket]->recv(response);
566     }
567     catch(const zmq::error_t &e)
568     {
569         cout<<"MANAGER -- get vector -- "<<e.what()<<endl;
570     }
571     
572     vector<serverListStruct> receivedList(static_cast<serverListStruct*>(response->data()), static_cast<serverListStruct*>(response->data()) + numberOfRecords);
573     
574     if (response) {delete response;}
575     return receivedList;
576 }
577
578 AliESDEvent* AliStorageEventManager::GetEvent(storageSockets socket,int timeout)
579 {
580     pollitem_t items[1] =  {{*fSockets[socket],0,ZMQ_POLLIN,0}} ;
581     if(timeout>=0){
582         try{(poll (&items[0], 1, timeout)==0);}
583         catch(const zmq::error_t &e){
584           cout<<"EVENT MANAGER -- GetEvent():"<<e.what()<<endl;
585             return NULL;
586           }
587     }
588     message_t* message = new message_t();
589     
590     try
591     {
592         fSockets[socket]->recv(message);
593     }
594     catch (const zmq::error_t& e)
595     {
596         cout<<"MANAGER -- "<<e.what()<<endl;
597         return NULL;
598     }
599     TBufferFile *mess = new TBufferFile(TBuffer::kRead,
600                                         message->size()+sizeof(UInt_t),
601                                         message->data());
602     mess->InitMap();
603     mess->ReadClass();// get first the class stored in message
604     mess->SetBufferOffset(sizeof(UInt_t) + sizeof(kMESS_OBJECT));
605     mess->ResetMap();
606     
607     AliESDEvent* data = (AliESDEvent*)(mess->ReadObjectAny(AliESDEvent::Class()));
608     
609     if (data)
610     {
611         data->GetStdContent();
612         if(message){delete message;}
613         return data;
614     }
615     else
616     {
617         if(message){delete message;}
618         return NULL;
619     }
620 }
621
622 TFile* AliStorageEventManager::GetFile(storageSockets socket,int timeout)
623 {
624   cout<<"get file"<<endl;
625     pollitem_t items[1] =  {{*fSockets[socket],0,ZMQ_POLLIN,0}} ;
626     if(timeout>=0){
627         try{(poll (&items[0], 1, timeout)==0);}
628         catch(const zmq::error_t &e){
629           cout<<"EVENT MANAGER -- GetFile():"<<e.what()<<endl;
630             return NULL;
631           }
632     }
633     message_t* message = new message_t();
634     cout<<"polling passed"<<endl;
635
636     try
637     {
638       cout<<"waiting for file on socket:"<<socket<<endl;
639         fSockets[socket]->recv(message);
640     }
641     catch (const zmq::error_t& e)
642     {
643         cout<<"MANAGER -- "<<e.what()<<endl;
644         return NULL;
645     }
646     cout<<"createing buffer file"<<endl;
647     TBufferFile *mess = new TBufferFile(TBuffer::kRead,
648                                         message->size()+sizeof(UInt_t),
649                                         message->data());
650
651     //TMessage *mess = new TMessage();
652     //mess->SetReadMode();
653    
654     mess->InitMap();
655     mess->ReadClass();
656     mess->SetBufferOffset(sizeof(UInt_t) + sizeof(kMESS_OBJECT));
657     mess->ResetMap();
658
659     //mess->ReadBuf(message->data(),message->size()+sizeof(UInt_t));
660     
661     cout<<"reading file from buffer"<<endl;
662     TTree* tree = (TTree*)(mess->ReadObjectAny(TTree::Class()));
663     //TFile* data = (TFile*)mess->ReadObject(TFile::Class());
664
665     if(tree)
666       {
667         cout<<"received a tree:"<<endl;
668         tree->Print();
669         std::string *dfTitle = new std::string();
670         tree->SetBranchAddress("event",&dfTitle);
671         tree->GetEntry(0);
672
673         cout<<"setting df's name to:"<<dfTitle->data()<<endl;
674         TDirectoryFile *df = new TDirectoryFile(dfTitle->data(),dfTitle->data());
675         df->Add(tree);
676         cout<<"added tree to directory file"<<endl;
677         
678         TFile *file = new TFile();
679         df->Write();
680
681         cout<<"created file:"<<endl;
682         file->ls();
683         if(message){delete message;}
684         if(df){delete df;}
685         if(tree){delete tree;}
686         return file;
687       }
688     else
689       {
690         cout<<"no tree found"<<endl;
691         if(message){delete message;}
692         return NULL;
693       }
694 }
695
696 struct recPointsStruct* AliStorageEventManager::GetFiles(storageSockets socket,int timeout)
697 {
698   struct recPointsStruct *files = new struct recPointsStruct;
699
700   for(int i=0;i<10;i++)
701     {
702       files->files[i] = GetFile(socket,timeout);
703     }
704   return files;
705 }
706
707 struct serverRequestStruct* AliStorageEventManager::GetServerStruct(storageSockets socket)
708 {
709     struct serverRequestStruct *request = new struct serverRequestStruct;
710     message_t *requestMessage = new message_t();
711     try{
712         fSockets[socket]->recv(requestMessage);
713     }
714     catch(const zmq::error_t &e)
715     {
716         cout<<"MANAGER -- get serverRequestStruct -- "<<e.what()<<endl;
717         request->messageType = -1;
718         return request;
719     }
720     request = static_cast<struct serverRequestStruct*>(requestMessage->data());
721     return request;
722 }
723
724 struct clientRequestStruct* AliStorageEventManager::GetClientStruct(storageSockets socket,int timeout)
725 {
726     pollitem_t items[1] =  {{*fSockets[socket],0,ZMQ_POLLIN,0}} ;
727     if(timeout>=0){if(poll (&items[0], 1, timeout)==0){return NULL;}}
728     
729     struct clientRequestStruct *request = new struct clientRequestStruct;
730     message_t *requestMessage = new message_t();
731     try{
732         fSockets[socket]->recv(requestMessage);
733     }
734     catch(const zmq::error_t &e)
735     {
736         cout<<"MANAGER -- get clientRequestStruct -- "<<e.what()<<endl;
737     }
738     request = static_cast<struct clientRequestStruct*>(requestMessage->data());
739     return request;
740 }
741
742 bool AliStorageEventManager::GetBool(storageSockets socket)
743 {
744     message_t *response = new message_t();
745     try{
746         fSockets[socket]->recv(response);
747     }
748     catch(const zmq::error_t &e)
749     {
750         cout<<"MANAGER -- get bool -- "<<e.what()<<endl;
751     }
752     char *result = (char*)response->data();
753     
754     if(!strcmp("true",result)){return true;}
755     else{return false;}
756 }
757
758 long AliStorageEventManager::GetLong(storageSockets socket)
759 {
760     message_t *responseMessage = new message_t();
761     try{
762         fSockets[socket]->recv(responseMessage);
763     }
764     catch(const zmq::error_t &e)
765     {
766         cout<<"MANAGER -- get long -- "<<e.what()<<endl;
767     }
768     
769     long result = 0;
770     
771     if(responseMessage)
772     {
773         result = (long)atoi(static_cast<char*>(responseMessage->data()));
774         delete responseMessage;
775     }
776     return result;
777 }
778