]> git.uio.no Git - u/mrichter/AliRoot.git/blob - HLT/BASE/AliHLTMessage.cxx
adding support for EventDoneData to AliHLTSystem
[u/mrichter/AliRoot.git] / HLT / BASE / AliHLTMessage.cxx
1 // $Id$
2
3 /** @file   AliHLTMessage.cxx
4     @author Matthias Richter (customization of Root TMessage )
5     @date   
6     @brief  Serialization of Root objects in the ALICE HLT. */
7
8 // This is the original Root TMessage implementation with a few minor
9 // modifications, original revision:
10 // root/net: v5-14-00 $: TMessage.cxx,v 1.6 2004/05/07 09:51:58 brun
11 // Author: Fons Rademakers   19/12/96
12
13 /*************************************************************************
14  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers.               *
15  * All rights reserved.                                                  *
16  *                                                                       *
17  * For the licensing terms see $ROOTSYS/LICENSE.                         *
18  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
19  *************************************************************************/
20
21 //////////////////////////////////////////////////////////////////////////
22 //                                                                      //
23 // TMessage                                                             //
24 //                                                                      //
25 // Message buffer class used for serializing objects and sending them   //
26 // over a network. This class inherits from TBuffer the basic I/O       //
27 // serializer.                                                          //
28 //                                                                      //
29 //////////////////////////////////////////////////////////////////////////
30
31 #include "AliHLTMessage.h"
32 #include "Bytes.h"
33 #include "TFile.h"
34 #include "TClass.h"
35
36 extern "C" void R__zip (Int_t cxlevel, Int_t *nin, char *bufin, Int_t *lout, char *bufout, Int_t *nout);
37 extern "C" void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout);
38 const Int_t kMAXBUF = 0xffffff;
39
40 ClassImp(AliHLTMessage)
41
42 //______________________________________________________________________________
43 AliHLTMessage::AliHLTMessage(UInt_t what) 
44   :
45 # ifdef ROOT_TBufferFile
46   TBufferFile(kWrite),
47 # else
48   TBuffer(kWrite),
49 # endif
50   AliHLTLogging(),
51   fWhat(what),
52   fClass(0),
53   fCompress(0),
54   fBufComp(0),
55   fBufCompCur(0),
56   fCompPos(0)
57   , fBufUncompressed(0)
58 {
59    // Create a AliHLTMessage object for storing objects. The "what" integer
60    // describes the type of message. Predifined ROOT system message types
61    // can be found in MessageTypes.h. Make sure your own message types are
62    // unique from the ROOT defined message types (i.e. 0 - 10000 are
63    // reserved by ROOT). In case you OR "what" with kMESS_ACK, the message
64    // will wait for an acknowledgement from the remote side. This makes
65    // the sending process synchronous. In case you OR "what" with kMESS_ZIP,
66    // the message will be compressed in TSocket using the zip algorithm
67    // (only if message is > 256 bytes).
68
69    // space at the beginning of the message reserved for the message length
70    UInt_t   reserved = 0;
71    *this << reserved;
72
73    *this << what;
74
75 }
76
77 const Int_t AliHLTMessage::fgkMinimumSize=30;
78 UInt_t AliHLTMessage::fgkDefaultBuffer[2]={0,0};
79
80 //______________________________________________________________________________
81 AliHLTMessage::AliHLTMessage(void *buf, Int_t bufsize)
82   :
83 # if defined(ROOT_TBufferFile)
84   TBufferFile(kRead, bufsize>fgkMinimumSize?bufsize:sizeof(fgkDefaultBuffer), bufsize>fgkMinimumSize?buf:&fgkDefaultBuffer, 0),
85 # else
86   TBuffer(kRead, bufsize>fgkMinimumSize?bufsize:sizeof(fgkDefaultBuffer), bufsize>fgkMinimumSize?buf:&fgkDefaultBuffer, 0),
87 # endif
88   AliHLTLogging(),
89   fWhat(0),
90   fClass(0),
91   fCompress(0),
92   fBufComp(0),
93   fBufCompCur(0),
94   fCompPos(0)
95   , fBufUncompressed(0)
96 {
97    // Create a AliHLTMessage object for reading objects. The objects will be
98    // read from buf. Use the What() method to get the message type.
99
100    // skip space at the beginning of the message reserved for the message length
101    fBufCur += sizeof(UInt_t);
102
103    *this >> fWhat;
104
105    if (fWhat & kMESS_ZIP) {
106       // if buffer has kMESS_ZIP set, move it to fBufComp and uncompress
107       fBufComp    = fBuffer;
108       fBufCompCur = fBuffer + bufsize;
109       fBuffer     = 0;
110       Uncompress();
111       // Matthias Sep 2008
112       // NOTE: this is not done in TMessage and will lead to the deletion
113       // of the buffer. This is not allowed in case of HLT where the
114       // buffer is handled by the framework. In general, I think this
115       // is a very bad idea to do it like that in TMessage
116       fBufComp    = NULL;
117       fBufCompCur = 0;
118    }
119
120    if (fWhat == kMESS_OBJECT) {
121       InitMap();
122       fClass = ReadClass();     // get first the class stored in message
123       SetBufferOffset(sizeof(UInt_t) + sizeof(fWhat));
124       ResetMap();
125    } else {
126       fClass = 0;
127    }
128 }
129
130 //______________________________________________________________________________
131 AliHLTMessage::~AliHLTMessage()
132 {
133    // Clean up compression buffer.
134   Reset();
135 }
136
137 //______________________________________________________________________________
138 void AliHLTMessage::Forward()
139 {
140    // Change a buffer that was received into one that can be send, i.e.
141    // forward a just received message.
142
143    if (IsReading()) {
144       SetWriteMode();
145       SetBufferOffset(fBufSize);
146
147       if (fBufComp) {
148          fCompPos = fBufCur;
149       }
150    }
151 }
152
153 //______________________________________________________________________________
154 void AliHLTMessage::Reset()
155 {
156    // Reset the message buffer so we can use (i.e. fill) it again.
157
158    SetBufferOffset(sizeof(UInt_t) + sizeof(fWhat));
159    ResetMap();
160
161    if (fBufComp) {
162       delete [] fBufComp;
163       fBufComp    = 0;
164       fBufCompCur = 0;
165       fCompPos    = 0;
166    }
167    if (fBufUncompressed) {
168      delete [] fBufUncompressed;
169      fBufUncompressed=NULL;
170    }
171 }
172
173 //______________________________________________________________________________
174 void AliHLTMessage::SetLength() const
175 {
176    // Set the message length at the beginning of the message buffer.
177
178    if (IsWriting()) {
179       char *buf = Buffer();
180       *((UInt_t*)buf) = (UInt_t)(Length() - sizeof(UInt_t));
181
182       if (fBufComp) {
183          buf = fBufComp;
184          *((UInt_t*)buf) = (UInt_t)(CompLength() - sizeof(UInt_t));
185       }
186    }
187 }
188
189 //______________________________________________________________________________
190 void AliHLTMessage::SetWhat(UInt_t what)
191 {
192    // Using this method one can change the message type a-posteriory.
193    // In case you OR "what" with kMESS_ACK, the message will wait for
194    // an acknowledgement from the remote side. This makes the sending
195    // process synchronous.
196
197    fWhat = what;
198
199    char *buf = Buffer();
200    buf += sizeof(UInt_t);   // skip reserved length space
201    tobuf(buf, what);
202
203    if (fBufComp) {
204       buf = fBufComp;
205       buf += sizeof(UInt_t);   // skip reserved length space
206       tobuf(buf, what | kMESS_ZIP);
207    }
208 }
209
210 //______________________________________________________________________________
211 void AliHLTMessage::SetCompressionLevel(Int_t level)
212 {
213    // Set the message compression level. Can be between 0 and 9 with 0
214    // being no compression and 9 maximum compression. In general the default
215    // level of 1 is the best compromise between achieved compression and
216    // cpu time. Compression will only happen when the message is > 256 bytes.
217
218    if (level < 0) level = 0;
219    if (level > 9) level = 9;
220
221    if (level != fCompress && fBufComp) {
222       delete [] fBufComp;
223       fBufComp    = 0;
224       fBufCompCur = 0;
225       fCompPos    = 0;
226    }
227    fCompress = level;
228 }
229
230 //______________________________________________________________________________
231 Int_t AliHLTMessage::Compress()
232 {
233    // Compress the message. The message will only be compressed if the
234    // compression level > 0 and the if the message is > 256 bytes.
235    // Returns -1 in case of error (when compression fails or
236    // when the message increases in size in some pathological cases),
237    // otherwise returns 0.
238
239    if (fCompress == 0) {
240       // no compression specified
241       if (fBufComp) {
242          delete [] fBufComp;
243          fBufComp    = 0;
244          fBufCompCur = 0;
245          fCompPos    = 0;
246       }
247       return 0;
248    }
249
250    if (fBufComp && fCompPos == fBufCur) {
251       // the message was already compressed
252       return 0;
253    }
254
255    // remove any existing compressed buffer before compressing modified message
256    if (fBufComp) {
257       delete [] fBufComp;
258       fBufComp    = 0;
259       fBufCompCur = 0;
260       fCompPos    = 0;
261    }
262
263    if (Length() <= (Int_t)(256 + 2*sizeof(UInt_t))) {
264       // this message is too small to be compressed
265       return 0;
266    }
267
268    Int_t hdrlen   = 2*sizeof(UInt_t);
269    Int_t messlen  = Length() - hdrlen;
270    Int_t nbuffers = messlen / kMAXBUF;
271    Int_t chdrlen  = 3*sizeof(UInt_t);   // compressed buffer header length
272    Int_t buflen   = TMath::Max(512, chdrlen + messlen + 9*nbuffers);
273    fBufComp       = new char[buflen];
274    char *messbuf  = Buffer() + hdrlen;
275    char *bufcur   = fBufComp + chdrlen;
276    Int_t noutot   = 0;
277    Int_t nzip     = 0;
278    Int_t nout, bufmax;
279    for (Int_t i = 0; i <= nbuffers; i++) {
280       if (i == nbuffers)
281          bufmax = messlen - nzip;
282       else
283          bufmax = kMAXBUF;
284       R__zip(fCompress, &bufmax, messbuf, &bufmax, bufcur, &nout);
285       if (nout == 0 || nout >= messlen) {
286          //this happens when the buffer cannot be compressed
287          delete [] fBufComp;
288          fBufComp    = 0;
289          fBufCompCur = 0;
290          fCompPos    = 0;
291          return -1;
292       }
293       bufcur  += nout;
294       noutot  += nout;
295       messbuf += kMAXBUF;
296       nzip    += kMAXBUF;
297    }
298    fBufCompCur = bufcur;
299    fCompPos    = fBufCur;
300
301    bufcur = fBufComp;
302    tobuf(bufcur, (UInt_t)(CompLength() - sizeof(UInt_t)));
303    Int_t what = fWhat | kMESS_ZIP;
304    tobuf(bufcur, what);
305    tobuf(bufcur, Length());    // original uncompressed buffer length
306
307    return 0;
308 }
309
310 //______________________________________________________________________________
311 Int_t AliHLTMessage::Uncompress()
312 {
313    // Uncompress the message. The message will only be uncompressed when
314    // kMESS_ZIP is set. Returns -1 in case of error, 0 otherwise.
315
316    if (!fBufComp || !(fWhat & kMESS_ZIP))
317       return -1;
318
319    Int_t buflen;
320    Int_t hdrlen = 2*sizeof(UInt_t);
321    UChar_t *bufcur = (UChar_t*)fBufComp + hdrlen;
322    frombuf((char *&)bufcur, &buflen);
323    fBuffer  = new char[buflen];
324    fBufUncompressed = fBuffer;
325    fBufSize = buflen;
326    fBufCur  = fBuffer + sizeof(UInt_t) + sizeof(fWhat);
327    fBufMax  = fBuffer + fBufSize;
328    char *messbuf = fBuffer + hdrlen;
329
330    Int_t nin, nout, nbuf;
331    Int_t noutot = 0;
332    while (1) {
333       nin  = 9 + ((Int_t)bufcur[3] | ((Int_t)bufcur[4] << 8) | ((Int_t)bufcur[5] << 16));
334       nbuf = (Int_t)bufcur[6] | ((Int_t)bufcur[7] << 8) | ((Int_t)bufcur[8] << 16);
335       R__unzip(&nin, bufcur, &nbuf, messbuf, &nout);
336       if (!nout) break;
337       noutot += nout;
338       if (noutot >= buflen - hdrlen) break;
339       bufcur  += nin;
340       messbuf += nout;
341    }
342
343    fWhat &= ~kMESS_ZIP;
344    fCompress = 1;
345
346    return 0;
347 }
348
349 AliHLTMessage* AliHLTMessage::Stream(TObject* pSrc, Int_t compression, unsigned verbosity)
350 {
351   /// Helper function to stream an object into an AliHLTMessage
352   /// The returned instance must be cleaned by the caller
353   ///
354   /// Get the data and data size from the message:
355   ///  first check
356   ///    pMsg->CompLength();
357   ///    pMsg->CompBuffer();
358   ///  if that is NULL
359   ///    pMsg->Length();
360   ///    pMsg->Buffer();
361   ///
362   /// Note: accessing scheme will be change din the future to just have the two
363   ///       latter ones.
364   if (!pSrc) return NULL;
365
366   AliHLTLogging log;
367   AliHLTMessage* pMsg=new AliHLTMessage(kMESS_OBJECT);
368   if (!pMsg) {
369     log.LoggingVarargs(kHLTLogError, "AliHLTMessage", "Stream" , __FILE__ , __LINE__ , "memory allocation failed");
370     return NULL;
371   }
372
373   pMsg->SetCompressionLevel(compression);
374   pMsg->WriteObject(pSrc);
375   if (pMsg->Length()>0) {
376     // Matthias Sep 2008
377     // NOTE: AliHLTMessage does implement it's own SetLength method
378     // which is not architecture independent. The original SetLength
379     // stores the size always in network byte order.
380     // I'm trying to remember the rational for that, might be that
381     // it was just some lack of knowledge. Want to change this, but
382     // has to be done carefully to be backward compatible.
383     pMsg->SetLength(); // sets the length to the first (reserved) word
384
385     // does nothing if the level is 0
386     pMsg->Compress();
387
388     if (pMsg->CompBuffer()) {
389       pMsg->SetLength(); // set once more to have the byte order
390       if (verbosity>0) log.LoggingVarargs(kHLTLogInfo, "AliHLTMessage", "Stream" , __FILE__ , __LINE__ , "object %p type %s streamed: size %d", pSrc, pSrc->GetName(), pMsg->CompLength());
391     } else {
392       if (verbosity>0) log.LoggingVarargs(kHLTLogInfo, "AliHLTMessage", "Stream" , __FILE__ , __LINE__ , "object %p type %s streamed: size %d", pSrc, pSrc->GetName(), pMsg->Length());
393     }
394   }
395   return pMsg;
396 }
397
398 TObject* AliHLTMessage::Extract(const void* pBuffer, unsigned bufferSize, unsigned verbosity)
399 {
400    /// Helper function to extract an object from a buffer.
401    /// The returned object must be cleaned by the caller
402   AliHLTLogging log;
403   if (!pBuffer || bufferSize<sizeof(AliHLTUInt32_t)) {
404     if (verbosity>0) log.LoggingVarargs(kHLTLogWarning, "AliHLTMessage", "Extract" , __FILE__ , __LINE__ , "invalid input buffer %p %d", pBuffer, bufferSize);
405     return NULL;
406   }
407
408   AliHLTUInt32_t firstWord=*((AliHLTUInt32_t*)pBuffer);
409   if (firstWord==bufferSize-sizeof(AliHLTUInt32_t) &&
410       firstWord>=34 /*thats the minimum size of a streamed TObject*/) {
411     AliHLTMessage msg((AliHLTUInt8_t*)pBuffer, bufferSize);
412     TClass* objclass=msg.GetClass();
413     TObject* pObject=msg.ReadObject(objclass);
414     if (pObject && objclass) {
415       if (verbosity>0) log.LoggingVarargs(kHLTLogInfo, "AliHLTMessage", "Extract" , __FILE__ , __LINE__ , "object %p type %s created", pObject, objclass->GetName());
416       return pObject;
417     } else {
418       if (verbosity>0) log.LoggingVarargs(kHLTLogWarning, "AliHLTMessage", "Extract" , __FILE__ , __LINE__ , "failed to create object from buffer of size %d", bufferSize);
419     }
420   } else {
421     if (verbosity>0) log.LoggingVarargs(kHLTLogWarning, "AliHLTMessage", "Extract" , __FILE__ , __LINE__ , "not a streamed TObject: block size %d, indicated %d", bufferSize, firstWord+sizeof(AliHLTUInt32_t));
422   }
423   return NULL;
424 }