]> git.uio.no Git - u/mrichter/AliRoot.git/blob - MONITOR/AliThreadedSocket.cxx
remove unneccassary TStopWatch
[u/mrichter/AliRoot.git] / MONITOR / AliThreadedSocket.cxx
1 #include <zmq.hpp>
2
3 #include <TThread.h>
4
5 #include "AliNetMessage.h"
6 #include "AliSocket.h"
7 #include "AliThreadedSocket.h"
8
9 ClassImp(AliThreadedSocket)
10 AliThreadedSocket::AliThreadedSocket(zmq::context_t *context, EOpenMode mode)
11         : TQObject(),
12         fThread(0),
13         fContext(context),
14         fOpenMode(mode)
15 {
16
17 }
18
19 AliThreadedSocket::~AliThreadedSocket()
20 {
21         Stop();
22 }
23
24 Bool_t AliThreadedSocket::Start()
25 {
26         if(!fThread){
27                 if(fOpenMode==READ)
28                 fThread = new TThread("AliThreadedSocket", (void(*) (void *) ) &RunThrdRead, (void*)  this );
29         else
30                 fThread = new TThread("AliThreadedSocket", (void(*) (void *) ) &RunThrdWrite,(void*)  this );
31                 
32         if(fThread->Run()==0){ 
33                 Emit("Started()");
34                 return kTRUE; 
35         }
36         }
37         
38         return kFALSE;
39 }
40
41 Bool_t AliThreadedSocket::Stop()
42 {
43         Emit("Stopped()");
44         return kTRUE;
45 }
46
47 Bool_t AliThreadedSocket::Kill()
48 {
49         if(fThread){
50                 if(fThread->Kill()!=0) return kFALSE;
51                 fThread->Delete();
52                 fThread=0;
53                 
54                 Emit("Stopped()");
55                 return kTRUE;
56         }
57 }
58
59 void AliThreadedSocket::Continue()
60 {
61         
62 }
63
64 zmq::context_t* AliThreadedSocket::GetContext() const
65 {
66         return fContext;
67 }
68
69 TThread* AliThreadedSocket::GetThread() const
70 {
71         return fThread;
72 }
73
74 void AliThreadedSocket::Started()
75 {
76         Emit("Started()");
77 }
78
79 void AliThreadedSocket::Stopped()
80 {
81         Emit("Stopped()");
82 }
83
84 void* AliThreadedSocket::RunThrdRead(void* arg)
85 {
86         AliNetMessage* mess=0;
87         AliThreadedSocket* thsock = (AliThreadedSocket*)arg;
88         zmq::context_t* context = thsock->GetContext();
89         
90         AliSocket sock(context, ZMQ_SUB);
91                 
92         do{
93                 sock.Recv(mess);
94         }
95         while(mess==0);
96         
97         thsock->Stopped();
98 }
99
100 void* AliThreadedSocket::RunThrdWrite(void* arg)
101 {
102         AliNetMessage* mess=0;
103         AliThreadedSocket* thsock = (AliThreadedSocket*)arg;
104         zmq::context_t* context = thsock->GetContext();
105         
106         AliSocket sock(context, ZMQ_PUB);
107         
108         do{
109                 sock.Send(*mess);
110         }
111         while(1);
112         
113         thsock->Emit("Stopped()");
114 }