]> git.uio.no Git - u/mrichter/AliRoot.git/blame - STORAGE/AliStorageEventManager.cxx
Fix
[u/mrichter/AliRoot.git] / STORAGE / AliStorageEventManager.cxx
CommitLineData
5eb34a26 1#include "AliStorageEventManager.h"
2#include "AliNetMessage.h"
3
4#include <iostream>
5#include <sstream>
6
7#include <TList.h>
8#include <TStreamerInfo.h>
9
10#include "zmq.hpp"
11
12using namespace zmq;
13using namespace std;
14
15AliStorageEventManager::AliStorageEventManager(){}
16AliStorageEventManager::~AliStorageEventManager(){}
17
18void __freeBuffer (void *data, void *hint)
19{
20 free(data);
21}
22
23void AliStorageEventManager::Send(AliESDEvent *event, socket_t *socket)
24{
25 AliNetMessage tmess(kMESS_OBJECT);
26 tmess.Reset();
27 tmess.WriteObject(event);
28
29 int bufSize = tmess.BufferSize();
30 char* buf = new char[bufSize];
31 memcpy(buf, (char*)tmess.Buffer(), bufSize);
32
33 message_t message(buf, bufSize, __freeBuffer, NULL);
34 //fwrite(mess.Buffer(), sizeof(char), bufSize, stdout);
35
36 socket->send(message);
37
38
39 //publisher.Send(tmess);
40 /*
41 TMessage tmess(kMESS_OBJECT);
42 tmess.Reset();
43 tmess.WriteObject(event);
44 TMessage::EnableSchemaEvolutionForAll(kTRUE);
45 SendStreamerInfos(&tmess, socket);
46 int bufsize = tmess.Length();
47 char* buf = (char*) malloc(bufsize * sizeof(char));
48 memcpy(buf, tmess.Buffer(), bufsize);
49 zmq::message_t message((void*)buf, bufsize, 0, 0);
50 socket->send(message);*/
51}
52
53void AliStorageEventManager::Send(vector<serverListStruct> list, socket_t *socket)
54{
55 //send size of the struct first
56 int numberOfRecords = list.size();
57 message_t message(20);
58 snprintf ((char *)message.data(), 20 ,"%d",numberOfRecords);
59
60 socket->send(message);
61 if(numberOfRecords==0)return;
62 socket->recv((new message_t));//empty message just to keep req-rep order
63
64 //prepare message with event's list
65 char *buffer = reinterpret_cast<char*> (&list[0]);
66 message_t *reply = new message_t((void*)buffer,
67 sizeof(serverListStruct)*numberOfRecords,0);
68 socket->send(*reply);
69
70 if(reply){delete reply;}
71}
72
73void AliStorageEventManager::Send(struct serverRequestStruct *request,zmq::socket_t *socket)
74{
75 char *buffer = (char*)(request);
76 message_t *requestMessage = new message_t((void*)buffer,
77 sizeof(struct serverRequestStruct)
78 +sizeof(struct listRequestStruct)
79 +sizeof(struct eventStruct),0);
80 socket->send(*requestMessage);
81}
82
83void AliStorageEventManager::Send(struct clientRequestStruct *request,zmq::socket_t *socket)
84{
85 char *buffer = (char*)(request);
86 message_t *requestMessage = new message_t((void*)buffer,
87 sizeof(struct clientRequestStruct),0);
88 socket->send(*requestMessage);
89}
90
91void AliStorageEventManager::Send(long message,zmq::socket_t *socket)
92{
93 stringstream streamBuffer;
94 streamBuffer << message;
95 string stringBuffer = streamBuffer.str();
96 char *buffer = (char*)stringBuffer.c_str();
97 message_t *replyMessage = new message_t((void*)buffer,sizeof(long),0);
98 socket->send(*replyMessage);
99 delete replyMessage;
100 streamBuffer.str(string());
101 streamBuffer.clear();
102}
103
104void AliStorageEventManager::Send(bool message,zmq::socket_t *socket)
105{
106 char *buffer;
107 if(message==true)
108 {
109 buffer = (char*)("true");
110 }
111 else
112 {
113 buffer = (char*)("false");
114 }
115 message_t *replyMessage = new message_t((void*)buffer,sizeof(char*),0);
116 socket->send(*replyMessage);
117 delete replyMessage;
118}
119
120vector<serverListStruct> AliStorageEventManager::GetServerListVector(socket_t *socket)
121{
122 //get size of the incomming message
123 message_t sizeMessage;
124 socket->recv(&sizeMessage);
125 int numberOfRecords;
126 istringstream iss(static_cast<char*>(sizeMessage.data()));
127 iss >> numberOfRecords;
128
129 if(numberOfRecords==0){cout<<"MANAGER -- list is empty"<<endl;}
130
131 socket->send(*(new message_t()));//receive empty message just to keep req-rep order
132
133//get list of events
134 message_t *response = new message_t(sizeof(serverListStruct)*numberOfRecords);
135 socket->recv(response);
136
137 vector<serverListStruct> receivedList(static_cast<serverListStruct*>(response->data()), static_cast<serverListStruct*>(response->data()) + numberOfRecords);
138
139 return receivedList;
140}
141
142AliESDEvent* AliStorageEventManager::GetEvent(socket_t *socket)
143{
144 message_t *message = new message_t();
145
146 socket->recv(message);
147 int bufSize = (int)message->size();
148
149 char* buf = new char[bufSize];
150 memcpy(buf, (char*)message->data(), bufSize);
151
152 AliNetMessage *mess = new AliNetMessage(buf, bufSize);
153
154 /*
155 message_t* message = RecvStreamerInfos(socket);
156 int64_t more;
157 size_t more_size = sizeof more;
158
159 socket->getsockopt(ZMQ_RCVMORE, &more, &more_size );
160 socket->recv(message);
161 TBufferFile *mess = new TBufferFile(TBuffer::kRead,
162 message->size()+sizeof(UInt_t),
163 message->data());
164 mess->InitMap();
165 mess->ReadClass();// get first the class stored in message
166 mess->SetBufferOffset(sizeof(UInt_t) + sizeof(kMESS_OBJECT));
167 mess->ResetMap();
168
169
170 //message_t *mess;
171 //socket->recv(message);
172 */
173 AliESDEvent* data = (AliESDEvent*)(mess->ReadObjectAny(AliESDEvent::Class()));
174 if (data)
175 {
176 TTree* tree= new TTree("esdTree", "esdTree");
177 data->WriteToTree(tree);
178 tree-> Fill();
179 AliESDEvent* event= new AliESDEvent();
180 event->ReadFromTree(tree);
181 tree->GetEntry(0);
182 delete data;
183 delete tree;
184 if(message){delete message;}
185 return event;
186 }
187 else
188 {
189 if(message){delete message;}
190 return NULL;
191 }
192}
193
194message_t* AliStorageEventManager::RecvStreamerInfos(socket_t *socket)
195{
196 message_t *message = new message_t;
197 socket->recv(message);
198 TBufferFile *mess = new TBufferFile(TBuffer::kRead,
199 message->size()+2*sizeof(UInt_t),
200 message->data());
201 if(!mess)
202 {
203 if(message){delete message;}
204 return NULL;
205 }
206 mess->InitMap();
207 mess->ReadClass(); // get first the class stored in message
208 mess->SetBufferOffset(sizeof(UInt_t) + sizeof(TMessage));
209 mess->ResetMap();
210 TList *list = (TList*)mess->ReadObjectAny(TList::Class());
211 if(list==0 || list->IsEmpty())
212 {
213 return message;
214 }
215 if(message){delete message;}
216 TIter next(list);
217 TStreamerInfo *info;
218 TObjLink *lnk = list->FirstLink();
219 // First call BuildCheck for regular class
220 while (lnk)
221 {
222 info = (TStreamerInfo*)lnk->GetObject();
223 TObject *element = info->GetElements()->UncheckedAt(0);
224 Bool_t isstl = element && strcmp("This",element->GetName())==0;
225 if (!isstl)
226 {
227 info->BuildCheck();
228 if (gDebug > 0)
229 {
230 Info("RecvStreamerInfos",
231 "importing TStreamerInfo: %s, version = %d",
232 info->GetName(),
233 info->GetClassVersion());
234 }
235 }
236 lnk = lnk->Next();
237 }
238 lnk = list->FirstLink();
239 while (lnk)
240 {
241 info = (TStreamerInfo*)lnk->GetObject();
242 TObject *element = info->GetElements()->UncheckedAt(0);
243 Bool_t isstl = element && strcmp("This",element->GetName())==0;
244 if (isstl)
245 {
246 info->BuildCheck();
247 if (gDebug > 0)
248 {
249 Info("RecvStreamerInfos",
250 "importing TStreamerInfo: %s, version = %d",
251 info->GetName(),
252 info->GetClassVersion());
253 }
254 }
255 lnk = lnk->Next();
256 }
257 if(list){delete list;}
258 if(mess){delete mess;}
259 if(lnk){delete lnk;}
260 return message;
261}
262
263void AliStorageEventManager::SendStreamerInfos(TMessage* mess, socket_t *socket)
264{
265 TList* infos = mess->GetStreamerInfos();
266
267 TIter next(infos);
268 TStreamerInfo *info;
269 TList *minilist = 0;
270 while ((info = (TStreamerInfo*)next()))
271 {
272 //Int_t uid = info->GetNumber();
273 if (!minilist) minilist = new TList();
274
275 minilist->Add(info);
276 }
277
278 if (minilist)
279 {
280 TMessage messinfo(kMESS_STREAMERINFO);
281 messinfo.WriteObject(minilist);
282 delete minilist;
283 if (messinfo.GetStreamerInfos())
284 messinfo.GetStreamerInfos()->Clear();
285
286 int bufsize = messinfo.Length();
287 char* buf = (char*) malloc(bufsize * sizeof(char));
288 memcpy(buf, messinfo.Buffer(), bufsize);
289
290 // send!
291 zmq::message_t message((void*)buf, bufsize, 0, 0);
292
293 if (socket->send(message, ZMQ_SNDMORE))
294 {
295 Warning("SendStreamerInfos", "problems sending TStreamerInfo's ...");
296 }
297 }
298
299 return;
300}
301
302