1 // Author: Mihai Niculescu 2013
3 /**************************************************************************
4 * Copyright(c) 1998-2013, ALICE Experiment at CERN, all rights reserved. *
5 * See http://aliceinfo.cern.ch/Offline/AliRoot/License.html for *
6 * full copyright notice. *
7 **************************************************************************/
14 #include <TCondition.h>
15 #include <TBufferFile.h>
17 #include <TObjArray.h>
18 #include <TStreamerInfo.h>
22 #include <AliESDEvent.h>
23 #include <AliESDfriend.h>
24 #include <AliRawReader.h>
25 #include <AliRunLoader.h>
26 #include <AliReconstruction.h>
28 #include "AliRecoServerThread.h"
30 ClassImp(AliRecoServerThread);
31 AliRecoServerThread::AliRecoServerThread(zmq::context_t *context, AliReconstruction* reco)
35 fHost("tcp://*:5051"),
43 AliRecoServerThread::~AliRecoServerThread()
48 Bool_t AliRecoServerThread::Start(const char* host)
52 fCond = new TCondition(0);
53 fThread = new TThread("AliRecoServerThread", (void(*) (void *) ) &RunThreaded, (void*) this );
62 Int_t AliRecoServerThread::Stop()
69 Bool_t AliRecoServerThread::ForceStop()
82 void AliRecoServerThread::Finished(Int_t status)
84 Emit("Finished(Int_t)", status);
87 void AliRecoServerThread::SendStreamerInfos(TMessage* mess, zmq::socket_t *sock)
89 //printf("Sending Streamer Infos....\n");
91 // Check if TStreamerInfo must be sent. The list of TStreamerInfo of classes
92 // in the object in the message is in the fInfos list of the message.
93 // We send only the TStreamerInfos not yet sent on this socket.
94 TList* infos = mess->GetStreamerInfos();
99 while ((info = (TStreamerInfo*)next())) {
100 Int_t uid = info->GetNumber();
101 if (!minilist) minilist = new TList();
107 TMessage messinfo(kMESS_STREAMERINFO);
108 messinfo.WriteObject(minilist);
110 if (messinfo.GetStreamerInfos())
111 messinfo.GetStreamerInfos()->Clear();
113 int bufsize = messinfo.Length();
114 char* buf = (char*) malloc(bufsize * sizeof(char));
115 memcpy(buf, messinfo.Buffer(), bufsize);
118 zmq::message_t message((void*)buf, bufsize, 0, 0);
120 if (sock->send(message, ZMQ_SNDMORE))
121 Warning("SendStreamerInfos", "problems sending TStreamerInfo's ...");
127 void AliRecoServerThread::SendEvent(AliESDEvent* event, zmq::socket_t* socket)
131 TMessage tmess(kMESS_OBJECT);
133 tmess.WriteObject(event);
135 TMessage::EnableSchemaEvolutionForAll(kTRUE);
136 SendStreamerInfos(&tmess, socket);
138 int bufsize = tmess.Length();
139 char* buf = (char*) malloc(bufsize * sizeof(char));
140 memcpy(buf, tmess.Buffer(), bufsize);
143 zmq::message_t message((void*)buf, bufsize, 0, 0);
144 socket->send(message);
149 void* AliRecoServerThread::RunThreaded(void* arg)
151 TThread::SetCancelAsynchronous();
152 TThread::SetCancelOn();
154 AliRecoServerThread* recoTh = (AliRecoServerThread*)arg;
156 const char* host = recoTh->GetHost();
157 zmq::context_t* context = recoTh->GetContext();
158 AliReconstruction* reco = recoTh->GetReconstruction();
160 zmq::socket_t publisher(*context, ZMQ_PUB);
161 publisher.bind(host);
163 if(reco==0) return 0;
168 if (reco->GetAbort() != TSelector::kContinue) return 0;
170 reco->SlaveBegin(NULL);
171 if (reco->GetAbort() != TSelector::kContinue) return 0;
173 //******* The loop over events
175 while ( reco->HasNextEventAfter(iEvent) ) {
176 // check if process has enough resources
177 if (!reco->HasEnoughResources(iEvent)) break;
178 Bool_t status = reco->ProcessEvent(iEvent);
182 event = reco->GetESDEvent();
183 SendEvent(event, &publisher);
188 reco->Abort("ProcessEvent",TSelector::kAbortFile);
191 reco->CleanProcessedEvent();
192 if(recoTh->Condition()->TimedWaitRelative(500)==0){
197 reco->SlaveTerminate();
198 if (reco->GetAbort() != TSelector::kContinue) return 0;
200 if (reco->GetAbort() != TSelector::kContinue) return 0;