3 /** @file AliHLTMessage.cxx
4 @author Matthias Richter (customization of Root TMessage )
6 @brief Serialization of Root objects in the ALICE HLT. */
8 // This is the original Root TMessage implementation with a few minor
9 // modifications, original revision:
10 // root/net: v5-14-00 $: TMessage.cxx,v 1.6 2004/05/07 09:51:58 brun
11 // Author: Fons Rademakers 19/12/96
13 /*************************************************************************
14 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
15 * All rights reserved. *
17 * For the licensing terms see $ROOTSYS/LICENSE. *
18 * For the list of contributors see $ROOTSYS/README/CREDITS. *
19 *************************************************************************/
21 //////////////////////////////////////////////////////////////////////////
25 // Message buffer class used for serializing objects and sending them //
26 // over a network. This class inherits from TBuffer the basic I/O //
29 //////////////////////////////////////////////////////////////////////////
31 #include "AliHLTMessage.h"
32 #include "TVirtualStreamerInfo.h"
35 #include "TProcessID.h"
38 extern "C" void R__zip (Int_t cxlevel, Int_t *nin, char *bufin, Int_t *lout, char *bufout, Int_t *nout);
39 extern "C" void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout);
40 const Int_t kMAXBUF = 0xffffff;
42 Bool_t AliHLTMessage::fgEvolution = kFALSE;
44 ClassImp(AliHLTMessage)
46 //______________________________________________________________________________
47 AliHLTMessage::AliHLTMessage(UInt_t what)
49 # ifdef ROOT_TBufferFile
66 // Create a AliHLTMessage object for storing objects. The "what" integer
67 // describes the type of message. Predifined ROOT system message types
68 // can be found in MessageTypes.h. Make sure your own message types are
69 // unique from the ROOT defined message types (i.e. 0 - 10000 are
70 // reserved by ROOT). In case you OR "what" with kMESS_ACK, the message
71 // will wait for an acknowledgement from the remote side. This makes
72 // the sending process synchronous. In case you OR "what" with kMESS_ZIP,
73 // the message will be compressed in TSocket using the zip algorithm
74 // (only if message is > 256 bytes).
76 // space at the beginning of the message reserved for the message length
82 SetBit(kCannotHandleMemberWiseStreaming);
85 const Int_t AliHLTMessage::fgkMinimumSize=30;
86 UInt_t AliHLTMessage::fgkDefaultBuffer[2]={0,0};
88 //______________________________________________________________________________
89 AliHLTMessage::AliHLTMessage(void *buf, Int_t bufsize)
91 # if defined(ROOT_TBufferFile)
92 TBufferFile(kRead, bufsize>fgkMinimumSize?bufsize:sizeof(fgkDefaultBuffer), bufsize>fgkMinimumSize?buf:&fgkDefaultBuffer, 0),
94 TBuffer(kRead, bufsize>fgkMinimumSize?bufsize:sizeof(fgkDefaultBuffer), bufsize>fgkMinimumSize?buf:&fgkDefaultBuffer, 0),
103 , fBufUncompressed(0)
108 // Create a AliHLTMessage object for reading objects. The objects will be
109 // read from buf. Use the What() method to get the message type.
111 // skip space at the beginning of the message reserved for the message length
112 fBufCur += sizeof(UInt_t);
116 if (fWhat & kMESS_ZIP) {
117 // if buffer has kMESS_ZIP set, move it to fBufComp and uncompress
119 fBufCompCur = fBuffer + bufsize;
123 // NOTE: this is not done in TMessage and will lead to the deletion
124 // of the buffer. This is not allowed in case of HLT where the
125 // buffer is handled by the framework. In general, I think this
126 // is a very bad idea to do it like that in TMessage
131 if (fWhat == kMESS_OBJECT) {
133 fClass = ReadClass(); // get first the class stored in message
134 SetBufferOffset(sizeof(UInt_t) + sizeof(fWhat));
141 //______________________________________________________________________________
142 AliHLTMessage::~AliHLTMessage()
144 // Clean up compression buffer.
148 //______________________________________________________________________________
149 void AliHLTMessage::EnableSchemaEvolutionForAll(Bool_t enable)
151 // Static function enabling or disabling the automatic schema evolution.
152 // By default schema evolution support is off.
154 fgEvolution = enable;
157 //______________________________________________________________________________
158 Bool_t AliHLTMessage::UsesSchemaEvolutionForAll()
160 // Static function returning status of global schema evolution.
165 //______________________________________________________________________________
166 void AliHLTMessage::ForceWriteInfo(TVirtualStreamerInfo *info, Bool_t /* force */)
168 // Force writing the TStreamerInfo to the message.
170 if (fgEvolution || fEvolution) {
171 if (!fInfos) fInfos = new TList();
176 //______________________________________________________________________________
177 void AliHLTMessage::Forward()
179 // Change a buffer that was received into one that can be send, i.e.
180 // forward a just received message.
184 SetBufferOffset(fBufSize);
185 SetBit(kCannotHandleMemberWiseStreaming);
193 //______________________________________________________________________________
194 void AliHLTMessage::IncrementLevel(TVirtualStreamerInfo *info)
198 TBufferFile::IncrementLevel(info);
200 if (fgEvolution || fEvolution) {
201 if (!fInfos) fInfos = new TList();
206 //______________________________________________________________________________
207 void AliHLTMessage::Reset()
209 // Reset the message buffer so we can use (i.e. fill) it again.
211 SetBufferOffset(sizeof(UInt_t) + sizeof(fWhat));
220 if (fBufUncompressed) {
221 delete [] fBufUncompressed;
222 fBufUncompressed=NULL;
226 //______________________________________________________________________________
227 void AliHLTMessage::SetLength() const
229 // Set the message length at the beginning of the message buffer.
232 char *buf = Buffer();
233 *((UInt_t*)buf) = (UInt_t)(Length() - sizeof(UInt_t));
237 *((UInt_t*)buf) = (UInt_t)(CompLength() - sizeof(UInt_t));
242 //______________________________________________________________________________
243 void AliHLTMessage::SetWhat(UInt_t what)
245 // Using this method one can change the message type a-posteriory.
246 // In case you OR "what" with kMESS_ACK, the message will wait for
247 // an acknowledgement from the remote side. This makes the sending
248 // process synchronous.
252 char *buf = Buffer();
253 buf += sizeof(UInt_t); // skip reserved length space
258 buf += sizeof(UInt_t); // skip reserved length space
259 tobuf(buf, what | kMESS_ZIP);
263 //______________________________________________________________________________
264 void AliHLTMessage::SetCompressionLevel(Int_t level)
266 // Set the message compression level. Can be between 0 and 9 with 0
267 // being no compression and 9 maximum compression. In general the default
268 // level of 1 is the best compromise between achieved compression and
269 // cpu time. Compression will only happen when the message is > 256 bytes.
271 if (level < 0) level = 0;
272 if (level > 9) level = 9;
274 if (level != fCompress && fBufComp) {
283 //______________________________________________________________________________
284 Int_t AliHLTMessage::Compress()
286 // Compress the message. The message will only be compressed if the
287 // compression level > 0 and the if the message is > 256 bytes.
288 // Returns -1 in case of error (when compression fails or
289 // when the message increases in size in some pathological cases),
290 // otherwise returns 0.
292 if (fCompress == 0) {
293 // no compression specified
303 if (fBufComp && fCompPos == fBufCur) {
304 // the message was already compressed
308 // remove any existing compressed buffer before compressing modified message
316 if (Length() <= (Int_t)(256 + 2*sizeof(UInt_t))) {
317 // this message is too small to be compressed
321 Int_t hdrlen = 2*sizeof(UInt_t);
322 Int_t messlen = Length() - hdrlen;
323 Int_t nbuffers = messlen / kMAXBUF;
324 Int_t chdrlen = 3*sizeof(UInt_t); // compressed buffer header length
325 Int_t buflen = TMath::Max(512, chdrlen + messlen + 9*nbuffers);
326 fBufComp = new char[buflen];
327 char *messbuf = Buffer() + hdrlen;
328 char *bufcur = fBufComp + chdrlen;
332 for (Int_t i = 0; i <= nbuffers; i++) {
334 bufmax = messlen - nzip;
337 R__zip(fCompress, &bufmax, messbuf, &bufmax, bufcur, &nout);
338 if (nout == 0 || nout >= messlen) {
339 //this happens when the buffer cannot be compressed
351 fBufCompCur = bufcur;
355 tobuf(bufcur, (UInt_t)(CompLength() - sizeof(UInt_t)));
356 Int_t what = fWhat | kMESS_ZIP;
358 tobuf(bufcur, Length()); // original uncompressed buffer length
363 //______________________________________________________________________________
364 Int_t AliHLTMessage::Uncompress()
366 // Uncompress the message. The message will only be uncompressed when
367 // kMESS_ZIP is set. Returns -1 in case of error, 0 otherwise.
369 if (!fBufComp || !(fWhat & kMESS_ZIP))
373 Int_t hdrlen = 2*sizeof(UInt_t);
374 UChar_t *bufcur = (UChar_t*)fBufComp + hdrlen;
375 frombuf((char *&)bufcur, &buflen);
376 fBuffer = new char[buflen];
377 fBufUncompressed = fBuffer;
379 fBufCur = fBuffer + sizeof(UInt_t) + sizeof(fWhat);
380 fBufMax = fBuffer + fBufSize;
381 char *messbuf = fBuffer + hdrlen;
383 Int_t nin, nout, nbuf;
386 nin = 9 + ((Int_t)bufcur[3] | ((Int_t)bufcur[4] << 8) | ((Int_t)bufcur[5] << 16));
387 nbuf = (Int_t)bufcur[6] | ((Int_t)bufcur[7] << 8) | ((Int_t)bufcur[8] << 16);
388 R__unzip(&nin, bufcur, &nbuf, messbuf, &nout);
391 if (noutot >= buflen - hdrlen) break;
402 //______________________________________________________________________________
403 void AliHLTMessage::WriteObject(const TObject *obj)
405 // Write object to message buffer.
406 // When support for schema evolution is enabled the list of TStreamerInfo
407 // used to stream this object is kept in fInfos. This information is used
408 // by TSocket::Send that sends this list through the socket. This list is in
409 // turn used by TSocket::Recv to store the TStreamerInfo objects in the
410 // relevant TClass in case the TClass does not know yet about a particular
411 // class version. This feature is implemented to support clients and servers
412 // with either different ROOT versions or different user classes versions.
414 if (fgEvolution || fEvolution) {
418 fInfos = new TList();
421 fBitsPIDs.ResetAllBits();
422 WriteObjectAny(obj, TObject::Class());
425 //______________________________________________________________________________
426 UShort_t AliHLTMessage::WriteProcessID(TProcessID *pid)
428 // Check if the ProcessID pid is already in the message.
430 // - mark bit 0 of fBitsPIDs to indicate that a ProcessID has been found
431 // - mark bit uid+1 where uid id the uid of the ProcessID
433 if (fBitsPIDs.TestBitNumber(0)) return 0;
435 pid = TProcessID::GetPID();
437 fBitsPIDs.SetBitNumber(0);
438 UInt_t uid = pid->GetUniqueID();
439 fBitsPIDs.SetBitNumber(uid+1);
443 AliHLTMessage* AliHLTMessage::Stream(TObject* pSrc, Int_t compression, unsigned verbosity)
445 /// Helper function to stream an object into an AliHLTMessage
446 /// The returned instance must be cleaned by the caller
448 /// Get the data and data size from the message:
450 /// pMsg->CompLength();
451 /// pMsg->CompBuffer();
456 /// Note: accessing scheme will be change din the future to just have the two
458 if (!pSrc) return NULL;
461 AliHLTMessage* pMsg=new AliHLTMessage(kMESS_OBJECT);
463 log.LoggingVarargs(kHLTLogError, "AliHLTMessage", "Stream" , __FILE__ , __LINE__ , "memory allocation failed");
467 pMsg->SetCompressionLevel(compression);
468 pMsg->WriteObject(pSrc);
469 if (pMsg->Length()>0) {
471 // NOTE: AliHLTMessage does implement it's own SetLength method
472 // which is not architecture independent. The original SetLength
473 // stores the size always in network byte order.
474 // I'm trying to remember the rational for that, might be that
475 // it was just some lack of knowledge. Want to change this, but
476 // has to be done carefully to be backward compatible.
477 pMsg->SetLength(); // sets the length to the first (reserved) word
479 // does nothing if the level is 0
482 if (pMsg->CompBuffer()) {
483 pMsg->SetLength(); // set once more to have the byte order
484 if (verbosity>0) log.LoggingVarargs(kHLTLogInfo, "AliHLTMessage", "Stream" , __FILE__ , __LINE__ , "object %p type %s streamed: size %d", pSrc, pSrc->GetName(), pMsg->CompLength());
486 if (verbosity>0) log.LoggingVarargs(kHLTLogInfo, "AliHLTMessage", "Stream" , __FILE__ , __LINE__ , "object %p type %s streamed: size %d", pSrc, pSrc->GetName(), pMsg->Length());
492 TObject* AliHLTMessage::Extract(const void* pBuffer, unsigned bufferSize, unsigned verbosity)
494 /// Helper function to extract an object from a buffer.
495 /// The returned object must be cleaned by the caller
497 if (!pBuffer || bufferSize<sizeof(AliHLTUInt32_t)) {
498 if (verbosity>0) log.LoggingVarargs(kHLTLogWarning, "AliHLTMessage", "Extract" , __FILE__ , __LINE__ , "invalid input buffer %p %d", pBuffer, bufferSize);
502 AliHLTUInt32_t firstWord=*((AliHLTUInt32_t*)pBuffer);
503 if (firstWord==bufferSize-sizeof(AliHLTUInt32_t) &&
504 firstWord>=34 /*thats the minimum size of a streamed TObject*/) {
505 AliHLTMessage msg((AliHLTUInt8_t*)pBuffer, bufferSize);
506 TClass* objclass=msg.GetClass();
507 TObject* pObject=msg.ReadObject(objclass);
508 if (pObject && objclass) {
509 if (verbosity>0) log.LoggingVarargs(kHLTLogInfo, "AliHLTMessage", "Extract" , __FILE__ , __LINE__ , "object %p type %s created", pObject, objclass->GetName());
512 if (verbosity>0) log.LoggingVarargs(kHLTLogWarning, "AliHLTMessage", "Extract" , __FILE__ , __LINE__ , "failed to create object from buffer of size %d", bufferSize);
515 if (verbosity>0) log.LoggingVarargs(kHLTLogWarning, "AliHLTMessage", "Extract" , __FILE__ , __LINE__ , "not a streamed TObject: block size %d, indicated %d", bufferSize, firstWord+sizeof(AliHLTUInt32_t));