using common function for merging of streamer info
[u/mrichter/AliRoot.git] / HLT / BASE / util / AliHLTRootSchemaEvolutionComponent.cxx
1 // $Id$
2
3 //**************************************************************************
4 //* This file is property of and copyright by the                          * 
5 //* ALICE Experiment at CERN, All rights reserved.                         *
6 //*                                                                        *
7 //* Primary Authors: Matthias Richter <Matthias.Richter@ift.uib.no>        *
8 //*                                                                        *
9 //* Permission to use, copy, modify and distribute this software and its   *
10 //* documentation strictly for non-commercial purposes is hereby granted   *
11 //* without fee, provided that the above copyright notice appears in all   *
12 //* copies and that both the copyright notice and this permission notice   *
13 //* appear in the supporting documentation. The authors make no claims     *
14 //* about the suitability of this software for any purpose. It is          *
15 //* provided "as is" without express or implied warranty.                  *
16 //**************************************************************************
17
18 /// @file   AliHLTRootSchemaEvolutionComponent.cxx
19 /// @author Matthias Richter
20 /// @date   2009-10-18
21 /// @brief  Handler component for ROOT schema evolution of streamed objects
22 ///
23
24 #include "AliHLTRootSchemaEvolutionComponent.h"
25 #include "AliHLTMessage.h"
26 #include "AliHLTReadoutList.h"
27 #include "AliHLTMisc.h"
28 #include "TObjArray.h"
29 #include "TStreamerInfo.h"
30 #include "TList.h"
31 #include "TFile.h"
32 #include "TStopwatch.h"
33 #include "TTimeStamp.h"
34 #include "TDatime.h"
35
36 #include "AliCDBStorage.h"
37 #include "AliCDBManager.h"
38 #include "AliCDBPath.h"
39 #include "AliCDBId.h"
40 #include "AliCDBMetaData.h"
41 #include "AliCDBEntry.h"
42
43 #include <numeric>
44 using namespace std;
45
46 namespace
47 {
48   // Helper class for std::accumulate algorithm.
49   class AliTimeSum {
50   public:
51     typedef int first_argument_type;
52     typedef AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem second_argument_type;
53     typedef bool result_type;
54     int operator() (int a, AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem b) {
55       return a+b.GetTotalTime();
56     }
57   };
58 } // end of namespace
59
60 /** ROOT macro for the implementation of ROOT specific class methods */
61 ClassImp(AliHLTRootSchemaEvolutionComponent)
62
63 AliHLTRootSchemaEvolutionComponent::AliHLTRootSchemaEvolutionComponent()
64   : AliHLTCalibrationProcessor()
65   , fList()
66   , fPropertyFlags(kFXS)
67   , fpStreamerInfos(NULL)
68   , fpEventTimer(NULL)
69   , fpCycleTimer(NULL)
70   , fMaxEventTime(500)
71   , fFXSPrescaler(0)
72   , fFileName()
73 {
74   // Collects streamer info for all input objects and produces the corresponding
75   // calibration object for reconstruction of HLT. The component runs with a
76   // configurable rate constraint and skips the processing of known data blocks
77   // for the sake of performance. New data blocks are always processed and added
78   // to the list.
79   //
80   // Component ID: \b ROOTSchemaEvolutionComponent                        <br>
81   // Library: \b libAliHLTUtil.so                                               <br>
82   // Input Data Types: ::kAliHLTAnyDataType                             <br>
83   // Output Data Types: none                                            <br>
84 }
85
86 // FIXME: read below when defining an OCDB object here
87 const char* AliHLTRootSchemaEvolutionComponent::fgkConfigurationObject=NULL;
88 const AliHLTUInt32_t AliHLTRootSchemaEvolutionComponent::fgkTimeScale=1000000;
89
90 AliHLTRootSchemaEvolutionComponent::~AliHLTRootSchemaEvolutionComponent()
91 {
92   // destructor
93   if (fpStreamerInfos) {
94     fpStreamerInfos->Clear();
95     delete fpStreamerInfos;
96   }
97   fpStreamerInfos=NULL;
98 }
99
100 void AliHLTRootSchemaEvolutionComponent::GetInputDataTypes(AliHLTComponentDataTypeList& list)
101 {
102   // overloaded from AliHLTComponent
103   list.push_back(kAliHLTAnyDataType);
104 }
105
106 AliHLTComponentDataType AliHLTRootSchemaEvolutionComponent::GetOutputDataType()
107 {
108   // overloaded from AliHLTComponent
109   return kAliHLTDataTypeStreamerInfo;
110 }
111
112 void AliHLTRootSchemaEvolutionComponent::GetOutputDataSize(unsigned long& constBase, double& inputMultiplier)
113 {
114   // overloaded from AliHLTComponent
115
116   // this is nothing more than an assumption, in fact it's very difficult to predict how
117   // much output the component produces
118   constBase=100*1024;
119   inputMultiplier=3;
120 }
121
122 int AliHLTRootSchemaEvolutionComponent::InitCalibration()
123 {
124   // overloaded from AliHLTCalibrationProcessor: initialization
125
126   int iResult=0;
127
128   // default configuration from CDB
129   // FIXME: has to be called from AliHLTCalibrationProcessor::DoInit in order to set
130   // the default parameters from OCDB before the custom argument scan
131   // not valid at the moment because fgkConfigurationObject==NULL
132   if (iResult>=0 && fgkConfigurationObject!=NULL) iResult=ConfigureFromCDBTObjString(fgkConfigurationObject);
133
134   if (iResult>=0) {
135     fpStreamerInfos=new TObjArray();
136     if (!fpStreamerInfos) iResult=-ENOMEM;
137
138     fpEventTimer=new TStopwatch;
139     if (fpEventTimer) {
140       fpEventTimer->Reset();
141     }
142     fpCycleTimer=new TStopwatch;
143     if (fpCycleTimer) {
144       fpCycleTimer->Reset();
145     }
146   }
147
148   return 0;
149 }
150
151 int AliHLTRootSchemaEvolutionComponent::DeinitCalibration()
152 {
153   // overloaded from AliHLTCalibrationProcessor: termination and cleanup
154   if (fFileName.IsNull()==0) {
155     WriteToFile(fFileName, fpStreamerInfos);
156     fFileName.Clear();
157   }
158
159   if (fpStreamerInfos) {
160     fpStreamerInfos->Clear();
161     delete fpStreamerInfos;
162   }
163   fpStreamerInfos=NULL;
164
165   if (fpEventTimer) {
166     delete fpEventTimer;
167     fpEventTimer=NULL;
168   }
169   if (fpCycleTimer) {
170     delete fpCycleTimer;
171     fpCycleTimer=NULL;
172   }
173   return 0;
174 }
175
176 int AliHLTRootSchemaEvolutionComponent::ProcessCalibration( const AliHLTComponentEventData& /*evtData*/,
177                                                             AliHLTComponentTriggerData& /*trigData*/ )
178 {
179   // overloaded from AliHLTCalibrationProcessor: event processing
180   int iResult=0;
181   AliHLTUInt32_t eventType=gkAliEventTypeUnknown;
182   if (!IsDataEvent(&eventType) && 
183       eventType==gkAliEventTypeStartOfRun) {
184     return 0;
185   }
186
187   AliHLTUInt32_t listtime=accumulate(fList.begin(), fList.end(), int(0), AliTimeSum());
188   AliHLTUInt32_t averageEventTime=0;
189   AliHLTUInt32_t averageCycleTime=0;
190
191   AliHLTUInt32_t proctime=0;
192   if (fpEventTimer) {
193     averageEventTime=AliHLTUInt32_t(fpEventTimer->RealTime()*fgkTimeScale)/(GetEventCount()+1);
194     proctime=AliHLTUInt32_t(fpEventTimer->RealTime()*fgkTimeScale);
195     fpEventTimer->Start(kFALSE);
196   }
197   if (fpCycleTimer) {
198     fpCycleTimer->Stop();
199     averageCycleTime=AliHLTUInt32_t(fpCycleTimer->RealTime()*fgkTimeScale)/(GetEventCount()+1);
200   }
201
202   // scale down the event processing according to the required rate
203   // and average processing time.
204   for (const AliHLTComponentBlockData* pBlock=GetFirstInputBlock();
205        pBlock && iResult>=0;
206        pBlock=GetNextInputBlock()) {
207     bool processBlock=true;
208     AliHLTDataBlockItem* item=FindItem(pBlock->fDataType, pBlock->fSpecification);
209     if (item) {
210       // TODO: do a selection of blocks on basis of the time spent in its processing
211       // for now only the global processing time is checked
212       // process if the average event time is smaller then the cycle time, i.e.
213       // the time is spent outside the component
214       // apply a factor 4 margin
215       processBlock=4*averageEventTime<fMaxEventTime || 2*averageEventTime<averageCycleTime;
216     } else {
217       // always process new incoming blocks
218       processBlock=true;
219       fList.push_back(AliHLTDataBlockItem(pBlock->fDataType, pBlock->fSpecification));
220       item=&fList[fList.size()-1];
221     }
222     if (processBlock) {
223       TObject* pObj=item->Extract(pBlock);
224       if (pObj) {
225         AliHLTMessage msg(kMESS_OBJECT);
226         msg.EnableSchemaEvolution();
227         if ((iResult=item->Stream(pObj, msg))>=0) {
228           iResult=UpdateStreamerInfos(msg.GetStreamerInfos(), fpStreamerInfos);
229         } else {
230           HLTError("failed to stream object %s of type %s", pObj->GetName(), pObj->ClassName());
231         }
232         delete pObj;
233         pObj=NULL;
234       }
235     }
236   }
237
238   if (fpEventTimer) {
239     fpEventTimer->Stop();
240     proctime=AliHLTUInt32_t(fpEventTimer->RealTime()*fgkTimeScale)-proctime;
241     averageEventTime=AliHLTUInt32_t(fpEventTimer->RealTime()*fgkTimeScale)/(GetEventCount()+1);
242
243     // info output once every 2 seconds
244     static UInt_t lastTime=0;
245     TDatime time;
246     if (time.Get()-lastTime>2) {
247       lastTime=time.Get();
248       HLTInfo("event time %d, average time %d, list time %d, cycle time %d", proctime, averageEventTime, listtime, averageCycleTime);
249     }
250   }
251   if (fpCycleTimer) {
252     fpCycleTimer->Start(kFALSE);
253   }
254
255   if (iResult>=0) {
256     if ((TestBits(kHLTOUTatFirstEvent) && GetEventCount()==0) ||
257         (TestBits(kHLTOUTatAllEvents))) {
258       PushBack(fpStreamerInfos, kAliHLTDataTypeStreamerInfo);
259     }
260   }
261
262   if (TestBits(kFXS) && fFXSPrescaler>0 && (GetEventCount()%fFXSPrescaler)==0) {
263     // push to FXS
264     AliHLTReadoutList rdList(AliHLTReadoutList::kHLT);
265     PushToFXS((TObject*)fpStreamerInfos, "HLT", "StreamerInfo", &rdList );
266   }
267
268   return iResult;
269 }
270
271 int AliHLTRootSchemaEvolutionComponent::ShipDataToFXS( const AliHLTComponentEventData& /*evtData*/,
272                                                        AliHLTComponentTriggerData& /*trigData*/)
273 {
274   // overloaded from AliHLTCalibrationProcessor: ship data
275   if (TestBits(kFXS)) {
276     // push to FXS
277     AliHLTReadoutList rdList(AliHLTReadoutList::kHLT);
278     PushToFXS((TObject*)fpStreamerInfos, "HLT", "StreamerInfo", &rdList );
279   }
280
281   if (fFileName.IsNull()==0) {
282     WriteToFile(fFileName, fpStreamerInfos);
283     fFileName.Clear();
284   }
285
286     if (TestBits(kHLTOUTatEOR)) {
287       PushBack(fpStreamerInfos, kAliHLTDataTypeStreamerInfo);
288     }
289
290     for (unsigned i=0; i<fList.size(); i++) {
291       if (CheckFilter(kHLTLogDebug)) fList[i].Print("short");
292       else if (fList[i].IsObject()) {
293         HLTInfo("AliHLTDataBlockItem %s %08x\n"
294                 "   average extraction time: %d usec\n"
295                 "   average streaming time: %d usec"
296                 , AliHLTComponent::DataType2Text(fList[i]).c_str()
297                 , fList[i].GetSpecification()
298                 , fList[i].GetExtractionTime()
299                 , fList[i].GetStreamingTime());
300       }
301     }
302
303   return 0;
304 }
305
306 int AliHLTRootSchemaEvolutionComponent::UpdateStreamerInfos(const TList* list, TObjArray* infos) const
307 {
308   // update streamer infos
309   int iResult=0;
310   if (!list || !infos) {
311     return -EINVAL;
312   }
313
314   TObject* element=NULL;
315   TIter next((TList*)list);
316   while ((element = next())) {
317     TStreamerInfo* pInfo=dynamic_cast<TStreamerInfo*>(element);
318     if (!pInfo) continue;
319     TString name=pInfo->GetName();
320     int i=0;
321     if (pInfo->GetClassVersion()==0) continue; // skip classes which are not for storage
322     for (; i<infos->GetEntriesFast(); i++) {
323       if (name.CompareTo(infos->At(i)->GetName())==0 &&
324           pInfo->GetClassVersion() == ((TStreamerInfo*)infos->At(i))->GetClassVersion()) {
325         // have it already
326         break;
327       }
328     }
329
330     // Add streamer info if not yet there
331     if (i>=infos->GetEntriesFast()) {
332       infos->Add(pInfo);
333     }
334   }
335
336   return iResult;
337 }
338
339 int AliHLTRootSchemaEvolutionComponent::ScanConfigurationArgument(int argc, const char** argv)
340 {
341   // overloaded from AliHLTComponent
342   int iResult=0;
343   if (argc<=0) return 0;
344   int i=0;
345   TString argument=argv[i];
346
347   // -hltout=[all,first,eor,off]
348   if (argument.Contains("-hltout")) {
349     argument.ReplaceAll("-hltout", "");
350     argument.ReplaceAll("=", "");
351     if (argument.IsNull() || argument.CompareTo("all")==0) {
352       SetBits(kHLTOUTatAllEvents|kHLTOUTatEOR);
353     } else if (argument.CompareTo("first")==0) {
354       SetBits(kHLTOUTatFirstEvent);
355     } else if (argument.CompareTo("eor")==0) {
356       SetBits(kHLTOUTatEOR);
357     } else if (argument.CompareTo("off")==0) {
358       ClearBits(kHLTOUTatAllEvents | kHLTOUTatFirstEvent | kHLTOUTatEOR);
359     } else {
360       HLTError("invalid parameter for argument -hltout= : %s", argument.Data());
361       return -EINVAL;
362     }
363     return 1;
364   }
365
366   // -fxs=[n,off]
367   if (argument.Contains("-fxs")) {
368     argument.ReplaceAll("-fxs", "");
369     argument.ReplaceAll("=", "");
370     SetBits(kFXS);
371     if (argument.IsNull()) {
372     } else if (argument.CompareTo("off")==0) {
373       ClearBits(kFXS);
374     } else if (argument.IsDigit()) {
375       fFXSPrescaler=argument.Atoi();
376     } else {
377       HLTError("invalid parameter for argument -fxs= : %s", argument.Data());
378       return -EINVAL;
379     }
380     return 1;
381   }
382
383   // -file=<filename>
384   if (argument.Contains("-file=")) {
385     argument.ReplaceAll("-file=", "");
386     if (!argument.IsNull()) {
387       fFileName=argument;
388     } else {
389       HLTError("argument -file= expects file name");
390       return -EINVAL;
391     }
392     return 1;
393   }
394
395   if (argument.Contains("-rate=")) {
396     argument.ReplaceAll("-rate=", "");
397     AliHLTUInt32_t rate=argument.Atoi();
398     if (rate>0 && rate<fgkTimeScale) {
399       fMaxEventTime=fgkTimeScale/rate;
400     } else {
401       HLTError("argument -file= expects number [Hz]");
402       return -EINVAL;
403     }
404     return 1;
405   }
406
407   return iResult;
408 }
409
410 int AliHLTRootSchemaEvolutionComponent::WriteToFile(const char* filename, const TObjArray* infos) const
411 {
412   // write aray of streamer infos to file
413   if (!filename || !infos) return -EINVAL;
414
415   TFile out(filename, "RECREATE");
416   if (out.IsZombie()) {
417     HLTError("failed to open file %s", filename);
418     return -EBADF;
419   }
420
421   const char* entrypath="HLT/Calib/StreamerInfo";
422   int version = -1;
423   AliCDBStorage* store = NULL;
424   // TODO: to be activated later, first some methods need to be made
425   // public in AliCDBManager. Or some new methods to be added 
426   //if (AliCDBManager::Instance()->SelectSpecificStorage(entrypath))
427   //  store = AliCDBManager::Instance()->GetSpecificStorage(entrypath);
428   if (!store) 
429     store = AliCDBManager::Instance()->GetDefaultStorage();
430   AliCDBEntry* existingEntry=NULL;
431   if (store && store->GetLatestVersion(entrypath, GetRunNo())>=0 &&
432       (existingEntry=AliCDBManager::Instance()->Get(entrypath))!=NULL) {
433     version=existingEntry->GetId().GetVersion();
434   }
435   version++;
436
437   TObjArray* clone=NULL;
438
439   if (existingEntry && existingEntry->GetObject()) {
440     TObject* cloneObj=existingEntry->GetObject()->Clone();
441     if (cloneObj) clone=dynamic_cast<TObjArray*>(cloneObj);
442     if (AliHLTMisc::Instance().MergeStreamerInfo(clone, infos)==0) {
443       // no change, store with identical version
444       version=existingEntry->GetId().GetVersion();
445     }
446   } else {
447     TObject* cloneObj=infos->Clone();
448     if (cloneObj) clone=dynamic_cast<TObjArray*>(cloneObj);
449   }
450   if (!clone) {
451     HLTError("failed to clone streamer info object array");
452     return -ENOMEM;
453   }
454
455   AliCDBPath cdbPath(entrypath);
456   AliCDBId cdbId(cdbPath, AliCDBManager::Instance()->GetRun(), AliCDBRunRange::Infinity(), version, 0);
457   AliCDBMetaData* cdbMetaData=new AliCDBMetaData;
458   cdbMetaData->SetResponsible("ALICE HLT Matthias.Richter@cern.ch");
459   cdbMetaData->SetComment("Streamer info for HLTOUT payload");
460   AliCDBEntry* entry=new AliCDBEntry(clone, cdbId, cdbMetaData, kTRUE);
461
462   out.cd();
463   entry->Write();
464   // this is a small memory leak
465   // seg fault in ROOT object handling if the two objects are deleted
466   // investigate later
467   //delete entry;
468   //delete cdbMetaData;
469   out.Close();
470
471   return 0;
472 }
473
474 AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem*
475 AliHLTRootSchemaEvolutionComponent::FindItem(AliHLTComponentDataType dt,
476                                              AliHLTUInt32_t spec)
477 {
478   /// find item in the list
479   // vector<AliHLTDataBlockItem>::iterator element=std::find(fList.begin(), fList.end(), AliHLTDataBlockItem(dt,spec));
480   // if (element!=fList.end()) return &(*element);
481   for (unsigned i=0; i<fList.size(); i++) {
482     if (fList[i]==dt && fList[i]==spec) return &fList[i];
483   }
484   return NULL;
485 }
486
487 AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem::AliHLTDataBlockItem(AliHLTComponentDataType dt,
488                                                                              AliHLTUInt32_t spec)
489   : fDt(dt)
490   , fSpecification(spec)
491   , fIsObject(false)
492   , fNofExtractions(0)
493   , fExtractionTimeUsec(0)
494   , fLastExtraction(0)
495   , fNofStreamings(0)
496   , fStreamingTimeUsec(0)
497   , fLastStreaming(0)
498 {
499   // helper class to keep track of input data blocks
500   // in the AliHLTRootSchemaEvolutionComponent
501   //
502 }
503
504 AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem::~AliHLTDataBlockItem()
505 {
506   // destructor
507 }
508
509 TObject* AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem::Extract(const AliHLTComponentBlockData* bd)
510 {
511   /// extract data block to root object, and update performance parameters
512   /// object needs to be deleted externally
513   if (!bd || !bd->fPtr || bd->fSize<8) return NULL;
514
515   AliHLTUInt32_t firstWord=*((AliHLTUInt32_t*)bd->fPtr);
516   if (!(fIsObject=(firstWord==bd->fSize-sizeof(AliHLTUInt32_t)))) return NULL;
517
518   TStopwatch sw;
519   sw.Start();
520   AliHLTMessage msg(bd->fPtr, bd->fSize);
521   TClass* objclass=msg.GetClass();
522   if (!(fIsObject=(objclass!=NULL))) return NULL;
523   TObject* pObj=msg.ReadObject(objclass);
524   if (!(fIsObject=(pObj!=NULL))) return NULL;
525   sw.Stop();
526   AliHLTUInt32_t usec=AliHLTUInt32_t(sw.RealTime()*fgkTimeScale);
527   fNofExtractions++;
528   fExtractionTimeUsec+=usec;
529   TTimeStamp ts;
530   fLastExtraction=(ts.GetSec()%1000)*fgkTimeScale + ts.GetNanoSec()/1000;
531   return pObj;
532 }
533
534 int AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem::Stream(const TObject* obj, AliHLTMessage& msg)
535 {
536   /// stream object and update performance parameters
537   if (!obj) return -EINVAL;
538   TStopwatch sw;
539   sw.Start();
540   msg.WriteObject(obj);
541
542   AliHLTUInt32_t usec=AliHLTUInt32_t(sw.RealTime()*fgkTimeScale);
543   fNofStreamings++;
544   fStreamingTimeUsec+=usec;
545   TTimeStamp ts;
546   fLastStreaming=(ts.GetSec()%1000)*fgkTimeScale + ts.GetNanoSec()/1000;
547   return 0;
548 }
549
550 void AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem::Print(const char* option) const
551 {
552   /// print status
553   if (fIsObject || !(strcmp(option, "short")==0))
554     cout << "AliHLTDataBlockItem: " << AliHLTComponent::DataType2Text(fDt).c_str() << " " << hex << fSpecification << dec << endl;
555   if (fIsObject) {
556     if (fNofExtractions>0) cout << "   average extraction time: " << fExtractionTimeUsec/fNofExtractions << " usec" << endl;
557     else cout << "   never extracted" << endl;
558     if (fNofStreamings>0) cout << "   average streaming time: " << fStreamingTimeUsec/fNofStreamings << " usec" << endl;
559     else cout << "   never streamed" << endl;
560   }
561 }