3 //**************************************************************************
4 //* This file is property of and copyright by the ALICE HLT Project *
5 //* ALICE Experiment at CERN, All rights reserved. *
7 //* Primary Authors: Matthias Richter <Matthias.Richter@ift.uib.no> *
8 //* for The ALICE HLT Project. *
10 //* Permission to use, copy, modify and distribute this software and its *
11 //* documentation strictly for non-commercial purposes is hereby granted *
12 //* without fee, provided that the above copyright notice appears in all *
13 //* copies and that both the copyright notice and this permission notice *
14 //* appear in the supporting documentation. The authors make no claims *
15 //* about the suitability of this software for any purpose. It is *
16 //* provided "as is" without express or implied warranty. *
17 //**************************************************************************
19 /** @file AliHLTRootSchemaEvolutionComponent.cxx
20 @author Matthias Richter
22 @brief Handler component for ROOT schema evolution of streamed objects
25 #include "AliHLTRootSchemaEvolutionComponent.h"
26 #include "AliHLTMessage.h"
27 #include "AliHLTReadoutList.h"
28 #include "TObjArray.h"
29 #include "TStreamerInfo.h"
32 #include "TStopwatch.h"
33 #include "TTimeStamp.h"
36 #include "AliCDBStorage.h"
37 #include "AliCDBManager.h"
38 #include "AliCDBPath.h"
40 #include "AliCDBMetaData.h"
41 #include "AliCDBEntry.h"
44 using std::accumulate;
46 /** ROOT macro for the implementation of ROOT specific class methods */
47 ClassImp(AliHLTRootSchemaEvolutionComponent)
49 AliHLTRootSchemaEvolutionComponent::AliHLTRootSchemaEvolutionComponent()
50 : AliHLTCalibrationProcessor()
53 , fpStreamerInfos(NULL)
60 // see header file for class documentation
62 // refer to README to build package
64 // visit http://web.ift.uib.no/~kjeks/doc/alice-hlt
68 // FIXME: read below when defining an OCDB object here
69 const char* AliHLTRootSchemaEvolutionComponent::fgkConfigurationObject=NULL;
70 const AliHLTUInt32_t AliHLTRootSchemaEvolutionComponent::fgkTimeScale=1000000;
72 AliHLTRootSchemaEvolutionComponent::~AliHLTRootSchemaEvolutionComponent()
74 // see header file for class documentation
75 if (fpStreamerInfos) {
76 fpStreamerInfos->Clear();
77 delete fpStreamerInfos;
82 void AliHLTRootSchemaEvolutionComponent::GetInputDataTypes(AliHLTComponentDataTypeList& list)
84 // see header file for class documentation
85 list.push_back(kAliHLTAnyDataType);
88 AliHLTComponentDataType AliHLTRootSchemaEvolutionComponent::GetOutputDataType()
90 // see header file for class documentation
91 return kAliHLTDataTypeStreamerInfo;
94 void AliHLTRootSchemaEvolutionComponent::GetOutputDataSize(unsigned long& constBase, double& inputMultiplier)
96 // see header file for class documentation
98 // this is nothing more than an assumption, in fact it's very difficult to predict how
99 // much output the component produces
104 int AliHLTRootSchemaEvolutionComponent::InitCalibration()
106 // see header file for class documentation
110 // default configuration from CDB
111 // FIXME: has to be called from AliHLTCalibrationProcessor::DoInit in order to set
112 // the default parameters from OCDB before the custom argument scan
113 // not valid at the moment because fgkConfigurationObject==NULL
114 if (iResult>=0 && fgkConfigurationObject!=NULL) iResult=ConfigureFromCDBTObjString(fgkConfigurationObject);
117 fpStreamerInfos=new TObjArray();
118 if (!fpStreamerInfos) iResult=-ENOMEM;
120 fpEventTimer=new TStopwatch;
122 fpEventTimer->Reset();
124 fpCycleTimer=new TStopwatch;
126 fpCycleTimer->Reset();
133 int AliHLTRootSchemaEvolutionComponent::DeinitCalibration()
135 // see header file for class documentation
136 if (fFileName.IsNull()==0) {
137 WriteToFile(fFileName, fpStreamerInfos);
141 if (fpStreamerInfos) {
142 fpStreamerInfos->Clear();
143 delete fpStreamerInfos;
145 fpStreamerInfos=NULL;
158 int AliHLTRootSchemaEvolutionComponent::ProcessCalibration( const AliHLTComponentEventData& /*evtData*/,
159 AliHLTComponentTriggerData& /*trigData*/ )
161 // see header file for class documentation
163 AliHLTUInt32_t eventType=gkAliEventTypeUnknown;
164 if (!IsDataEvent(&eventType) &&
165 eventType==gkAliEventTypeStartOfRun) {
169 AliHLTUInt32_t listtime=accumulate(fList.begin(), fList.end(), int(0), AliHLTDataBlockItem::TimeSum());
170 AliHLTUInt32_t averageEventTime=0;
171 AliHLTUInt32_t averageCycleTime=0;
173 AliHLTUInt32_t proctime=0;
175 averageEventTime=(fpEventTimer->RealTime()*fgkTimeScale)/(GetEventCount()+1);
176 proctime=fpEventTimer->RealTime()*fgkTimeScale;
177 fpEventTimer->Start(kFALSE);
180 fpCycleTimer->Stop();
181 averageCycleTime=(fpCycleTimer->RealTime()*fgkTimeScale)/(GetEventCount()+1);
184 // scale down the event processing according to the required rate
185 // and average processing time.
186 for (const AliHLTComponentBlockData* pBlock=GetFirstInputBlock();
187 pBlock && iResult>=0;
188 pBlock=GetNextInputBlock()) {
189 bool processBlock=true;
190 AliHLTDataBlockItem* item=FindItem(pBlock->fDataType, pBlock->fSpecification);
192 // TODO: do a selection of blocks on basis of the time spent in its processing
193 // for now only the global processing time is checked
194 // process if the average event time is smaller then the cycle time, i.e.
195 // the time is spent outside the component
196 // apply a factor 4 margin
197 processBlock=4*averageEventTime<fMaxEventTime || 2*averageEventTime<averageCycleTime;
199 // always process new incoming blocks
201 fList.push_back(AliHLTDataBlockItem(pBlock->fDataType, pBlock->fSpecification));
202 item=&fList[fList.size()-1];
205 TObject* pObj=item->Extract(pBlock);
207 AliHLTMessage msg(kMESS_OBJECT);
208 msg.EnableSchemaEvolution();
209 if ((iResult=item->Stream(pObj, msg))>=0) {
210 iResult=UpdateStreamerInfos(msg.GetStreamerInfos(), fpStreamerInfos);
212 HLTError("failed to stream object %s of type %s", pObj->GetName(), pObj->ClassName());
221 fpEventTimer->Stop();
222 proctime=fpEventTimer->RealTime()*fgkTimeScale-proctime;
223 averageEventTime=(fpEventTimer->RealTime()*fgkTimeScale)/(GetEventCount()+1);
225 // info output once every 2 seconds
226 static UInt_t lastTime=0;
228 if (time.Get()-lastTime>2) {
230 HLTInfo("event time %d, average time %d, list time %d, cycle time %d", proctime, averageEventTime, listtime, averageCycleTime);
234 fpCycleTimer->Start(kFALSE);
238 if ((TestBits(kHLTOUTatFirstEvent) && GetEventCount()==0) ||
239 (TestBits(kHLTOUTatAllEvents))) {
240 PushBack(fpStreamerInfos, kAliHLTDataTypeStreamerInfo);
244 if (TestBits(kFXS) && fFXSPrescaler>0 && (GetEventCount()%fFXSPrescaler)==0) {
246 AliHLTReadoutList rdList(AliHLTReadoutList::kHLT);
247 PushToFXS((TObject*)fpStreamerInfos, "HLT", "StreamerInfo", &rdList );
253 int AliHLTRootSchemaEvolutionComponent::ShipDataToFXS( const AliHLTComponentEventData& /*evtData*/,
254 AliHLTComponentTriggerData& /*trigData*/)
256 // see header file for class documentation
257 if (TestBits(kFXS)) {
259 AliHLTReadoutList rdList(AliHLTReadoutList::kHLT);
260 PushToFXS((TObject*)fpStreamerInfos, "HLT", "StreamerInfo", &rdList );
263 if (fFileName.IsNull()==0) {
264 WriteToFile(fFileName, fpStreamerInfos);
268 if (TestBits(kHLTOUTatEOR)) {
269 PushBack(fpStreamerInfos, kAliHLTDataTypeStreamerInfo);
272 for (unsigned i=0; i<fList.size(); i++) {
273 if (CheckFilter(kHLTLogDebug)) fList[i].Print("short");
274 else if (fList[i].IsObject()) {
275 HLTInfo("AliHLTDataBlockItem %s %08x\n"
276 " average extraction time: %d usec\n"
277 " average streaming time: %d usec"
278 , AliHLTComponent::DataType2Text(fList[i]).c_str()
279 , fList[i].GetSpecification()
280 , fList[i].GetExtractionTime()
281 , fList[i].GetStreamingTime());
288 int AliHLTRootSchemaEvolutionComponent::UpdateStreamerInfos(const TList* list, TObjArray* infos) const
290 // see header file for class documentation
292 if (!list || !infos) {
296 TObject* element=NULL;
297 TIter next((TList*)list);
298 while ((element = next())) {
299 TStreamerInfo* pInfo=dynamic_cast<TStreamerInfo*>(element);
300 if (!pInfo) continue;
301 TString name=pInfo->GetName();
303 if (pInfo->GetClassVersion()==0) continue; // skip classes which are not for storage
304 for (; i<infos->GetEntriesFast(); i++) {
305 if (name.CompareTo(infos->At(i)->GetName())==0 &&
306 pInfo->GetClassVersion() == ((TStreamerInfo*)infos->At(i))->GetClassVersion()) {
312 // Add streamer info if not yet there
313 if (i>=infos->GetEntriesFast()) {
321 int AliHLTRootSchemaEvolutionComponent::ScanConfigurationArgument(int argc, const char** argv)
323 // see header file for class documentation
325 if (argc<=0) return 0;
327 TString argument=argv[i];
329 // -hltout=[all,first,eor,off]
330 if (argument.Contains("-hltout")) {
331 argument.ReplaceAll("-hltout", "");
332 argument.ReplaceAll("=", "");
333 if (argument.IsNull() || argument.CompareTo("all")==0) {
334 SetBits(kHLTOUTatAllEvents|kHLTOUTatEOR);
335 } else if (argument.CompareTo("first")==0) {
336 SetBits(kHLTOUTatFirstEvent);
337 } else if (argument.CompareTo("eor")==0) {
338 SetBits(kHLTOUTatEOR);
339 } else if (argument.CompareTo("off")==0) {
340 ClearBits(kHLTOUTatAllEvents | kHLTOUTatFirstEvent | kHLTOUTatEOR);
342 HLTError("invalid parameter for argument -hltout= : %s", argument.Data());
349 if (argument.Contains("-fxs")) {
350 argument.ReplaceAll("-fxs", "");
351 argument.ReplaceAll("=", "");
353 if (argument.IsNull()) {
354 } else if (argument.CompareTo("off")==0) {
356 } else if (argument.IsDigit()) {
357 fFXSPrescaler=argument.Atoi();
359 HLTError("invalid parameter for argument -fxs= : %s", argument.Data());
366 if (argument.Contains("-file=")) {
367 argument.ReplaceAll("-file=", "");
368 if (!argument.IsNull()) {
371 HLTError("argument -file= expects file name");
377 if (argument.Contains("-rate=")) {
378 argument.ReplaceAll("-rate=", "");
379 AliHLTUInt32_t rate=argument.Atoi();
380 if (rate>0 && rate<fgkTimeScale) {
381 fMaxEventTime=fgkTimeScale/rate;
383 HLTError("argument -file= expects number [Hz]");
392 int AliHLTRootSchemaEvolutionComponent::WriteToFile(const char* filename, const TObjArray* infos) const
394 // write aray of streamer infos to file
395 if (!filename || !infos) return -EINVAL;
397 TFile out(filename, "RECREATE");
398 if (out.IsZombie()) {
399 HLTError("failed to open file %s", filename);
403 const char* entrypath="HLT/Calib/StreamerInfo";
405 AliCDBStorage* store = NULL;
406 // TODO: to be activated later, first some methods need to be made
407 // public in AliCDBManager. Or some new methods to be added
408 //if (AliCDBManager::Instance()->SelectSpecificStorage(entrypath))
409 // store = AliCDBManager::Instance()->GetSpecificStorage(entrypath);
411 store = AliCDBManager::Instance()->GetDefaultStorage();
412 AliCDBEntry* existingEntry=NULL;
413 if (store && store->GetLatestVersion(entrypath, GetRunNo())>=0 &&
414 (existingEntry=AliCDBManager::Instance()->Get(entrypath))!=NULL) {
415 version=existingEntry->GetId().GetVersion();
419 TObjArray* clone=NULL;
421 if (existingEntry && existingEntry->GetObject()) {
422 TObject* cloneObj=existingEntry->GetObject()->Clone();
423 if (cloneObj) clone=dynamic_cast<TObjArray*>(cloneObj);
424 if (MergeStreamerInfo(clone, infos)==0) {
425 // no change, store with identical version
426 version=existingEntry->GetId().GetVersion();
429 TObject* cloneObj=infos->Clone();
430 if (cloneObj) clone=dynamic_cast<TObjArray*>(cloneObj);
433 HLTError("failed to clone streamer info object array");
437 AliCDBPath cdbPath(entrypath);
438 AliCDBId cdbId(cdbPath, AliCDBManager::Instance()->GetRun(), AliCDBRunRange::Infinity(), version, 0);
439 AliCDBMetaData* cdbMetaData=new AliCDBMetaData;
440 cdbMetaData->SetResponsible("ALICE HLT Matthias.Richter@cern.ch");
441 cdbMetaData->SetComment("Streamer info for HLTOUT payload");
442 AliCDBEntry* entry=new AliCDBEntry(clone, cdbId, cdbMetaData, kTRUE);
446 // this is a small memory leak
447 // seg fault in ROOT object handling if the two objects are deleted
450 //delete cdbMetaData;
456 int AliHLTRootSchemaEvolutionComponent::MergeStreamerInfo(TObjArray* tgt, const TObjArray* src)
458 /// merge streamer info entries from source array to target array
459 /// return 1 if target array has been changed
461 // add all existing infos if not existing in the current one, or having
462 // different class version
464 if (!tgt || !src) return -EINVAL;
467 // check if all infos from the existing entry are in the new entry and with
468 // identical class version
470 TObject* nextobj=NULL;
471 while ((nextobj=next())) {
472 TStreamerInfo* srcInfo=dynamic_cast<TStreamerInfo*>(nextobj);
473 if (!srcInfo) continue;
474 TString srcInfoName=srcInfo->GetName();
477 for (; i<tgt->GetEntriesFast(); i++) {
478 if (tgt->At(i)==NULL) continue;
479 if (srcInfoName.CompareTo(tgt->At(i)->GetName())!=0) continue;
480 // TODO: 2010-08-23 some more detailed investigation is needed.
481 // Structures used for data exchange, e.g. AliHLTComponentDataType
482 // or AliHLTEventDDLV1 do not have a class version, but need to be stored in the
483 // streamer info. Strictly speaking not, because those structures are not supposed
484 // to be changed at all, so they should be the same in all versions in the future.
485 // There has been a problem with detecting whether the streamer info is already in
486 // the target array if the srcInfo has class version -1. As it just concerns
487 // structures not going to be changed we can safely skip checking the class version,
488 // as long as the entry is already in the target streamer infos it does not need
489 // to be copied again.
490 if (srcInfo->GetClassVersion()<0) break;
491 TStreamerInfo* tgtInfo=dynamic_cast<TStreamerInfo*>(tgt->At(i));
492 if (tgtInfo && tgtInfo->GetClassVersion()==srcInfo->GetClassVersion()) break;
494 if (i<tgt->GetEntriesFast()) continue;
504 AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem*
505 AliHLTRootSchemaEvolutionComponent::FindItem(AliHLTComponentDataType dt,
508 /// find item in the list
509 // vector<AliHLTDataBlockItem>::iterator element=std::find(fList.begin(), fList.end(), AliHLTDataBlockItem(dt,spec));
510 // if (element!=fList.end()) return &(*element);
511 for (unsigned i=0; i<fList.size(); i++) {
512 if (fList[i]==dt && fList[i]==spec) return &fList[i];
517 AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem::AliHLTDataBlockItem(AliHLTComponentDataType dt,
520 , fSpecification(spec)
523 , fExtractionTimeUsec(0)
526 , fStreamingTimeUsec(0)
529 // helper class to keep track of input data blocks
530 // in the AliHLTRootSchemaEvolutionComponent
534 AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem::~AliHLTDataBlockItem()
539 TObject* AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem::Extract(const AliHLTComponentBlockData* bd)
541 /// extract data block to root object, and update performance parameters
542 /// object needs to be deleted externally
543 if (!bd || !bd->fPtr || bd->fSize<8) return NULL;
545 AliHLTUInt32_t firstWord=*((AliHLTUInt32_t*)bd->fPtr);
546 if (!(fIsObject=(firstWord==bd->fSize-sizeof(AliHLTUInt32_t)))) return NULL;
550 AliHLTMessage msg(bd->fPtr, bd->fSize);
551 TClass* objclass=msg.GetClass();
552 if (!(fIsObject=(objclass!=NULL))) return NULL;
553 TObject* pObj=msg.ReadObject(objclass);
554 if (!(fIsObject=(pObj!=NULL))) return NULL;
556 AliHLTUInt32_t usec=sw.RealTime()*fgkTimeScale;
558 fExtractionTimeUsec+=usec;
560 fLastExtraction=(ts.GetSec()%1000)*fgkTimeScale + ts.GetNanoSec()/1000;
564 int AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem::Stream(TObject* obj, AliHLTMessage& msg)
566 /// stream object and update performance parameters
567 if (!obj) return -EINVAL;
570 msg.WriteObject(obj);
572 AliHLTUInt32_t usec=sw.RealTime()*fgkTimeScale;
574 fStreamingTimeUsec+=usec;
576 fLastStreaming=(ts.GetSec()%1000)*fgkTimeScale + ts.GetNanoSec()/1000;
580 void AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem::Print(const char* option) const
583 if (fIsObject || !(strcmp(option, "short")==0))
584 cout << "AliHLTDataBlockItem: " << AliHLTComponent::DataType2Text(fDt).c_str() << " " << hex << fSpecification << dec << endl;
586 if (fNofExtractions>0) cout << " average extraction time: " << fExtractionTimeUsec/fNofExtractions << " usec" << endl;
587 else cout << " never extracted" << endl;
588 if (fNofStreamings>0) cout << " average streaming time: " << fStreamingTimeUsec/fNofStreamings << " usec" << endl;
589 else cout << " never streamed" << endl;