Removing additional 0MQ dependency
[u/mrichter/AliRoot.git] / MONITOR / AliRecoServerThread.cxx
CommitLineData
7e0cf530 1// Author: Mihai Niculescu 2013
2
3/**************************************************************************
4 * Copyright(c) 1998-2013, ALICE Experiment at CERN, all rights reserved. *
5 * See http://aliceinfo.cern.ch/Offline/AliRoot/License.html for *
6 * full copyright notice. *
7 **************************************************************************/
8
bd2715e3 9#include <RVersion.h>
7e0cf530 10#include <stdlib.h>
11
12#include <zmq.hpp>
13
14#include <TCondition.h>
15#include <TBufferFile.h>
16#include <TMessage.h>
17#include <TObjArray.h>
18#include <TStreamerInfo.h>
19#include <TThread.h>
20
21
22#include <AliESDEvent.h>
23#include <AliESDfriend.h>
24#include <AliRawReader.h>
25#include <AliRunLoader.h>
26#include <AliReconstruction.h>
27
28#include "AliRecoServerThread.h"
29
30ClassImp(AliRecoServerThread);
31AliRecoServerThread::AliRecoServerThread(zmq::context_t *context, AliReconstruction* reco)
32 : TQObject(),
33 fContext(0),
34 fReco(0),
35 fHost("tcp://*:5051"),
36 fThread(0),
37 fCond(0)
38{
39 fContext = context;
40 fReco = reco;
41}
42
43AliRecoServerThread::~AliRecoServerThread()
44{
45 Stop();
46}
47
48Bool_t AliRecoServerThread::Start(const char* host)
49{
50 if(!fThread){
51 fHost = host;
52 fCond = new TCondition(0);
53 fThread = new TThread("AliRecoServerThread", (void(*) (void *) ) &RunThreaded, (void*) this );
54 fThread->Run();
55
56 return kTRUE;
57 }
58
59 return kFALSE;
60}
61
62Int_t AliRecoServerThread::Stop()
63{
64 fCond->Signal();
65
66 return 0;
67}
68
69Bool_t AliRecoServerThread::ForceStop()
70{
71 if(fThread){
72 fThread->Kill();
73 fThread->Delete();
74 fThread=0;
75
76 return kTRUE;
77 }
78
79 return kFALSE;
80}
81
82void AliRecoServerThread::Finished(Int_t status)
83{
84 Emit("Finished(Int_t)", status);
85}
86
87void AliRecoServerThread::SendStreamerInfos(TMessage* mess, zmq::socket_t *sock)
88{
89 //printf("Sending Streamer Infos....\n");
90
91 // Check if TStreamerInfo must be sent. The list of TStreamerInfo of classes
92 // in the object in the message is in the fInfos list of the message.
93 // We send only the TStreamerInfos not yet sent on this socket.
94 TList* infos = mess->GetStreamerInfos();
95
96 TIter next(infos);
97 TStreamerInfo *info;
98 TList *minilist = 0;
99 while ((info = (TStreamerInfo*)next())) {
100 Int_t uid = info->GetNumber();
101 if (!minilist) minilist = new TList();
102
103 minilist->Add(info);
104 }
105
106 if (minilist) {
107 TMessage messinfo(kMESS_STREAMERINFO);
108 messinfo.WriteObject(minilist);
109 delete minilist;
110 if (messinfo.GetStreamerInfos())
111 messinfo.GetStreamerInfos()->Clear();
7b037834 112
113 int bufsize = messinfo.Length();
7e0cf530 114 char* buf = (char*) malloc(bufsize * sizeof(char));
115 memcpy(buf, messinfo.Buffer(), bufsize);
116
117 // send!
118 zmq::message_t message((void*)buf, bufsize, 0, 0);
119
120 if (sock->send(message, ZMQ_SNDMORE))
121 Warning("SendStreamerInfos", "problems sending TStreamerInfo's ...");
122 }
123
124 return;
125}
126
127void AliRecoServerThread::SendEvent(AliESDEvent* event, zmq::socket_t* socket)
128{
129 if(!event) return;
130
131 TMessage tmess(kMESS_OBJECT);
132 tmess.Reset();
133 tmess.WriteObject(event);
134
135 TMessage::EnableSchemaEvolutionForAll(kTRUE);
136 SendStreamerInfos(&tmess, socket);
137
7e0cf530 138 int bufsize = tmess.Length();
139 char* buf = (char*) malloc(bufsize * sizeof(char));
140 memcpy(buf, tmess.Buffer(), bufsize);
141
142 // send!
143 zmq::message_t message((void*)buf, bufsize, 0, 0);
144 socket->send(message);
145
146}
147
148
149void* AliRecoServerThread::RunThreaded(void* arg)
150{
151 TThread::SetCancelAsynchronous();
152 TThread::SetCancelOn();
153
154 AliRecoServerThread* recoTh = (AliRecoServerThread*)arg;
155
156 const char* host = recoTh->GetHost();
157 zmq::context_t* context = recoTh->GetContext();
158 AliReconstruction* reco = recoTh->GetReconstruction();
159
160 zmq::socket_t publisher(*context, ZMQ_PUB);
161 publisher.bind(host);
162
163 if(reco==0) return 0;
164
165 AliESDEvent* event;
166
167 reco->Begin(NULL);
168 if (reco->GetAbort() != TSelector::kContinue) return 0;
169
170 reco->SlaveBegin(NULL);
171 if (reco->GetAbort() != TSelector::kContinue) return 0;
172
173 //******* The loop over events
174 Int_t iEvent = 0;
175 while ( reco->HasNextEventAfter(iEvent) ) {
176 // check if process has enough resources
177 if (!reco->HasEnoughResources(iEvent)) break;
178 Bool_t status = reco->ProcessEvent(iEvent);
179
180 if (status)
181 {
182 event = reco->GetESDEvent();
183 SendEvent(event, &publisher);
184
185 sleep(1);
186 }
187 else {
188 reco->Abort("ProcessEvent",TSelector::kAbortFile);
189 }
190
191 reco->CleanProcessedEvent();
192 if(recoTh->Condition()->TimedWaitRelative(500)==0){
193 break;
194 }
195 iEvent++;
196 }
197 reco->SlaveTerminate();
198 if (reco->GetAbort() != TSelector::kContinue) return 0;
199 reco->Terminate();
200 if (reco->GetAbort() != TSelector::kContinue) return 0;
201
202}