Adding base classes for zmq networking and modificiations to event server
[u/mrichter/AliRoot.git] / MONITOR / AliRecoServerThread.cxx
1 // Author: Mihai Niculescu 2013
2
3 /**************************************************************************
4  * Copyright(c) 1998-2013, ALICE Experiment at CERN, all rights reserved. *
5  * See http://aliceinfo.cern.ch/Offline/AliRoot/License.html for          *
6  * full copyright notice.                                                 *
7  **************************************************************************/
8
9 #include <RVersion.h>
10 #include <stdlib.h>
11
12 #include <TCondition.h>
13 #include <TBufferFile.h>
14 #include <TMessage.h>
15 #include <TObjArray.h>
16 #include <TStreamerInfo.h>
17 #include <TThread.h>
18
19 #include <AliESDEvent.h>
20 #include <AliESDfriend.h>
21 #include <AliRawReader.h>
22 #include <AliRunLoader.h>
23 #include <AliReconstruction.h>
24
25 #include <AliNetMessage.h>
26 #include <AliSocket.h>
27
28 #include <zmq.hpp>
29
30 #include "AliRecoServerThread.h"
31
32 ClassImp(AliRecoServerThread)
33 AliRecoServerThread::AliRecoServerThread(zmq::context_t *context, AliReconstruction* reco)
34   : AliThreadedSocket(context, AliThreadedSocket::WRITE),
35         fReco(0),
36     fCond(0)
37 {
38         fReco = reco;
39 }
40
41 AliRecoServerThread::~AliRecoServerThread()
42 {
43         Stop();
44 }
45
46 Bool_t AliRecoServerThread::Start(const char* endpoint)
47 {
48         fHost = endpoint;
49         
50         return AliThreadedSocket::Start();
51 }
52
53 void* AliRecoServerThread::RunThrdWrite(void* arg)
54 {
55         TThread::SetCancelAsynchronous();
56         TThread::SetCancelOn();
57         
58         AliRecoServerThread* recoTh = (AliRecoServerThread*)arg;
59         
60         const char* host = recoTh->GetHost();
61         zmq::context_t* context = recoTh->GetContext();
62         AliReconstruction* reco = recoTh->GetReconstruction();
63
64  // generate a publish socket
65         AliSocket publisher(context, ZMQ_PUB);
66         publisher.Bind(host);
67         
68   if(reco==0) return 0;
69   
70   AliESDEvent* event;
71   
72         reco->Begin(NULL);
73   if (reco->GetAbort() != TSelector::kContinue) return 0;
74   
75   reco->SlaveBegin(NULL);
76         if (reco->GetAbort() != TSelector::kContinue) return 0;
77   
78   //******* The loop over events
79     Int_t iEvent = 0;
80     while ( reco->HasNextEventAfter(iEvent) ) {
81       // check if process has enough resources 
82       if (!reco->HasEnoughResources(iEvent)) break;
83       Bool_t status = reco->ProcessEvent(iEvent);
84       
85       if (status)
86       {
87                     event = reco->GetESDEvent();
88                     
89         AliNetMessage tmess(kMESS_OBJECT);
90                         tmess.Reset();
91                         tmess.WriteObject(event);
92                                 
93                                 publisher.Send(tmess);
94
95                         sleep(1);
96      }
97       else {
98         reco->Abort("ProcessEvent",TSelector::kAbortFile);
99       }
100                 
101       reco->CleanProcessedEvent();
102       if(recoTh->Condition()->TimedWaitRelative(500)==0){
103                                 break;
104                         }                       
105       iEvent++;
106     }
107     reco->SlaveTerminate();
108     if (reco->GetAbort() != TSelector::kContinue) return 0;
109     reco->Terminate();
110     if (reco->GetAbort() != TSelector::kContinue) return 0;
111   
112 }