3 //**************************************************************************
4 //* This file is property of and copyright by the ALICE HLT Project *
5 //* ALICE Experiment at CERN, All rights reserved. *
7 //* Primary Authors: Matthias Richter <Matthias.Richter@ift.uib.no> *
8 //* for The ALICE HLT Project. *
10 //* Permission to use, copy, modify and distribute this software and its *
11 //* documentation strictly for non-commercial purposes is hereby granted *
12 //* without fee, provided that the above copyright notice appears in all *
13 //* copies and that both the copyright notice and this permission notice *
14 //* appear in the supporting documentation. The authors make no claims *
15 //* about the suitability of this software for any purpose. It is *
16 //* provided "as is" without express or implied warranty. *
17 //**************************************************************************
19 /** @file AliHLTRootSchemaEvolutionComponent.cxx
20 @author Matthias Richter
22 @brief Handler component for ROOT schema evolution of streamed objects
25 #include "AliHLTRootSchemaEvolutionComponent.h"
26 #include "AliHLTMessage.h"
27 #include "AliHLTReadoutList.h"
28 #include "TObjArray.h"
29 #include "TStreamerInfo.h"
32 #include "TStopwatch.h"
33 #include "TTimeStamp.h"
36 #include "AliCDBStorage.h"
37 #include "AliCDBManager.h"
38 #include "AliCDBPath.h"
40 #include "AliCDBMetaData.h"
41 #include "AliCDBEntry.h"
44 using std::accumulate;
48 // Helper class for std::accumulate algorithm.
51 typedef int first_argument_type;
52 typedef AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem second_argument_type;
53 typedef bool result_type;
54 int operator() (int a, AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem b) {
55 return a+b.GetTotalTime();
60 /** ROOT macro for the implementation of ROOT specific class methods */
61 ClassImp(AliHLTRootSchemaEvolutionComponent)
63 AliHLTRootSchemaEvolutionComponent::AliHLTRootSchemaEvolutionComponent()
64 : AliHLTCalibrationProcessor()
67 , fpStreamerInfos(NULL)
74 // see header file for class documentation
76 // refer to README to build package
78 // visit http://web.ift.uib.no/~kjeks/doc/alice-hlt
82 // FIXME: read below when defining an OCDB object here
83 const char* AliHLTRootSchemaEvolutionComponent::fgkConfigurationObject=NULL;
84 const AliHLTUInt32_t AliHLTRootSchemaEvolutionComponent::fgkTimeScale=1000000;
86 AliHLTRootSchemaEvolutionComponent::~AliHLTRootSchemaEvolutionComponent()
88 // see header file for class documentation
89 if (fpStreamerInfos) {
90 fpStreamerInfos->Clear();
91 delete fpStreamerInfos;
96 void AliHLTRootSchemaEvolutionComponent::GetInputDataTypes(AliHLTComponentDataTypeList& list)
98 // see header file for class documentation
99 list.push_back(kAliHLTAnyDataType);
102 AliHLTComponentDataType AliHLTRootSchemaEvolutionComponent::GetOutputDataType()
104 // see header file for class documentation
105 return kAliHLTDataTypeStreamerInfo;
108 void AliHLTRootSchemaEvolutionComponent::GetOutputDataSize(unsigned long& constBase, double& inputMultiplier)
110 // see header file for class documentation
112 // this is nothing more than an assumption, in fact it's very difficult to predict how
113 // much output the component produces
118 int AliHLTRootSchemaEvolutionComponent::InitCalibration()
120 // see header file for class documentation
124 // default configuration from CDB
125 // FIXME: has to be called from AliHLTCalibrationProcessor::DoInit in order to set
126 // the default parameters from OCDB before the custom argument scan
127 // not valid at the moment because fgkConfigurationObject==NULL
128 if (iResult>=0 && fgkConfigurationObject!=NULL) iResult=ConfigureFromCDBTObjString(fgkConfigurationObject);
131 fpStreamerInfos=new TObjArray();
132 if (!fpStreamerInfos) iResult=-ENOMEM;
134 fpEventTimer=new TStopwatch;
136 fpEventTimer->Reset();
138 fpCycleTimer=new TStopwatch;
140 fpCycleTimer->Reset();
147 int AliHLTRootSchemaEvolutionComponent::DeinitCalibration()
149 // see header file for class documentation
150 if (fFileName.IsNull()==0) {
151 WriteToFile(fFileName, fpStreamerInfos);
155 if (fpStreamerInfos) {
156 fpStreamerInfos->Clear();
157 delete fpStreamerInfos;
159 fpStreamerInfos=NULL;
172 int AliHLTRootSchemaEvolutionComponent::ProcessCalibration( const AliHLTComponentEventData& /*evtData*/,
173 AliHLTComponentTriggerData& /*trigData*/ )
175 // see header file for class documentation
177 AliHLTUInt32_t eventType=gkAliEventTypeUnknown;
178 if (!IsDataEvent(&eventType) &&
179 eventType==gkAliEventTypeStartOfRun) {
183 AliHLTUInt32_t listtime=accumulate(fList.begin(), fList.end(), int(0), AliTimeSum());
184 AliHLTUInt32_t averageEventTime=0;
185 AliHLTUInt32_t averageCycleTime=0;
187 AliHLTUInt32_t proctime=0;
189 averageEventTime=AliHLTUInt32_t(fpEventTimer->RealTime()*fgkTimeScale)/(GetEventCount()+1);
190 proctime=AliHLTUInt32_t(fpEventTimer->RealTime()*fgkTimeScale);
191 fpEventTimer->Start(kFALSE);
194 fpCycleTimer->Stop();
195 averageCycleTime=AliHLTUInt32_t(fpCycleTimer->RealTime()*fgkTimeScale)/(GetEventCount()+1);
198 // scale down the event processing according to the required rate
199 // and average processing time.
200 for (const AliHLTComponentBlockData* pBlock=GetFirstInputBlock();
201 pBlock && iResult>=0;
202 pBlock=GetNextInputBlock()) {
203 bool processBlock=true;
204 AliHLTDataBlockItem* item=FindItem(pBlock->fDataType, pBlock->fSpecification);
206 // TODO: do a selection of blocks on basis of the time spent in its processing
207 // for now only the global processing time is checked
208 // process if the average event time is smaller then the cycle time, i.e.
209 // the time is spent outside the component
210 // apply a factor 4 margin
211 processBlock=4*averageEventTime<fMaxEventTime || 2*averageEventTime<averageCycleTime;
213 // always process new incoming blocks
215 fList.push_back(AliHLTDataBlockItem(pBlock->fDataType, pBlock->fSpecification));
216 item=&fList[fList.size()-1];
219 TObject* pObj=item->Extract(pBlock);
221 AliHLTMessage msg(kMESS_OBJECT);
222 msg.EnableSchemaEvolution();
223 if ((iResult=item->Stream(pObj, msg))>=0) {
224 iResult=UpdateStreamerInfos(msg.GetStreamerInfos(), fpStreamerInfos);
226 HLTError("failed to stream object %s of type %s", pObj->GetName(), pObj->ClassName());
235 fpEventTimer->Stop();
236 proctime=AliHLTUInt32_t(fpEventTimer->RealTime()*fgkTimeScale)-proctime;
237 averageEventTime=AliHLTUInt32_t(fpEventTimer->RealTime()*fgkTimeScale)/(GetEventCount()+1);
239 // info output once every 2 seconds
240 static UInt_t lastTime=0;
242 if (time.Get()-lastTime>2) {
244 HLTInfo("event time %d, average time %d, list time %d, cycle time %d", proctime, averageEventTime, listtime, averageCycleTime);
248 fpCycleTimer->Start(kFALSE);
252 if ((TestBits(kHLTOUTatFirstEvent) && GetEventCount()==0) ||
253 (TestBits(kHLTOUTatAllEvents))) {
254 PushBack(fpStreamerInfos, kAliHLTDataTypeStreamerInfo);
258 if (TestBits(kFXS) && fFXSPrescaler>0 && (GetEventCount()%fFXSPrescaler)==0) {
260 AliHLTReadoutList rdList(AliHLTReadoutList::kHLT);
261 PushToFXS((TObject*)fpStreamerInfos, "HLT", "StreamerInfo", &rdList );
267 int AliHLTRootSchemaEvolutionComponent::ShipDataToFXS( const AliHLTComponentEventData& /*evtData*/,
268 AliHLTComponentTriggerData& /*trigData*/)
270 // see header file for class documentation
271 if (TestBits(kFXS)) {
273 AliHLTReadoutList rdList(AliHLTReadoutList::kHLT);
274 PushToFXS((TObject*)fpStreamerInfos, "HLT", "StreamerInfo", &rdList );
277 if (fFileName.IsNull()==0) {
278 WriteToFile(fFileName, fpStreamerInfos);
282 if (TestBits(kHLTOUTatEOR)) {
283 PushBack(fpStreamerInfos, kAliHLTDataTypeStreamerInfo);
286 for (unsigned i=0; i<fList.size(); i++) {
287 if (CheckFilter(kHLTLogDebug)) fList[i].Print("short");
288 else if (fList[i].IsObject()) {
289 HLTInfo("AliHLTDataBlockItem %s %08x\n"
290 " average extraction time: %d usec\n"
291 " average streaming time: %d usec"
292 , AliHLTComponent::DataType2Text(fList[i]).c_str()
293 , fList[i].GetSpecification()
294 , fList[i].GetExtractionTime()
295 , fList[i].GetStreamingTime());
302 int AliHLTRootSchemaEvolutionComponent::UpdateStreamerInfos(const TList* list, TObjArray* infos) const
304 // see header file for class documentation
306 if (!list || !infos) {
310 TObject* element=NULL;
311 TIter next((TList*)list);
312 while ((element = next())) {
313 TStreamerInfo* pInfo=dynamic_cast<TStreamerInfo*>(element);
314 if (!pInfo) continue;
315 TString name=pInfo->GetName();
317 if (pInfo->GetClassVersion()==0) continue; // skip classes which are not for storage
318 for (; i<infos->GetEntriesFast(); i++) {
319 if (name.CompareTo(infos->At(i)->GetName())==0 &&
320 pInfo->GetClassVersion() == ((TStreamerInfo*)infos->At(i))->GetClassVersion()) {
326 // Add streamer info if not yet there
327 if (i>=infos->GetEntriesFast()) {
335 int AliHLTRootSchemaEvolutionComponent::ScanConfigurationArgument(int argc, const char** argv)
337 // see header file for class documentation
339 if (argc<=0) return 0;
341 TString argument=argv[i];
343 // -hltout=[all,first,eor,off]
344 if (argument.Contains("-hltout")) {
345 argument.ReplaceAll("-hltout", "");
346 argument.ReplaceAll("=", "");
347 if (argument.IsNull() || argument.CompareTo("all")==0) {
348 SetBits(kHLTOUTatAllEvents|kHLTOUTatEOR);
349 } else if (argument.CompareTo("first")==0) {
350 SetBits(kHLTOUTatFirstEvent);
351 } else if (argument.CompareTo("eor")==0) {
352 SetBits(kHLTOUTatEOR);
353 } else if (argument.CompareTo("off")==0) {
354 ClearBits(kHLTOUTatAllEvents | kHLTOUTatFirstEvent | kHLTOUTatEOR);
356 HLTError("invalid parameter for argument -hltout= : %s", argument.Data());
363 if (argument.Contains("-fxs")) {
364 argument.ReplaceAll("-fxs", "");
365 argument.ReplaceAll("=", "");
367 if (argument.IsNull()) {
368 } else if (argument.CompareTo("off")==0) {
370 } else if (argument.IsDigit()) {
371 fFXSPrescaler=argument.Atoi();
373 HLTError("invalid parameter for argument -fxs= : %s", argument.Data());
380 if (argument.Contains("-file=")) {
381 argument.ReplaceAll("-file=", "");
382 if (!argument.IsNull()) {
385 HLTError("argument -file= expects file name");
391 if (argument.Contains("-rate=")) {
392 argument.ReplaceAll("-rate=", "");
393 AliHLTUInt32_t rate=argument.Atoi();
394 if (rate>0 && rate<fgkTimeScale) {
395 fMaxEventTime=fgkTimeScale/rate;
397 HLTError("argument -file= expects number [Hz]");
406 int AliHLTRootSchemaEvolutionComponent::WriteToFile(const char* filename, const TObjArray* infos) const
408 // write aray of streamer infos to file
409 if (!filename || !infos) return -EINVAL;
411 TFile out(filename, "RECREATE");
412 if (out.IsZombie()) {
413 HLTError("failed to open file %s", filename);
417 const char* entrypath="HLT/Calib/StreamerInfo";
419 AliCDBStorage* store = NULL;
420 // TODO: to be activated later, first some methods need to be made
421 // public in AliCDBManager. Or some new methods to be added
422 //if (AliCDBManager::Instance()->SelectSpecificStorage(entrypath))
423 // store = AliCDBManager::Instance()->GetSpecificStorage(entrypath);
425 store = AliCDBManager::Instance()->GetDefaultStorage();
426 AliCDBEntry* existingEntry=NULL;
427 if (store && store->GetLatestVersion(entrypath, GetRunNo())>=0 &&
428 (existingEntry=AliCDBManager::Instance()->Get(entrypath))!=NULL) {
429 version=existingEntry->GetId().GetVersion();
433 TObjArray* clone=NULL;
435 if (existingEntry && existingEntry->GetObject()) {
436 TObject* cloneObj=existingEntry->GetObject()->Clone();
437 if (cloneObj) clone=dynamic_cast<TObjArray*>(cloneObj);
438 if (MergeStreamerInfo(clone, infos)==0) {
439 // no change, store with identical version
440 version=existingEntry->GetId().GetVersion();
443 TObject* cloneObj=infos->Clone();
444 if (cloneObj) clone=dynamic_cast<TObjArray*>(cloneObj);
447 HLTError("failed to clone streamer info object array");
451 AliCDBPath cdbPath(entrypath);
452 AliCDBId cdbId(cdbPath, AliCDBManager::Instance()->GetRun(), AliCDBRunRange::Infinity(), version, 0);
453 AliCDBMetaData* cdbMetaData=new AliCDBMetaData;
454 cdbMetaData->SetResponsible("ALICE HLT Matthias.Richter@cern.ch");
455 cdbMetaData->SetComment("Streamer info for HLTOUT payload");
456 AliCDBEntry* entry=new AliCDBEntry(clone, cdbId, cdbMetaData, kTRUE);
460 // this is a small memory leak
461 // seg fault in ROOT object handling if the two objects are deleted
464 //delete cdbMetaData;
470 int AliHLTRootSchemaEvolutionComponent::MergeStreamerInfo(TObjArray* tgt, const TObjArray* src)
472 /// merge streamer info entries from source array to target array
473 /// return 1 if target array has been changed
475 // add all existing infos if not existing in the current one, or having
476 // different class version
478 if (!tgt || !src) return -EINVAL;
481 // check if all infos from the existing entry are in the new entry and with
482 // identical class version
484 TObject* nextobj=NULL;
485 while ((nextobj=next())) {
486 TStreamerInfo* srcInfo=dynamic_cast<TStreamerInfo*>(nextobj);
487 if (!srcInfo) continue;
488 TString srcInfoName=srcInfo->GetName();
491 for (; i<tgt->GetEntriesFast(); i++) {
492 if (tgt->At(i)==NULL) continue;
493 if (srcInfoName.CompareTo(tgt->At(i)->GetName())!=0) continue;
494 // TODO: 2010-08-23 some more detailed investigation is needed.
495 // Structures used for data exchange, e.g. AliHLTComponentDataType
496 // or AliHLTEventDDLV1 do not have a class version, but need to be stored in the
497 // streamer info. Strictly speaking not, because those structures are not supposed
498 // to be changed at all, so they should be the same in all versions in the future.
499 // There has been a problem with detecting whether the streamer info is already in
500 // the target array if the srcInfo has class version -1. As it just concerns
501 // structures not going to be changed we can safely skip checking the class version,
502 // as long as the entry is already in the target streamer infos it does not need
503 // to be copied again.
504 if (srcInfo->GetClassVersion()<0) break;
505 TStreamerInfo* tgtInfo=dynamic_cast<TStreamerInfo*>(tgt->At(i));
506 if (tgtInfo && tgtInfo->GetClassVersion()==srcInfo->GetClassVersion()) break;
508 if (i<tgt->GetEntriesFast()) continue;
518 AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem*
519 AliHLTRootSchemaEvolutionComponent::FindItem(AliHLTComponentDataType dt,
522 /// find item in the list
523 // vector<AliHLTDataBlockItem>::iterator element=std::find(fList.begin(), fList.end(), AliHLTDataBlockItem(dt,spec));
524 // if (element!=fList.end()) return &(*element);
525 for (unsigned i=0; i<fList.size(); i++) {
526 if (fList[i]==dt && fList[i]==spec) return &fList[i];
531 AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem::AliHLTDataBlockItem(AliHLTComponentDataType dt,
534 , fSpecification(spec)
537 , fExtractionTimeUsec(0)
540 , fStreamingTimeUsec(0)
543 // helper class to keep track of input data blocks
544 // in the AliHLTRootSchemaEvolutionComponent
548 AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem::~AliHLTDataBlockItem()
553 TObject* AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem::Extract(const AliHLTComponentBlockData* bd)
555 /// extract data block to root object, and update performance parameters
556 /// object needs to be deleted externally
557 if (!bd || !bd->fPtr || bd->fSize<8) return NULL;
559 AliHLTUInt32_t firstWord=*((AliHLTUInt32_t*)bd->fPtr);
560 if (!(fIsObject=(firstWord==bd->fSize-sizeof(AliHLTUInt32_t)))) return NULL;
564 AliHLTMessage msg(bd->fPtr, bd->fSize);
565 TClass* objclass=msg.GetClass();
566 if (!(fIsObject=(objclass!=NULL))) return NULL;
567 TObject* pObj=msg.ReadObject(objclass);
568 if (!(fIsObject=(pObj!=NULL))) return NULL;
570 AliHLTUInt32_t usec=AliHLTUInt32_t(sw.RealTime()*fgkTimeScale);
572 fExtractionTimeUsec+=usec;
574 fLastExtraction=(ts.GetSec()%1000)*fgkTimeScale + ts.GetNanoSec()/1000;
578 int AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem::Stream(TObject* obj, AliHLTMessage& msg)
580 /// stream object and update performance parameters
581 if (!obj) return -EINVAL;
584 msg.WriteObject(obj);
586 AliHLTUInt32_t usec=AliHLTUInt32_t(sw.RealTime()*fgkTimeScale);
588 fStreamingTimeUsec+=usec;
590 fLastStreaming=(ts.GetSec()%1000)*fgkTimeScale + ts.GetNanoSec()/1000;
594 void AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem::Print(const char* option) const
597 if (fIsObject || !(strcmp(option, "short")==0))
598 cout << "AliHLTDataBlockItem: " << AliHLTComponent::DataType2Text(fDt).c_str() << " " << hex << fSpecification << dec << endl;
600 if (fNofExtractions>0) cout << " average extraction time: " << fExtractionTimeUsec/fNofExtractions << " usec" << endl;
601 else cout << " never extracted" << endl;
602 if (fNofStreamings>0) cout << " average streaming time: " << fStreamingTimeUsec/fNofStreamings << " usec" << endl;
603 else cout << " never streamed" << endl;