//////////////////////////////////////////////////////////////////////////
#include "AliHLTMessage.h"
+#include "TVirtualStreamerInfo.h"
#include "Bytes.h"
#include "TFile.h"
+#include "TProcessID.h"
+#include "TClass.h"
extern "C" void R__zip (Int_t cxlevel, Int_t *nin, char *bufin, Int_t *lout, char *bufout, Int_t *nout);
extern "C" void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout);
const Int_t kMAXBUF = 0xffffff;
+Bool_t AliHLTMessage::fgEvolution = kFALSE;
+
ClassImp(AliHLTMessage)
//______________________________________________________________________________
fBufComp(0),
fBufCompCur(0),
fCompPos(0)
+ , fBufUncompressed(0)
+ , fBitsPIDs(0)
+ , fInfos(NULL)
+ , fEvolution(kFALSE)
{
// Create a AliHLTMessage object for storing objects. The "what" integer
// describes the type of message. Predifined ROOT system message types
*this << what;
+ SetBit(kCannotHandleMemberWiseStreaming);
}
const Int_t AliHLTMessage::fgkMinimumSize=30;
fBufComp(0),
fBufCompCur(0),
fCompPos(0)
+ , fBufUncompressed(0)
+ , fBitsPIDs(0)
+ , fInfos(NULL)
+ , fEvolution(kFALSE)
{
// Create a AliHLTMessage object for reading objects. The objects will be
// read from buf. Use the What() method to get the message type.
fBufCompCur = fBuffer + bufsize;
fBuffer = 0;
Uncompress();
+ // Matthias Sep 2008
+ // NOTE: this is not done in TMessage and will lead to the deletion
+ // of the buffer. This is not allowed in case of HLT where the
+ // buffer is handled by the framework. In general, I think this
+ // is a very bad idea to do it like that in TMessage
+ fBufComp = NULL;
+ fBufCompCur = 0;
}
if (fWhat == kMESS_OBJECT) {
AliHLTMessage::~AliHLTMessage()
{
// Clean up compression buffer.
- delete [] fBufComp;
+ Reset();
+}
+
+//______________________________________________________________________________
+void AliHLTMessage::EnableSchemaEvolutionForAll(Bool_t enable)
+{
+ // Static function enabling or disabling the automatic schema evolution.
+ // By default schema evolution support is off.
+
+ fgEvolution = enable;
+}
+
+//______________________________________________________________________________
+Bool_t AliHLTMessage::UsesSchemaEvolutionForAll()
+{
+ // Static function returning status of global schema evolution.
+
+ return fgEvolution;
+}
+
+//______________________________________________________________________________
+void AliHLTMessage::ForceWriteInfo(TVirtualStreamerInfo *info, Bool_t /* force */)
+{
+ // Force writing the TStreamerInfo to the message.
+
+ if (fgEvolution || fEvolution) {
+ if (!fInfos) fInfos = new TList();
+ if (fInfos->FindObject(info->GetName())==NULL) {
+ fInfos->Add(info);
+ }
+ }
}
//______________________________________________________________________________
if (IsReading()) {
SetWriteMode();
SetBufferOffset(fBufSize);
+ SetBit(kCannotHandleMemberWiseStreaming);
if (fBufComp) {
fCompPos = fBufCur;
}
}
+//______________________________________________________________________________
+void AliHLTMessage::IncrementLevel(TVirtualStreamerInfo *info)
+{
+ // Increment level.
+
+ TBufferFile::IncrementLevel(info);
+
+ if (!info) return;
+ if (fgEvolution || fEvolution) {
+ if (!fInfos) fInfos = new TList();
+
+ // add the streamer info, but only once
+ // this assumes that there is only one version
+ if (fInfos->FindObject(info->GetName())==NULL) {
+ fInfos->Add(info);
+ }
+ }
+}
+
//______________________________________________________________________________
void AliHLTMessage::Reset()
{
fBufCompCur = 0;
fCompPos = 0;
}
+ if (fBufUncompressed) {
+ delete [] fBufUncompressed;
+ fBufUncompressed=NULL;
+ }
}
//______________________________________________________________________________
if (fBufComp) {
buf = fBufComp;
- *((UInt_t*)buf) = (UInt_t)(Length() - sizeof(UInt_t));
+ *((UInt_t*)buf) = (UInt_t)(CompLength() - sizeof(UInt_t));
}
}
}
Int_t buflen;
Int_t hdrlen = 2*sizeof(UInt_t);
- UChar_t *bufcur = (UChar_t*)fBufComp + hdrlen;
- frombuf((char *&)bufcur, &buflen);
+ char *bufcur1 = fBufComp + hdrlen;
+ frombuf(bufcur1, &buflen);
+ UChar_t *bufcur = (UChar_t*)bufcur1;
fBuffer = new char[buflen];
+ fBufUncompressed = fBuffer;
fBufSize = buflen;
fBufCur = fBuffer + sizeof(UInt_t) + sizeof(fWhat);
fBufMax = fBuffer + fBufSize;
return 0;
}
+//______________________________________________________________________________
+void AliHLTMessage::WriteObject(const TObject *obj)
+{
+ // Write object to message buffer.
+ // When support for schema evolution is enabled the list of TStreamerInfo
+ // used to stream this object is kept in fInfos. This information is used
+ // by TSocket::Send that sends this list through the socket. This list is in
+ // turn used by TSocket::Recv to store the TStreamerInfo objects in the
+ // relevant TClass in case the TClass does not know yet about a particular
+ // class version. This feature is implemented to support clients and servers
+ // with either different ROOT versions or different user classes versions.
+
+ if (fgEvolution || fEvolution) {
+ if (fInfos)
+ fInfos->Clear();
+ else
+ fInfos = new TList();
+ }
+
+ fBitsPIDs.ResetAllBits();
+ WriteObjectAny(obj, TObject::Class());
+}
+
+//______________________________________________________________________________
+UShort_t AliHLTMessage::WriteProcessID(TProcessID *pid)
+{
+ // Check if the ProcessID pid is already in the message.
+ // If not, then:
+ // - mark bit 0 of fBitsPIDs to indicate that a ProcessID has been found
+ // - mark bit uid+1 where uid id the uid of the ProcessID
+
+ if (fBitsPIDs.TestBitNumber(0)) return 0;
+ if (!pid)
+ pid = TProcessID::GetPID();
+ if (!pid) return 0;
+ fBitsPIDs.SetBitNumber(0);
+ UInt_t uid = pid->GetUniqueID();
+ fBitsPIDs.SetBitNumber(uid+1);
+ return 1;
+}
+
+AliHLTMessage* AliHLTMessage::Stream(TObject* pSrc, Int_t compression, unsigned verbosity)
+{
+ /// Helper function to stream an object into an AliHLTMessage
+ /// The returned instance must be cleaned by the caller
+ ///
+ /// Get the data and data size from the message:
+ /// first check
+ /// pMsg->CompLength();
+ /// pMsg->CompBuffer();
+ /// if that is NULL
+ /// pMsg->Length();
+ /// pMsg->Buffer();
+ ///
+ /// Note: accessing scheme will be change din the future to just have the two
+ /// latter ones.
+ if (!pSrc) return NULL;
+
+ AliHLTLogging log;
+ AliHLTMessage* pMsg=new AliHLTMessage(kMESS_OBJECT);
+ if (!pMsg) {
+ log.LoggingVarargs(kHLTLogError, "AliHLTMessage", "Stream" , __FILE__ , __LINE__ , "memory allocation failed");
+ return NULL;
+ }
+
+ pMsg->SetCompressionLevel(compression);
+ pMsg->WriteObject(pSrc);
+ if (pMsg->Length()>0) {
+ // Matthias Sep 2008
+ // NOTE: AliHLTMessage does implement it's own SetLength method
+ // which is not architecture independent. The original SetLength
+ // stores the size always in network byte order.
+ // I'm trying to remember the rational for that, might be that
+ // it was just some lack of knowledge. Want to change this, but
+ // has to be done carefully to be backward compatible.
+ pMsg->SetLength(); // sets the length to the first (reserved) word
+
+ // does nothing if the level is 0
+ pMsg->Compress();
+
+ if (pMsg->CompBuffer()) {
+ pMsg->SetLength(); // set once more to have the byte order
+ if (verbosity>0) log.LoggingVarargs(kHLTLogInfo, "AliHLTMessage", "Stream" , __FILE__ , __LINE__ , "object %p type %s streamed: size %d", pSrc, pSrc->GetName(), pMsg->CompLength());
+ } else {
+ if (verbosity>0) log.LoggingVarargs(kHLTLogInfo, "AliHLTMessage", "Stream" , __FILE__ , __LINE__ , "object %p type %s streamed: size %d", pSrc, pSrc->GetName(), pMsg->Length());
+ }
+ }
+ return pMsg;
+}
+
+TObject* AliHLTMessage::Extract(const void* pBuffer, unsigned bufferSize, unsigned verbosity)
+{
+ /// Helper function to extract an object from a buffer.
+ /// The returned object must be cleaned by the caller
+ AliHLTLogging log;
+ if (!pBuffer || bufferSize<sizeof(AliHLTUInt32_t)) {
+ if (verbosity>0) log.LoggingVarargs(kHLTLogWarning, "AliHLTMessage", "Extract" , __FILE__ , __LINE__ , "invalid input buffer %p %d", pBuffer, bufferSize);
+ return NULL;
+ }
+
+ AliHLTUInt32_t firstWord=*((AliHLTUInt32_t*)pBuffer);
+ if (firstWord==bufferSize-sizeof(AliHLTUInt32_t) &&
+ firstWord>=34 /*thats the minimum size of a streamed TObject*/) {
+ AliHLTMessage msg((AliHLTUInt8_t*)pBuffer, bufferSize);
+ TClass* objclass=msg.GetClass();
+ TObject* pObject=msg.ReadObject(objclass);
+ if (pObject && objclass) {
+ if (verbosity>0) log.LoggingVarargs(kHLTLogInfo, "AliHLTMessage", "Extract" , __FILE__ , __LINE__ , "object %p type %s created", pObject, objclass->GetName());
+ return pObject;
+ } else {
+ if (verbosity>0) log.LoggingVarargs(kHLTLogWarning, "AliHLTMessage", "Extract" , __FILE__ , __LINE__ , "failed to create object from buffer of size %d", bufferSize);
+ }
+ } else {
+ if (verbosity>0) log.LoggingVarargs(kHLTLogWarning, "AliHLTMessage", "Extract" , __FILE__ , __LINE__ , "not a streamed TObject: block size %d, indicated %d", bufferSize, firstWord+sizeof(AliHLTUInt32_t));
+ }
+ return NULL;
+}
+
+TObject* AliHLTMessage::Extract(const char* filename, unsigned verbosity)
+{
+ /// Helper function to extract an object from a file containing the streamed object.
+ /// The returned object must be cleaned by the caller
+ if (!filename) return NULL;
+
+ AliHLTLogging log;
+ TString input=filename;
+ input+="?filetype=raw";
+ TFile* pFile=new TFile(input);
+ if (!pFile) return NULL;
+ TObject* pObject=NULL;
+ if (!pFile->IsZombie()) {
+ pFile->Seek(0);
+ TArrayC buffer;
+ buffer.Set(pFile->GetSize());
+ if (pFile->ReadBuffer(buffer.GetArray(), buffer.GetSize())==0) {
+ pObject=Extract(buffer.GetArray(), buffer.GetSize(), verbosity);
+ } else {
+ log.LoggingVarargs(kHLTLogError, "AliHLTMessage", "Extract" , __FILE__ , __LINE__ , "failed reading %d byte(s) from file %s", pFile->GetSize(), filename);
+ }
+ }
+
+ delete pFile;
+ return pObject;
+}