]> git.uio.no Git - u/mrichter/AliRoot.git/blob - MONITOR/AliRecoServerThread.cxx
Updated treatment of track labels (Ruben)
[u/mrichter/AliRoot.git] / MONITOR / AliRecoServerThread.cxx
1 // Author: Mihai Niculescu 2013
2
3 /**************************************************************************
4  * Copyright(c) 1998-2013, ALICE Experiment at CERN, all rights reserved. *
5  * See http://aliceinfo.cern.ch/Offline/AliRoot/License.html for          *
6  * full copyright notice.                                                 *
7  **************************************************************************/
8
9 #include <RVersion.h>
10 #include <stdlib.h>
11
12 #include <zmq.hpp>
13
14 #include <TCondition.h>
15 #include <TBufferFile.h>
16 #include <TMessage.h>
17 #include <TObjArray.h>
18 #include <TStreamerInfo.h>
19 #include <TThread.h>
20
21
22 #include <AliESDEvent.h>
23 #include <AliESDfriend.h>
24 #include <AliRawReader.h>
25 #include <AliRunLoader.h>
26 #include <AliReconstruction.h>
27
28 #include "AliRecoServerThread.h"
29
30 ClassImp(AliRecoServerThread);
31 AliRecoServerThread::AliRecoServerThread(zmq::context_t *context, AliReconstruction* reco)
32   : TQObject(),
33                 fContext(0),
34         fReco(0),
35         fHost("tcp://*:5051"),
36     fThread(0),
37     fCond(0)
38 {
39         fContext = context;
40         fReco = reco;
41 }
42
43 AliRecoServerThread::~AliRecoServerThread()
44 {
45         Stop();
46 }
47
48 Bool_t AliRecoServerThread::Start(const char* host)
49 {
50         if(!fThread){
51         fHost = host;
52         fCond = new TCondition(0);
53         fThread = new TThread("AliRecoServerThread", (void(*) (void *) ) &RunThreaded, (void*)  this );
54         fThread->Run();
55   
56         return kTRUE;
57         }
58         
59         return kFALSE;  
60 }
61
62 Int_t AliRecoServerThread::Stop()
63 {
64         fCond->Signal();
65  
66   return 0;
67 }
68
69 Bool_t AliRecoServerThread::ForceStop()
70 {
71         if(fThread){
72                 fThread->Kill();
73                 fThread->Delete();
74                 fThread=0;
75                 
76                 return kTRUE;
77         }
78         
79         return kFALSE;
80 }
81
82 void AliRecoServerThread::Finished(Int_t status)
83 {
84   Emit("Finished(Int_t)", status);
85 }
86
87 void AliRecoServerThread::SendStreamerInfos(TMessage* mess, zmq::socket_t *sock)
88 {
89         //printf("Sending Streamer Infos....\n");
90
91         // Check if TStreamerInfo must be sent. The list of TStreamerInfo of classes
92    // in the object in the message is in the fInfos list of the message.
93    // We send only the TStreamerInfos not yet sent on this socket.
94         TList* infos = mess->GetStreamerInfos();
95    
96       TIter next(infos);
97       TStreamerInfo *info;
98       TList *minilist = 0;
99       while ((info = (TStreamerInfo*)next())) {
100          Int_t uid = info->GetNumber();
101          if (!minilist) minilist = new TList();
102          
103          minilist->Add(info);
104       }
105       
106       if (minilist) {
107          TMessage messinfo(kMESS_STREAMERINFO);
108          messinfo.WriteObject(minilist);
109          delete minilist;
110          if (messinfo.GetStreamerInfos())
111             messinfo.GetStreamerInfos()->Clear();
112           
113                                         int bufsize = messinfo.Length();
114                 char* buf = (char*) malloc(bufsize * sizeof(char));
115           memcpy(buf, messinfo.Buffer(), bufsize);
116
117                 // send!
118           zmq::message_t message((void*)buf, bufsize, 0, 0);
119                      
120          if (sock->send(message, ZMQ_SNDMORE))
121             Warning("SendStreamerInfos", "problems sending TStreamerInfo's ...");
122       }
123
124    return;
125 }
126
127 void AliRecoServerThread::SendEvent(AliESDEvent* event, zmq::socket_t* socket)
128 {
129   if(!event) return;
130
131   TMessage tmess(kMESS_OBJECT);
132   tmess.Reset();
133   tmess.WriteObject(event);
134
135   TMessage::EnableSchemaEvolutionForAll(kTRUE);
136   SendStreamerInfos(&tmess, socket);
137
138   int bufsize = tmess.Length();
139   char* buf = (char*) malloc(bufsize * sizeof(char));
140   memcpy(buf, tmess.Buffer(), bufsize);
141
142   // send!
143   zmq::message_t message((void*)buf, bufsize, 0, 0);
144   socket->send(message);
145
146 }
147
148
149 void* AliRecoServerThread::RunThreaded(void* arg)
150 {
151         TThread::SetCancelAsynchronous();
152         TThread::SetCancelOn();
153         
154         AliRecoServerThread* recoTh = (AliRecoServerThread*)arg;
155         
156         const char* host = recoTh->GetHost();
157         zmq::context_t* context = recoTh->GetContext();
158         AliReconstruction* reco   = recoTh->GetReconstruction();
159
160         zmq::socket_t publisher(*context, ZMQ_PUB);
161         publisher.bind(host);
162         
163   if(reco==0) return 0;
164   
165   AliESDEvent* event;
166   
167         reco->Begin(NULL);
168   if (reco->GetAbort() != TSelector::kContinue) return 0;
169   
170   reco->SlaveBegin(NULL);
171         if (reco->GetAbort() != TSelector::kContinue) return 0;
172   
173   //******* The loop over events
174     Int_t iEvent = 0;
175     while ( reco->HasNextEventAfter(iEvent) ) {
176       // check if process has enough resources 
177       if (!reco->HasEnoughResources(iEvent)) break;
178       Bool_t status = reco->ProcessEvent(iEvent);
179       
180       if (status)
181       {
182                                 event = reco->GetESDEvent();
183                                 SendEvent(event, &publisher);
184
185                         sleep(1);
186      }
187       else {
188         reco->Abort("ProcessEvent",TSelector::kAbortFile);
189       }
190                 
191       reco->CleanProcessedEvent();
192       if(recoTh->Condition()->TimedWaitRelative(500)==0){
193                                 break;
194                         }                       
195       iEvent++;
196     }
197     reco->SlaveTerminate();
198     if (reco->GetAbort() != TSelector::kContinue) return 0;
199     reco->Terminate();
200     if (reco->GetAbort() != TSelector::kContinue) return 0;
201   
202 }