- added new task for pi0 reconstruction using purely calorimeter clusters
[u/mrichter/AliRoot.git] / MONITOR / AliRecoServerThread.cxx
1 // Author: Mihai Niculescu 2013
2
3 /**************************************************************************
4  * Copyright(c) 1998-2013, ALICE Experiment at CERN, all rights reserved. *
5  * See http://aliceinfo.cern.ch/Offline/AliRoot/License.html for          *
6  * full copyright notice.                                                 *
7  **************************************************************************/
8
9 #include <RVersion.h>
10 #include <stdlib.h>
11
12 #include <TCondition.h>
13 #include <TBufferFile.h>
14 #include <TMessage.h>
15 #include <TObjArray.h>
16 #include <TStreamerInfo.h>
17 #include <TThread.h>
18
19 #include <AliESDEvent.h>
20 #include <AliESDfriend.h>
21 #include <AliRawReader.h>
22 #include <AliRunLoader.h>
23 #include <AliReconstruction.h>
24
25 #include <AliNetMessage.h>
26 #include <AliSocket.h>
27
28 #include <zmq.hpp>
29
30 #include "AliRecoServerThread.h"
31
32 ClassImp(AliRecoServerThread)
33 AliRecoServerThread::AliRecoServerThread(zmq::context_t *context, AliReconstruction* reco)
34   : AliThreadedSocket(context, AliThreadedSocket::kWRITE),
35         fReco(0),
36     fCond(0)
37 {
38         fReco = reco;
39 }
40
41 AliRecoServerThread::~AliRecoServerThread()
42 {
43         Wait();
44         Stop();
45 }
46
47 Bool_t AliRecoServerThread::Start(const char* endpoint)
48 {
49         fHost = endpoint;
50         
51         return AliThreadedSocket::Start();
52 }
53
54 void AliRecoServerThread::RunThrdWrite()
55 {
56         TThread::SetCancelAsynchronous();
57         TThread::SetCancelOn();
58         
59  // generate a publish socket
60         AliSocket publisher(fContext, ZMQ_PUB);
61         publisher.Bind(fHost);
62         
63   if(fReco==0) return;
64   
65   AliESDEvent* event;
66   
67         fReco->Begin(NULL);
68   if (fReco->GetAbort() != TSelector::kContinue) return;
69   
70   fReco->SlaveBegin(NULL);
71         if (fReco->GetAbort() != TSelector::kContinue) return;
72   
73   //******* The loop over events
74     Int_t iEvent = 0;
75     while ( fReco->HasNextEventAfter(iEvent) ) {
76       // check if process has enough resources 
77       if (!fReco->HasEnoughResources(iEvent)) break;
78       Bool_t status = fReco->ProcessEvent(iEvent);
79       
80       if (status)
81       {
82                     event = fReco->GetESDEvent();
83                     
84         AliNetMessage tmess(kMESS_OBJECT);
85                         tmess.Reset();
86                         tmess.WriteObject(event);
87                                 
88                                 publisher.Send(tmess);
89
90                         sleep(1);
91      }
92       else {
93         fReco->Abort("ProcessEvent",TSelector::kAbortFile);
94       }
95                 
96       fReco->CleanProcessedEvent();
97       /*
98       if(fCond->TimedWaitRelative(500)==0){
99                                 break;
100                                 }                       */
101       iEvent++;
102     }
103     fReco->SlaveTerminate();
104     if (fReco->GetAbort() != TSelector::kContinue) return;
105     fReco->Terminate();
106     if (fReco->GetAbort() != TSelector::kContinue) return;
107   
108 }