reverting r45444 to disentangle modules and make porting possible
[u/mrichter/AliRoot.git] / HLT / BASE / util / AliHLTRootSchemaEvolutionComponent.cxx
1 // $Id$
2
3 //**************************************************************************
4 //* This file is property of and copyright by the ALICE HLT Project        * 
5 //* ALICE Experiment at CERN, All rights reserved.                         *
6 //*                                                                        *
7 //* Primary Authors: Matthias Richter <Matthias.Richter@ift.uib.no>        *
8 //*                  for The ALICE HLT Project.                            *
9 //*                                                                        *
10 //* Permission to use, copy, modify and distribute this software and its   *
11 //* documentation strictly for non-commercial purposes is hereby granted   *
12 //* without fee, provided that the above copyright notice appears in all   *
13 //* copies and that both the copyright notice and this permission notice   *
14 //* appear in the supporting documentation. The authors make no claims     *
15 //* about the suitability of this software for any purpose. It is          *
16 //* provided "as is" without express or implied warranty.                  *
17 //**************************************************************************
18
19 /** @file   AliHLTRootSchemaEvolutionComponent.cxx
20     @author Matthias Richter
21     @date   2009-10-18
22     @brief  Handler component for ROOT schema evolution of streamed objects
23 */
24
25 #include "AliHLTRootSchemaEvolutionComponent.h"
26 #include "AliHLTMessage.h"
27 #include "AliHLTReadoutList.h"
28 #include "TObjArray.h"
29 #include "TStreamerInfo.h"
30 #include "TList.h"
31 #include "TFile.h"
32 #include "TStopwatch.h"
33 #include "TTimeStamp.h"
34 #include "TDatime.h"
35
36 #include "AliCDBStorage.h"
37 #include "AliCDBManager.h"
38 #include "AliCDBPath.h"
39 #include "AliCDBId.h"
40 #include "AliCDBMetaData.h"
41 #include "AliCDBEntry.h"
42
43 #include <numeric>
44 using std::accumulate;
45
46 namespace
47 {
48   // Helper class for std::accumulate algorithm.
49   class AliTimeSum {
50   public:
51     typedef int first_argument_type;
52     typedef AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem second_argument_type;
53     typedef bool result_type;
54     int operator() (int a, AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem b) {
55       return a+b.GetTotalTime();
56     }
57   };
58 } // end of namespace
59
60 /** ROOT macro for the implementation of ROOT specific class methods */
61 ClassImp(AliHLTRootSchemaEvolutionComponent)
62
63 AliHLTRootSchemaEvolutionComponent::AliHLTRootSchemaEvolutionComponent()
64   : AliHLTCalibrationProcessor()
65   , fList()
66   , fFlags(kFXS)
67   , fpStreamerInfos(NULL)
68   , fpEventTimer(NULL)
69   , fpCycleTimer(NULL)
70   , fMaxEventTime(500)
71   , fFXSPrescaler(0)
72   , fFileName()
73 {
74   // see header file for class documentation
75   // or
76   // refer to README to build package
77   // or
78   // visit http://web.ift.uib.no/~kjeks/doc/alice-hlt
79
80 }
81
82 // FIXME: read below when defining an OCDB object here
83 const char* AliHLTRootSchemaEvolutionComponent::fgkConfigurationObject=NULL;
84 const AliHLTUInt32_t AliHLTRootSchemaEvolutionComponent::fgkTimeScale=1000000;
85
86 AliHLTRootSchemaEvolutionComponent::~AliHLTRootSchemaEvolutionComponent()
87 {
88   // see header file for class documentation
89   if (fpStreamerInfos) {
90     fpStreamerInfos->Clear();
91     delete fpStreamerInfos;
92   }
93   fpStreamerInfos=NULL;
94 }
95
96 void AliHLTRootSchemaEvolutionComponent::GetInputDataTypes(AliHLTComponentDataTypeList& list)
97 {
98   // see header file for class documentation
99   list.push_back(kAliHLTAnyDataType);
100 }
101
102 AliHLTComponentDataType AliHLTRootSchemaEvolutionComponent::GetOutputDataType()
103 {
104   // see header file for class documentation
105   return kAliHLTDataTypeStreamerInfo;
106 }
107
108 void AliHLTRootSchemaEvolutionComponent::GetOutputDataSize(unsigned long& constBase, double& inputMultiplier)
109 {
110   // see header file for class documentation
111
112   // this is nothing more than an assumption, in fact it's very difficult to predict how
113   // much output the component produces
114   constBase=100*1024;
115   inputMultiplier=3;
116 }
117
118 int AliHLTRootSchemaEvolutionComponent::InitCalibration()
119 {
120   // see header file for class documentation
121
122   int iResult=0;
123
124   // default configuration from CDB
125   // FIXME: has to be called from AliHLTCalibrationProcessor::DoInit in order to set
126   // the default parameters from OCDB before the custom argument scan
127   // not valid at the moment because fgkConfigurationObject==NULL
128   if (iResult>=0 && fgkConfigurationObject!=NULL) iResult=ConfigureFromCDBTObjString(fgkConfigurationObject);
129
130   if (iResult>=0) {
131     fpStreamerInfos=new TObjArray();
132     if (!fpStreamerInfos) iResult=-ENOMEM;
133
134     fpEventTimer=new TStopwatch;
135     if (fpEventTimer) {
136       fpEventTimer->Reset();
137     }
138     fpCycleTimer=new TStopwatch;
139     if (fpCycleTimer) {
140       fpCycleTimer->Reset();
141     }
142   }
143
144   return 0;
145 }
146
147 int AliHLTRootSchemaEvolutionComponent::DeinitCalibration()
148 {
149   // see header file for class documentation
150   if (fFileName.IsNull()==0) {
151     WriteToFile(fFileName, fpStreamerInfos);
152     fFileName.Clear();
153   }
154
155   if (fpStreamerInfos) {
156     fpStreamerInfos->Clear();
157     delete fpStreamerInfos;
158   }
159   fpStreamerInfos=NULL;
160
161   if (fpEventTimer) {
162     delete fpEventTimer;
163     fpEventTimer=NULL;
164   }
165   if (fpCycleTimer) {
166     delete fpCycleTimer;
167     fpCycleTimer=NULL;
168   }
169   return 0;
170 }
171
172 int AliHLTRootSchemaEvolutionComponent::ProcessCalibration( const AliHLTComponentEventData& /*evtData*/,
173                                                             AliHLTComponentTriggerData& /*trigData*/ )
174 {
175   // see header file for class documentation
176   int iResult=0;
177   AliHLTUInt32_t eventType=gkAliEventTypeUnknown;
178   if (!IsDataEvent(&eventType) && 
179       eventType==gkAliEventTypeStartOfRun) {
180     return 0;
181   }
182
183   AliHLTUInt32_t listtime=accumulate(fList.begin(), fList.end(), int(0), AliTimeSum());
184   AliHLTUInt32_t averageEventTime=0;
185   AliHLTUInt32_t averageCycleTime=0;
186
187   AliHLTUInt32_t proctime=0;
188   if (fpEventTimer) {
189     averageEventTime=(fpEventTimer->RealTime()*fgkTimeScale)/(GetEventCount()+1);
190     proctime=fpEventTimer->RealTime()*fgkTimeScale;
191     fpEventTimer->Start(kFALSE);
192   }
193   if (fpCycleTimer) {
194     fpCycleTimer->Stop();
195     averageCycleTime=(fpCycleTimer->RealTime()*fgkTimeScale)/(GetEventCount()+1);
196   }
197
198   // scale down the event processing according to the required rate
199   // and average processing time.
200   for (const AliHLTComponentBlockData* pBlock=GetFirstInputBlock();
201        pBlock && iResult>=0;
202        pBlock=GetNextInputBlock()) {
203     bool processBlock=true;
204     AliHLTDataBlockItem* item=FindItem(pBlock->fDataType, pBlock->fSpecification);
205     if (item) {
206       // TODO: do a selection of blocks on basis of the time spent in its processing
207       // for now only the global processing time is checked
208       // process if the average event time is smaller then the cycle time, i.e.
209       // the time is spent outside the component
210       // apply a factor 4 margin
211       processBlock=4*averageEventTime<fMaxEventTime || 2*averageEventTime<averageCycleTime;
212     } else {
213       // always process new incoming blocks
214       processBlock=true;
215       fList.push_back(AliHLTDataBlockItem(pBlock->fDataType, pBlock->fSpecification));
216       item=&fList[fList.size()-1];
217     }
218     if (processBlock) {
219       TObject* pObj=item->Extract(pBlock);
220       if (pObj) {
221         AliHLTMessage msg(kMESS_OBJECT);
222         msg.EnableSchemaEvolution();
223         if ((iResult=item->Stream(pObj, msg))>=0) {
224           iResult=UpdateStreamerInfos(msg.GetStreamerInfos(), fpStreamerInfos);
225         } else {
226           HLTError("failed to stream object %s of type %s", pObj->GetName(), pObj->ClassName());
227         }
228         delete pObj;
229         pObj=NULL;
230       }
231     }
232   }
233
234   if (fpEventTimer) {
235     fpEventTimer->Stop();
236     proctime=fpEventTimer->RealTime()*fgkTimeScale-proctime;
237     averageEventTime=(fpEventTimer->RealTime()*fgkTimeScale)/(GetEventCount()+1);
238
239     // info output once every 2 seconds
240     static UInt_t lastTime=0;
241     TDatime time;
242     if (time.Get()-lastTime>2) {
243       lastTime=time.Get();
244       HLTInfo("event time %d, average time %d, list time %d, cycle time %d", proctime, averageEventTime, listtime, averageCycleTime);
245     }
246   }
247   if (fpCycleTimer) {
248     fpCycleTimer->Start(kFALSE);
249   }
250
251   if (iResult>=0) {
252     if ((TestBits(kHLTOUTatFirstEvent) && GetEventCount()==0) ||
253         (TestBits(kHLTOUTatAllEvents))) {
254       PushBack(fpStreamerInfos, kAliHLTDataTypeStreamerInfo);
255     }
256   }
257
258   if (TestBits(kFXS) && fFXSPrescaler>0 && (GetEventCount()%fFXSPrescaler)==0) {
259     // push to FXS
260     AliHLTReadoutList rdList(AliHLTReadoutList::kHLT);
261     PushToFXS((TObject*)fpStreamerInfos, "HLT", "StreamerInfo", &rdList );
262   }
263
264   return iResult;
265 }
266
267 int AliHLTRootSchemaEvolutionComponent::ShipDataToFXS( const AliHLTComponentEventData& /*evtData*/,
268                                                        AliHLTComponentTriggerData& /*trigData*/)
269 {
270   // see header file for class documentation
271   if (TestBits(kFXS)) {
272     // push to FXS
273     AliHLTReadoutList rdList(AliHLTReadoutList::kHLT);
274     PushToFXS((TObject*)fpStreamerInfos, "HLT", "StreamerInfo", &rdList );
275   }
276
277   if (fFileName.IsNull()==0) {
278     WriteToFile(fFileName, fpStreamerInfos);
279     fFileName.Clear();
280   }
281
282     if (TestBits(kHLTOUTatEOR)) {
283       PushBack(fpStreamerInfos, kAliHLTDataTypeStreamerInfo);
284     }
285
286     for (unsigned i=0; i<fList.size(); i++) {
287       if (CheckFilter(kHLTLogDebug)) fList[i].Print("short");
288       else if (fList[i].IsObject()) {
289         HLTInfo("AliHLTDataBlockItem %s %08x\n"
290                 "   average extraction time: %d usec\n"
291                 "   average streaming time: %d usec"
292                 , AliHLTComponent::DataType2Text(fList[i]).c_str()
293                 , fList[i].GetSpecification()
294                 , fList[i].GetExtractionTime()
295                 , fList[i].GetStreamingTime());
296       }
297     }
298
299   return 0;
300 }
301
302 int AliHLTRootSchemaEvolutionComponent::UpdateStreamerInfos(const TList* list, TObjArray* infos) const
303 {
304   // see header file for class documentation
305   int iResult=0;
306   if (!list || !infos) {
307     return -EINVAL;
308   }
309
310   TObject* element=NULL;
311   TIter next((TList*)list);
312   while ((element = next())) {
313     TStreamerInfo* pInfo=dynamic_cast<TStreamerInfo*>(element);
314     if (!pInfo) continue;
315     TString name=pInfo->GetName();
316     int i=0;
317     if (pInfo->GetClassVersion()==0) continue; // skip classes which are not for storage
318     for (; i<infos->GetEntriesFast(); i++) {
319       if (name.CompareTo(infos->At(i)->GetName())==0 &&
320           pInfo->GetClassVersion() == ((TStreamerInfo*)infos->At(i))->GetClassVersion()) {
321         // have it already
322         break;
323       }
324     }
325
326     // Add streamer info if not yet there
327     if (i>=infos->GetEntriesFast()) {
328       infos->Add(pInfo);
329     }
330   }
331
332   return iResult;
333 }
334
335 int AliHLTRootSchemaEvolutionComponent::ScanConfigurationArgument(int argc, const char** argv)
336 {
337   // see header file for class documentation
338   int iResult=0;
339   if (argc<=0) return 0;
340   int i=0;
341   TString argument=argv[i];
342
343   // -hltout=[all,first,eor,off]
344   if (argument.Contains("-hltout")) {
345     argument.ReplaceAll("-hltout", "");
346     argument.ReplaceAll("=", "");
347     if (argument.IsNull() || argument.CompareTo("all")==0) {
348       SetBits(kHLTOUTatAllEvents|kHLTOUTatEOR);
349     } else if (argument.CompareTo("first")==0) {
350       SetBits(kHLTOUTatFirstEvent);
351     } else if (argument.CompareTo("eor")==0) {
352       SetBits(kHLTOUTatEOR);
353     } else if (argument.CompareTo("off")==0) {
354       ClearBits(kHLTOUTatAllEvents | kHLTOUTatFirstEvent | kHLTOUTatEOR);
355     } else {
356       HLTError("invalid parameter for argument -hltout= : %s", argument.Data());
357       return -EINVAL;
358     }
359     return 1;
360   }
361
362   // -fxs=[n,off]
363   if (argument.Contains("-fxs")) {
364     argument.ReplaceAll("-fxs", "");
365     argument.ReplaceAll("=", "");
366     SetBits(kFXS);
367     if (argument.IsNull()) {
368     } else if (argument.CompareTo("off")==0) {
369       ClearBits(kFXS);
370     } else if (argument.IsDigit()) {
371       fFXSPrescaler=argument.Atoi();
372     } else {
373       HLTError("invalid parameter for argument -fxs= : %s", argument.Data());
374       return -EINVAL;
375     }
376     return 1;
377   }
378
379   // -file=<filename>
380   if (argument.Contains("-file=")) {
381     argument.ReplaceAll("-file=", "");
382     if (!argument.IsNull()) {
383       fFileName=argument;
384     } else {
385       HLTError("argument -file= expects file name");
386       return -EINVAL;
387     }
388     return 1;
389   }
390
391   if (argument.Contains("-rate=")) {
392     argument.ReplaceAll("-rate=", "");
393     AliHLTUInt32_t rate=argument.Atoi();
394     if (rate>0 && rate<fgkTimeScale) {
395       fMaxEventTime=fgkTimeScale/rate;
396     } else {
397       HLTError("argument -file= expects number [Hz]");
398       return -EINVAL;
399     }
400     return 1;
401   }
402
403   return iResult;
404 }
405
406 int AliHLTRootSchemaEvolutionComponent::WriteToFile(const char* filename, const TObjArray* infos) const
407 {
408   // write aray of streamer infos to file
409   if (!filename || !infos) return -EINVAL;
410
411   TFile out(filename, "RECREATE");
412   if (out.IsZombie()) {
413     HLTError("failed to open file %s", filename);
414     return -EBADF;
415   }
416
417   const char* entrypath="HLT/Calib/StreamerInfo";
418   int version = -1;
419   AliCDBStorage* store = NULL;
420   // TODO: to be activated later, first some methods need to be made
421   // public in AliCDBManager. Or some new methods to be added 
422   //if (AliCDBManager::Instance()->SelectSpecificStorage(entrypath))
423   //  store = AliCDBManager::Instance()->GetSpecificStorage(entrypath);
424   if (!store) 
425     store = AliCDBManager::Instance()->GetDefaultStorage();
426   AliCDBEntry* existingEntry=NULL;
427   if (store && store->GetLatestVersion(entrypath, GetRunNo())>=0 &&
428       (existingEntry=AliCDBManager::Instance()->Get(entrypath))!=NULL) {
429     version=existingEntry->GetId().GetVersion();
430   }
431   version++;
432
433   TObjArray* clone=NULL;
434
435   if (existingEntry && existingEntry->GetObject()) {
436     TObject* cloneObj=existingEntry->GetObject()->Clone();
437     if (cloneObj) clone=dynamic_cast<TObjArray*>(cloneObj);
438     if (MergeStreamerInfo(clone, infos)==0) {
439       // no change, store with identical version
440       version=existingEntry->GetId().GetVersion();
441     }
442   } else {
443     TObject* cloneObj=infos->Clone();
444     if (cloneObj) clone=dynamic_cast<TObjArray*>(cloneObj);
445   }
446   if (!clone) {
447     HLTError("failed to clone streamer info object array");
448     return -ENOMEM;
449   }
450
451   AliCDBPath cdbPath(entrypath);
452   AliCDBId cdbId(cdbPath, AliCDBManager::Instance()->GetRun(), AliCDBRunRange::Infinity(), version, 0);
453   AliCDBMetaData* cdbMetaData=new AliCDBMetaData;
454   cdbMetaData->SetResponsible("ALICE HLT Matthias.Richter@cern.ch");
455   cdbMetaData->SetComment("Streamer info for HLTOUT payload");
456   AliCDBEntry* entry=new AliCDBEntry(clone, cdbId, cdbMetaData, kTRUE);
457
458   out.cd();
459   entry->Write();
460   // this is a small memory leak
461   // seg fault in ROOT object handling if the two objects are deleted
462   // investigate later
463   //delete entry;
464   //delete cdbMetaData;
465   out.Close();
466
467   return 0;
468 }
469
470 int AliHLTRootSchemaEvolutionComponent::MergeStreamerInfo(TObjArray* tgt, const TObjArray* src)
471 {
472   /// merge streamer info entries from source array to target array
473   /// return 1 if target array has been changed
474
475   // add all existing infos if not existing in the current one, or having
476   // different class version
477   int iResult=0;
478   if (!tgt || !src) return -EINVAL;
479
480   {
481     // check if all infos from the existing entry are in the new entry and with
482     // identical class version
483     TIter next(src);
484     TObject* nextobj=NULL;
485     while ((nextobj=next())) {
486       TStreamerInfo* srcInfo=dynamic_cast<TStreamerInfo*>(nextobj);
487       if (!srcInfo) continue;
488       TString srcInfoName=srcInfo->GetName();
489
490       int i=0;
491       for (; i<tgt->GetEntriesFast(); i++) {
492         if (tgt->At(i)==NULL) continue;
493         if (srcInfoName.CompareTo(tgt->At(i)->GetName())!=0) continue;
494         // TODO: 2010-08-23 some more detailed investigation is needed.
495         // Structures used for data exchange, e.g. AliHLTComponentDataType
496         // or AliHLTEventDDLV1 do not have a class version, but need to be stored in the
497         // streamer info. Strictly speaking not, because those structures are not supposed
498         // to be changed at all, so they should be the same in all versions in the future.
499         // There has been a problem with detecting whether the streamer info is already in
500         // the target array if the srcInfo has class version -1. As it just concerns
501         // structures not going to be changed we can safely skip checking the class version,
502         // as long as the entry is already in the target streamer infos it does not need
503         // to be copied again.
504         if (srcInfo->GetClassVersion()<0) break;
505         TStreamerInfo* tgtInfo=dynamic_cast<TStreamerInfo*>(tgt->At(i));
506         if (tgtInfo && tgtInfo->GetClassVersion()==srcInfo->GetClassVersion()) break;
507       }
508       if (i<tgt->GetEntriesFast()) continue;
509
510       iResult=1;
511       tgt->Add(srcInfo);
512     }
513   }
514
515   return iResult;
516 }
517
518 AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem*
519 AliHLTRootSchemaEvolutionComponent::FindItem(AliHLTComponentDataType dt,
520                                              AliHLTUInt32_t spec)
521 {
522   /// find item in the list
523   // vector<AliHLTDataBlockItem>::iterator element=std::find(fList.begin(), fList.end(), AliHLTDataBlockItem(dt,spec));
524   // if (element!=fList.end()) return &(*element);
525   for (unsigned i=0; i<fList.size(); i++) {
526     if (fList[i]==dt && fList[i]==spec) return &fList[i];
527   }
528   return NULL;
529 }
530
531 AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem::AliHLTDataBlockItem(AliHLTComponentDataType dt,
532                                                                              AliHLTUInt32_t spec)
533   : fDt(dt)
534   , fSpecification(spec)
535   , fIsObject(false)
536   , fNofExtractions(0)
537   , fExtractionTimeUsec(0)
538   , fLastExtraction(0)
539   , fNofStreamings(0)
540   , fStreamingTimeUsec(0)
541   , fLastStreaming(0)
542 {
543   // helper class to keep track of input data blocks
544   // in the AliHLTRootSchemaEvolutionComponent
545   //
546 }
547
548 AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem::~AliHLTDataBlockItem()
549 {
550   // destructor
551 }
552
553 TObject* AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem::Extract(const AliHLTComponentBlockData* bd)
554 {
555   /// extract data block to root object, and update performance parameters
556   /// object needs to be deleted externally
557   if (!bd || !bd->fPtr || bd->fSize<8) return NULL;
558
559   AliHLTUInt32_t firstWord=*((AliHLTUInt32_t*)bd->fPtr);
560   if (!(fIsObject=(firstWord==bd->fSize-sizeof(AliHLTUInt32_t)))) return NULL;
561
562   TStopwatch sw;
563   sw.Start();
564   AliHLTMessage msg(bd->fPtr, bd->fSize);
565   TClass* objclass=msg.GetClass();
566   if (!(fIsObject=(objclass!=NULL))) return NULL;
567   TObject* pObj=msg.ReadObject(objclass);
568   if (!(fIsObject=(pObj!=NULL))) return NULL;
569   sw.Stop();
570   AliHLTUInt32_t usec=sw.RealTime()*fgkTimeScale;
571   fNofExtractions++;
572   fExtractionTimeUsec+=usec;
573   TTimeStamp ts;
574   fLastExtraction=(ts.GetSec()%1000)*fgkTimeScale + ts.GetNanoSec()/1000;
575   return pObj;
576 }
577
578 int AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem::Stream(TObject* obj, AliHLTMessage& msg)
579 {
580   /// stream object and update performance parameters
581   if (!obj) return -EINVAL;
582   TStopwatch sw;
583   sw.Start();
584   msg.WriteObject(obj);
585
586   AliHLTUInt32_t usec=sw.RealTime()*fgkTimeScale;
587   fNofStreamings++;
588   fStreamingTimeUsec+=usec;
589   TTimeStamp ts;
590   fLastStreaming=(ts.GetSec()%1000)*fgkTimeScale + ts.GetNanoSec()/1000;
591   return 0;
592 }
593
594 void AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem::Print(const char* option) const
595 {
596   /// print status
597   if (fIsObject || !(strcmp(option, "short")==0))
598     cout << "AliHLTDataBlockItem: " << AliHLTComponent::DataType2Text(fDt).c_str() << " " << hex << fSpecification << dec << endl;
599   if (fIsObject) {
600     if (fNofExtractions>0) cout << "   average extraction time: " << fExtractionTimeUsec/fNofExtractions << " usec" << endl;
601     else cout << "   never extracted" << endl;
602     if (fNofStreamings>0) cout << "   average streaming time: " << fStreamingTimeUsec/fNofStreamings << " usec" << endl;
603     else cout << "   never streamed" << endl;
604   }
605 }