]> git.uio.no Git - u/mrichter/AliRoot.git/blob - HLT/BASE/AliHLTMessage.cxx
75cc551c02a9d38887bbea1f950474890479620b
[u/mrichter/AliRoot.git] / HLT / BASE / AliHLTMessage.cxx
1 // $Id$
2
3 /** @file   AliHLTMessage.cxx
4     @author Matthias Richter (customization of Root TMessage )
5     @date   
6     @brief  Serialization of Root objects in the ALICE HLT. */
7
8 // This is the original Root TMessage implementation with a few minor
9 // modifications, original revision:
10 // root/net: v5-14-00 $: TMessage.cxx,v 1.6 2004/05/07 09:51:58 brun
11 // Author: Fons Rademakers   19/12/96
12
13 /*************************************************************************
14  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers.               *
15  * All rights reserved.                                                  *
16  *                                                                       *
17  * For the licensing terms see $ROOTSYS/LICENSE.                         *
18  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
19  *************************************************************************/
20
21 //////////////////////////////////////////////////////////////////////////
22 //                                                                      //
23 // TMessage                                                             //
24 //                                                                      //
25 // Message buffer class used for serializing objects and sending them   //
26 // over a network. This class inherits from TBuffer the basic I/O       //
27 // serializer.                                                          //
28 //                                                                      //
29 //////////////////////////////////////////////////////////////////////////
30
31 #include "AliHLTMessage.h"
32 #include "TVirtualStreamerInfo.h"
33 #include "Bytes.h"
34 #include "TFile.h"
35 #include "TProcessID.h"
36 #include "TClass.h"
37
38 extern "C" void R__zip (Int_t cxlevel, Int_t *nin, char *bufin, Int_t *lout, char *bufout, Int_t *nout);
39 extern "C" void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout);
40 const Int_t kMAXBUF = 0xffffff;
41
42 Bool_t AliHLTMessage::fgEvolution = kFALSE;
43
44 ClassImp(AliHLTMessage)
45
46 //______________________________________________________________________________
47 AliHLTMessage::AliHLTMessage(UInt_t what) 
48   :
49 # ifdef ROOT_TBufferFile
50   TBufferFile(kWrite),
51 # else
52   TBuffer(kWrite),
53 # endif
54   AliHLTLogging(),
55   fWhat(what),
56   fClass(0),
57   fCompress(0),
58   fBufComp(0),
59   fBufCompCur(0),
60   fCompPos(0)
61   , fBufUncompressed(0)
62   , fBitsPIDs(0)
63   , fInfos(NULL)
64   , fEvolution(kFALSE)
65 {
66    // Create a AliHLTMessage object for storing objects. The "what" integer
67    // describes the type of message. Predifined ROOT system message types
68    // can be found in MessageTypes.h. Make sure your own message types are
69    // unique from the ROOT defined message types (i.e. 0 - 10000 are
70    // reserved by ROOT). In case you OR "what" with kMESS_ACK, the message
71    // will wait for an acknowledgement from the remote side. This makes
72    // the sending process synchronous. In case you OR "what" with kMESS_ZIP,
73    // the message will be compressed in TSocket using the zip algorithm
74    // (only if message is > 256 bytes).
75
76    // space at the beginning of the message reserved for the message length
77    UInt_t   reserved = 0;
78    *this << reserved;
79
80    *this << what;
81
82    SetBit(kCannotHandleMemberWiseStreaming);
83 }
84
85 const Int_t AliHLTMessage::fgkMinimumSize=30;
86 UInt_t AliHLTMessage::fgkDefaultBuffer[2]={0,0};
87
88 //______________________________________________________________________________
89 AliHLTMessage::AliHLTMessage(void *buf, Int_t bufsize)
90   :
91 # if defined(ROOT_TBufferFile)
92   TBufferFile(kRead, bufsize>fgkMinimumSize?bufsize:sizeof(fgkDefaultBuffer), bufsize>fgkMinimumSize?buf:&fgkDefaultBuffer, 0),
93 # else
94   TBuffer(kRead, bufsize>fgkMinimumSize?bufsize:sizeof(fgkDefaultBuffer), bufsize>fgkMinimumSize?buf:&fgkDefaultBuffer, 0),
95 # endif
96   AliHLTLogging(),
97   fWhat(0),
98   fClass(0),
99   fCompress(0),
100   fBufComp(0),
101   fBufCompCur(0),
102   fCompPos(0)
103   , fBufUncompressed(0)
104   , fBitsPIDs(0)
105   , fInfos(NULL)
106   , fEvolution(kFALSE)
107 {
108    // Create a AliHLTMessage object for reading objects. The objects will be
109    // read from buf. Use the What() method to get the message type.
110
111    // skip space at the beginning of the message reserved for the message length
112    fBufCur += sizeof(UInt_t);
113
114    *this >> fWhat;
115
116    if (fWhat & kMESS_ZIP) {
117       // if buffer has kMESS_ZIP set, move it to fBufComp and uncompress
118       fBufComp    = fBuffer;
119       fBufCompCur = fBuffer + bufsize;
120       fBuffer     = 0;
121       Uncompress();
122       // Matthias Sep 2008
123       // NOTE: this is not done in TMessage and will lead to the deletion
124       // of the buffer. This is not allowed in case of HLT where the
125       // buffer is handled by the framework. In general, I think this
126       // is a very bad idea to do it like that in TMessage
127       fBufComp    = NULL;
128       fBufCompCur = 0;
129    }
130
131    if (fWhat == kMESS_OBJECT) {
132       InitMap();
133       fClass = ReadClass();     // get first the class stored in message
134       SetBufferOffset(sizeof(UInt_t) + sizeof(fWhat));
135       ResetMap();
136    } else {
137       fClass = 0;
138    }
139 }
140
141 //______________________________________________________________________________
142 AliHLTMessage::~AliHLTMessage()
143 {
144    // Clean up compression buffer.
145   Reset();
146 }
147
148 //______________________________________________________________________________
149 void AliHLTMessage::EnableSchemaEvolutionForAll(Bool_t enable)
150 {
151    // Static function enabling or disabling the automatic schema evolution.
152    // By default schema evolution support is off.
153
154    fgEvolution = enable;
155 }
156
157 //______________________________________________________________________________
158 Bool_t AliHLTMessage::UsesSchemaEvolutionForAll()
159 {
160    // Static function returning status of global schema evolution.
161
162    return fgEvolution;
163 }
164
165 //______________________________________________________________________________
166 void AliHLTMessage::ForceWriteInfo(TVirtualStreamerInfo *info, Bool_t /* force */)
167 {
168    // Force writing the TStreamerInfo to the message.
169
170    if (fgEvolution || fEvolution) {
171       if (!fInfos) fInfos = new TList();
172       fInfos->Add(info);
173    }
174 }
175
176 //______________________________________________________________________________
177 void AliHLTMessage::Forward()
178 {
179    // Change a buffer that was received into one that can be send, i.e.
180    // forward a just received message.
181
182    if (IsReading()) {
183       SetWriteMode();
184       SetBufferOffset(fBufSize);
185       SetBit(kCannotHandleMemberWiseStreaming);
186
187       if (fBufComp) {
188          fCompPos = fBufCur;
189       }
190    }
191 }
192
193 //______________________________________________________________________________
194 void AliHLTMessage::IncrementLevel(TVirtualStreamerInfo *info)
195 {
196    // Increment level.
197
198    TBufferFile::IncrementLevel(info);
199
200    if (fgEvolution || fEvolution) {
201       if (!fInfos) fInfos = new TList();
202       fInfos->Add(info);
203    }
204 }
205
206 //______________________________________________________________________________
207 void AliHLTMessage::Reset()
208 {
209    // Reset the message buffer so we can use (i.e. fill) it again.
210
211    SetBufferOffset(sizeof(UInt_t) + sizeof(fWhat));
212    ResetMap();
213
214    if (fBufComp) {
215       delete [] fBufComp;
216       fBufComp    = 0;
217       fBufCompCur = 0;
218       fCompPos    = 0;
219    }
220    if (fBufUncompressed) {
221      delete [] fBufUncompressed;
222      fBufUncompressed=NULL;
223    }
224 }
225
226 //______________________________________________________________________________
227 void AliHLTMessage::SetLength() const
228 {
229    // Set the message length at the beginning of the message buffer.
230
231    if (IsWriting()) {
232       char *buf = Buffer();
233       *((UInt_t*)buf) = (UInt_t)(Length() - sizeof(UInt_t));
234
235       if (fBufComp) {
236          buf = fBufComp;
237          *((UInt_t*)buf) = (UInt_t)(CompLength() - sizeof(UInt_t));
238       }
239    }
240 }
241
242 //______________________________________________________________________________
243 void AliHLTMessage::SetWhat(UInt_t what)
244 {
245    // Using this method one can change the message type a-posteriory.
246    // In case you OR "what" with kMESS_ACK, the message will wait for
247    // an acknowledgement from the remote side. This makes the sending
248    // process synchronous.
249
250    fWhat = what;
251
252    char *buf = Buffer();
253    buf += sizeof(UInt_t);   // skip reserved length space
254    tobuf(buf, what);
255
256    if (fBufComp) {
257       buf = fBufComp;
258       buf += sizeof(UInt_t);   // skip reserved length space
259       tobuf(buf, what | kMESS_ZIP);
260    }
261 }
262
263 //______________________________________________________________________________
264 void AliHLTMessage::SetCompressionLevel(Int_t level)
265 {
266    // Set the message compression level. Can be between 0 and 9 with 0
267    // being no compression and 9 maximum compression. In general the default
268    // level of 1 is the best compromise between achieved compression and
269    // cpu time. Compression will only happen when the message is > 256 bytes.
270
271    if (level < 0) level = 0;
272    if (level > 9) level = 9;
273
274    if (level != fCompress && fBufComp) {
275       delete [] fBufComp;
276       fBufComp    = 0;
277       fBufCompCur = 0;
278       fCompPos    = 0;
279    }
280    fCompress = level;
281 }
282
283 //______________________________________________________________________________
284 Int_t AliHLTMessage::Compress()
285 {
286    // Compress the message. The message will only be compressed if the
287    // compression level > 0 and the if the message is > 256 bytes.
288    // Returns -1 in case of error (when compression fails or
289    // when the message increases in size in some pathological cases),
290    // otherwise returns 0.
291
292    if (fCompress == 0) {
293       // no compression specified
294       if (fBufComp) {
295          delete [] fBufComp;
296          fBufComp    = 0;
297          fBufCompCur = 0;
298          fCompPos    = 0;
299       }
300       return 0;
301    }
302
303    if (fBufComp && fCompPos == fBufCur) {
304       // the message was already compressed
305       return 0;
306    }
307
308    // remove any existing compressed buffer before compressing modified message
309    if (fBufComp) {
310       delete [] fBufComp;
311       fBufComp    = 0;
312       fBufCompCur = 0;
313       fCompPos    = 0;
314    }
315
316    if (Length() <= (Int_t)(256 + 2*sizeof(UInt_t))) {
317       // this message is too small to be compressed
318       return 0;
319    }
320
321    Int_t hdrlen   = 2*sizeof(UInt_t);
322    Int_t messlen  = Length() - hdrlen;
323    Int_t nbuffers = messlen / kMAXBUF;
324    Int_t chdrlen  = 3*sizeof(UInt_t);   // compressed buffer header length
325    Int_t buflen   = TMath::Max(512, chdrlen + messlen + 9*nbuffers);
326    fBufComp       = new char[buflen];
327    char *messbuf  = Buffer() + hdrlen;
328    char *bufcur   = fBufComp + chdrlen;
329    Int_t noutot   = 0;
330    Int_t nzip     = 0;
331    Int_t nout, bufmax;
332    for (Int_t i = 0; i <= nbuffers; i++) {
333       if (i == nbuffers)
334          bufmax = messlen - nzip;
335       else
336          bufmax = kMAXBUF;
337       R__zip(fCompress, &bufmax, messbuf, &bufmax, bufcur, &nout);
338       if (nout == 0 || nout >= messlen) {
339          //this happens when the buffer cannot be compressed
340          delete [] fBufComp;
341          fBufComp    = 0;
342          fBufCompCur = 0;
343          fCompPos    = 0;
344          return -1;
345       }
346       bufcur  += nout;
347       noutot  += nout;
348       messbuf += kMAXBUF;
349       nzip    += kMAXBUF;
350    }
351    fBufCompCur = bufcur;
352    fCompPos    = fBufCur;
353
354    bufcur = fBufComp;
355    tobuf(bufcur, (UInt_t)(CompLength() - sizeof(UInt_t)));
356    Int_t what = fWhat | kMESS_ZIP;
357    tobuf(bufcur, what);
358    tobuf(bufcur, Length());    // original uncompressed buffer length
359
360    return 0;
361 }
362
363 //______________________________________________________________________________
364 Int_t AliHLTMessage::Uncompress()
365 {
366    // Uncompress the message. The message will only be uncompressed when
367    // kMESS_ZIP is set. Returns -1 in case of error, 0 otherwise.
368
369    if (!fBufComp || !(fWhat & kMESS_ZIP))
370       return -1;
371
372    Int_t buflen;
373    Int_t hdrlen = 2*sizeof(UInt_t);
374    UChar_t *bufcur = (UChar_t*)fBufComp + hdrlen;
375    frombuf((char *&)bufcur, &buflen);
376    fBuffer  = new char[buflen];
377    fBufUncompressed = fBuffer;
378    fBufSize = buflen;
379    fBufCur  = fBuffer + sizeof(UInt_t) + sizeof(fWhat);
380    fBufMax  = fBuffer + fBufSize;
381    char *messbuf = fBuffer + hdrlen;
382
383    Int_t nin, nout, nbuf;
384    Int_t noutot = 0;
385    while (1) {
386       nin  = 9 + ((Int_t)bufcur[3] | ((Int_t)bufcur[4] << 8) | ((Int_t)bufcur[5] << 16));
387       nbuf = (Int_t)bufcur[6] | ((Int_t)bufcur[7] << 8) | ((Int_t)bufcur[8] << 16);
388       R__unzip(&nin, bufcur, &nbuf, messbuf, &nout);
389       if (!nout) break;
390       noutot += nout;
391       if (noutot >= buflen - hdrlen) break;
392       bufcur  += nin;
393       messbuf += nout;
394    }
395
396    fWhat &= ~kMESS_ZIP;
397    fCompress = 1;
398
399    return 0;
400 }
401
402 //______________________________________________________________________________
403 void AliHLTMessage::WriteObject(const TObject *obj)
404 {
405    // Write object to message buffer.
406    // When support for schema evolution is enabled the list of TStreamerInfo
407    // used to stream this object is kept in fInfos. This information is used
408    // by TSocket::Send that sends this list through the socket. This list is in
409    // turn used by TSocket::Recv to store the TStreamerInfo objects in the
410    // relevant TClass in case the TClass does not know yet about a particular
411    // class version. This feature is implemented to support clients and servers
412    // with either different ROOT versions or different user classes versions.
413
414    if (fgEvolution || fEvolution) {
415       if (fInfos)
416          fInfos->Clear();
417       else
418          fInfos = new TList();
419    }
420
421    fBitsPIDs.ResetAllBits();
422    WriteObjectAny(obj, TObject::Class());
423 }
424
425 //______________________________________________________________________________
426 UShort_t AliHLTMessage::WriteProcessID(TProcessID *pid)
427 {
428    // Check if the ProcessID pid is already in the message.
429    // If not, then:
430    //   - mark bit 0 of fBitsPIDs to indicate that a ProcessID has been found
431    //   - mark bit uid+1 where uid id the uid of the ProcessID
432
433    if (fBitsPIDs.TestBitNumber(0)) return 0;
434    if (!pid)
435       pid = TProcessID::GetPID();
436    if (!pid) return 0;
437    fBitsPIDs.SetBitNumber(0);
438    UInt_t uid = pid->GetUniqueID();
439    fBitsPIDs.SetBitNumber(uid+1);
440    return 1;
441 }
442
443 AliHLTMessage* AliHLTMessage::Stream(TObject* pSrc, Int_t compression, unsigned verbosity)
444 {
445   /// Helper function to stream an object into an AliHLTMessage
446   /// The returned instance must be cleaned by the caller
447   ///
448   /// Get the data and data size from the message:
449   ///  first check
450   ///    pMsg->CompLength();
451   ///    pMsg->CompBuffer();
452   ///  if that is NULL
453   ///    pMsg->Length();
454   ///    pMsg->Buffer();
455   ///
456   /// Note: accessing scheme will be change din the future to just have the two
457   ///       latter ones.
458   if (!pSrc) return NULL;
459
460   AliHLTLogging log;
461   AliHLTMessage* pMsg=new AliHLTMessage(kMESS_OBJECT);
462   if (!pMsg) {
463     log.LoggingVarargs(kHLTLogError, "AliHLTMessage", "Stream" , __FILE__ , __LINE__ , "memory allocation failed");
464     return NULL;
465   }
466
467   pMsg->SetCompressionLevel(compression);
468   pMsg->WriteObject(pSrc);
469   if (pMsg->Length()>0) {
470     // Matthias Sep 2008
471     // NOTE: AliHLTMessage does implement it's own SetLength method
472     // which is not architecture independent. The original SetLength
473     // stores the size always in network byte order.
474     // I'm trying to remember the rational for that, might be that
475     // it was just some lack of knowledge. Want to change this, but
476     // has to be done carefully to be backward compatible.
477     pMsg->SetLength(); // sets the length to the first (reserved) word
478
479     // does nothing if the level is 0
480     pMsg->Compress();
481
482     if (pMsg->CompBuffer()) {
483       pMsg->SetLength(); // set once more to have the byte order
484       if (verbosity>0) log.LoggingVarargs(kHLTLogInfo, "AliHLTMessage", "Stream" , __FILE__ , __LINE__ , "object %p type %s streamed: size %d", pSrc, pSrc->GetName(), pMsg->CompLength());
485     } else {
486       if (verbosity>0) log.LoggingVarargs(kHLTLogInfo, "AliHLTMessage", "Stream" , __FILE__ , __LINE__ , "object %p type %s streamed: size %d", pSrc, pSrc->GetName(), pMsg->Length());
487     }
488   }
489   return pMsg;
490 }
491
492 TObject* AliHLTMessage::Extract(const void* pBuffer, unsigned bufferSize, unsigned verbosity)
493 {
494    /// Helper function to extract an object from a buffer.
495    /// The returned object must be cleaned by the caller
496   AliHLTLogging log;
497   if (!pBuffer || bufferSize<sizeof(AliHLTUInt32_t)) {
498     if (verbosity>0) log.LoggingVarargs(kHLTLogWarning, "AliHLTMessage", "Extract" , __FILE__ , __LINE__ , "invalid input buffer %p %d", pBuffer, bufferSize);
499     return NULL;
500   }
501
502   AliHLTUInt32_t firstWord=*((AliHLTUInt32_t*)pBuffer);
503   if (firstWord==bufferSize-sizeof(AliHLTUInt32_t) &&
504       firstWord>=34 /*thats the minimum size of a streamed TObject*/) {
505     AliHLTMessage msg((AliHLTUInt8_t*)pBuffer, bufferSize);
506     TClass* objclass=msg.GetClass();
507     TObject* pObject=msg.ReadObject(objclass);
508     if (pObject && objclass) {
509       if (verbosity>0) log.LoggingVarargs(kHLTLogInfo, "AliHLTMessage", "Extract" , __FILE__ , __LINE__ , "object %p type %s created", pObject, objclass->GetName());
510       return pObject;
511     } else {
512       if (verbosity>0) log.LoggingVarargs(kHLTLogWarning, "AliHLTMessage", "Extract" , __FILE__ , __LINE__ , "failed to create object from buffer of size %d", bufferSize);
513     }
514   } else {
515     if (verbosity>0) log.LoggingVarargs(kHLTLogWarning, "AliHLTMessage", "Extract" , __FILE__ , __LINE__ , "not a streamed TObject: block size %d, indicated %d", bufferSize, firstWord+sizeof(AliHLTUInt32_t));
516   }
517   return NULL;
518 }