]>
Commit | Line | Data |
---|---|---|
db352b46 | 1 | #include <zmq.hpp> |
2 | ||
3 | #include <TThread.h> | |
4 | ||
5 | #include "AliNetMessage.h" | |
6 | #include "AliSocket.h" | |
7 | #include "AliThreadedSocket.h" | |
8 | ||
9 | ClassImp(AliThreadedSocket) | |
10 | AliThreadedSocket::AliThreadedSocket(zmq::context_t *context, EOpenMode mode) | |
11 | : TQObject(), | |
12 | fThread(0), | |
13 | fContext(context), | |
14 | fOpenMode(mode) | |
15 | { | |
16 | ||
17 | } | |
18 | ||
19 | AliThreadedSocket::~AliThreadedSocket() | |
20 | { | |
f05dd9ed | 21 | Wait(); |
db352b46 | 22 | Stop(); |
23 | } | |
24 | ||
25 | Bool_t AliThreadedSocket::Start() | |
26 | { | |
27 | if(!fThread){ | |
f05dd9ed | 28 | fThread = new TThread("AliThreadedSocket", (void(*) (void *) ) &Dispatch, (void*) this ); |
29 | ||
db352b46 | 30 | if(fThread->Run()==0){ |
31 | Emit("Started()"); | |
32 | return kTRUE; | |
33 | } | |
34 | } | |
35 | ||
36 | return kFALSE; | |
37 | } | |
38 | ||
39 | Bool_t AliThreadedSocket::Stop() | |
40 | { | |
f05dd9ed | 41 | if(fThread){ |
42 | fThread->Delete(); | |
43 | fThread=0; | |
44 | } | |
45 | ||
db352b46 | 46 | Emit("Stopped()"); |
47 | return kTRUE; | |
48 | } | |
49 | ||
50 | Bool_t AliThreadedSocket::Kill() | |
51 | { | |
52 | if(fThread){ | |
53 | if(fThread->Kill()!=0) return kFALSE; | |
54 | fThread->Delete(); | |
55 | fThread=0; | |
56 | ||
57 | Emit("Stopped()"); | |
58 | return kTRUE; | |
59 | } | |
60 | } | |
61 | ||
62 | void AliThreadedSocket::Continue() | |
63 | { | |
64 | ||
65 | } | |
66 | ||
f05dd9ed | 67 | void AliThreadedSocket::Wait() |
68 | { | |
69 | if(fThread && fThread->GetState()==TThread::kRunningState) | |
70 | { | |
71 | fThread->Join(); | |
72 | } | |
73 | } | |
74 | ||
db352b46 | 75 | zmq::context_t* AliThreadedSocket::GetContext() const |
76 | { | |
77 | return fContext; | |
78 | } | |
79 | ||
80 | TThread* AliThreadedSocket::GetThread() const | |
81 | { | |
82 | return fThread; | |
83 | } | |
84 | ||
85 | void AliThreadedSocket::Started() | |
86 | { | |
87 | Emit("Started()"); | |
88 | } | |
89 | ||
90 | void AliThreadedSocket::Stopped() | |
91 | { | |
92 | Emit("Stopped()"); | |
93 | } | |
94 | ||
f05dd9ed | 95 | void AliThreadedSocket::RunThrdRead() |
db352b46 | 96 | { |
97 | AliNetMessage* mess=0; | |
f05dd9ed | 98 | AliSocket sock(fContext, ZMQ_SUB); |
db352b46 | 99 | |
100 | do{ | |
101 | sock.Recv(mess); | |
102 | } | |
103 | while(mess==0); | |
104 | ||
f05dd9ed | 105 | Stopped(); |
db352b46 | 106 | } |
107 | ||
f05dd9ed | 108 | void AliThreadedSocket::RunThrdWrite() |
db352b46 | 109 | { |
110 | AliNetMessage* mess=0; | |
f05dd9ed | 111 | AliSocket sock(fContext, ZMQ_PUB); |
db352b46 | 112 | |
113 | do{ | |
114 | sock.Send(*mess); | |
115 | } | |
116 | while(1); | |
117 | ||
f05dd9ed | 118 | Stopped(); |
db352b46 | 119 | } |