]> git.uio.no Git - u/mrichter/AliRoot.git/blob - STORAGE/AliStorageEventManager.cxx
Untriggered DEtaDPhi: corrections to macros for the train
[u/mrichter/AliRoot.git] / STORAGE / AliStorageEventManager.cxx
1 #include "AliStorageEventManager.h"
2 #include "AliNetMessage.h"
3
4 #include <iostream>
5 #include <sstream>
6
7 #include <TList.h>
8 #include <TStreamerInfo.h>
9
10 #include "zmq.hpp"
11
12 using namespace zmq;
13 using namespace std;
14
15 AliStorageEventManager::AliStorageEventManager(){}
16 AliStorageEventManager::~AliStorageEventManager(){}
17
18 void __freeBuffer (void *data, void *hint)
19 {
20     free(data);
21 }
22
23 void AliStorageEventManager::Send(AliESDEvent *event, socket_t *socket)
24 {
25         AliNetMessage tmess(kMESS_OBJECT);
26         tmess.Reset();
27         tmess.WriteObject(event);
28
29         int bufSize = tmess.BufferSize();
30         char* buf = new char[bufSize];
31         memcpy(buf, (char*)tmess.Buffer(), bufSize);
32
33         message_t message(buf, bufSize, __freeBuffer, NULL);
34         //fwrite(mess.Buffer(), sizeof(char), bufSize, stdout);
35         
36         socket->send(message);
37
38         
39         //publisher.Send(tmess);
40                         /*
41         TMessage tmess(kMESS_OBJECT);
42         tmess.Reset();
43         tmess.WriteObject(event);
44         TMessage::EnableSchemaEvolutionForAll(kTRUE);
45         SendStreamerInfos(&tmess, socket);
46         int bufsize = tmess.Length();
47         char* buf = (char*) malloc(bufsize * sizeof(char));
48         memcpy(buf, tmess.Buffer(), bufsize);
49         zmq::message_t message((void*)buf, bufsize, 0, 0);
50         socket->send(message);*/
51 }
52
53 void AliStorageEventManager::Send(vector<serverListStruct> list, socket_t *socket)
54 {
55         //send size of the struct first
56         int numberOfRecords = list.size();
57         message_t message(20);
58         snprintf ((char *)message.data(), 20 ,"%d",numberOfRecords);
59
60         socket->send(message);
61         if(numberOfRecords==0)return;
62         socket->recv((new message_t));//empty message just to keep req-rep order
63
64         //prepare message with event's list
65         char *buffer = reinterpret_cast<char*> (&list[0]);
66         message_t *reply = new message_t((void*)buffer,
67                               sizeof(serverListStruct)*numberOfRecords,0);
68         socket->send(*reply);
69
70         if(reply){delete reply;}
71 }
72
73 void AliStorageEventManager::Send(struct serverRequestStruct *request,zmq::socket_t *socket)
74 {
75         char *buffer = (char*)(request);
76         message_t *requestMessage = new message_t((void*)buffer,
77                                            sizeof(struct serverRequestStruct)
78                                            +sizeof(struct listRequestStruct)
79                                            +sizeof(struct eventStruct),0);      
80         socket->send(*requestMessage);
81 }
82
83 void AliStorageEventManager::Send(struct clientRequestStruct *request,zmq::socket_t *socket)
84 {
85         char *buffer = (char*)(request);
86         message_t *requestMessage = new message_t((void*)buffer,
87                                            sizeof(struct clientRequestStruct),0);       
88         socket->send(*requestMessage);
89 }
90
91 void AliStorageEventManager::Send(long message,zmq::socket_t *socket)
92 {
93         stringstream streamBuffer;
94         streamBuffer << message;
95         string stringBuffer = streamBuffer.str();
96         char *buffer = (char*)stringBuffer.c_str();
97         message_t *replyMessage = new message_t((void*)buffer,sizeof(long),0);
98         socket->send(*replyMessage);
99         delete replyMessage;
100         streamBuffer.str(string());
101         streamBuffer.clear();
102 }
103
104 void AliStorageEventManager::Send(bool message,zmq::socket_t *socket)
105 {
106         char *buffer;
107         if(message==true)
108         {
109                 buffer = (char*)("true");
110         }
111         else
112         {
113                 buffer = (char*)("false");
114         }
115         message_t *replyMessage = new message_t((void*)buffer,sizeof(char*),0);
116         socket->send(*replyMessage);
117         delete replyMessage;
118 }
119
120 vector<serverListStruct> AliStorageEventManager::GetServerListVector(socket_t *socket)
121 {
122         //get size of the incomming message
123         message_t sizeMessage;
124         socket->recv(&sizeMessage);
125         int numberOfRecords;
126         istringstream iss(static_cast<char*>(sizeMessage.data()));
127         iss >> numberOfRecords;
128
129         if(numberOfRecords==0){cout<<"MANAGER -- list is empty"<<endl;}
130         
131         socket->send(*(new message_t()));//receive empty message just to keep req-rep order
132         
133 //get list of events
134         message_t *response = new message_t(sizeof(serverListStruct)*numberOfRecords);
135         socket->recv(response);
136         
137         vector<serverListStruct> receivedList(static_cast<serverListStruct*>(response->data()), static_cast<serverListStruct*>(response->data()) + numberOfRecords);
138
139         return receivedList;
140 }
141
142 AliESDEvent* AliStorageEventManager::GetEvent(socket_t *socket)
143 {
144         message_t *message = new message_t();
145         
146         socket->recv(message);
147         int bufSize = (int)message->size();
148                 
149         char* buf = new char[bufSize];
150         memcpy(buf, (char*)message->data(), bufSize);
151         
152         AliNetMessage *mess = new AliNetMessage(buf, bufSize);
153         
154         /*
155         message_t* message = RecvStreamerInfos(socket);
156         int64_t more;
157         size_t more_size = sizeof more;
158     
159         socket->getsockopt(ZMQ_RCVMORE, &more, &more_size );
160         socket->recv(message);
161         TBufferFile *mess = new TBufferFile(TBuffer::kRead,
162                                             message->size()+sizeof(UInt_t),
163                                             message->data());
164         mess->InitMap();
165         mess->ReadClass();// get first the class stored in message
166         mess->SetBufferOffset(sizeof(UInt_t) + sizeof(kMESS_OBJECT));
167         mess->ResetMap();
168         
169
170         //message_t *mess;
171         //socket->recv(message);
172         */
173         AliESDEvent* data = (AliESDEvent*)(mess->ReadObjectAny(AliESDEvent::Class()));
174         if (data)
175         {
176                 TTree* tree= new TTree("esdTree", "esdTree");
177                 data->WriteToTree(tree);
178                 tree-> Fill();
179                 AliESDEvent* event= new AliESDEvent();
180                 event->ReadFromTree(tree);
181                 tree->GetEntry(0);
182                 delete data;
183                 delete tree;
184                 if(message){delete message;}
185                 return event;                   
186         }
187         else
188         {
189                 if(message){delete message;}
190                 return NULL;
191         }
192 }
193
194 message_t* AliStorageEventManager::RecvStreamerInfos(socket_t *socket)
195 {
196         message_t *message = new message_t;
197         socket->recv(message);
198         TBufferFile *mess = new TBufferFile(TBuffer::kRead,
199                                             message->size()+2*sizeof(UInt_t),
200                                             message->data());
201         if(!mess)
202         {
203                 if(message){delete message;}
204                 return NULL;
205         }
206         mess->InitMap();
207         mess->ReadClass();     // get first the class stored in message
208         mess->SetBufferOffset(sizeof(UInt_t) + sizeof(TMessage));
209         mess->ResetMap();
210         TList *list = (TList*)mess->ReadObjectAny(TList::Class());
211         if(list==0 || list->IsEmpty())
212         {
213                 return message;
214         }
215         if(message){delete message;}
216         TIter next(list);
217         TStreamerInfo *info;
218         TObjLink *lnk = list->FirstLink();
219         // First call BuildCheck for regular class
220         while (lnk)
221         {
222                 info = (TStreamerInfo*)lnk->GetObject();
223                 TObject *element = info->GetElements()->UncheckedAt(0);
224                 Bool_t isstl = element && strcmp("This",element->GetName())==0;
225                 if (!isstl)
226                 {
227                         info->BuildCheck();
228                         if (gDebug > 0)
229                         {
230                                 Info("RecvStreamerInfos",
231                                      "importing TStreamerInfo: %s, version = %d",
232                                      info->GetName(),
233                                      info->GetClassVersion());
234                         }
235                 }
236                 lnk = lnk->Next();
237         }
238         lnk = list->FirstLink();
239         while (lnk)
240         {
241                 info = (TStreamerInfo*)lnk->GetObject();
242                 TObject *element = info->GetElements()->UncheckedAt(0);
243                 Bool_t isstl = element && strcmp("This",element->GetName())==0;
244                 if (isstl)
245                 {
246                         info->BuildCheck();
247                         if (gDebug > 0)
248                         {
249                                 Info("RecvStreamerInfos",
250                                      "importing TStreamerInfo: %s, version = %d",
251                                      info->GetName(),
252                                      info->GetClassVersion());
253                         }
254                 }
255                 lnk = lnk->Next();
256         }
257         if(list){delete list;}
258         if(mess){delete mess;}
259         if(lnk){delete lnk;}
260         return message;
261 }
262
263 void AliStorageEventManager::SendStreamerInfos(TMessage* mess, socket_t *socket)
264 {
265         TList* infos = mess->GetStreamerInfos();
266    
267         TIter next(infos);
268         TStreamerInfo *info;
269         TList *minilist = 0;
270         while ((info = (TStreamerInfo*)next()))
271         {
272                 //Int_t uid = info->GetNumber();
273                 if (!minilist) minilist = new TList();
274          
275                 minilist->Add(info);
276         }
277       
278         if (minilist)
279         {
280                 TMessage messinfo(kMESS_STREAMERINFO);
281                 messinfo.WriteObject(minilist);
282                 delete minilist;
283                 if (messinfo.GetStreamerInfos())
284                         messinfo.GetStreamerInfos()->Clear();
285           
286                 int bufsize = messinfo.Length();
287                 char* buf = (char*) malloc(bufsize * sizeof(char));
288                 memcpy(buf, messinfo.Buffer(), bufsize);
289
290                 // send!
291                 zmq::message_t message((void*)buf, bufsize, 0, 0);
292                      
293                 if (socket->send(message, ZMQ_SNDMORE))
294                 {
295                         Warning("SendStreamerInfos", "problems sending TStreamerInfo's ...");
296                 }
297         }
298
299         return;
300 }
301
302