]> git.uio.no Git - u/mrichter/AliRoot.git/blame - MONITOR/AliRecoServerThread.cxx
Adding base classes for zmq networking and modificiations to event server
[u/mrichter/AliRoot.git] / MONITOR / AliRecoServerThread.cxx
CommitLineData
7e0cf530 1// Author: Mihai Niculescu 2013
2
3/**************************************************************************
4 * Copyright(c) 1998-2013, ALICE Experiment at CERN, all rights reserved. *
5 * See http://aliceinfo.cern.ch/Offline/AliRoot/License.html for *
6 * full copyright notice. *
7 **************************************************************************/
8
bd2715e3 9#include <RVersion.h>
7e0cf530 10#include <stdlib.h>
11
7e0cf530 12#include <TCondition.h>
13#include <TBufferFile.h>
14#include <TMessage.h>
15#include <TObjArray.h>
16#include <TStreamerInfo.h>
17#include <TThread.h>
18
7e0cf530 19#include <AliESDEvent.h>
20#include <AliESDfriend.h>
21#include <AliRawReader.h>
22#include <AliRunLoader.h>
23#include <AliReconstruction.h>
24
db352b46 25#include <AliNetMessage.h>
26#include <AliSocket.h>
27
28#include <zmq.hpp>
29
7e0cf530 30#include "AliRecoServerThread.h"
31
db352b46 32ClassImp(AliRecoServerThread)
7e0cf530 33AliRecoServerThread::AliRecoServerThread(zmq::context_t *context, AliReconstruction* reco)
db352b46 34 : AliThreadedSocket(context, AliThreadedSocket::WRITE),
7e0cf530 35 fReco(0),
7e0cf530 36 fCond(0)
37{
7e0cf530 38 fReco = reco;
39}
40
41AliRecoServerThread::~AliRecoServerThread()
42{
43 Stop();
44}
45
db352b46 46Bool_t AliRecoServerThread::Start(const char* endpoint)
7e0cf530 47{
db352b46 48 fHost = endpoint;
7e0cf530 49
db352b46 50 return AliThreadedSocket::Start();
7e0cf530 51}
52
db352b46 53void* AliRecoServerThread::RunThrdWrite(void* arg)
7e0cf530 54{
55 TThread::SetCancelAsynchronous();
56 TThread::SetCancelOn();
57
58 AliRecoServerThread* recoTh = (AliRecoServerThread*)arg;
59
60 const char* host = recoTh->GetHost();
61 zmq::context_t* context = recoTh->GetContext();
db352b46 62 AliReconstruction* reco = recoTh->GetReconstruction();
7e0cf530 63
db352b46 64 // generate a publish socket
65 AliSocket publisher(context, ZMQ_PUB);
66 publisher.Bind(host);
7e0cf530 67
68 if(reco==0) return 0;
69
70 AliESDEvent* event;
71
72 reco->Begin(NULL);
73 if (reco->GetAbort() != TSelector::kContinue) return 0;
74
75 reco->SlaveBegin(NULL);
76 if (reco->GetAbort() != TSelector::kContinue) return 0;
77
78 //******* The loop over events
79 Int_t iEvent = 0;
80 while ( reco->HasNextEventAfter(iEvent) ) {
81 // check if process has enough resources
82 if (!reco->HasEnoughResources(iEvent)) break;
83 Bool_t status = reco->ProcessEvent(iEvent);
84
85 if (status)
86 {
db352b46 87 event = reco->GetESDEvent();
88
89 AliNetMessage tmess(kMESS_OBJECT);
90 tmess.Reset();
91 tmess.WriteObject(event);
92
93 publisher.Send(tmess);
7e0cf530 94
95 sleep(1);
96 }
97 else {
98 reco->Abort("ProcessEvent",TSelector::kAbortFile);
99 }
100
101 reco->CleanProcessedEvent();
102 if(recoTh->Condition()->TimedWaitRelative(500)==0){
103 break;
104 }
105 iEvent++;
106 }
107 reco->SlaveTerminate();
108 if (reco->GetAbort() != TSelector::kContinue) return 0;
109 reco->Terminate();
110 if (reco->GetAbort() != TSelector::kContinue) return 0;
111
112}