]> git.uio.no Git - u/mrichter/AliRoot.git/blob - MONITOR/AliRecoServerThread.cxx
Event server (Mihai)
[u/mrichter/AliRoot.git] / MONITOR / AliRecoServerThread.cxx
1 // Author: Mihai Niculescu 2013
2
3 /**************************************************************************
4  * Copyright(c) 1998-2013, ALICE Experiment at CERN, all rights reserved. *
5  * See http://aliceinfo.cern.ch/Offline/AliRoot/License.html for          *
6  * full copyright notice.                                                 *
7  **************************************************************************/
8
9 #include <stdlib.h>
10
11 #include <zmq.hpp>
12
13 #include <TCondition.h>
14 #include <TBufferFile.h>
15 #include <TMessage.h>
16 #include <TObjArray.h>
17 #include <TStreamerInfo.h>
18 #include <TThread.h>
19
20
21 #include <AliESDEvent.h>
22 #include <AliESDfriend.h>
23 #include <AliRawReader.h>
24 #include <AliRunLoader.h>
25 #include <AliReconstruction.h>
26
27 #include "AliRecoServerThread.h"
28
29 ClassImp(AliRecoServerThread);
30 AliRecoServerThread::AliRecoServerThread(zmq::context_t *context, AliReconstruction* reco)
31   : TQObject(),
32                 fContext(0),
33         fReco(0),
34         fHost("tcp://*:5051"),
35     fThread(0),
36     fCond(0)
37 {
38         fContext = context;
39         fReco = reco;
40 }
41
42 AliRecoServerThread::~AliRecoServerThread()
43 {
44         Stop();
45 }
46
47 Bool_t AliRecoServerThread::Start(const char* host)
48 {
49         if(!fThread){
50         fHost = host;
51         fCond = new TCondition(0);
52         fThread = new TThread("AliRecoServerThread", (void(*) (void *) ) &RunThreaded, (void*)  this );
53         fThread->Run();
54   
55         return kTRUE;
56         }
57         
58         return kFALSE;  
59 }
60
61 Int_t AliRecoServerThread::Stop()
62 {
63         fCond->Signal();
64  
65   return 0;
66 }
67
68 Bool_t AliRecoServerThread::ForceStop()
69 {
70         if(fThread){
71                 fThread->Kill();
72                 fThread->Delete();
73                 fThread=0;
74                 
75                 return kTRUE;
76         }
77         
78         return kFALSE;
79 }
80
81 void AliRecoServerThread::Finished(Int_t status)
82 {
83   Emit("Finished(Int_t)", status);
84 }
85
86 void AliRecoServerThread::SendStreamerInfos(TMessage* mess, zmq::socket_t *sock)
87 {
88         //printf("Sending Streamer Infos....\n");
89
90         // Check if TStreamerInfo must be sent. The list of TStreamerInfo of classes
91    // in the object in the message is in the fInfos list of the message.
92    // We send only the TStreamerInfos not yet sent on this socket.
93         TList* infos = mess->GetStreamerInfos();
94    
95       TIter next(infos);
96       TStreamerInfo *info;
97       TList *minilist = 0;
98       while ((info = (TStreamerInfo*)next())) {
99          Int_t uid = info->GetNumber();
100          if (!minilist) minilist = new TList();
101          
102          minilist->Add(info);
103       }
104       
105       if (minilist) {
106          TMessage messinfo(kMESS_STREAMERINFO);
107          messinfo.WriteObject(minilist);
108          delete minilist;
109          if (messinfo.GetStreamerInfos())
110             messinfo.GetStreamerInfos()->Clear();
111            
112            messinfo.SetLength();
113            
114           int bufsize = messinfo.Length();
115                 char* buf = (char*) malloc(bufsize * sizeof(char));
116           memcpy(buf, messinfo.Buffer(), bufsize);
117
118                 // send!
119           zmq::message_t message((void*)buf, bufsize, 0, 0);
120                      
121          if (sock->send(message, ZMQ_SNDMORE))
122             Warning("SendStreamerInfos", "problems sending TStreamerInfo's ...");
123       }
124
125    return;
126 }
127
128 void AliRecoServerThread::SendEvent(AliESDEvent* event, zmq::socket_t* socket)
129 {
130   if(!event) return;
131
132   TMessage tmess(kMESS_OBJECT);
133   tmess.Reset();
134   tmess.WriteObject(event);
135
136   TMessage::EnableSchemaEvolutionForAll(kTRUE);
137   SendStreamerInfos(&tmess, socket);
138
139   tmess.SetLength();
140
141   int bufsize = tmess.Length();
142   char* buf = (char*) malloc(bufsize * sizeof(char));
143   memcpy(buf, tmess.Buffer(), bufsize);
144
145   // send!
146   zmq::message_t message((void*)buf, bufsize, 0, 0);
147   socket->send(message);
148
149 }
150
151
152 void* AliRecoServerThread::RunThreaded(void* arg)
153 {
154         TThread::SetCancelAsynchronous();
155         TThread::SetCancelOn();
156         
157         AliRecoServerThread* recoTh = (AliRecoServerThread*)arg;
158         
159         const char* host = recoTh->GetHost();
160         zmq::context_t* context = recoTh->GetContext();
161         AliReconstruction* reco   = recoTh->GetReconstruction();
162
163         zmq::socket_t publisher(*context, ZMQ_PUB);
164         publisher.bind(host);
165         
166   if(reco==0) return 0;
167   
168   AliESDEvent* event;
169   
170         reco->Begin(NULL);
171   if (reco->GetAbort() != TSelector::kContinue) return 0;
172   
173   reco->SlaveBegin(NULL);
174         if (reco->GetAbort() != TSelector::kContinue) return 0;
175   
176   //******* The loop over events
177     Int_t iEvent = 0;
178     while ( reco->HasNextEventAfter(iEvent) ) {
179       // check if process has enough resources 
180       if (!reco->HasEnoughResources(iEvent)) break;
181       Bool_t status = reco->ProcessEvent(iEvent);
182       
183       if (status)
184       {
185                                 event = reco->GetESDEvent();
186                                 SendEvent(event, &publisher);
187
188                         sleep(1);
189      }
190       else {
191         reco->Abort("ProcessEvent",TSelector::kAbortFile);
192       }
193                 
194       reco->CleanProcessedEvent();
195       if(recoTh->Condition()->TimedWaitRelative(500)==0){
196                                 break;
197                         }                       
198       iEvent++;
199     }
200     reco->SlaveTerminate();
201     if (reco->GetAbort() != TSelector::kContinue) return 0;
202     reco->Terminate();
203     if (reco->GetAbort() != TSelector::kContinue) return 0;
204   
205 }