]> git.uio.no Git - u/mrichter/AliRoot.git/blob - HLT/BASE/AliHLTMessage.cxx
adding USE_DLOPEN option for HLT
[u/mrichter/AliRoot.git] / HLT / BASE / AliHLTMessage.cxx
1 // $Id$
2
3 /** @file   AliHLTMessage.cxx
4     @author Matthias Richter (customization of Root TMessage )
5     @date   
6     @brief  Serialization of Root objects in the ALICE HLT. */
7
8 // This is the original Root TMessage implementation with a few minor
9 // modifications, original revision:
10 // root/net: v5-14-00 $: TMessage.cxx,v 1.6 2004/05/07 09:51:58 brun
11 // Author: Fons Rademakers   19/12/96
12
13 /*************************************************************************
14  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers.               *
15  * All rights reserved.                                                  *
16  *                                                                       *
17  * For the licensing terms see $ROOTSYS/LICENSE.                         *
18  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
19  *************************************************************************/
20
21 //////////////////////////////////////////////////////////////////////////
22 //                                                                      //
23 // TMessage                                                             //
24 //                                                                      //
25 // Message buffer class used for serializing objects and sending them   //
26 // over a network. This class inherits from TBuffer the basic I/O       //
27 // serializer.                                                          //
28 //                                                                      //
29 //////////////////////////////////////////////////////////////////////////
30
31 #include "AliHLTMessage.h"
32 #include "TVirtualStreamerInfo.h"
33 #include "Bytes.h"
34 #include "TFile.h"
35 #include "TProcessID.h"
36 #include "TClass.h"
37
38 extern "C" void R__zip (Int_t cxlevel, Int_t *nin, char *bufin, Int_t *lout, char *bufout, Int_t *nout);
39 extern "C" void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout);
40 const Int_t kMAXBUF = 0xffffff;
41
42 Bool_t AliHLTMessage::fgEvolution = kFALSE;
43
44 ClassImp(AliHLTMessage)
45
46 //______________________________________________________________________________
47 AliHLTMessage::AliHLTMessage(UInt_t what) 
48   :
49 # ifdef ROOT_TBufferFile
50   TBufferFile(kWrite),
51 # else
52   TBuffer(kWrite),
53 # endif
54   AliHLTLogging(),
55   fWhat(what),
56   fClass(0),
57   fCompress(0),
58   fBufComp(0),
59   fBufCompCur(0),
60   fCompPos(0)
61   , fBufUncompressed(0)
62   , fBitsPIDs(0)
63   , fInfos(NULL)
64   , fEvolution(kFALSE)
65 {
66    // Create a AliHLTMessage object for storing objects. The "what" integer
67    // describes the type of message. Predifined ROOT system message types
68    // can be found in MessageTypes.h. Make sure your own message types are
69    // unique from the ROOT defined message types (i.e. 0 - 10000 are
70    // reserved by ROOT). In case you OR "what" with kMESS_ACK, the message
71    // will wait for an acknowledgement from the remote side. This makes
72    // the sending process synchronous. In case you OR "what" with kMESS_ZIP,
73    // the message will be compressed in TSocket using the zip algorithm
74    // (only if message is > 256 bytes).
75
76    // space at the beginning of the message reserved for the message length
77    UInt_t   reserved = 0;
78    *this << reserved;
79
80    *this << what;
81
82    SetBit(kCannotHandleMemberWiseStreaming);
83 }
84
85 const Int_t AliHLTMessage::fgkMinimumSize=30;
86 UInt_t AliHLTMessage::fgkDefaultBuffer[2]={0,0};
87
88 //______________________________________________________________________________
89 AliHLTMessage::AliHLTMessage(void *buf, Int_t bufsize)
90   :
91 # if defined(ROOT_TBufferFile)
92   TBufferFile(kRead, bufsize>fgkMinimumSize?bufsize:sizeof(fgkDefaultBuffer), bufsize>fgkMinimumSize?buf:&fgkDefaultBuffer, 0),
93 # else
94   TBuffer(kRead, bufsize>fgkMinimumSize?bufsize:sizeof(fgkDefaultBuffer), bufsize>fgkMinimumSize?buf:&fgkDefaultBuffer, 0),
95 # endif
96   AliHLTLogging(),
97   fWhat(0),
98   fClass(0),
99   fCompress(0),
100   fBufComp(0),
101   fBufCompCur(0),
102   fCompPos(0)
103   , fBufUncompressed(0)
104   , fBitsPIDs(0)
105   , fInfos(NULL)
106   , fEvolution(kFALSE)
107 {
108    // Create a AliHLTMessage object for reading objects. The objects will be
109    // read from buf. Use the What() method to get the message type.
110
111    // skip space at the beginning of the message reserved for the message length
112    fBufCur += sizeof(UInt_t);
113
114    *this >> fWhat;
115
116    if (fWhat & kMESS_ZIP) {
117       // if buffer has kMESS_ZIP set, move it to fBufComp and uncompress
118       fBufComp    = fBuffer;
119       fBufCompCur = fBuffer + bufsize;
120       fBuffer     = 0;
121       Uncompress();
122       // Matthias Sep 2008
123       // NOTE: this is not done in TMessage and will lead to the deletion
124       // of the buffer. This is not allowed in case of HLT where the
125       // buffer is handled by the framework. In general, I think this
126       // is a very bad idea to do it like that in TMessage
127       fBufComp    = NULL;
128       fBufCompCur = 0;
129    }
130
131    if (fWhat == kMESS_OBJECT) {
132       InitMap();
133       fClass = ReadClass();     // get first the class stored in message
134       SetBufferOffset(sizeof(UInt_t) + sizeof(fWhat));
135       ResetMap();
136    } else {
137       fClass = 0;
138    }
139 }
140
141 //______________________________________________________________________________
142 AliHLTMessage::~AliHLTMessage()
143 {
144    // Clean up compression buffer.
145   Reset();
146 }
147
148 //______________________________________________________________________________
149 void AliHLTMessage::EnableSchemaEvolutionForAll(Bool_t enable)
150 {
151    // Static function enabling or disabling the automatic schema evolution.
152    // By default schema evolution support is off.
153
154    fgEvolution = enable;
155 }
156
157 //______________________________________________________________________________
158 Bool_t AliHLTMessage::UsesSchemaEvolutionForAll()
159 {
160    // Static function returning status of global schema evolution.
161
162    return fgEvolution;
163 }
164
165 //______________________________________________________________________________
166 void AliHLTMessage::ForceWriteInfo(TVirtualStreamerInfo *info, Bool_t /* force */)
167 {
168    // Force writing the TStreamerInfo to the message.
169
170    if (fgEvolution || fEvolution) {
171       if (!fInfos) fInfos = new TList();
172       if (fInfos->FindObject(info->GetName())==NULL) {
173         fInfos->Add(info);
174       }
175    }
176 }
177
178 //______________________________________________________________________________
179 void AliHLTMessage::Forward()
180 {
181    // Change a buffer that was received into one that can be send, i.e.
182    // forward a just received message.
183
184    if (IsReading()) {
185       SetWriteMode();
186       SetBufferOffset(fBufSize);
187       SetBit(kCannotHandleMemberWiseStreaming);
188
189       if (fBufComp) {
190          fCompPos = fBufCur;
191       }
192    }
193 }
194
195 //______________________________________________________________________________
196 void AliHLTMessage::IncrementLevel(TVirtualStreamerInfo *info)
197 {
198    // Increment level.
199
200    TBufferFile::IncrementLevel(info);
201
202    if (!info) return;
203    if (fgEvolution || fEvolution) {
204       if (!fInfos) fInfos = new TList();
205
206       // add the streamer info, but only once
207       // this assumes that there is only one version
208       if (fInfos->FindObject(info->GetName())==NULL) {
209         fInfos->Add(info);
210       }
211    }
212 }
213
214 //______________________________________________________________________________
215 void AliHLTMessage::Reset()
216 {
217    // Reset the message buffer so we can use (i.e. fill) it again.
218
219    SetBufferOffset(sizeof(UInt_t) + sizeof(fWhat));
220    ResetMap();
221
222    if (fBufComp) {
223       delete [] fBufComp;
224       fBufComp    = 0;
225       fBufCompCur = 0;
226       fCompPos    = 0;
227    }
228    if (fBufUncompressed) {
229      delete [] fBufUncompressed;
230      fBufUncompressed=NULL;
231    }
232 }
233
234 //______________________________________________________________________________
235 void AliHLTMessage::SetLength() const
236 {
237    // Set the message length at the beginning of the message buffer.
238
239    if (IsWriting()) {
240       char *buf = Buffer();
241       *((UInt_t*)buf) = (UInt_t)(Length() - sizeof(UInt_t));
242
243       if (fBufComp) {
244          buf = fBufComp;
245          *((UInt_t*)buf) = (UInt_t)(CompLength() - sizeof(UInt_t));
246       }
247    }
248 }
249
250 //______________________________________________________________________________
251 void AliHLTMessage::SetWhat(UInt_t what)
252 {
253    // Using this method one can change the message type a-posteriory.
254    // In case you OR "what" with kMESS_ACK, the message will wait for
255    // an acknowledgement from the remote side. This makes the sending
256    // process synchronous.
257
258    fWhat = what;
259
260    char *buf = Buffer();
261    buf += sizeof(UInt_t);   // skip reserved length space
262    tobuf(buf, what);
263
264    if (fBufComp) {
265       buf = fBufComp;
266       buf += sizeof(UInt_t);   // skip reserved length space
267       tobuf(buf, what | kMESS_ZIP);
268    }
269 }
270
271 //______________________________________________________________________________
272 void AliHLTMessage::SetCompressionLevel(Int_t level)
273 {
274    // Set the message compression level. Can be between 0 and 9 with 0
275    // being no compression and 9 maximum compression. In general the default
276    // level of 1 is the best compromise between achieved compression and
277    // cpu time. Compression will only happen when the message is > 256 bytes.
278
279    if (level < 0) level = 0;
280    if (level > 9) level = 9;
281
282    if (level != fCompress && fBufComp) {
283       delete [] fBufComp;
284       fBufComp    = 0;
285       fBufCompCur = 0;
286       fCompPos    = 0;
287    }
288    fCompress = level;
289 }
290
291 //______________________________________________________________________________
292 Int_t AliHLTMessage::Compress()
293 {
294    // Compress the message. The message will only be compressed if the
295    // compression level > 0 and the if the message is > 256 bytes.
296    // Returns -1 in case of error (when compression fails or
297    // when the message increases in size in some pathological cases),
298    // otherwise returns 0.
299
300    if (fCompress == 0) {
301       // no compression specified
302       if (fBufComp) {
303          delete [] fBufComp;
304          fBufComp    = 0;
305          fBufCompCur = 0;
306          fCompPos    = 0;
307       }
308       return 0;
309    }
310
311    if (fBufComp && fCompPos == fBufCur) {
312       // the message was already compressed
313       return 0;
314    }
315
316    // remove any existing compressed buffer before compressing modified message
317    if (fBufComp) {
318       delete [] fBufComp;
319       fBufComp    = 0;
320       fBufCompCur = 0;
321       fCompPos    = 0;
322    }
323
324    if (Length() <= (Int_t)(256 + 2*sizeof(UInt_t))) {
325       // this message is too small to be compressed
326       return 0;
327    }
328
329    Int_t hdrlen   = 2*sizeof(UInt_t);
330    Int_t messlen  = Length() - hdrlen;
331    Int_t nbuffers = messlen / kMAXBUF;
332    Int_t chdrlen  = 3*sizeof(UInt_t);   // compressed buffer header length
333    Int_t buflen   = TMath::Max(512, chdrlen + messlen + 9*nbuffers);
334    fBufComp       = new char[buflen];
335    char *messbuf  = Buffer() + hdrlen;
336    char *bufcur   = fBufComp + chdrlen;
337    Int_t noutot   = 0;
338    Int_t nzip     = 0;
339    Int_t nout, bufmax;
340    for (Int_t i = 0; i <= nbuffers; i++) {
341       if (i == nbuffers)
342          bufmax = messlen - nzip;
343       else
344          bufmax = kMAXBUF;
345       R__zip(fCompress, &bufmax, messbuf, &bufmax, bufcur, &nout);
346       if (nout == 0 || nout >= messlen) {
347          //this happens when the buffer cannot be compressed
348          delete [] fBufComp;
349          fBufComp    = 0;
350          fBufCompCur = 0;
351          fCompPos    = 0;
352          return -1;
353       }
354       bufcur  += nout;
355       noutot  += nout;
356       messbuf += kMAXBUF;
357       nzip    += kMAXBUF;
358    }
359    fBufCompCur = bufcur;
360    fCompPos    = fBufCur;
361
362    bufcur = fBufComp;
363    tobuf(bufcur, (UInt_t)(CompLength() - sizeof(UInt_t)));
364    Int_t what = fWhat | kMESS_ZIP;
365    tobuf(bufcur, what);
366    tobuf(bufcur, Length());    // original uncompressed buffer length
367
368    return 0;
369 }
370
371 //______________________________________________________________________________
372 Int_t AliHLTMessage::Uncompress()
373 {
374    // Uncompress the message. The message will only be uncompressed when
375    // kMESS_ZIP is set. Returns -1 in case of error, 0 otherwise.
376
377    if (!fBufComp || !(fWhat & kMESS_ZIP))
378       return -1;
379
380    Int_t buflen;
381    Int_t hdrlen = 2*sizeof(UInt_t);
382    char *bufcur1 = fBufComp + hdrlen;
383    frombuf(bufcur1, &buflen);
384    UChar_t *bufcur = (UChar_t*)bufcur1;
385    fBuffer  = new char[buflen];
386    fBufUncompressed = fBuffer;
387    fBufSize = buflen;
388    fBufCur  = fBuffer + sizeof(UInt_t) + sizeof(fWhat);
389    fBufMax  = fBuffer + fBufSize;
390    char *messbuf = fBuffer + hdrlen;
391
392    Int_t nin, nout, nbuf;
393    Int_t noutot = 0;
394    while (1) {
395       nin  = 9 + ((Int_t)bufcur[3] | ((Int_t)bufcur[4] << 8) | ((Int_t)bufcur[5] << 16));
396       nbuf = (Int_t)bufcur[6] | ((Int_t)bufcur[7] << 8) | ((Int_t)bufcur[8] << 16);
397       R__unzip(&nin, bufcur, &nbuf, messbuf, &nout);
398       if (!nout) break;
399       noutot += nout;
400       if (noutot >= buflen - hdrlen) break;
401       bufcur  += nin;
402       messbuf += nout;
403    }
404
405    fWhat &= ~kMESS_ZIP;
406    fCompress = 1;
407
408    return 0;
409 }
410
411 //______________________________________________________________________________
412 void AliHLTMessage::WriteObject(const TObject *obj)
413 {
414    // Write object to message buffer.
415    // When support for schema evolution is enabled the list of TStreamerInfo
416    // used to stream this object is kept in fInfos. This information is used
417    // by TSocket::Send that sends this list through the socket. This list is in
418    // turn used by TSocket::Recv to store the TStreamerInfo objects in the
419    // relevant TClass in case the TClass does not know yet about a particular
420    // class version. This feature is implemented to support clients and servers
421    // with either different ROOT versions or different user classes versions.
422
423    if (fgEvolution || fEvolution) {
424       if (fInfos)
425          fInfos->Clear();
426       else
427          fInfos = new TList();
428    }
429
430    fBitsPIDs.ResetAllBits();
431    WriteObjectAny(obj, TObject::Class());
432 }
433
434 //______________________________________________________________________________
435 UShort_t AliHLTMessage::WriteProcessID(TProcessID *pid)
436 {
437    // Check if the ProcessID pid is already in the message.
438    // If not, then:
439    //   - mark bit 0 of fBitsPIDs to indicate that a ProcessID has been found
440    //   - mark bit uid+1 where uid id the uid of the ProcessID
441
442    if (fBitsPIDs.TestBitNumber(0)) return 0;
443    if (!pid)
444       pid = TProcessID::GetPID();
445    if (!pid) return 0;
446    fBitsPIDs.SetBitNumber(0);
447    UInt_t uid = pid->GetUniqueID();
448    fBitsPIDs.SetBitNumber(uid+1);
449    return 1;
450 }
451
452 AliHLTMessage* AliHLTMessage::Stream(TObject* pSrc, Int_t compression, unsigned verbosity)
453 {
454   /// Helper function to stream an object into an AliHLTMessage
455   /// The returned instance must be cleaned by the caller
456   ///
457   /// Get the data and data size from the message:
458   ///  first check
459   ///    pMsg->CompLength();
460   ///    pMsg->CompBuffer();
461   ///  if that is NULL
462   ///    pMsg->Length();
463   ///    pMsg->Buffer();
464   ///
465   /// Note: accessing scheme will be change din the future to just have the two
466   ///       latter ones.
467   if (!pSrc) return NULL;
468
469   AliHLTLogging log;
470   AliHLTMessage* pMsg=new AliHLTMessage(kMESS_OBJECT);
471   if (!pMsg) {
472     log.LoggingVarargs(kHLTLogError, "AliHLTMessage", "Stream" , __FILE__ , __LINE__ , "memory allocation failed");
473     return NULL;
474   }
475
476   pMsg->SetCompressionLevel(compression);
477   pMsg->WriteObject(pSrc);
478   if (pMsg->Length()>0) {
479     // Matthias Sep 2008
480     // NOTE: AliHLTMessage does implement it's own SetLength method
481     // which is not architecture independent. The original SetLength
482     // stores the size always in network byte order.
483     // I'm trying to remember the rational for that, might be that
484     // it was just some lack of knowledge. Want to change this, but
485     // has to be done carefully to be backward compatible.
486     pMsg->SetLength(); // sets the length to the first (reserved) word
487
488     // does nothing if the level is 0
489     pMsg->Compress();
490
491     if (pMsg->CompBuffer()) {
492       pMsg->SetLength(); // set once more to have the byte order
493       if (verbosity>0) log.LoggingVarargs(kHLTLogInfo, "AliHLTMessage", "Stream" , __FILE__ , __LINE__ , "object %p type %s streamed: size %d", pSrc, pSrc->GetName(), pMsg->CompLength());
494     } else {
495       if (verbosity>0) log.LoggingVarargs(kHLTLogInfo, "AliHLTMessage", "Stream" , __FILE__ , __LINE__ , "object %p type %s streamed: size %d", pSrc, pSrc->GetName(), pMsg->Length());
496     }
497   }
498   return pMsg;
499 }
500
501 TObject* AliHLTMessage::Extract(const void* pBuffer, unsigned bufferSize, unsigned verbosity)
502 {
503    /// Helper function to extract an object from a buffer.
504    /// The returned object must be cleaned by the caller
505   AliHLTLogging log;
506   if (!pBuffer || bufferSize<sizeof(AliHLTUInt32_t)) {
507     if (verbosity>0) log.LoggingVarargs(kHLTLogWarning, "AliHLTMessage", "Extract" , __FILE__ , __LINE__ , "invalid input buffer %p %d", pBuffer, bufferSize);
508     return NULL;
509   }
510
511   AliHLTUInt32_t firstWord=*((AliHLTUInt32_t*)pBuffer);
512   if (firstWord==bufferSize-sizeof(AliHLTUInt32_t) &&
513       firstWord>=34 /*thats the minimum size of a streamed TObject*/) {
514     AliHLTMessage msg((AliHLTUInt8_t*)pBuffer, bufferSize);
515     TClass* objclass=msg.GetClass();
516     TObject* pObject=msg.ReadObject(objclass);
517     if (pObject && objclass) {
518       if (verbosity>0) log.LoggingVarargs(kHLTLogInfo, "AliHLTMessage", "Extract" , __FILE__ , __LINE__ , "object %p type %s created", pObject, objclass->GetName());
519       return pObject;
520     } else {
521       if (verbosity>0) log.LoggingVarargs(kHLTLogWarning, "AliHLTMessage", "Extract" , __FILE__ , __LINE__ , "failed to create object from buffer of size %d", bufferSize);
522     }
523   } else {
524     if (verbosity>0) log.LoggingVarargs(kHLTLogWarning, "AliHLTMessage", "Extract" , __FILE__ , __LINE__ , "not a streamed TObject: block size %d, indicated %d", bufferSize, firstWord+sizeof(AliHLTUInt32_t));
525   }
526   return NULL;
527 }
528
529 TObject* AliHLTMessage::Extract(const char* filename, unsigned verbosity)
530 {
531    /// Helper function to extract an object from a file containing the streamed object.
532    /// The returned object must be cleaned by the caller
533   if (!filename) return NULL;
534   
535   AliHLTLogging log;
536   TString input=filename;
537   input+="?filetype=raw";
538   TFile* pFile=new TFile(input);
539   if (!pFile) return NULL;
540   TObject* pObject=NULL;
541   if (!pFile->IsZombie()) {
542     pFile->Seek(0);
543     TArrayC buffer;
544     buffer.Set(pFile->GetSize());
545     if (pFile->ReadBuffer(buffer.GetArray(), buffer.GetSize())==0) {
546       pObject=Extract(buffer.GetArray(), buffer.GetSize(), verbosity);
547     } else {
548       log.LoggingVarargs(kHLTLogError, "AliHLTMessage", "Extract" , __FILE__ , __LINE__ , "failed reading %d byte(s) from file %s", pFile->GetSize(), filename);
549     }
550   }
551
552   delete pFile;
553   return pObject;
554 }