]> git.uio.no Git - u/mrichter/AliRoot.git/blame - HLT/BASE/AliHLTMessage.cxx
updated from the running version in the ACR
[u/mrichter/AliRoot.git] / HLT / BASE / AliHLTMessage.cxx
CommitLineData
a655eae3 1// $Id$
2
3/** @file AliHLTMessage.cxx
4 @author Matthias Richter (customization of Root TMessage )
5 @date
6 @brief Serialization of Root objects in the ALICE HLT. */
7
8// This is the original Root TMessage implementation with a few minor
9// modifications, original revision:
10// root/net: v5-14-00 $: TMessage.cxx,v 1.6 2004/05/07 09:51:58 brun
11// Author: Fons Rademakers 19/12/96
12
13/*************************************************************************
14 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
15 * All rights reserved. *
16 * *
17 * For the licensing terms see $ROOTSYS/LICENSE. *
18 * For the list of contributors see $ROOTSYS/README/CREDITS. *
19 *************************************************************************/
20
21//////////////////////////////////////////////////////////////////////////
22// //
23// TMessage //
24// //
25// Message buffer class used for serializing objects and sending them //
26// over a network. This class inherits from TBuffer the basic I/O //
27// serializer. //
28// //
29//////////////////////////////////////////////////////////////////////////
30
31#include "AliHLTMessage.h"
32#include "Bytes.h"
33#include "TFile.h"
e5701dcf 34#include "TClass.h"
a655eae3 35
36extern "C" void R__zip (Int_t cxlevel, Int_t *nin, char *bufin, Int_t *lout, char *bufout, Int_t *nout);
37extern "C" void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout);
38const Int_t kMAXBUF = 0xffffff;
39
a655eae3 40ClassImp(AliHLTMessage)
41
42//______________________________________________________________________________
dfaa582a 43AliHLTMessage::AliHLTMessage(UInt_t what)
44 :
45# ifdef ROOT_TBufferFile
46 TBufferFile(kWrite),
47# else
48 TBuffer(kWrite),
49# endif
c61a7285 50 AliHLTLogging(),
51 fWhat(what),
52 fClass(0),
53 fCompress(0),
54 fBufComp(0),
55 fBufCompCur(0),
56 fCompPos(0)
0634add1 57 , fBufUncompressed(0)
a655eae3 58{
59 // Create a AliHLTMessage object for storing objects. The "what" integer
60 // describes the type of message. Predifined ROOT system message types
61 // can be found in MessageTypes.h. Make sure your own message types are
62 // unique from the ROOT defined message types (i.e. 0 - 10000 are
63 // reserved by ROOT). In case you OR "what" with kMESS_ACK, the message
64 // will wait for an acknowledgement from the remote side. This makes
65 // the sending process synchronous. In case you OR "what" with kMESS_ZIP,
66 // the message will be compressed in TSocket using the zip algorithm
67 // (only if message is > 256 bytes).
68
69 // space at the beginning of the message reserved for the message length
70 UInt_t reserved = 0;
71 *this << reserved;
72
a655eae3 73 *this << what;
74
a655eae3 75}
76
5cdaceed 77const Int_t AliHLTMessage::fgkMinimumSize=30;
78UInt_t AliHLTMessage::fgkDefaultBuffer[2]={0,0};
79
a655eae3 80//______________________________________________________________________________
dfaa582a 81AliHLTMessage::AliHLTMessage(void *buf, Int_t bufsize)
82 :
ba4b0dd7 83# if defined(ROOT_TBufferFile)
5cdaceed 84 TBufferFile(kRead, bufsize>fgkMinimumSize?bufsize:sizeof(fgkDefaultBuffer), bufsize>fgkMinimumSize?buf:&fgkDefaultBuffer, 0),
dfaa582a 85# else
5cdaceed 86 TBuffer(kRead, bufsize>fgkMinimumSize?bufsize:sizeof(fgkDefaultBuffer), bufsize>fgkMinimumSize?buf:&fgkDefaultBuffer, 0),
dfaa582a 87# endif
c61a7285 88 AliHLTLogging(),
89 fWhat(0),
90 fClass(0),
91 fCompress(0),
92 fBufComp(0),
93 fBufCompCur(0),
94 fCompPos(0)
0634add1 95 , fBufUncompressed(0)
a655eae3 96{
97 // Create a AliHLTMessage object for reading objects. The objects will be
98 // read from buf. Use the What() method to get the message type.
99
100 // skip space at the beginning of the message reserved for the message length
101 fBufCur += sizeof(UInt_t);
102
103 *this >> fWhat;
104
a655eae3 105 if (fWhat & kMESS_ZIP) {
106 // if buffer has kMESS_ZIP set, move it to fBufComp and uncompress
107 fBufComp = fBuffer;
108 fBufCompCur = fBuffer + bufsize;
109 fBuffer = 0;
110 Uncompress();
d896b5c8 111 // Matthias Sep 2008
112 // NOTE: this is not done in TMessage and will lead to the deletion
113 // of the buffer. This is not allowed in case of HLT where the
114 // buffer is handled by the framework. In general, I think this
115 // is a very bad idea to do it like that in TMessage
116 fBufComp = NULL;
117 fBufCompCur = 0;
a655eae3 118 }
119
120 if (fWhat == kMESS_OBJECT) {
121 InitMap();
122 fClass = ReadClass(); // get first the class stored in message
123 SetBufferOffset(sizeof(UInt_t) + sizeof(fWhat));
124 ResetMap();
125 } else {
126 fClass = 0;
127 }
128}
129
130//______________________________________________________________________________
131AliHLTMessage::~AliHLTMessage()
132{
133 // Clean up compression buffer.
0634add1 134 Reset();
a655eae3 135}
136
137//______________________________________________________________________________
138void AliHLTMessage::Forward()
139{
140 // Change a buffer that was received into one that can be send, i.e.
141 // forward a just received message.
142
143 if (IsReading()) {
144 SetWriteMode();
145 SetBufferOffset(fBufSize);
146
147 if (fBufComp) {
148 fCompPos = fBufCur;
149 }
150 }
151}
152
153//______________________________________________________________________________
154void AliHLTMessage::Reset()
155{
156 // Reset the message buffer so we can use (i.e. fill) it again.
157
158 SetBufferOffset(sizeof(UInt_t) + sizeof(fWhat));
159 ResetMap();
160
161 if (fBufComp) {
162 delete [] fBufComp;
163 fBufComp = 0;
164 fBufCompCur = 0;
165 fCompPos = 0;
166 }
0634add1 167 if (fBufUncompressed) {
168 delete [] fBufUncompressed;
169 fBufUncompressed=NULL;
170 }
a655eae3 171}
172
173//______________________________________________________________________________
174void AliHLTMessage::SetLength() const
175{
176 // Set the message length at the beginning of the message buffer.
177
178 if (IsWriting()) {
179 char *buf = Buffer();
180 *((UInt_t*)buf) = (UInt_t)(Length() - sizeof(UInt_t));
181
182 if (fBufComp) {
183 buf = fBufComp;
d896b5c8 184 *((UInt_t*)buf) = (UInt_t)(CompLength() - sizeof(UInt_t));
a655eae3 185 }
186 }
187}
188
189//______________________________________________________________________________
190void AliHLTMessage::SetWhat(UInt_t what)
191{
192 // Using this method one can change the message type a-posteriory.
193 // In case you OR "what" with kMESS_ACK, the message will wait for
194 // an acknowledgement from the remote side. This makes the sending
195 // process synchronous.
196
197 fWhat = what;
198
199 char *buf = Buffer();
200 buf += sizeof(UInt_t); // skip reserved length space
201 tobuf(buf, what);
202
203 if (fBufComp) {
204 buf = fBufComp;
205 buf += sizeof(UInt_t); // skip reserved length space
206 tobuf(buf, what | kMESS_ZIP);
207 }
208}
209
210//______________________________________________________________________________
211void AliHLTMessage::SetCompressionLevel(Int_t level)
212{
213 // Set the message compression level. Can be between 0 and 9 with 0
214 // being no compression and 9 maximum compression. In general the default
215 // level of 1 is the best compromise between achieved compression and
216 // cpu time. Compression will only happen when the message is > 256 bytes.
217
218 if (level < 0) level = 0;
219 if (level > 9) level = 9;
220
221 if (level != fCompress && fBufComp) {
222 delete [] fBufComp;
223 fBufComp = 0;
224 fBufCompCur = 0;
225 fCompPos = 0;
226 }
227 fCompress = level;
228}
229
230//______________________________________________________________________________
231Int_t AliHLTMessage::Compress()
232{
233 // Compress the message. The message will only be compressed if the
234 // compression level > 0 and the if the message is > 256 bytes.
235 // Returns -1 in case of error (when compression fails or
236 // when the message increases in size in some pathological cases),
237 // otherwise returns 0.
238
239 if (fCompress == 0) {
240 // no compression specified
241 if (fBufComp) {
242 delete [] fBufComp;
243 fBufComp = 0;
244 fBufCompCur = 0;
245 fCompPos = 0;
246 }
247 return 0;
248 }
249
250 if (fBufComp && fCompPos == fBufCur) {
251 // the message was already compressed
252 return 0;
253 }
254
255 // remove any existing compressed buffer before compressing modified message
256 if (fBufComp) {
257 delete [] fBufComp;
258 fBufComp = 0;
259 fBufCompCur = 0;
260 fCompPos = 0;
261 }
262
263 if (Length() <= (Int_t)(256 + 2*sizeof(UInt_t))) {
264 // this message is too small to be compressed
265 return 0;
266 }
267
268 Int_t hdrlen = 2*sizeof(UInt_t);
269 Int_t messlen = Length() - hdrlen;
270 Int_t nbuffers = messlen / kMAXBUF;
271 Int_t chdrlen = 3*sizeof(UInt_t); // compressed buffer header length
272 Int_t buflen = TMath::Max(512, chdrlen + messlen + 9*nbuffers);
273 fBufComp = new char[buflen];
274 char *messbuf = Buffer() + hdrlen;
275 char *bufcur = fBufComp + chdrlen;
276 Int_t noutot = 0;
277 Int_t nzip = 0;
278 Int_t nout, bufmax;
279 for (Int_t i = 0; i <= nbuffers; i++) {
280 if (i == nbuffers)
281 bufmax = messlen - nzip;
282 else
283 bufmax = kMAXBUF;
284 R__zip(fCompress, &bufmax, messbuf, &bufmax, bufcur, &nout);
285 if (nout == 0 || nout >= messlen) {
286 //this happens when the buffer cannot be compressed
287 delete [] fBufComp;
288 fBufComp = 0;
289 fBufCompCur = 0;
290 fCompPos = 0;
291 return -1;
292 }
293 bufcur += nout;
294 noutot += nout;
295 messbuf += kMAXBUF;
296 nzip += kMAXBUF;
297 }
298 fBufCompCur = bufcur;
299 fCompPos = fBufCur;
300
301 bufcur = fBufComp;
302 tobuf(bufcur, (UInt_t)(CompLength() - sizeof(UInt_t)));
303 Int_t what = fWhat | kMESS_ZIP;
304 tobuf(bufcur, what);
305 tobuf(bufcur, Length()); // original uncompressed buffer length
306
307 return 0;
308}
309
310//______________________________________________________________________________
311Int_t AliHLTMessage::Uncompress()
312{
313 // Uncompress the message. The message will only be uncompressed when
314 // kMESS_ZIP is set. Returns -1 in case of error, 0 otherwise.
315
316 if (!fBufComp || !(fWhat & kMESS_ZIP))
317 return -1;
318
319 Int_t buflen;
320 Int_t hdrlen = 2*sizeof(UInt_t);
321 UChar_t *bufcur = (UChar_t*)fBufComp + hdrlen;
322 frombuf((char *&)bufcur, &buflen);
323 fBuffer = new char[buflen];
0634add1 324 fBufUncompressed = fBuffer;
a655eae3 325 fBufSize = buflen;
326 fBufCur = fBuffer + sizeof(UInt_t) + sizeof(fWhat);
327 fBufMax = fBuffer + fBufSize;
328 char *messbuf = fBuffer + hdrlen;
329
330 Int_t nin, nout, nbuf;
331 Int_t noutot = 0;
332 while (1) {
333 nin = 9 + ((Int_t)bufcur[3] | ((Int_t)bufcur[4] << 8) | ((Int_t)bufcur[5] << 16));
334 nbuf = (Int_t)bufcur[6] | ((Int_t)bufcur[7] << 8) | ((Int_t)bufcur[8] << 16);
335 R__unzip(&nin, bufcur, &nbuf, messbuf, &nout);
336 if (!nout) break;
337 noutot += nout;
338 if (noutot >= buflen - hdrlen) break;
339 bufcur += nin;
340 messbuf += nout;
341 }
342
343 fWhat &= ~kMESS_ZIP;
344 fCompress = 1;
345
346 return 0;
347}
348
e5701dcf 349AliHLTMessage* AliHLTMessage::Stream(TObject* pSrc, Int_t compression, unsigned verbosity)
350{
351 /// Helper function to stream an object into an AliHLTMessage
352 /// The returned instance must be cleaned by the caller
353 ///
354 /// Get the data and data size from the message:
355 /// first check
356 /// pMsg->CompLength();
357 /// pMsg->CompBuffer();
358 /// if that is NULL
359 /// pMsg->Length();
360 /// pMsg->Buffer();
361 ///
362 /// Note: accessing scheme will be change din the future to just have the two
363 /// latter ones.
364 if (!pSrc) return NULL;
365
366 AliHLTLogging log;
367 AliHLTMessage* pMsg=new AliHLTMessage(kMESS_OBJECT);
368 if (!pMsg) {
369 log.LoggingVarargs(kHLTLogError, "AliHLTMessage", "Stream" , __FILE__ , __LINE__ , "memory allocation failed");
370 return NULL;
371 }
372
373 pMsg->SetCompressionLevel(compression);
374 pMsg->WriteObject(pSrc);
375 if (pMsg->Length()>0) {
376 // Matthias Sep 2008
377 // NOTE: AliHLTMessage does implement it's own SetLength method
378 // which is not architecture independent. The original SetLength
379 // stores the size always in network byte order.
380 // I'm trying to remember the rational for that, might be that
381 // it was just some lack of knowledge. Want to change this, but
382 // has to be done carefully to be backward compatible.
383 pMsg->SetLength(); // sets the length to the first (reserved) word
384
385 // does nothing if the level is 0
386 pMsg->Compress();
387
388 if (pMsg->CompBuffer()) {
389 pMsg->SetLength(); // set once more to have the byte order
390 if (verbosity>0) log.LoggingVarargs(kHLTLogInfo, "AliHLTMessage", "Stream" , __FILE__ , __LINE__ , "object %p type %s streamed: size %d", pSrc, pSrc->GetName(), pMsg->CompLength());
391 } else {
392 if (verbosity>0) log.LoggingVarargs(kHLTLogInfo, "AliHLTMessage", "Stream" , __FILE__ , __LINE__ , "object %p type %s streamed: size %d", pSrc, pSrc->GetName(), pMsg->Length());
393 }
394 }
395 return pMsg;
396}
397
398TObject* AliHLTMessage::Extract(const void* pBuffer, unsigned bufferSize, unsigned verbosity)
399{
400 /// Helper function to extract an object from a buffer.
401 /// The returned object must be cleaned by the caller
402 AliHLTLogging log;
403 if (!pBuffer || bufferSize<sizeof(AliHLTUInt32_t)) {
404 if (verbosity>0) log.LoggingVarargs(kHLTLogWarning, "AliHLTMessage", "Extract" , __FILE__ , __LINE__ , "invalid input buffer %p %d", pBuffer, bufferSize);
405 return NULL;
406 }
407
408 AliHLTUInt32_t firstWord=*((AliHLTUInt32_t*)pBuffer);
409 if (firstWord==bufferSize-sizeof(AliHLTUInt32_t) &&
410 firstWord>=34 /*thats the minimum size of a streamed TObject*/) {
411 AliHLTMessage msg((AliHLTUInt8_t*)pBuffer, bufferSize);
412 TClass* objclass=msg.GetClass();
413 TObject* pObject=msg.ReadObject(objclass);
414 if (pObject && objclass) {
415 if (verbosity>0) log.LoggingVarargs(kHLTLogInfo, "AliHLTMessage", "Extract" , __FILE__ , __LINE__ , "object %p type %s created", pObject, objclass->GetName());
416 return pObject;
417 } else {
418 if (verbosity>0) log.LoggingVarargs(kHLTLogWarning, "AliHLTMessage", "Extract" , __FILE__ , __LINE__ , "failed to create object from buffer of size %d", bufferSize);
419 }
420 } else {
421 if (verbosity>0) log.LoggingVarargs(kHLTLogWarning, "AliHLTMessage", "Extract" , __FILE__ , __LINE__ , "not a streamed TObject: block size %d, indicated %d", bufferSize, firstWord+sizeof(AliHLTUInt32_t));
422 }
423 return NULL;
424}