]> git.uio.no Git - u/mrichter/AliRoot.git/blame - HLT/BASE/AliHLTMessage.cxx
Fixing warning.
[u/mrichter/AliRoot.git] / HLT / BASE / AliHLTMessage.cxx
CommitLineData
a655eae3 1// $Id$
2
3/** @file AliHLTMessage.cxx
4 @author Matthias Richter (customization of Root TMessage )
5 @date
6 @brief Serialization of Root objects in the ALICE HLT. */
7
8// This is the original Root TMessage implementation with a few minor
9// modifications, original revision:
10// root/net: v5-14-00 $: TMessage.cxx,v 1.6 2004/05/07 09:51:58 brun
11// Author: Fons Rademakers 19/12/96
12
13/*************************************************************************
14 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
15 * All rights reserved. *
16 * *
17 * For the licensing terms see $ROOTSYS/LICENSE. *
18 * For the list of contributors see $ROOTSYS/README/CREDITS. *
19 *************************************************************************/
20
21//////////////////////////////////////////////////////////////////////////
22// //
23// TMessage //
24// //
25// Message buffer class used for serializing objects and sending them //
26// over a network. This class inherits from TBuffer the basic I/O //
27// serializer. //
28// //
29//////////////////////////////////////////////////////////////////////////
30
31#include "AliHLTMessage.h"
257aedf5 32#include "TVirtualStreamerInfo.h"
a655eae3 33#include "Bytes.h"
34#include "TFile.h"
257aedf5 35#include "TProcessID.h"
e5701dcf 36#include "TClass.h"
a655eae3 37
38extern "C" void R__zip (Int_t cxlevel, Int_t *nin, char *bufin, Int_t *lout, char *bufout, Int_t *nout);
39extern "C" void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout);
40const Int_t kMAXBUF = 0xffffff;
41
257aedf5 42Bool_t AliHLTMessage::fgEvolution = kFALSE;
43
a655eae3 44ClassImp(AliHLTMessage)
45
46//______________________________________________________________________________
dfaa582a 47AliHLTMessage::AliHLTMessage(UInt_t what)
48 :
49# ifdef ROOT_TBufferFile
50 TBufferFile(kWrite),
51# else
52 TBuffer(kWrite),
53# endif
c61a7285 54 AliHLTLogging(),
55 fWhat(what),
56 fClass(0),
57 fCompress(0),
58 fBufComp(0),
59 fBufCompCur(0),
60 fCompPos(0)
0634add1 61 , fBufUncompressed(0)
257aedf5 62 , fBitsPIDs(0)
63 , fInfos(NULL)
64 , fEvolution(kFALSE)
a655eae3 65{
66 // Create a AliHLTMessage object for storing objects. The "what" integer
67 // describes the type of message. Predifined ROOT system message types
68 // can be found in MessageTypes.h. Make sure your own message types are
69 // unique from the ROOT defined message types (i.e. 0 - 10000 are
70 // reserved by ROOT). In case you OR "what" with kMESS_ACK, the message
71 // will wait for an acknowledgement from the remote side. This makes
72 // the sending process synchronous. In case you OR "what" with kMESS_ZIP,
73 // the message will be compressed in TSocket using the zip algorithm
74 // (only if message is > 256 bytes).
75
76 // space at the beginning of the message reserved for the message length
77 UInt_t reserved = 0;
78 *this << reserved;
79
a655eae3 80 *this << what;
81
257aedf5 82 SetBit(kCannotHandleMemberWiseStreaming);
a655eae3 83}
84
5cdaceed 85const Int_t AliHLTMessage::fgkMinimumSize=30;
86UInt_t AliHLTMessage::fgkDefaultBuffer[2]={0,0};
87
a655eae3 88//______________________________________________________________________________
dfaa582a 89AliHLTMessage::AliHLTMessage(void *buf, Int_t bufsize)
90 :
ba4b0dd7 91# if defined(ROOT_TBufferFile)
5cdaceed 92 TBufferFile(kRead, bufsize>fgkMinimumSize?bufsize:sizeof(fgkDefaultBuffer), bufsize>fgkMinimumSize?buf:&fgkDefaultBuffer, 0),
dfaa582a 93# else
5cdaceed 94 TBuffer(kRead, bufsize>fgkMinimumSize?bufsize:sizeof(fgkDefaultBuffer), bufsize>fgkMinimumSize?buf:&fgkDefaultBuffer, 0),
dfaa582a 95# endif
c61a7285 96 AliHLTLogging(),
97 fWhat(0),
98 fClass(0),
99 fCompress(0),
100 fBufComp(0),
101 fBufCompCur(0),
102 fCompPos(0)
0634add1 103 , fBufUncompressed(0)
257aedf5 104 , fBitsPIDs(0)
105 , fInfos(NULL)
106 , fEvolution(kFALSE)
a655eae3 107{
108 // Create a AliHLTMessage object for reading objects. The objects will be
109 // read from buf. Use the What() method to get the message type.
110
111 // skip space at the beginning of the message reserved for the message length
112 fBufCur += sizeof(UInt_t);
113
114 *this >> fWhat;
115
a655eae3 116 if (fWhat & kMESS_ZIP) {
117 // if buffer has kMESS_ZIP set, move it to fBufComp and uncompress
118 fBufComp = fBuffer;
119 fBufCompCur = fBuffer + bufsize;
120 fBuffer = 0;
121 Uncompress();
d896b5c8 122 // Matthias Sep 2008
123 // NOTE: this is not done in TMessage and will lead to the deletion
124 // of the buffer. This is not allowed in case of HLT where the
125 // buffer is handled by the framework. In general, I think this
126 // is a very bad idea to do it like that in TMessage
127 fBufComp = NULL;
128 fBufCompCur = 0;
a655eae3 129 }
130
131 if (fWhat == kMESS_OBJECT) {
132 InitMap();
133 fClass = ReadClass(); // get first the class stored in message
134 SetBufferOffset(sizeof(UInt_t) + sizeof(fWhat));
135 ResetMap();
136 } else {
137 fClass = 0;
138 }
139}
140
141//______________________________________________________________________________
142AliHLTMessage::~AliHLTMessage()
143{
144 // Clean up compression buffer.
0634add1 145 Reset();
a655eae3 146}
147
257aedf5 148//______________________________________________________________________________
149void AliHLTMessage::EnableSchemaEvolutionForAll(Bool_t enable)
150{
151 // Static function enabling or disabling the automatic schema evolution.
152 // By default schema evolution support is off.
153
154 fgEvolution = enable;
155}
156
157//______________________________________________________________________________
158Bool_t AliHLTMessage::UsesSchemaEvolutionForAll()
159{
160 // Static function returning status of global schema evolution.
161
162 return fgEvolution;
163}
164
165//______________________________________________________________________________
166void AliHLTMessage::ForceWriteInfo(TVirtualStreamerInfo *info, Bool_t /* force */)
167{
168 // Force writing the TStreamerInfo to the message.
169
170 if (fgEvolution || fEvolution) {
171 if (!fInfos) fInfos = new TList();
e0617052 172 if (fInfos->FindObject(info->GetName())==NULL) {
173 fInfos->Add(info);
174 }
257aedf5 175 }
176}
177
a655eae3 178//______________________________________________________________________________
179void AliHLTMessage::Forward()
180{
181 // Change a buffer that was received into one that can be send, i.e.
182 // forward a just received message.
183
184 if (IsReading()) {
185 SetWriteMode();
186 SetBufferOffset(fBufSize);
257aedf5 187 SetBit(kCannotHandleMemberWiseStreaming);
a655eae3 188
189 if (fBufComp) {
190 fCompPos = fBufCur;
191 }
192 }
193}
194
257aedf5 195//______________________________________________________________________________
196void AliHLTMessage::IncrementLevel(TVirtualStreamerInfo *info)
197{
198 // Increment level.
199
200 TBufferFile::IncrementLevel(info);
201
e0617052 202 if (!info) return;
257aedf5 203 if (fgEvolution || fEvolution) {
204 if (!fInfos) fInfos = new TList();
e0617052 205
206 // add the streamer info, but only once
207 // this assumes that there is only one version
208 if (fInfos->FindObject(info->GetName())==NULL) {
209 fInfos->Add(info);
210 }
257aedf5 211 }
212}
213
a655eae3 214//______________________________________________________________________________
215void AliHLTMessage::Reset()
216{
217 // Reset the message buffer so we can use (i.e. fill) it again.
218
219 SetBufferOffset(sizeof(UInt_t) + sizeof(fWhat));
220 ResetMap();
221
222 if (fBufComp) {
223 delete [] fBufComp;
224 fBufComp = 0;
225 fBufCompCur = 0;
226 fCompPos = 0;
227 }
0634add1 228 if (fBufUncompressed) {
229 delete [] fBufUncompressed;
230 fBufUncompressed=NULL;
231 }
a655eae3 232}
233
234//______________________________________________________________________________
235void AliHLTMessage::SetLength() const
236{
237 // Set the message length at the beginning of the message buffer.
238
239 if (IsWriting()) {
240 char *buf = Buffer();
241 *((UInt_t*)buf) = (UInt_t)(Length() - sizeof(UInt_t));
242
243 if (fBufComp) {
244 buf = fBufComp;
d896b5c8 245 *((UInt_t*)buf) = (UInt_t)(CompLength() - sizeof(UInt_t));
a655eae3 246 }
247 }
248}
249
250//______________________________________________________________________________
251void AliHLTMessage::SetWhat(UInt_t what)
252{
253 // Using this method one can change the message type a-posteriory.
254 // In case you OR "what" with kMESS_ACK, the message will wait for
255 // an acknowledgement from the remote side. This makes the sending
256 // process synchronous.
257
258 fWhat = what;
259
260 char *buf = Buffer();
261 buf += sizeof(UInt_t); // skip reserved length space
262 tobuf(buf, what);
263
264 if (fBufComp) {
265 buf = fBufComp;
266 buf += sizeof(UInt_t); // skip reserved length space
267 tobuf(buf, what | kMESS_ZIP);
268 }
269}
270
271//______________________________________________________________________________
272void AliHLTMessage::SetCompressionLevel(Int_t level)
273{
274 // Set the message compression level. Can be between 0 and 9 with 0
275 // being no compression and 9 maximum compression. In general the default
276 // level of 1 is the best compromise between achieved compression and
277 // cpu time. Compression will only happen when the message is > 256 bytes.
278
279 if (level < 0) level = 0;
280 if (level > 9) level = 9;
281
282 if (level != fCompress && fBufComp) {
283 delete [] fBufComp;
284 fBufComp = 0;
285 fBufCompCur = 0;
286 fCompPos = 0;
287 }
288 fCompress = level;
289}
290
291//______________________________________________________________________________
292Int_t AliHLTMessage::Compress()
293{
294 // Compress the message. The message will only be compressed if the
295 // compression level > 0 and the if the message is > 256 bytes.
296 // Returns -1 in case of error (when compression fails or
297 // when the message increases in size in some pathological cases),
298 // otherwise returns 0.
299
300 if (fCompress == 0) {
301 // no compression specified
302 if (fBufComp) {
303 delete [] fBufComp;
304 fBufComp = 0;
305 fBufCompCur = 0;
306 fCompPos = 0;
307 }
308 return 0;
309 }
310
311 if (fBufComp && fCompPos == fBufCur) {
312 // the message was already compressed
313 return 0;
314 }
315
316 // remove any existing compressed buffer before compressing modified message
317 if (fBufComp) {
318 delete [] fBufComp;
319 fBufComp = 0;
320 fBufCompCur = 0;
321 fCompPos = 0;
322 }
323
324 if (Length() <= (Int_t)(256 + 2*sizeof(UInt_t))) {
325 // this message is too small to be compressed
326 return 0;
327 }
328
329 Int_t hdrlen = 2*sizeof(UInt_t);
330 Int_t messlen = Length() - hdrlen;
331 Int_t nbuffers = messlen / kMAXBUF;
332 Int_t chdrlen = 3*sizeof(UInt_t); // compressed buffer header length
333 Int_t buflen = TMath::Max(512, chdrlen + messlen + 9*nbuffers);
334 fBufComp = new char[buflen];
335 char *messbuf = Buffer() + hdrlen;
336 char *bufcur = fBufComp + chdrlen;
337 Int_t noutot = 0;
338 Int_t nzip = 0;
339 Int_t nout, bufmax;
340 for (Int_t i = 0; i <= nbuffers; i++) {
341 if (i == nbuffers)
342 bufmax = messlen - nzip;
343 else
344 bufmax = kMAXBUF;
345 R__zip(fCompress, &bufmax, messbuf, &bufmax, bufcur, &nout);
346 if (nout == 0 || nout >= messlen) {
347 //this happens when the buffer cannot be compressed
348 delete [] fBufComp;
349 fBufComp = 0;
350 fBufCompCur = 0;
351 fCompPos = 0;
352 return -1;
353 }
354 bufcur += nout;
355 noutot += nout;
356 messbuf += kMAXBUF;
357 nzip += kMAXBUF;
358 }
359 fBufCompCur = bufcur;
360 fCompPos = fBufCur;
361
362 bufcur = fBufComp;
363 tobuf(bufcur, (UInt_t)(CompLength() - sizeof(UInt_t)));
364 Int_t what = fWhat | kMESS_ZIP;
365 tobuf(bufcur, what);
366 tobuf(bufcur, Length()); // original uncompressed buffer length
367
368 return 0;
369}
370
371//______________________________________________________________________________
372Int_t AliHLTMessage::Uncompress()
373{
374 // Uncompress the message. The message will only be uncompressed when
375 // kMESS_ZIP is set. Returns -1 in case of error, 0 otherwise.
376
377 if (!fBufComp || !(fWhat & kMESS_ZIP))
378 return -1;
379
380 Int_t buflen;
381 Int_t hdrlen = 2*sizeof(UInt_t);
3e952f58 382 char *bufcur1 = fBufComp + hdrlen;
383 frombuf(bufcur1, &buflen);
384 UChar_t *bufcur = (UChar_t*)bufcur1;
a655eae3 385 fBuffer = new char[buflen];
0634add1 386 fBufUncompressed = fBuffer;
a655eae3 387 fBufSize = buflen;
388 fBufCur = fBuffer + sizeof(UInt_t) + sizeof(fWhat);
389 fBufMax = fBuffer + fBufSize;
390 char *messbuf = fBuffer + hdrlen;
391
392 Int_t nin, nout, nbuf;
393 Int_t noutot = 0;
394 while (1) {
395 nin = 9 + ((Int_t)bufcur[3] | ((Int_t)bufcur[4] << 8) | ((Int_t)bufcur[5] << 16));
396 nbuf = (Int_t)bufcur[6] | ((Int_t)bufcur[7] << 8) | ((Int_t)bufcur[8] << 16);
397 R__unzip(&nin, bufcur, &nbuf, messbuf, &nout);
398 if (!nout) break;
399 noutot += nout;
400 if (noutot >= buflen - hdrlen) break;
401 bufcur += nin;
402 messbuf += nout;
403 }
404
405 fWhat &= ~kMESS_ZIP;
406 fCompress = 1;
407
408 return 0;
409}
410
257aedf5 411//______________________________________________________________________________
412void AliHLTMessage::WriteObject(const TObject *obj)
413{
414 // Write object to message buffer.
415 // When support for schema evolution is enabled the list of TStreamerInfo
416 // used to stream this object is kept in fInfos. This information is used
417 // by TSocket::Send that sends this list through the socket. This list is in
418 // turn used by TSocket::Recv to store the TStreamerInfo objects in the
419 // relevant TClass in case the TClass does not know yet about a particular
420 // class version. This feature is implemented to support clients and servers
421 // with either different ROOT versions or different user classes versions.
422
423 if (fgEvolution || fEvolution) {
424 if (fInfos)
425 fInfos->Clear();
426 else
427 fInfos = new TList();
428 }
429
430 fBitsPIDs.ResetAllBits();
431 WriteObjectAny(obj, TObject::Class());
432}
433
434//______________________________________________________________________________
435UShort_t AliHLTMessage::WriteProcessID(TProcessID *pid)
436{
437 // Check if the ProcessID pid is already in the message.
438 // If not, then:
439 // - mark bit 0 of fBitsPIDs to indicate that a ProcessID has been found
440 // - mark bit uid+1 where uid id the uid of the ProcessID
441
442 if (fBitsPIDs.TestBitNumber(0)) return 0;
443 if (!pid)
444 pid = TProcessID::GetPID();
445 if (!pid) return 0;
446 fBitsPIDs.SetBitNumber(0);
447 UInt_t uid = pid->GetUniqueID();
448 fBitsPIDs.SetBitNumber(uid+1);
449 return 1;
450}
451
e5701dcf 452AliHLTMessage* AliHLTMessage::Stream(TObject* pSrc, Int_t compression, unsigned verbosity)
453{
454 /// Helper function to stream an object into an AliHLTMessage
455 /// The returned instance must be cleaned by the caller
456 ///
457 /// Get the data and data size from the message:
458 /// first check
459 /// pMsg->CompLength();
460 /// pMsg->CompBuffer();
461 /// if that is NULL
462 /// pMsg->Length();
463 /// pMsg->Buffer();
464 ///
465 /// Note: accessing scheme will be change din the future to just have the two
466 /// latter ones.
467 if (!pSrc) return NULL;
468
469 AliHLTLogging log;
470 AliHLTMessage* pMsg=new AliHLTMessage(kMESS_OBJECT);
471 if (!pMsg) {
472 log.LoggingVarargs(kHLTLogError, "AliHLTMessage", "Stream" , __FILE__ , __LINE__ , "memory allocation failed");
473 return NULL;
474 }
475
476 pMsg->SetCompressionLevel(compression);
477 pMsg->WriteObject(pSrc);
478 if (pMsg->Length()>0) {
479 // Matthias Sep 2008
480 // NOTE: AliHLTMessage does implement it's own SetLength method
481 // which is not architecture independent. The original SetLength
482 // stores the size always in network byte order.
483 // I'm trying to remember the rational for that, might be that
484 // it was just some lack of knowledge. Want to change this, but
485 // has to be done carefully to be backward compatible.
486 pMsg->SetLength(); // sets the length to the first (reserved) word
487
488 // does nothing if the level is 0
489 pMsg->Compress();
490
491 if (pMsg->CompBuffer()) {
492 pMsg->SetLength(); // set once more to have the byte order
493 if (verbosity>0) log.LoggingVarargs(kHLTLogInfo, "AliHLTMessage", "Stream" , __FILE__ , __LINE__ , "object %p type %s streamed: size %d", pSrc, pSrc->GetName(), pMsg->CompLength());
494 } else {
495 if (verbosity>0) log.LoggingVarargs(kHLTLogInfo, "AliHLTMessage", "Stream" , __FILE__ , __LINE__ , "object %p type %s streamed: size %d", pSrc, pSrc->GetName(), pMsg->Length());
496 }
497 }
498 return pMsg;
499}
500
501TObject* AliHLTMessage::Extract(const void* pBuffer, unsigned bufferSize, unsigned verbosity)
502{
503 /// Helper function to extract an object from a buffer.
504 /// The returned object must be cleaned by the caller
505 AliHLTLogging log;
506 if (!pBuffer || bufferSize<sizeof(AliHLTUInt32_t)) {
507 if (verbosity>0) log.LoggingVarargs(kHLTLogWarning, "AliHLTMessage", "Extract" , __FILE__ , __LINE__ , "invalid input buffer %p %d", pBuffer, bufferSize);
508 return NULL;
509 }
510
511 AliHLTUInt32_t firstWord=*((AliHLTUInt32_t*)pBuffer);
512 if (firstWord==bufferSize-sizeof(AliHLTUInt32_t) &&
513 firstWord>=34 /*thats the minimum size of a streamed TObject*/) {
514 AliHLTMessage msg((AliHLTUInt8_t*)pBuffer, bufferSize);
515 TClass* objclass=msg.GetClass();
516 TObject* pObject=msg.ReadObject(objclass);
517 if (pObject && objclass) {
518 if (verbosity>0) log.LoggingVarargs(kHLTLogInfo, "AliHLTMessage", "Extract" , __FILE__ , __LINE__ , "object %p type %s created", pObject, objclass->GetName());
519 return pObject;
520 } else {
521 if (verbosity>0) log.LoggingVarargs(kHLTLogWarning, "AliHLTMessage", "Extract" , __FILE__ , __LINE__ , "failed to create object from buffer of size %d", bufferSize);
522 }
523 } else {
524 if (verbosity>0) log.LoggingVarargs(kHLTLogWarning, "AliHLTMessage", "Extract" , __FILE__ , __LINE__ , "not a streamed TObject: block size %d, indicated %d", bufferSize, firstWord+sizeof(AliHLTUInt32_t));
525 }
526 return NULL;
527}
f5e1aed1 528
529TObject* AliHLTMessage::Extract(const char* filename, unsigned verbosity)
530{
531 /// Helper function to extract an object from a file containing the streamed object.
532 /// The returned object must be cleaned by the caller
533 if (!filename) return NULL;
534
535 AliHLTLogging log;
536 TString input=filename;
537 input+="?filetype=raw";
538 TFile* pFile=new TFile(input);
539 if (!pFile) return NULL;
540 TObject* pObject=NULL;
541 if (!pFile->IsZombie()) {
542 pFile->Seek(0);
543 TArrayC buffer;
544 buffer.Set(pFile->GetSize());
545 if (pFile->ReadBuffer(buffer.GetArray(), buffer.GetSize())==0) {
546 pObject=Extract(buffer.GetArray(), buffer.GetSize(), verbosity);
547 } else {
548 log.LoggingVarargs(kHLTLogError, "AliHLTMessage", "Extract" , __FILE__ , __LINE__ , "failed reading %d byte(s) from file %s", pFile->GetSize(), filename);
549 }
550 }
551
552 delete pFile;
553 return pObject;
554}