1 // Author: Mihai Niculescu 2013
3 /**************************************************************************
4 * Copyright(c) 1998-2013, ALICE Experiment at CERN, all rights reserved. *
5 * See http://aliceinfo.cern.ch/Offline/AliRoot/License.html for *
6 * full copyright notice. *
7 **************************************************************************/
13 #include <TCondition.h>
14 #include <TBufferFile.h>
16 #include <TObjArray.h>
17 #include <TStreamerInfo.h>
21 #include <AliESDEvent.h>
22 #include <AliESDfriend.h>
23 #include <AliRawReader.h>
24 #include <AliRunLoader.h>
25 #include <AliReconstruction.h>
27 #include "AliRecoServerThread.h"
29 ClassImp(AliRecoServerThread);
30 AliRecoServerThread::AliRecoServerThread(zmq::context_t *context, AliReconstruction* reco)
34 fHost("tcp://*:5051"),
42 AliRecoServerThread::~AliRecoServerThread()
47 Bool_t AliRecoServerThread::Start(const char* host)
51 fCond = new TCondition(0);
52 fThread = new TThread("AliRecoServerThread", (void(*) (void *) ) &RunThreaded, (void*) this );
61 Int_t AliRecoServerThread::Stop()
68 Bool_t AliRecoServerThread::ForceStop()
81 void AliRecoServerThread::Finished(Int_t status)
83 Emit("Finished(Int_t)", status);
86 void AliRecoServerThread::SendStreamerInfos(TMessage* mess, zmq::socket_t *sock)
88 //printf("Sending Streamer Infos....\n");
90 // Check if TStreamerInfo must be sent. The list of TStreamerInfo of classes
91 // in the object in the message is in the fInfos list of the message.
92 // We send only the TStreamerInfos not yet sent on this socket.
93 TList* infos = mess->GetStreamerInfos();
98 while ((info = (TStreamerInfo*)next())) {
99 Int_t uid = info->GetNumber();
100 if (!minilist) minilist = new TList();
106 TMessage messinfo(kMESS_STREAMERINFO);
107 messinfo.WriteObject(minilist);
109 if (messinfo.GetStreamerInfos())
110 messinfo.GetStreamerInfos()->Clear();
112 messinfo.SetLength();
114 int bufsize = messinfo.Length();
115 char* buf = (char*) malloc(bufsize * sizeof(char));
116 memcpy(buf, messinfo.Buffer(), bufsize);
119 zmq::message_t message((void*)buf, bufsize, 0, 0);
121 if (sock->send(message, ZMQ_SNDMORE))
122 Warning("SendStreamerInfos", "problems sending TStreamerInfo's ...");
128 void AliRecoServerThread::SendEvent(AliESDEvent* event, zmq::socket_t* socket)
132 TMessage tmess(kMESS_OBJECT);
134 tmess.WriteObject(event);
136 TMessage::EnableSchemaEvolutionForAll(kTRUE);
137 SendStreamerInfos(&tmess, socket);
141 int bufsize = tmess.Length();
142 char* buf = (char*) malloc(bufsize * sizeof(char));
143 memcpy(buf, tmess.Buffer(), bufsize);
146 zmq::message_t message((void*)buf, bufsize, 0, 0);
147 socket->send(message);
152 void* AliRecoServerThread::RunThreaded(void* arg)
154 TThread::SetCancelAsynchronous();
155 TThread::SetCancelOn();
157 AliRecoServerThread* recoTh = (AliRecoServerThread*)arg;
159 const char* host = recoTh->GetHost();
160 zmq::context_t* context = recoTh->GetContext();
161 AliReconstruction* reco = recoTh->GetReconstruction();
163 zmq::socket_t publisher(*context, ZMQ_PUB);
164 publisher.bind(host);
166 if(reco==0) return 0;
171 if (reco->GetAbort() != TSelector::kContinue) return 0;
173 reco->SlaveBegin(NULL);
174 if (reco->GetAbort() != TSelector::kContinue) return 0;
176 //******* The loop over events
178 while ( reco->HasNextEventAfter(iEvent) ) {
179 // check if process has enough resources
180 if (!reco->HasEnoughResources(iEvent)) break;
181 Bool_t status = reco->ProcessEvent(iEvent);
185 event = reco->GetESDEvent();
186 SendEvent(event, &publisher);
191 reco->Abort("ProcessEvent",TSelector::kAbortFile);
194 reco->CleanProcessedEvent();
195 if(recoTh->Condition()->TimedWaitRelative(500)==0){
200 reco->SlaveTerminate();
201 if (reco->GetAbort() != TSelector::kContinue) return 0;
203 if (reco->GetAbort() != TSelector::kContinue) return 0;