]>
Commit | Line | Data |
---|---|---|
7e0cf530 | 1 | // Author: Mihai Niculescu 2013 |
2 | ||
3 | /************************************************************************** | |
4 | * Copyright(c) 1998-2013, ALICE Experiment at CERN, all rights reserved. * | |
5 | * See http://aliceinfo.cern.ch/Offline/AliRoot/License.html for * | |
6 | * full copyright notice. * | |
7 | **************************************************************************/ | |
8 | ||
bd2715e3 | 9 | #include <RVersion.h> |
7e0cf530 | 10 | #include <stdlib.h> |
11 | ||
7e0cf530 | 12 | #include <TCondition.h> |
13 | #include <TBufferFile.h> | |
14 | #include <TMessage.h> | |
15 | #include <TObjArray.h> | |
16 | #include <TStreamerInfo.h> | |
17 | #include <TThread.h> | |
18 | ||
7e0cf530 | 19 | #include <AliESDEvent.h> |
20 | #include <AliESDfriend.h> | |
21 | #include <AliRawReader.h> | |
22 | #include <AliRunLoader.h> | |
23 | #include <AliReconstruction.h> | |
24 | ||
db352b46 | 25 | #include <AliNetMessage.h> |
26 | #include <AliSocket.h> | |
27 | ||
28 | #include <zmq.hpp> | |
29 | ||
7e0cf530 | 30 | #include "AliRecoServerThread.h" |
31 | ||
db352b46 | 32 | ClassImp(AliRecoServerThread) |
7e0cf530 | 33 | AliRecoServerThread::AliRecoServerThread(zmq::context_t *context, AliReconstruction* reco) |
f05dd9ed | 34 | : AliThreadedSocket(context, AliThreadedSocket::kWRITE), |
7e0cf530 | 35 | fReco(0), |
7e0cf530 | 36 | fCond(0) |
37 | { | |
7e0cf530 | 38 | fReco = reco; |
39 | } | |
40 | ||
41 | AliRecoServerThread::~AliRecoServerThread() | |
42 | { | |
43 | Stop(); | |
44 | } | |
45 | ||
db352b46 | 46 | Bool_t AliRecoServerThread::Start(const char* endpoint) |
7e0cf530 | 47 | { |
db352b46 | 48 | fHost = endpoint; |
7e0cf530 | 49 | |
db352b46 | 50 | return AliThreadedSocket::Start(); |
7e0cf530 | 51 | } |
52 | ||
f05dd9ed | 53 | void AliRecoServerThread::RunThrdWrite() |
7e0cf530 | 54 | { |
55 | TThread::SetCancelAsynchronous(); | |
56 | TThread::SetCancelOn(); | |
57 | ||
db352b46 | 58 | // generate a publish socket |
f05dd9ed | 59 | AliSocket publisher(fContext, ZMQ_PUB); |
60 | publisher.Bind(fHost); | |
7e0cf530 | 61 | |
f05dd9ed | 62 | if(fReco==0) return; |
7e0cf530 | 63 | |
64 | AliESDEvent* event; | |
65 | ||
f05dd9ed | 66 | fReco->Begin(NULL); |
67 | if (fReco->GetAbort() != TSelector::kContinue) return; | |
7e0cf530 | 68 | |
f05dd9ed | 69 | fReco->SlaveBegin(NULL); |
70 | if (fReco->GetAbort() != TSelector::kContinue) return; | |
7e0cf530 | 71 | |
72 | //******* The loop over events | |
73 | Int_t iEvent = 0; | |
f05dd9ed | 74 | while ( fReco->HasNextEventAfter(iEvent) ) { |
7e0cf530 | 75 | // check if process has enough resources |
f05dd9ed | 76 | if (!fReco->HasEnoughResources(iEvent)) break; |
77 | Bool_t status = fReco->ProcessEvent(iEvent); | |
7e0cf530 | 78 | |
79 | if (status) | |
80 | { | |
f05dd9ed | 81 | event = fReco->GetESDEvent(); |
db352b46 | 82 | |
83 | AliNetMessage tmess(kMESS_OBJECT); | |
84 | tmess.Reset(); | |
85 | tmess.WriteObject(event); | |
86 | ||
87 | publisher.Send(tmess); | |
7e0cf530 | 88 | |
89 | sleep(1); | |
90 | } | |
91 | else { | |
f05dd9ed | 92 | fReco->Abort("ProcessEvent",TSelector::kAbortFile); |
7e0cf530 | 93 | } |
94 | ||
f05dd9ed | 95 | fReco->CleanProcessedEvent(); |
96 | if(fCond->TimedWaitRelative(500)==0){ | |
7e0cf530 | 97 | break; |
98 | } | |
99 | iEvent++; | |
100 | } | |
f05dd9ed | 101 | fReco->SlaveTerminate(); |
102 | if (fReco->GetAbort() != TSelector::kContinue) return; | |
103 | fReco->Terminate(); | |
104 | if (fReco->GetAbort() != TSelector::kContinue) return; | |
7e0cf530 | 105 | |
106 | } |