1 #include "AliStorageEventManager.h"
2 #include "AliNetMessage.h"
8 #include <TStreamerInfo.h>
15 AliStorageEventManager::AliStorageEventManager(){}
16 AliStorageEventManager::~AliStorageEventManager(){}
18 void __freeBuffer (void *data, void *hint)
23 void AliStorageEventManager::Send(AliESDEvent *event, socket_t *socket)
25 AliNetMessage tmess(kMESS_OBJECT);
27 tmess.WriteObject(event);
29 int bufSize = tmess.BufferSize();
30 char* buf = new char[bufSize];
31 memcpy(buf, (char*)tmess.Buffer(), bufSize);
33 message_t message(buf, bufSize, __freeBuffer, NULL);
34 //fwrite(mess.Buffer(), sizeof(char), bufSize, stdout);
36 socket->send(message);
39 //publisher.Send(tmess);
41 TMessage tmess(kMESS_OBJECT);
43 tmess.WriteObject(event);
44 TMessage::EnableSchemaEvolutionForAll(kTRUE);
45 SendStreamerInfos(&tmess, socket);
46 int bufsize = tmess.Length();
47 char* buf = (char*) malloc(bufsize * sizeof(char));
48 memcpy(buf, tmess.Buffer(), bufsize);
49 zmq::message_t message((void*)buf, bufsize, 0, 0);
50 socket->send(message);*/
53 void AliStorageEventManager::Send(vector<serverListStruct> list, socket_t *socket)
55 //send size of the struct first
56 int numberOfRecords = list.size();
57 message_t message(20);
58 snprintf ((char *)message.data(), 20 ,"%d",numberOfRecords);
60 socket->send(message);
61 if(numberOfRecords==0)return;
62 socket->recv((new message_t));//empty message just to keep req-rep order
64 //prepare message with event's list
65 char *buffer = reinterpret_cast<char*> (&list[0]);
66 message_t *reply = new message_t((void*)buffer,
67 sizeof(serverListStruct)*numberOfRecords,0);
70 if(reply){delete reply;}
73 void AliStorageEventManager::Send(struct serverRequestStruct *request,zmq::socket_t *socket)
75 char *buffer = (char*)(request);
76 message_t *requestMessage = new message_t((void*)buffer,
77 sizeof(struct serverRequestStruct)
78 +sizeof(struct listRequestStruct)
79 +sizeof(struct eventStruct),0);
80 socket->send(*requestMessage);
83 void AliStorageEventManager::Send(struct clientRequestStruct *request,zmq::socket_t *socket)
85 char *buffer = (char*)(request);
86 message_t *requestMessage = new message_t((void*)buffer,
87 sizeof(struct clientRequestStruct),0);
88 socket->send(*requestMessage);
91 void AliStorageEventManager::Send(long message,zmq::socket_t *socket)
93 stringstream streamBuffer;
94 streamBuffer << message;
95 string stringBuffer = streamBuffer.str();
96 char *buffer = (char*)stringBuffer.c_str();
97 message_t *replyMessage = new message_t((void*)buffer,sizeof(long),0);
98 socket->send(*replyMessage);
100 streamBuffer.str(string());
101 streamBuffer.clear();
104 void AliStorageEventManager::Send(bool message,zmq::socket_t *socket)
109 buffer = (char*)("true");
113 buffer = (char*)("false");
115 message_t *replyMessage = new message_t((void*)buffer,sizeof(char*),0);
116 socket->send(*replyMessage);
120 vector<serverListStruct> AliStorageEventManager::GetServerListVector(socket_t *socket)
122 //get size of the incomming message
123 message_t sizeMessage;
124 socket->recv(&sizeMessage);
126 istringstream iss(static_cast<char*>(sizeMessage.data()));
127 iss >> numberOfRecords;
129 if(numberOfRecords==0){cout<<"MANAGER -- list is empty"<<endl;}
131 socket->send(*(new message_t()));//receive empty message just to keep req-rep order
134 message_t *response = new message_t(sizeof(serverListStruct)*numberOfRecords);
135 socket->recv(response);
137 vector<serverListStruct> receivedList(static_cast<serverListStruct*>(response->data()), static_cast<serverListStruct*>(response->data()) + numberOfRecords);
142 AliESDEvent* AliStorageEventManager::GetEvent(socket_t *socket)
144 message_t *message = new message_t();
146 socket->recv(message);
147 int bufSize = (int)message->size();
149 char* buf = new char[bufSize];
150 memcpy(buf, (char*)message->data(), bufSize);
152 AliNetMessage *mess = new AliNetMessage(buf, bufSize);
155 message_t* message = RecvStreamerInfos(socket);
157 size_t more_size = sizeof more;
159 socket->getsockopt(ZMQ_RCVMORE, &more, &more_size );
160 socket->recv(message);
161 TBufferFile *mess = new TBufferFile(TBuffer::kRead,
162 message->size()+sizeof(UInt_t),
165 mess->ReadClass();// get first the class stored in message
166 mess->SetBufferOffset(sizeof(UInt_t) + sizeof(kMESS_OBJECT));
171 //socket->recv(message);
173 AliESDEvent* data = (AliESDEvent*)(mess->ReadObjectAny(AliESDEvent::Class()));
176 TTree* tree= new TTree("esdTree", "esdTree");
177 data->WriteToTree(tree);
179 AliESDEvent* event= new AliESDEvent();
180 event->ReadFromTree(tree);
184 if(message){delete message;}
189 if(message){delete message;}
194 message_t* AliStorageEventManager::RecvStreamerInfos(socket_t *socket)
196 message_t *message = new message_t;
197 socket->recv(message);
198 TBufferFile *mess = new TBufferFile(TBuffer::kRead,
199 message->size()+2*sizeof(UInt_t),
203 if(message){delete message;}
207 mess->ReadClass(); // get first the class stored in message
208 mess->SetBufferOffset(sizeof(UInt_t) + sizeof(TMessage));
210 TList *list = (TList*)mess->ReadObjectAny(TList::Class());
211 if(list==0 || list->IsEmpty())
215 if(message){delete message;}
218 TObjLink *lnk = list->FirstLink();
219 // First call BuildCheck for regular class
222 info = (TStreamerInfo*)lnk->GetObject();
223 TObject *element = info->GetElements()->UncheckedAt(0);
224 Bool_t isstl = element && strcmp("This",element->GetName())==0;
230 Info("RecvStreamerInfos",
231 "importing TStreamerInfo: %s, version = %d",
233 info->GetClassVersion());
238 lnk = list->FirstLink();
241 info = (TStreamerInfo*)lnk->GetObject();
242 TObject *element = info->GetElements()->UncheckedAt(0);
243 Bool_t isstl = element && strcmp("This",element->GetName())==0;
249 Info("RecvStreamerInfos",
250 "importing TStreamerInfo: %s, version = %d",
252 info->GetClassVersion());
257 if(list){delete list;}
258 if(mess){delete mess;}
263 void AliStorageEventManager::SendStreamerInfos(TMessage* mess, socket_t *socket)
265 TList* infos = mess->GetStreamerInfos();
270 while ((info = (TStreamerInfo*)next()))
272 //Int_t uid = info->GetNumber();
273 if (!minilist) minilist = new TList();
280 TMessage messinfo(kMESS_STREAMERINFO);
281 messinfo.WriteObject(minilist);
283 if (messinfo.GetStreamerInfos())
284 messinfo.GetStreamerInfos()->Clear();
286 int bufsize = messinfo.Length();
287 char* buf = (char*) malloc(bufsize * sizeof(char));
288 memcpy(buf, messinfo.Buffer(), bufsize);
291 zmq::message_t message((void*)buf, bufsize, 0, 0);
293 if (socket->send(message, ZMQ_SNDMORE))
295 Warning("SendStreamerInfos", "problems sending TStreamerInfo's ...");