]>
Commit | Line | Data |
---|---|---|
1 | #include <zmq.hpp> | |
2 | ||
3 | #include <TThread.h> | |
4 | ||
5 | #include "AliNetMessage.h" | |
6 | #include "AliSocket.h" | |
7 | #include "AliThreadedSocket.h" | |
8 | ||
9 | ClassImp(AliThreadedSocket) | |
10 | AliThreadedSocket::AliThreadedSocket(zmq::context_t *context, EOpenMode mode) | |
11 | : TQObject(), | |
12 | fThread(0), | |
13 | fContext(context), | |
14 | fOpenMode(mode) | |
15 | { | |
16 | ||
17 | } | |
18 | ||
19 | AliThreadedSocket::~AliThreadedSocket() | |
20 | { | |
21 | Wait(); | |
22 | Stop(); | |
23 | } | |
24 | ||
25 | Bool_t AliThreadedSocket::Start() | |
26 | { | |
27 | if(!fThread){ | |
28 | fThread = new TThread("AliThreadedSocket", (void(*) (void *) ) &Dispatch, (void*) this ); | |
29 | ||
30 | if(fThread->Run()==0){ | |
31 | Emit("Started()"); | |
32 | return kTRUE; | |
33 | } | |
34 | } | |
35 | ||
36 | return kFALSE; | |
37 | } | |
38 | ||
39 | Bool_t AliThreadedSocket::Stop() | |
40 | { | |
41 | if(fThread){ | |
42 | fThread->Delete(); | |
43 | fThread=0; | |
44 | } | |
45 | ||
46 | Emit("Stopped()"); | |
47 | return kTRUE; | |
48 | } | |
49 | ||
50 | Bool_t AliThreadedSocket::Kill() | |
51 | { | |
52 | if(fThread){ | |
53 | if(fThread->Kill()!=0) return kFALSE; | |
54 | fThread->Delete(); | |
55 | fThread=0; | |
56 | ||
57 | Emit("Stopped()"); | |
58 | return kTRUE; | |
59 | } | |
60 | } | |
61 | ||
62 | void AliThreadedSocket::Continue() | |
63 | { | |
64 | ||
65 | } | |
66 | ||
67 | void AliThreadedSocket::Wait() | |
68 | { | |
69 | if(fThread && fThread->GetState()==TThread::kRunningState) | |
70 | { | |
71 | fThread->Join(); | |
72 | } | |
73 | } | |
74 | ||
75 | zmq::context_t* AliThreadedSocket::GetContext() const | |
76 | { | |
77 | return fContext; | |
78 | } | |
79 | ||
80 | TThread* AliThreadedSocket::GetThread() const | |
81 | { | |
82 | return fThread; | |
83 | } | |
84 | ||
85 | void AliThreadedSocket::Started() | |
86 | { | |
87 | Emit("Started()"); | |
88 | } | |
89 | ||
90 | void AliThreadedSocket::Stopped() | |
91 | { | |
92 | Emit("Stopped()"); | |
93 | } | |
94 | ||
95 | void AliThreadedSocket::RunThrdRead() | |
96 | { | |
97 | AliNetMessage* mess=0; | |
98 | AliSocket sock(fContext, ZMQ_SUB); | |
99 | ||
100 | do{ | |
101 | sock.Recv(mess); | |
102 | } | |
103 | while(mess==0); | |
104 | ||
105 | Stopped(); | |
106 | } | |
107 | ||
108 | void AliThreadedSocket::RunThrdWrite() | |
109 | { | |
110 | AliNetMessage* mess=0; | |
111 | AliSocket sock(fContext, ZMQ_PUB); | |
112 | ||
113 | do{ | |
114 | sock.Send(*mess); | |
115 | } | |
116 | while(1); | |
117 | ||
118 | Stopped(); | |
119 | } |