3 //**************************************************************************
4 //* This file is property of and copyright by the ALICE *
5 //* ALICE Experiment at CERN, All rights reserved. *
7 //* Primary Authors: Matthias Richter <Matthias.Richter@ift.uib.no> *
9 //* Permission to use, copy, modify and distribute this software and its *
10 //* documentation strictly for non-commercial purposes is hereby granted *
11 //* without fee, provided that the above copyright notice appears in all *
12 //* copies and that both the copyright notice and this permission notice *
13 //* appear in the supporting documentation. The authors make no claims *
14 //* about the suitability of this software for any purpose. It is *
15 //* provided "as is" without express or implied warranty. *
16 //**************************************************************************
18 /// @file AliHLTRootSchemaEvolutionComponent.cxx
19 /// @author Matthias Richter
21 /// @brief Handler component for ROOT schema evolution of streamed objects
24 #include "AliHLTRootSchemaEvolutionComponent.h"
25 #include "AliHLTMessage.h"
26 #include "AliHLTReadoutList.h"
27 #include "TObjArray.h"
28 #include "TStreamerInfo.h"
31 #include "TStopwatch.h"
32 #include "TTimeStamp.h"
35 #include "AliCDBStorage.h"
36 #include "AliCDBManager.h"
37 #include "AliCDBPath.h"
39 #include "AliCDBMetaData.h"
40 #include "AliCDBEntry.h"
43 using std::accumulate;
47 // Helper class for std::accumulate algorithm.
50 typedef int first_argument_type;
51 typedef AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem second_argument_type;
52 typedef bool result_type;
53 int operator() (int a, AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem b) {
54 return a+b.GetTotalTime();
59 /** ROOT macro for the implementation of ROOT specific class methods */
60 ClassImp(AliHLTRootSchemaEvolutionComponent)
62 AliHLTRootSchemaEvolutionComponent::AliHLTRootSchemaEvolutionComponent()
63 : AliHLTCalibrationProcessor()
65 , fPropertyFlags(kFXS)
66 , fpStreamerInfos(NULL)
73 // see header file for class documentation
75 // refer to README to build package
77 // visit http://web.ift.uib.no/~kjeks/doc/alice-hlt
81 // FIXME: read below when defining an OCDB object here
82 const char* AliHLTRootSchemaEvolutionComponent::fgkConfigurationObject=NULL;
83 const AliHLTUInt32_t AliHLTRootSchemaEvolutionComponent::fgkTimeScale=1000000;
85 AliHLTRootSchemaEvolutionComponent::~AliHLTRootSchemaEvolutionComponent()
87 // see header file for class documentation
88 if (fpStreamerInfos) {
89 fpStreamerInfos->Clear();
90 delete fpStreamerInfos;
95 void AliHLTRootSchemaEvolutionComponent::GetInputDataTypes(AliHLTComponentDataTypeList& list)
97 // see header file for class documentation
98 list.push_back(kAliHLTAnyDataType);
101 AliHLTComponentDataType AliHLTRootSchemaEvolutionComponent::GetOutputDataType()
103 // see header file for class documentation
104 return kAliHLTDataTypeStreamerInfo;
107 void AliHLTRootSchemaEvolutionComponent::GetOutputDataSize(unsigned long& constBase, double& inputMultiplier)
109 // see header file for class documentation
111 // this is nothing more than an assumption, in fact it's very difficult to predict how
112 // much output the component produces
117 int AliHLTRootSchemaEvolutionComponent::InitCalibration()
119 // see header file for class documentation
123 // default configuration from CDB
124 // FIXME: has to be called from AliHLTCalibrationProcessor::DoInit in order to set
125 // the default parameters from OCDB before the custom argument scan
126 // not valid at the moment because fgkConfigurationObject==NULL
127 if (iResult>=0 && fgkConfigurationObject!=NULL) iResult=ConfigureFromCDBTObjString(fgkConfigurationObject);
130 fpStreamerInfos=new TObjArray();
131 if (!fpStreamerInfos) iResult=-ENOMEM;
133 fpEventTimer=new TStopwatch;
135 fpEventTimer->Reset();
137 fpCycleTimer=new TStopwatch;
139 fpCycleTimer->Reset();
146 int AliHLTRootSchemaEvolutionComponent::DeinitCalibration()
148 // see header file for class documentation
149 if (fFileName.IsNull()==0) {
150 WriteToFile(fFileName, fpStreamerInfos);
154 if (fpStreamerInfos) {
155 fpStreamerInfos->Clear();
156 delete fpStreamerInfos;
158 fpStreamerInfos=NULL;
171 int AliHLTRootSchemaEvolutionComponent::ProcessCalibration( const AliHLTComponentEventData& /*evtData*/,
172 AliHLTComponentTriggerData& /*trigData*/ )
174 // see header file for class documentation
176 AliHLTUInt32_t eventType=gkAliEventTypeUnknown;
177 if (!IsDataEvent(&eventType) &&
178 eventType==gkAliEventTypeStartOfRun) {
182 AliHLTUInt32_t listtime=accumulate(fList.begin(), fList.end(), int(0), AliTimeSum());
183 AliHLTUInt32_t averageEventTime=0;
184 AliHLTUInt32_t averageCycleTime=0;
186 AliHLTUInt32_t proctime=0;
188 averageEventTime=AliHLTUInt32_t(fpEventTimer->RealTime()*fgkTimeScale)/(GetEventCount()+1);
189 proctime=AliHLTUInt32_t(fpEventTimer->RealTime()*fgkTimeScale);
190 fpEventTimer->Start(kFALSE);
193 fpCycleTimer->Stop();
194 averageCycleTime=AliHLTUInt32_t(fpCycleTimer->RealTime()*fgkTimeScale)/(GetEventCount()+1);
197 // scale down the event processing according to the required rate
198 // and average processing time.
199 for (const AliHLTComponentBlockData* pBlock=GetFirstInputBlock();
200 pBlock && iResult>=0;
201 pBlock=GetNextInputBlock()) {
202 bool processBlock=true;
203 AliHLTDataBlockItem* item=FindItem(pBlock->fDataType, pBlock->fSpecification);
205 // TODO: do a selection of blocks on basis of the time spent in its processing
206 // for now only the global processing time is checked
207 // process if the average event time is smaller then the cycle time, i.e.
208 // the time is spent outside the component
209 // apply a factor 4 margin
210 processBlock=4*averageEventTime<fMaxEventTime || 2*averageEventTime<averageCycleTime;
212 // always process new incoming blocks
214 fList.push_back(AliHLTDataBlockItem(pBlock->fDataType, pBlock->fSpecification));
215 item=&fList[fList.size()-1];
218 TObject* pObj=item->Extract(pBlock);
220 AliHLTMessage msg(kMESS_OBJECT);
221 msg.EnableSchemaEvolution();
222 if ((iResult=item->Stream(pObj, msg))>=0) {
223 iResult=UpdateStreamerInfos(msg.GetStreamerInfos(), fpStreamerInfos);
225 HLTError("failed to stream object %s of type %s", pObj->GetName(), pObj->ClassName());
234 fpEventTimer->Stop();
235 proctime=AliHLTUInt32_t(fpEventTimer->RealTime()*fgkTimeScale)-proctime;
236 averageEventTime=AliHLTUInt32_t(fpEventTimer->RealTime()*fgkTimeScale)/(GetEventCount()+1);
238 // info output once every 2 seconds
239 static UInt_t lastTime=0;
241 if (time.Get()-lastTime>2) {
243 HLTInfo("event time %d, average time %d, list time %d, cycle time %d", proctime, averageEventTime, listtime, averageCycleTime);
247 fpCycleTimer->Start(kFALSE);
251 if ((TestBits(kHLTOUTatFirstEvent) && GetEventCount()==0) ||
252 (TestBits(kHLTOUTatAllEvents))) {
253 PushBack(fpStreamerInfos, kAliHLTDataTypeStreamerInfo);
257 if (TestBits(kFXS) && fFXSPrescaler>0 && (GetEventCount()%fFXSPrescaler)==0) {
259 AliHLTReadoutList rdList(AliHLTReadoutList::kHLT);
260 PushToFXS((TObject*)fpStreamerInfos, "HLT", "StreamerInfo", &rdList );
266 int AliHLTRootSchemaEvolutionComponent::ShipDataToFXS( const AliHLTComponentEventData& /*evtData*/,
267 AliHLTComponentTriggerData& /*trigData*/)
269 // see header file for class documentation
270 if (TestBits(kFXS)) {
272 AliHLTReadoutList rdList(AliHLTReadoutList::kHLT);
273 PushToFXS((TObject*)fpStreamerInfos, "HLT", "StreamerInfo", &rdList );
276 if (fFileName.IsNull()==0) {
277 WriteToFile(fFileName, fpStreamerInfos);
281 if (TestBits(kHLTOUTatEOR)) {
282 PushBack(fpStreamerInfos, kAliHLTDataTypeStreamerInfo);
285 for (unsigned i=0; i<fList.size(); i++) {
286 if (CheckFilter(kHLTLogDebug)) fList[i].Print("short");
287 else if (fList[i].IsObject()) {
288 HLTInfo("AliHLTDataBlockItem %s %08x\n"
289 " average extraction time: %d usec\n"
290 " average streaming time: %d usec"
291 , AliHLTComponent::DataType2Text(fList[i]).c_str()
292 , fList[i].GetSpecification()
293 , fList[i].GetExtractionTime()
294 , fList[i].GetStreamingTime());
301 int AliHLTRootSchemaEvolutionComponent::UpdateStreamerInfos(const TList* list, TObjArray* infos) const
303 // see header file for class documentation
305 if (!list || !infos) {
309 TObject* element=NULL;
310 TIter next((TList*)list);
311 while ((element = next())) {
312 TStreamerInfo* pInfo=dynamic_cast<TStreamerInfo*>(element);
313 if (!pInfo) continue;
314 TString name=pInfo->GetName();
316 if (pInfo->GetClassVersion()==0) continue; // skip classes which are not for storage
317 for (; i<infos->GetEntriesFast(); i++) {
318 if (name.CompareTo(infos->At(i)->GetName())==0 &&
319 pInfo->GetClassVersion() == ((TStreamerInfo*)infos->At(i))->GetClassVersion()) {
325 // Add streamer info if not yet there
326 if (i>=infos->GetEntriesFast()) {
334 int AliHLTRootSchemaEvolutionComponent::ScanConfigurationArgument(int argc, const char** argv)
336 // see header file for class documentation
338 if (argc<=0) return 0;
340 TString argument=argv[i];
342 // -hltout=[all,first,eor,off]
343 if (argument.Contains("-hltout")) {
344 argument.ReplaceAll("-hltout", "");
345 argument.ReplaceAll("=", "");
346 if (argument.IsNull() || argument.CompareTo("all")==0) {
347 SetBits(kHLTOUTatAllEvents|kHLTOUTatEOR);
348 } else if (argument.CompareTo("first")==0) {
349 SetBits(kHLTOUTatFirstEvent);
350 } else if (argument.CompareTo("eor")==0) {
351 SetBits(kHLTOUTatEOR);
352 } else if (argument.CompareTo("off")==0) {
353 ClearBits(kHLTOUTatAllEvents | kHLTOUTatFirstEvent | kHLTOUTatEOR);
355 HLTError("invalid parameter for argument -hltout= : %s", argument.Data());
362 if (argument.Contains("-fxs")) {
363 argument.ReplaceAll("-fxs", "");
364 argument.ReplaceAll("=", "");
366 if (argument.IsNull()) {
367 } else if (argument.CompareTo("off")==0) {
369 } else if (argument.IsDigit()) {
370 fFXSPrescaler=argument.Atoi();
372 HLTError("invalid parameter for argument -fxs= : %s", argument.Data());
379 if (argument.Contains("-file=")) {
380 argument.ReplaceAll("-file=", "");
381 if (!argument.IsNull()) {
384 HLTError("argument -file= expects file name");
390 if (argument.Contains("-rate=")) {
391 argument.ReplaceAll("-rate=", "");
392 AliHLTUInt32_t rate=argument.Atoi();
393 if (rate>0 && rate<fgkTimeScale) {
394 fMaxEventTime=fgkTimeScale/rate;
396 HLTError("argument -file= expects number [Hz]");
405 int AliHLTRootSchemaEvolutionComponent::WriteToFile(const char* filename, const TObjArray* infos) const
407 // write aray of streamer infos to file
408 if (!filename || !infos) return -EINVAL;
410 TFile out(filename, "RECREATE");
411 if (out.IsZombie()) {
412 HLTError("failed to open file %s", filename);
416 const char* entrypath="HLT/Calib/StreamerInfo";
418 AliCDBStorage* store = NULL;
419 // TODO: to be activated later, first some methods need to be made
420 // public in AliCDBManager. Or some new methods to be added
421 //if (AliCDBManager::Instance()->SelectSpecificStorage(entrypath))
422 // store = AliCDBManager::Instance()->GetSpecificStorage(entrypath);
424 store = AliCDBManager::Instance()->GetDefaultStorage();
425 AliCDBEntry* existingEntry=NULL;
426 if (store && store->GetLatestVersion(entrypath, GetRunNo())>=0 &&
427 (existingEntry=AliCDBManager::Instance()->Get(entrypath))!=NULL) {
428 version=existingEntry->GetId().GetVersion();
432 TObjArray* clone=NULL;
434 if (existingEntry && existingEntry->GetObject()) {
435 TObject* cloneObj=existingEntry->GetObject()->Clone();
436 if (cloneObj) clone=dynamic_cast<TObjArray*>(cloneObj);
437 if (MergeStreamerInfo(clone, infos)==0) {
438 // no change, store with identical version
439 version=existingEntry->GetId().GetVersion();
442 TObject* cloneObj=infos->Clone();
443 if (cloneObj) clone=dynamic_cast<TObjArray*>(cloneObj);
446 HLTError("failed to clone streamer info object array");
450 AliCDBPath cdbPath(entrypath);
451 AliCDBId cdbId(cdbPath, AliCDBManager::Instance()->GetRun(), AliCDBRunRange::Infinity(), version, 0);
452 AliCDBMetaData* cdbMetaData=new AliCDBMetaData;
453 cdbMetaData->SetResponsible("ALICE HLT Matthias.Richter@cern.ch");
454 cdbMetaData->SetComment("Streamer info for HLTOUT payload");
455 AliCDBEntry* entry=new AliCDBEntry(clone, cdbId, cdbMetaData, kTRUE);
459 // this is a small memory leak
460 // seg fault in ROOT object handling if the two objects are deleted
463 //delete cdbMetaData;
469 int AliHLTRootSchemaEvolutionComponent::MergeStreamerInfo(TObjArray* tgt, const TObjArray* src)
471 /// merge streamer info entries from source array to target array
472 /// return 1 if target array has been changed
474 // add all existing infos if not existing in the current one, or having
475 // different class version
477 if (!tgt || !src) return -EINVAL;
480 // check if all infos from the existing entry are in the new entry and with
481 // identical class version
483 TObject* nextobj=NULL;
484 while ((nextobj=next())) {
485 TStreamerInfo* srcInfo=dynamic_cast<TStreamerInfo*>(nextobj);
486 if (!srcInfo) continue;
487 TString srcInfoName=srcInfo->GetName();
490 for (; i<tgt->GetEntriesFast(); i++) {
491 if (tgt->At(i)==NULL) continue;
492 if (srcInfoName.CompareTo(tgt->At(i)->GetName())!=0) continue;
493 // TODO: 2010-08-23 some more detailed investigation is needed.
494 // Structures used for data exchange, e.g. AliHLTComponentDataType
495 // or AliHLTEventDDLV1 do not have a class version, but need to be stored in the
496 // streamer info. Strictly speaking not, because those structures are not supposed
497 // to be changed at all, so they should be the same in all versions in the future.
498 // There has been a problem with detecting whether the streamer info is already in
499 // the target array if the srcInfo has class version -1. As it just concerns
500 // structures not going to be changed we can safely skip checking the class version,
501 // as long as the entry is already in the target streamer infos it does not need
502 // to be copied again.
503 if (srcInfo->GetClassVersion()<0) break;
504 TStreamerInfo* tgtInfo=dynamic_cast<TStreamerInfo*>(tgt->At(i));
505 if (tgtInfo && tgtInfo->GetClassVersion()==srcInfo->GetClassVersion()) break;
507 if (i<tgt->GetEntriesFast()) continue;
517 AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem*
518 AliHLTRootSchemaEvolutionComponent::FindItem(AliHLTComponentDataType dt,
521 /// find item in the list
522 // vector<AliHLTDataBlockItem>::iterator element=std::find(fList.begin(), fList.end(), AliHLTDataBlockItem(dt,spec));
523 // if (element!=fList.end()) return &(*element);
524 for (unsigned i=0; i<fList.size(); i++) {
525 if (fList[i]==dt && fList[i]==spec) return &fList[i];
530 AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem::AliHLTDataBlockItem(AliHLTComponentDataType dt,
533 , fSpecification(spec)
536 , fExtractionTimeUsec(0)
539 , fStreamingTimeUsec(0)
542 // helper class to keep track of input data blocks
543 // in the AliHLTRootSchemaEvolutionComponent
547 AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem::~AliHLTDataBlockItem()
552 TObject* AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem::Extract(const AliHLTComponentBlockData* bd)
554 /// extract data block to root object, and update performance parameters
555 /// object needs to be deleted externally
556 if (!bd || !bd->fPtr || bd->fSize<8) return NULL;
558 AliHLTUInt32_t firstWord=*((AliHLTUInt32_t*)bd->fPtr);
559 if (!(fIsObject=(firstWord==bd->fSize-sizeof(AliHLTUInt32_t)))) return NULL;
563 AliHLTMessage msg(bd->fPtr, bd->fSize);
564 TClass* objclass=msg.GetClass();
565 if (!(fIsObject=(objclass!=NULL))) return NULL;
566 TObject* pObj=msg.ReadObject(objclass);
567 if (!(fIsObject=(pObj!=NULL))) return NULL;
569 AliHLTUInt32_t usec=AliHLTUInt32_t(sw.RealTime()*fgkTimeScale);
571 fExtractionTimeUsec+=usec;
573 fLastExtraction=(ts.GetSec()%1000)*fgkTimeScale + ts.GetNanoSec()/1000;
577 int AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem::Stream(const TObject* obj, AliHLTMessage& msg)
579 /// stream object and update performance parameters
580 if (!obj) return -EINVAL;
583 msg.WriteObject(obj);
585 AliHLTUInt32_t usec=AliHLTUInt32_t(sw.RealTime()*fgkTimeScale);
587 fStreamingTimeUsec+=usec;
589 fLastStreaming=(ts.GetSec()%1000)*fgkTimeScale + ts.GetNanoSec()/1000;
593 void AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem::Print(const char* option) const
596 if (fIsObject || !(strcmp(option, "short")==0))
597 cout << "AliHLTDataBlockItem: " << AliHLTComponent::DataType2Text(fDt).c_str() << " " << hex << fSpecification << dec << endl;
599 if (fNofExtractions>0) cout << " average extraction time: " << fExtractionTimeUsec/fNofExtractions << " usec" << endl;
600 else cout << " never extracted" << endl;
601 if (fNofStreamings>0) cout << " average streaming time: " << fStreamingTimeUsec/fNofStreamings << " usec" << endl;
602 else cout << " never streamed" << endl;