]>
Commit | Line | Data |
---|---|---|
db352b46 | 1 | #include <cstring> |
2 | ||
3 | #include <zmq.hpp> | |
4 | ||
5 | #include "AliNetMessage.h" | |
6 | #include "AliSocket.h" | |
7 | ||
8 | void __freeBuffer (void *data, void *hint) | |
9 | { | |
10 | free(data); | |
11 | } | |
12 | ||
13 | ||
14 | ClassImp(AliSocket); | |
15 | AliSocket::AliSocket(zmq::context_t *context,int type) | |
16 | : TObject(), | |
17 | fContext(context) | |
18 | { | |
19 | fSocket = new zmq::socket_t(*context,type); | |
20 | } | |
21 | ||
22 | AliSocket::~AliSocket() | |
23 | { | |
24 | fSocket->close(); | |
25 | } | |
26 | ||
27 | void AliSocket::Bind(const char* endpoint) | |
28 | { | |
29 | fEndPoint = endpoint; | |
30 | ||
31 | fSocket->bind(endpoint); | |
32 | } | |
33 | ||
34 | void AliSocket::Connect(const char* endpoint) | |
35 | { | |
36 | fEndPoint = endpoint; | |
37 | ||
38 | fSocket->connect(endpoint); | |
39 | } | |
40 | ||
41 | void AliSocket::Subscribe(const char* filter) | |
42 | { | |
43 | fSocket->setsockopt(ZMQ_SUBSCRIBE, filter, strlen(filter) ); | |
44 | } | |
45 | ||
46 | bool AliSocket::Recv(AliNetMessage *&mess, int flags) | |
47 | { | |
48 | zmq::message_t message; | |
49 | ||
50 | if(!fSocket->recv(&message, flags)) | |
51 | return false; | |
52 | ||
53 | int bufSize = (int)message.size(); | |
54 | ||
55 | // buffer will be adopted by AliNetMessage, no need to free it | |
56 | char* buf = new char[bufSize]; | |
57 | memcpy(buf, (char*)message.data(), bufSize); | |
58 | ||
59 | mess = new AliNetMessage(buf, bufSize); | |
60 | ||
61 | return true; | |
62 | } | |
63 | ||
64 | bool AliSocket::Send(AliNetMessage &mess, int flags) | |
65 | { | |
66 | //NOTE: this is already done by AliNetMessage, should we do this too? | |
67 | // send length of the message | |
68 | int bufSize = mess.BufferSize(); | |
69 | ||
70 | // we need to copy it elsewhere because zmq takes ownership of the buffer data | |
71 | char* buf = new char[bufSize]; | |
72 | memcpy(buf, (char*)mess.Buffer(), bufSize); | |
73 | ||
74 | zmq::message_t message(buf, bufSize, __freeBuffer, NULL); | |
75 | ||
76 | //fwrite(mess.Buffer(), sizeof(char), bufSize, stdout); | |
77 | ||
78 | return fSocket->send(message, flags); | |
79 | } |