]>
Commit | Line | Data |
---|---|---|
a655eae3 | 1 | // $Id$ |
2 | ||
3 | /** @file AliHLTMessage.cxx | |
4 | @author Matthias Richter (customization of Root TMessage ) | |
5 | @date | |
6 | @brief Serialization of Root objects in the ALICE HLT. */ | |
7 | ||
8 | // This is the original Root TMessage implementation with a few minor | |
9 | // modifications, original revision: | |
10 | // root/net: v5-14-00 $: TMessage.cxx,v 1.6 2004/05/07 09:51:58 brun | |
11 | // Author: Fons Rademakers 19/12/96 | |
12 | ||
13 | /************************************************************************* | |
14 | * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. * | |
15 | * All rights reserved. * | |
16 | * * | |
17 | * For the licensing terms see $ROOTSYS/LICENSE. * | |
18 | * For the list of contributors see $ROOTSYS/README/CREDITS. * | |
19 | *************************************************************************/ | |
20 | ||
21 | ////////////////////////////////////////////////////////////////////////// | |
22 | // // | |
23 | // TMessage // | |
24 | // // | |
25 | // Message buffer class used for serializing objects and sending them // | |
26 | // over a network. This class inherits from TBuffer the basic I/O // | |
27 | // serializer. // | |
28 | // // | |
29 | ////////////////////////////////////////////////////////////////////////// | |
30 | ||
31 | #include "AliHLTMessage.h" | |
32 | #include "Bytes.h" | |
33 | #include "TFile.h" | |
34 | ||
35 | extern "C" void R__zip (Int_t cxlevel, Int_t *nin, char *bufin, Int_t *lout, char *bufout, Int_t *nout); | |
36 | extern "C" void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout); | |
37 | const Int_t kMAXBUF = 0xffffff; | |
38 | ||
a655eae3 | 39 | ClassImp(AliHLTMessage) |
40 | ||
41 | //______________________________________________________________________________ | |
dfaa582a | 42 | AliHLTMessage::AliHLTMessage(UInt_t what) |
43 | : | |
44 | # ifdef ROOT_TBufferFile | |
45 | TBufferFile(kWrite), | |
46 | # else | |
47 | TBuffer(kWrite), | |
48 | # endif | |
c61a7285 | 49 | AliHLTLogging(), |
50 | fWhat(what), | |
51 | fClass(0), | |
52 | fCompress(0), | |
53 | fBufComp(0), | |
54 | fBufCompCur(0), | |
55 | fCompPos(0) | |
0634add1 | 56 | , fBufUncompressed(0) |
a655eae3 | 57 | { |
58 | // Create a AliHLTMessage object for storing objects. The "what" integer | |
59 | // describes the type of message. Predifined ROOT system message types | |
60 | // can be found in MessageTypes.h. Make sure your own message types are | |
61 | // unique from the ROOT defined message types (i.e. 0 - 10000 are | |
62 | // reserved by ROOT). In case you OR "what" with kMESS_ACK, the message | |
63 | // will wait for an acknowledgement from the remote side. This makes | |
64 | // the sending process synchronous. In case you OR "what" with kMESS_ZIP, | |
65 | // the message will be compressed in TSocket using the zip algorithm | |
66 | // (only if message is > 256 bytes). | |
67 | ||
68 | // space at the beginning of the message reserved for the message length | |
69 | UInt_t reserved = 0; | |
70 | *this << reserved; | |
71 | ||
a655eae3 | 72 | *this << what; |
73 | ||
a655eae3 | 74 | } |
75 | ||
5cdaceed | 76 | const Int_t AliHLTMessage::fgkMinimumSize=30; |
77 | UInt_t AliHLTMessage::fgkDefaultBuffer[2]={0,0}; | |
78 | ||
a655eae3 | 79 | //______________________________________________________________________________ |
dfaa582a | 80 | AliHLTMessage::AliHLTMessage(void *buf, Int_t bufsize) |
81 | : | |
ba4b0dd7 | 82 | # if defined(ROOT_TBufferFile) |
5cdaceed | 83 | TBufferFile(kRead, bufsize>fgkMinimumSize?bufsize:sizeof(fgkDefaultBuffer), bufsize>fgkMinimumSize?buf:&fgkDefaultBuffer, 0), |
dfaa582a | 84 | # else |
5cdaceed | 85 | TBuffer(kRead, bufsize>fgkMinimumSize?bufsize:sizeof(fgkDefaultBuffer), bufsize>fgkMinimumSize?buf:&fgkDefaultBuffer, 0), |
dfaa582a | 86 | # endif |
c61a7285 | 87 | AliHLTLogging(), |
88 | fWhat(0), | |
89 | fClass(0), | |
90 | fCompress(0), | |
91 | fBufComp(0), | |
92 | fBufCompCur(0), | |
93 | fCompPos(0) | |
0634add1 | 94 | , fBufUncompressed(0) |
a655eae3 | 95 | { |
96 | // Create a AliHLTMessage object for reading objects. The objects will be | |
97 | // read from buf. Use the What() method to get the message type. | |
98 | ||
99 | // skip space at the beginning of the message reserved for the message length | |
100 | fBufCur += sizeof(UInt_t); | |
101 | ||
102 | *this >> fWhat; | |
103 | ||
a655eae3 | 104 | if (fWhat & kMESS_ZIP) { |
105 | // if buffer has kMESS_ZIP set, move it to fBufComp and uncompress | |
106 | fBufComp = fBuffer; | |
107 | fBufCompCur = fBuffer + bufsize; | |
108 | fBuffer = 0; | |
109 | Uncompress(); | |
d896b5c8 | 110 | // Matthias Sep 2008 |
111 | // NOTE: this is not done in TMessage and will lead to the deletion | |
112 | // of the buffer. This is not allowed in case of HLT where the | |
113 | // buffer is handled by the framework. In general, I think this | |
114 | // is a very bad idea to do it like that in TMessage | |
115 | fBufComp = NULL; | |
116 | fBufCompCur = 0; | |
a655eae3 | 117 | } |
118 | ||
119 | if (fWhat == kMESS_OBJECT) { | |
120 | InitMap(); | |
121 | fClass = ReadClass(); // get first the class stored in message | |
122 | SetBufferOffset(sizeof(UInt_t) + sizeof(fWhat)); | |
123 | ResetMap(); | |
124 | } else { | |
125 | fClass = 0; | |
126 | } | |
127 | } | |
128 | ||
129 | //______________________________________________________________________________ | |
130 | AliHLTMessage::~AliHLTMessage() | |
131 | { | |
132 | // Clean up compression buffer. | |
0634add1 | 133 | Reset(); |
a655eae3 | 134 | } |
135 | ||
136 | //______________________________________________________________________________ | |
137 | void AliHLTMessage::Forward() | |
138 | { | |
139 | // Change a buffer that was received into one that can be send, i.e. | |
140 | // forward a just received message. | |
141 | ||
142 | if (IsReading()) { | |
143 | SetWriteMode(); | |
144 | SetBufferOffset(fBufSize); | |
145 | ||
146 | if (fBufComp) { | |
147 | fCompPos = fBufCur; | |
148 | } | |
149 | } | |
150 | } | |
151 | ||
152 | //______________________________________________________________________________ | |
153 | void AliHLTMessage::Reset() | |
154 | { | |
155 | // Reset the message buffer so we can use (i.e. fill) it again. | |
156 | ||
157 | SetBufferOffset(sizeof(UInt_t) + sizeof(fWhat)); | |
158 | ResetMap(); | |
159 | ||
160 | if (fBufComp) { | |
161 | delete [] fBufComp; | |
162 | fBufComp = 0; | |
163 | fBufCompCur = 0; | |
164 | fCompPos = 0; | |
165 | } | |
0634add1 | 166 | if (fBufUncompressed) { |
167 | delete [] fBufUncompressed; | |
168 | fBufUncompressed=NULL; | |
169 | } | |
a655eae3 | 170 | } |
171 | ||
172 | //______________________________________________________________________________ | |
173 | void AliHLTMessage::SetLength() const | |
174 | { | |
175 | // Set the message length at the beginning of the message buffer. | |
176 | ||
177 | if (IsWriting()) { | |
178 | char *buf = Buffer(); | |
179 | *((UInt_t*)buf) = (UInt_t)(Length() - sizeof(UInt_t)); | |
180 | ||
181 | if (fBufComp) { | |
182 | buf = fBufComp; | |
d896b5c8 | 183 | *((UInt_t*)buf) = (UInt_t)(CompLength() - sizeof(UInt_t)); |
a655eae3 | 184 | } |
185 | } | |
186 | } | |
187 | ||
188 | //______________________________________________________________________________ | |
189 | void AliHLTMessage::SetWhat(UInt_t what) | |
190 | { | |
191 | // Using this method one can change the message type a-posteriory. | |
192 | // In case you OR "what" with kMESS_ACK, the message will wait for | |
193 | // an acknowledgement from the remote side. This makes the sending | |
194 | // process synchronous. | |
195 | ||
196 | fWhat = what; | |
197 | ||
198 | char *buf = Buffer(); | |
199 | buf += sizeof(UInt_t); // skip reserved length space | |
200 | tobuf(buf, what); | |
201 | ||
202 | if (fBufComp) { | |
203 | buf = fBufComp; | |
204 | buf += sizeof(UInt_t); // skip reserved length space | |
205 | tobuf(buf, what | kMESS_ZIP); | |
206 | } | |
207 | } | |
208 | ||
209 | //______________________________________________________________________________ | |
210 | void AliHLTMessage::SetCompressionLevel(Int_t level) | |
211 | { | |
212 | // Set the message compression level. Can be between 0 and 9 with 0 | |
213 | // being no compression and 9 maximum compression. In general the default | |
214 | // level of 1 is the best compromise between achieved compression and | |
215 | // cpu time. Compression will only happen when the message is > 256 bytes. | |
216 | ||
217 | if (level < 0) level = 0; | |
218 | if (level > 9) level = 9; | |
219 | ||
220 | if (level != fCompress && fBufComp) { | |
221 | delete [] fBufComp; | |
222 | fBufComp = 0; | |
223 | fBufCompCur = 0; | |
224 | fCompPos = 0; | |
225 | } | |
226 | fCompress = level; | |
227 | } | |
228 | ||
229 | //______________________________________________________________________________ | |
230 | Int_t AliHLTMessage::Compress() | |
231 | { | |
232 | // Compress the message. The message will only be compressed if the | |
233 | // compression level > 0 and the if the message is > 256 bytes. | |
234 | // Returns -1 in case of error (when compression fails or | |
235 | // when the message increases in size in some pathological cases), | |
236 | // otherwise returns 0. | |
237 | ||
238 | if (fCompress == 0) { | |
239 | // no compression specified | |
240 | if (fBufComp) { | |
241 | delete [] fBufComp; | |
242 | fBufComp = 0; | |
243 | fBufCompCur = 0; | |
244 | fCompPos = 0; | |
245 | } | |
246 | return 0; | |
247 | } | |
248 | ||
249 | if (fBufComp && fCompPos == fBufCur) { | |
250 | // the message was already compressed | |
251 | return 0; | |
252 | } | |
253 | ||
254 | // remove any existing compressed buffer before compressing modified message | |
255 | if (fBufComp) { | |
256 | delete [] fBufComp; | |
257 | fBufComp = 0; | |
258 | fBufCompCur = 0; | |
259 | fCompPos = 0; | |
260 | } | |
261 | ||
262 | if (Length() <= (Int_t)(256 + 2*sizeof(UInt_t))) { | |
263 | // this message is too small to be compressed | |
264 | return 0; | |
265 | } | |
266 | ||
267 | Int_t hdrlen = 2*sizeof(UInt_t); | |
268 | Int_t messlen = Length() - hdrlen; | |
269 | Int_t nbuffers = messlen / kMAXBUF; | |
270 | Int_t chdrlen = 3*sizeof(UInt_t); // compressed buffer header length | |
271 | Int_t buflen = TMath::Max(512, chdrlen + messlen + 9*nbuffers); | |
272 | fBufComp = new char[buflen]; | |
273 | char *messbuf = Buffer() + hdrlen; | |
274 | char *bufcur = fBufComp + chdrlen; | |
275 | Int_t noutot = 0; | |
276 | Int_t nzip = 0; | |
277 | Int_t nout, bufmax; | |
278 | for (Int_t i = 0; i <= nbuffers; i++) { | |
279 | if (i == nbuffers) | |
280 | bufmax = messlen - nzip; | |
281 | else | |
282 | bufmax = kMAXBUF; | |
283 | R__zip(fCompress, &bufmax, messbuf, &bufmax, bufcur, &nout); | |
284 | if (nout == 0 || nout >= messlen) { | |
285 | //this happens when the buffer cannot be compressed | |
286 | delete [] fBufComp; | |
287 | fBufComp = 0; | |
288 | fBufCompCur = 0; | |
289 | fCompPos = 0; | |
290 | return -1; | |
291 | } | |
292 | bufcur += nout; | |
293 | noutot += nout; | |
294 | messbuf += kMAXBUF; | |
295 | nzip += kMAXBUF; | |
296 | } | |
297 | fBufCompCur = bufcur; | |
298 | fCompPos = fBufCur; | |
299 | ||
300 | bufcur = fBufComp; | |
301 | tobuf(bufcur, (UInt_t)(CompLength() - sizeof(UInt_t))); | |
302 | Int_t what = fWhat | kMESS_ZIP; | |
303 | tobuf(bufcur, what); | |
304 | tobuf(bufcur, Length()); // original uncompressed buffer length | |
305 | ||
306 | return 0; | |
307 | } | |
308 | ||
309 | //______________________________________________________________________________ | |
310 | Int_t AliHLTMessage::Uncompress() | |
311 | { | |
312 | // Uncompress the message. The message will only be uncompressed when | |
313 | // kMESS_ZIP is set. Returns -1 in case of error, 0 otherwise. | |
314 | ||
315 | if (!fBufComp || !(fWhat & kMESS_ZIP)) | |
316 | return -1; | |
317 | ||
318 | Int_t buflen; | |
319 | Int_t hdrlen = 2*sizeof(UInt_t); | |
320 | UChar_t *bufcur = (UChar_t*)fBufComp + hdrlen; | |
321 | frombuf((char *&)bufcur, &buflen); | |
322 | fBuffer = new char[buflen]; | |
0634add1 | 323 | fBufUncompressed = fBuffer; |
a655eae3 | 324 | fBufSize = buflen; |
325 | fBufCur = fBuffer + sizeof(UInt_t) + sizeof(fWhat); | |
326 | fBufMax = fBuffer + fBufSize; | |
327 | char *messbuf = fBuffer + hdrlen; | |
328 | ||
329 | Int_t nin, nout, nbuf; | |
330 | Int_t noutot = 0; | |
331 | while (1) { | |
332 | nin = 9 + ((Int_t)bufcur[3] | ((Int_t)bufcur[4] << 8) | ((Int_t)bufcur[5] << 16)); | |
333 | nbuf = (Int_t)bufcur[6] | ((Int_t)bufcur[7] << 8) | ((Int_t)bufcur[8] << 16); | |
334 | R__unzip(&nin, bufcur, &nbuf, messbuf, &nout); | |
335 | if (!nout) break; | |
336 | noutot += nout; | |
337 | if (noutot >= buflen - hdrlen) break; | |
338 | bufcur += nin; | |
339 | messbuf += nout; | |
340 | } | |
341 | ||
342 | fWhat &= ~kMESS_ZIP; | |
343 | fCompress = 1; | |
344 | ||
345 | return 0; | |
346 | } | |
347 |