]> git.uio.no Git - u/mrichter/AliRoot.git/blob - MONITOR/AliThreadedSocket.cxx
EVGEN depends on HepMC and HepMCParser
[u/mrichter/AliRoot.git] / MONITOR / AliThreadedSocket.cxx
1 #include <zmq.hpp>
2
3 #include <TThread.h>
4
5 #include "AliNetMessage.h"
6 #include "AliSocket.h"
7 #include "AliThreadedSocket.h"
8
9 ClassImp(AliThreadedSocket)
10 AliThreadedSocket::AliThreadedSocket(zmq::context_t *context, EOpenMode mode)
11         : TQObject(),
12         fThread(0),
13         fContext(context),
14         fOpenMode(mode)
15 {
16
17 }
18
19 AliThreadedSocket::~AliThreadedSocket()
20 {
21         Wait();
22         Stop();
23 }
24
25 Bool_t AliThreadedSocket::Start()
26 {
27         if(!fThread){
28                 fThread = new TThread("AliThreadedSocket", (void(*) (void *) ) &Dispatch, (void*)  this );
29   
30         if(fThread->Run()==0){ 
31                 Emit("Started()");
32                 return kTRUE; 
33         }
34         }
35         
36         return kFALSE;
37 }
38
39 Bool_t AliThreadedSocket::Stop()
40 {
41         if(fThread){
42                 fThread->Delete();
43                 fThread=0;
44         }
45
46         Emit("Stopped()");
47         return kTRUE;
48 }
49
50 Bool_t AliThreadedSocket::Kill()
51 {
52         if(fThread){
53                 if(fThread->Kill()!=0) return kFALSE;
54                 fThread->Delete();
55                 fThread=0;
56                 
57                 Emit("Stopped()");
58                 return kTRUE;
59         }
60 }
61
62 void AliThreadedSocket::Continue()
63 {
64         
65 }
66
67 void AliThreadedSocket::Wait()
68 {
69         if(fThread && fThread->GetState()==TThread::kRunningState)
70         {
71                 fThread->Join();
72         }
73 }
74
75 zmq::context_t* AliThreadedSocket::GetContext() const
76 {
77         return fContext;
78 }
79
80 TThread* AliThreadedSocket::GetThread() const
81 {
82         return fThread;
83 }
84
85 void AliThreadedSocket::Started()
86 {
87         Emit("Started()");
88 }
89
90 void AliThreadedSocket::Stopped()
91 {
92         Emit("Stopped()");
93 }
94
95 void AliThreadedSocket::RunThrdRead()
96 {
97         AliNetMessage* mess=0;
98         AliSocket sock(fContext, ZMQ_SUB);
99                 
100         do{
101                 sock.Recv(mess);
102         }
103         while(mess==0);
104         
105         Stopped();
106 }
107
108 void AliThreadedSocket::RunThrdWrite()
109 {
110         AliNetMessage* mess=0;
111         AliSocket sock(fContext, ZMQ_PUB);
112         
113         do{
114                 sock.Send(*mess);
115         }
116         while(1);
117         
118         Stopped();
119 }