minor code cleanup and coding rules
[u/mrichter/AliRoot.git] / HLT / BASE / util / AliHLTRootSchemaEvolutionComponent.cxx
1 // $Id$
2
3 //**************************************************************************
4 //* This file is property of and copyright by the ALICE                    * 
5 //* ALICE Experiment at CERN, All rights reserved.                         *
6 //*                                                                        *
7 //* Primary Authors: Matthias Richter <Matthias.Richter@ift.uib.no>        *
8 //*                                                                        *
9 //* Permission to use, copy, modify and distribute this software and its   *
10 //* documentation strictly for non-commercial purposes is hereby granted   *
11 //* without fee, provided that the above copyright notice appears in all   *
12 //* copies and that both the copyright notice and this permission notice   *
13 //* appear in the supporting documentation. The authors make no claims     *
14 //* about the suitability of this software for any purpose. It is          *
15 //* provided "as is" without express or implied warranty.                  *
16 //**************************************************************************
17
18 /// @file   AliHLTRootSchemaEvolutionComponent.cxx
19 /// @author Matthias Richter
20 /// @date   2009-10-18
21 /// @brief  Handler component for ROOT schema evolution of streamed objects
22 ///
23
24 #include "AliHLTRootSchemaEvolutionComponent.h"
25 #include "AliHLTMessage.h"
26 #include "AliHLTReadoutList.h"
27 #include "TObjArray.h"
28 #include "TStreamerInfo.h"
29 #include "TList.h"
30 #include "TFile.h"
31 #include "TStopwatch.h"
32 #include "TTimeStamp.h"
33 #include "TDatime.h"
34
35 #include "AliCDBStorage.h"
36 #include "AliCDBManager.h"
37 #include "AliCDBPath.h"
38 #include "AliCDBId.h"
39 #include "AliCDBMetaData.h"
40 #include "AliCDBEntry.h"
41
42 #include <numeric>
43 using std::accumulate;
44
45 namespace
46 {
47   // Helper class for std::accumulate algorithm.
48   class AliTimeSum {
49   public:
50     typedef int first_argument_type;
51     typedef AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem second_argument_type;
52     typedef bool result_type;
53     int operator() (int a, AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem b) {
54       return a+b.GetTotalTime();
55     }
56   };
57 } // end of namespace
58
59 /** ROOT macro for the implementation of ROOT specific class methods */
60 ClassImp(AliHLTRootSchemaEvolutionComponent)
61
62 AliHLTRootSchemaEvolutionComponent::AliHLTRootSchemaEvolutionComponent()
63   : AliHLTCalibrationProcessor()
64   , fList()
65   , fPropertyFlags(kFXS)
66   , fpStreamerInfos(NULL)
67   , fpEventTimer(NULL)
68   , fpCycleTimer(NULL)
69   , fMaxEventTime(500)
70   , fFXSPrescaler(0)
71   , fFileName()
72 {
73   // see header file for class documentation
74   // or
75   // refer to README to build package
76   // or
77   // visit http://web.ift.uib.no/~kjeks/doc/alice-hlt
78
79 }
80
81 // FIXME: read below when defining an OCDB object here
82 const char* AliHLTRootSchemaEvolutionComponent::fgkConfigurationObject=NULL;
83 const AliHLTUInt32_t AliHLTRootSchemaEvolutionComponent::fgkTimeScale=1000000;
84
85 AliHLTRootSchemaEvolutionComponent::~AliHLTRootSchemaEvolutionComponent()
86 {
87   // see header file for class documentation
88   if (fpStreamerInfos) {
89     fpStreamerInfos->Clear();
90     delete fpStreamerInfos;
91   }
92   fpStreamerInfos=NULL;
93 }
94
95 void AliHLTRootSchemaEvolutionComponent::GetInputDataTypes(AliHLTComponentDataTypeList& list)
96 {
97   // see header file for class documentation
98   list.push_back(kAliHLTAnyDataType);
99 }
100
101 AliHLTComponentDataType AliHLTRootSchemaEvolutionComponent::GetOutputDataType()
102 {
103   // see header file for class documentation
104   return kAliHLTDataTypeStreamerInfo;
105 }
106
107 void AliHLTRootSchemaEvolutionComponent::GetOutputDataSize(unsigned long& constBase, double& inputMultiplier)
108 {
109   // see header file for class documentation
110
111   // this is nothing more than an assumption, in fact it's very difficult to predict how
112   // much output the component produces
113   constBase=100*1024;
114   inputMultiplier=3;
115 }
116
117 int AliHLTRootSchemaEvolutionComponent::InitCalibration()
118 {
119   // see header file for class documentation
120
121   int iResult=0;
122
123   // default configuration from CDB
124   // FIXME: has to be called from AliHLTCalibrationProcessor::DoInit in order to set
125   // the default parameters from OCDB before the custom argument scan
126   // not valid at the moment because fgkConfigurationObject==NULL
127   if (iResult>=0 && fgkConfigurationObject!=NULL) iResult=ConfigureFromCDBTObjString(fgkConfigurationObject);
128
129   if (iResult>=0) {
130     fpStreamerInfos=new TObjArray();
131     if (!fpStreamerInfos) iResult=-ENOMEM;
132
133     fpEventTimer=new TStopwatch;
134     if (fpEventTimer) {
135       fpEventTimer->Reset();
136     }
137     fpCycleTimer=new TStopwatch;
138     if (fpCycleTimer) {
139       fpCycleTimer->Reset();
140     }
141   }
142
143   return 0;
144 }
145
146 int AliHLTRootSchemaEvolutionComponent::DeinitCalibration()
147 {
148   // see header file for class documentation
149   if (fFileName.IsNull()==0) {
150     WriteToFile(fFileName, fpStreamerInfos);
151     fFileName.Clear();
152   }
153
154   if (fpStreamerInfos) {
155     fpStreamerInfos->Clear();
156     delete fpStreamerInfos;
157   }
158   fpStreamerInfos=NULL;
159
160   if (fpEventTimer) {
161     delete fpEventTimer;
162     fpEventTimer=NULL;
163   }
164   if (fpCycleTimer) {
165     delete fpCycleTimer;
166     fpCycleTimer=NULL;
167   }
168   return 0;
169 }
170
171 int AliHLTRootSchemaEvolutionComponent::ProcessCalibration( const AliHLTComponentEventData& /*evtData*/,
172                                                             AliHLTComponentTriggerData& /*trigData*/ )
173 {
174   // see header file for class documentation
175   int iResult=0;
176   AliHLTUInt32_t eventType=gkAliEventTypeUnknown;
177   if (!IsDataEvent(&eventType) && 
178       eventType==gkAliEventTypeStartOfRun) {
179     return 0;
180   }
181
182   AliHLTUInt32_t listtime=accumulate(fList.begin(), fList.end(), int(0), AliTimeSum());
183   AliHLTUInt32_t averageEventTime=0;
184   AliHLTUInt32_t averageCycleTime=0;
185
186   AliHLTUInt32_t proctime=0;
187   if (fpEventTimer) {
188     averageEventTime=AliHLTUInt32_t(fpEventTimer->RealTime()*fgkTimeScale)/(GetEventCount()+1);
189     proctime=AliHLTUInt32_t(fpEventTimer->RealTime()*fgkTimeScale);
190     fpEventTimer->Start(kFALSE);
191   }
192   if (fpCycleTimer) {
193     fpCycleTimer->Stop();
194     averageCycleTime=AliHLTUInt32_t(fpCycleTimer->RealTime()*fgkTimeScale)/(GetEventCount()+1);
195   }
196
197   // scale down the event processing according to the required rate
198   // and average processing time.
199   for (const AliHLTComponentBlockData* pBlock=GetFirstInputBlock();
200        pBlock && iResult>=0;
201        pBlock=GetNextInputBlock()) {
202     bool processBlock=true;
203     AliHLTDataBlockItem* item=FindItem(pBlock->fDataType, pBlock->fSpecification);
204     if (item) {
205       // TODO: do a selection of blocks on basis of the time spent in its processing
206       // for now only the global processing time is checked
207       // process if the average event time is smaller then the cycle time, i.e.
208       // the time is spent outside the component
209       // apply a factor 4 margin
210       processBlock=4*averageEventTime<fMaxEventTime || 2*averageEventTime<averageCycleTime;
211     } else {
212       // always process new incoming blocks
213       processBlock=true;
214       fList.push_back(AliHLTDataBlockItem(pBlock->fDataType, pBlock->fSpecification));
215       item=&fList[fList.size()-1];
216     }
217     if (processBlock) {
218       TObject* pObj=item->Extract(pBlock);
219       if (pObj) {
220         AliHLTMessage msg(kMESS_OBJECT);
221         msg.EnableSchemaEvolution();
222         if ((iResult=item->Stream(pObj, msg))>=0) {
223           iResult=UpdateStreamerInfos(msg.GetStreamerInfos(), fpStreamerInfos);
224         } else {
225           HLTError("failed to stream object %s of type %s", pObj->GetName(), pObj->ClassName());
226         }
227         delete pObj;
228         pObj=NULL;
229       }
230     }
231   }
232
233   if (fpEventTimer) {
234     fpEventTimer->Stop();
235     proctime=AliHLTUInt32_t(fpEventTimer->RealTime()*fgkTimeScale)-proctime;
236     averageEventTime=AliHLTUInt32_t(fpEventTimer->RealTime()*fgkTimeScale)/(GetEventCount()+1);
237
238     // info output once every 2 seconds
239     static UInt_t lastTime=0;
240     TDatime time;
241     if (time.Get()-lastTime>2) {
242       lastTime=time.Get();
243       HLTInfo("event time %d, average time %d, list time %d, cycle time %d", proctime, averageEventTime, listtime, averageCycleTime);
244     }
245   }
246   if (fpCycleTimer) {
247     fpCycleTimer->Start(kFALSE);
248   }
249
250   if (iResult>=0) {
251     if ((TestBits(kHLTOUTatFirstEvent) && GetEventCount()==0) ||
252         (TestBits(kHLTOUTatAllEvents))) {
253       PushBack(fpStreamerInfos, kAliHLTDataTypeStreamerInfo);
254     }
255   }
256
257   if (TestBits(kFXS) && fFXSPrescaler>0 && (GetEventCount()%fFXSPrescaler)==0) {
258     // push to FXS
259     AliHLTReadoutList rdList(AliHLTReadoutList::kHLT);
260     PushToFXS((TObject*)fpStreamerInfos, "HLT", "StreamerInfo", &rdList );
261   }
262
263   return iResult;
264 }
265
266 int AliHLTRootSchemaEvolutionComponent::ShipDataToFXS( const AliHLTComponentEventData& /*evtData*/,
267                                                        AliHLTComponentTriggerData& /*trigData*/)
268 {
269   // see header file for class documentation
270   if (TestBits(kFXS)) {
271     // push to FXS
272     AliHLTReadoutList rdList(AliHLTReadoutList::kHLT);
273     PushToFXS((TObject*)fpStreamerInfos, "HLT", "StreamerInfo", &rdList );
274   }
275
276   if (fFileName.IsNull()==0) {
277     WriteToFile(fFileName, fpStreamerInfos);
278     fFileName.Clear();
279   }
280
281     if (TestBits(kHLTOUTatEOR)) {
282       PushBack(fpStreamerInfos, kAliHLTDataTypeStreamerInfo);
283     }
284
285     for (unsigned i=0; i<fList.size(); i++) {
286       if (CheckFilter(kHLTLogDebug)) fList[i].Print("short");
287       else if (fList[i].IsObject()) {
288         HLTInfo("AliHLTDataBlockItem %s %08x\n"
289                 "   average extraction time: %d usec\n"
290                 "   average streaming time: %d usec"
291                 , AliHLTComponent::DataType2Text(fList[i]).c_str()
292                 , fList[i].GetSpecification()
293                 , fList[i].GetExtractionTime()
294                 , fList[i].GetStreamingTime());
295       }
296     }
297
298   return 0;
299 }
300
301 int AliHLTRootSchemaEvolutionComponent::UpdateStreamerInfos(const TList* list, TObjArray* infos) const
302 {
303   // see header file for class documentation
304   int iResult=0;
305   if (!list || !infos) {
306     return -EINVAL;
307   }
308
309   TObject* element=NULL;
310   TIter next((TList*)list);
311   while ((element = next())) {
312     TStreamerInfo* pInfo=dynamic_cast<TStreamerInfo*>(element);
313     if (!pInfo) continue;
314     TString name=pInfo->GetName();
315     int i=0;
316     if (pInfo->GetClassVersion()==0) continue; // skip classes which are not for storage
317     for (; i<infos->GetEntriesFast(); i++) {
318       if (name.CompareTo(infos->At(i)->GetName())==0 &&
319           pInfo->GetClassVersion() == ((TStreamerInfo*)infos->At(i))->GetClassVersion()) {
320         // have it already
321         break;
322       }
323     }
324
325     // Add streamer info if not yet there
326     if (i>=infos->GetEntriesFast()) {
327       infos->Add(pInfo);
328     }
329   }
330
331   return iResult;
332 }
333
334 int AliHLTRootSchemaEvolutionComponent::ScanConfigurationArgument(int argc, const char** argv)
335 {
336   // see header file for class documentation
337   int iResult=0;
338   if (argc<=0) return 0;
339   int i=0;
340   TString argument=argv[i];
341
342   // -hltout=[all,first,eor,off]
343   if (argument.Contains("-hltout")) {
344     argument.ReplaceAll("-hltout", "");
345     argument.ReplaceAll("=", "");
346     if (argument.IsNull() || argument.CompareTo("all")==0) {
347       SetBits(kHLTOUTatAllEvents|kHLTOUTatEOR);
348     } else if (argument.CompareTo("first")==0) {
349       SetBits(kHLTOUTatFirstEvent);
350     } else if (argument.CompareTo("eor")==0) {
351       SetBits(kHLTOUTatEOR);
352     } else if (argument.CompareTo("off")==0) {
353       ClearBits(kHLTOUTatAllEvents | kHLTOUTatFirstEvent | kHLTOUTatEOR);
354     } else {
355       HLTError("invalid parameter for argument -hltout= : %s", argument.Data());
356       return -EINVAL;
357     }
358     return 1;
359   }
360
361   // -fxs=[n,off]
362   if (argument.Contains("-fxs")) {
363     argument.ReplaceAll("-fxs", "");
364     argument.ReplaceAll("=", "");
365     SetBits(kFXS);
366     if (argument.IsNull()) {
367     } else if (argument.CompareTo("off")==0) {
368       ClearBits(kFXS);
369     } else if (argument.IsDigit()) {
370       fFXSPrescaler=argument.Atoi();
371     } else {
372       HLTError("invalid parameter for argument -fxs= : %s", argument.Data());
373       return -EINVAL;
374     }
375     return 1;
376   }
377
378   // -file=<filename>
379   if (argument.Contains("-file=")) {
380     argument.ReplaceAll("-file=", "");
381     if (!argument.IsNull()) {
382       fFileName=argument;
383     } else {
384       HLTError("argument -file= expects file name");
385       return -EINVAL;
386     }
387     return 1;
388   }
389
390   if (argument.Contains("-rate=")) {
391     argument.ReplaceAll("-rate=", "");
392     AliHLTUInt32_t rate=argument.Atoi();
393     if (rate>0 && rate<fgkTimeScale) {
394       fMaxEventTime=fgkTimeScale/rate;
395     } else {
396       HLTError("argument -file= expects number [Hz]");
397       return -EINVAL;
398     }
399     return 1;
400   }
401
402   return iResult;
403 }
404
405 int AliHLTRootSchemaEvolutionComponent::WriteToFile(const char* filename, const TObjArray* infos) const
406 {
407   // write aray of streamer infos to file
408   if (!filename || !infos) return -EINVAL;
409
410   TFile out(filename, "RECREATE");
411   if (out.IsZombie()) {
412     HLTError("failed to open file %s", filename);
413     return -EBADF;
414   }
415
416   const char* entrypath="HLT/Calib/StreamerInfo";
417   int version = -1;
418   AliCDBStorage* store = NULL;
419   // TODO: to be activated later, first some methods need to be made
420   // public in AliCDBManager. Or some new methods to be added 
421   //if (AliCDBManager::Instance()->SelectSpecificStorage(entrypath))
422   //  store = AliCDBManager::Instance()->GetSpecificStorage(entrypath);
423   if (!store) 
424     store = AliCDBManager::Instance()->GetDefaultStorage();
425   AliCDBEntry* existingEntry=NULL;
426   if (store && store->GetLatestVersion(entrypath, GetRunNo())>=0 &&
427       (existingEntry=AliCDBManager::Instance()->Get(entrypath))!=NULL) {
428     version=existingEntry->GetId().GetVersion();
429   }
430   version++;
431
432   TObjArray* clone=NULL;
433
434   if (existingEntry && existingEntry->GetObject()) {
435     TObject* cloneObj=existingEntry->GetObject()->Clone();
436     if (cloneObj) clone=dynamic_cast<TObjArray*>(cloneObj);
437     if (MergeStreamerInfo(clone, infos)==0) {
438       // no change, store with identical version
439       version=existingEntry->GetId().GetVersion();
440     }
441   } else {
442     TObject* cloneObj=infos->Clone();
443     if (cloneObj) clone=dynamic_cast<TObjArray*>(cloneObj);
444   }
445   if (!clone) {
446     HLTError("failed to clone streamer info object array");
447     return -ENOMEM;
448   }
449
450   AliCDBPath cdbPath(entrypath);
451   AliCDBId cdbId(cdbPath, AliCDBManager::Instance()->GetRun(), AliCDBRunRange::Infinity(), version, 0);
452   AliCDBMetaData* cdbMetaData=new AliCDBMetaData;
453   cdbMetaData->SetResponsible("ALICE HLT Matthias.Richter@cern.ch");
454   cdbMetaData->SetComment("Streamer info for HLTOUT payload");
455   AliCDBEntry* entry=new AliCDBEntry(clone, cdbId, cdbMetaData, kTRUE);
456
457   out.cd();
458   entry->Write();
459   // this is a small memory leak
460   // seg fault in ROOT object handling if the two objects are deleted
461   // investigate later
462   //delete entry;
463   //delete cdbMetaData;
464   out.Close();
465
466   return 0;
467 }
468
469 int AliHLTRootSchemaEvolutionComponent::MergeStreamerInfo(TObjArray* tgt, const TObjArray* src)
470 {
471   /// merge streamer info entries from source array to target array
472   /// return 1 if target array has been changed
473
474   // add all existing infos if not existing in the current one, or having
475   // different class version
476   int iResult=0;
477   if (!tgt || !src) return -EINVAL;
478
479   {
480     // check if all infos from the existing entry are in the new entry and with
481     // identical class version
482     TIter next(src);
483     TObject* nextobj=NULL;
484     while ((nextobj=next())) {
485       TStreamerInfo* srcInfo=dynamic_cast<TStreamerInfo*>(nextobj);
486       if (!srcInfo) continue;
487       TString srcInfoName=srcInfo->GetName();
488
489       int i=0;
490       for (; i<tgt->GetEntriesFast(); i++) {
491         if (tgt->At(i)==NULL) continue;
492         if (srcInfoName.CompareTo(tgt->At(i)->GetName())!=0) continue;
493         // TODO: 2010-08-23 some more detailed investigation is needed.
494         // Structures used for data exchange, e.g. AliHLTComponentDataType
495         // or AliHLTEventDDLV1 do not have a class version, but need to be stored in the
496         // streamer info. Strictly speaking not, because those structures are not supposed
497         // to be changed at all, so they should be the same in all versions in the future.
498         // There has been a problem with detecting whether the streamer info is already in
499         // the target array if the srcInfo has class version -1. As it just concerns
500         // structures not going to be changed we can safely skip checking the class version,
501         // as long as the entry is already in the target streamer infos it does not need
502         // to be copied again.
503         if (srcInfo->GetClassVersion()<0) break;
504         TStreamerInfo* tgtInfo=dynamic_cast<TStreamerInfo*>(tgt->At(i));
505         if (tgtInfo && tgtInfo->GetClassVersion()==srcInfo->GetClassVersion()) break;
506       }
507       if (i<tgt->GetEntriesFast()) continue;
508
509       iResult=1;
510       tgt->Add(srcInfo);
511     }
512   }
513
514   return iResult;
515 }
516
517 AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem*
518 AliHLTRootSchemaEvolutionComponent::FindItem(AliHLTComponentDataType dt,
519                                              AliHLTUInt32_t spec)
520 {
521   /// find item in the list
522   // vector<AliHLTDataBlockItem>::iterator element=std::find(fList.begin(), fList.end(), AliHLTDataBlockItem(dt,spec));
523   // if (element!=fList.end()) return &(*element);
524   for (unsigned i=0; i<fList.size(); i++) {
525     if (fList[i]==dt && fList[i]==spec) return &fList[i];
526   }
527   return NULL;
528 }
529
530 AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem::AliHLTDataBlockItem(AliHLTComponentDataType dt,
531                                                                              AliHLTUInt32_t spec)
532   : fDt(dt)
533   , fSpecification(spec)
534   , fIsObject(false)
535   , fNofExtractions(0)
536   , fExtractionTimeUsec(0)
537   , fLastExtraction(0)
538   , fNofStreamings(0)
539   , fStreamingTimeUsec(0)
540   , fLastStreaming(0)
541 {
542   // helper class to keep track of input data blocks
543   // in the AliHLTRootSchemaEvolutionComponent
544   //
545 }
546
547 AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem::~AliHLTDataBlockItem()
548 {
549   // destructor
550 }
551
552 TObject* AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem::Extract(const AliHLTComponentBlockData* bd)
553 {
554   /// extract data block to root object, and update performance parameters
555   /// object needs to be deleted externally
556   if (!bd || !bd->fPtr || bd->fSize<8) return NULL;
557
558   AliHLTUInt32_t firstWord=*((AliHLTUInt32_t*)bd->fPtr);
559   if (!(fIsObject=(firstWord==bd->fSize-sizeof(AliHLTUInt32_t)))) return NULL;
560
561   TStopwatch sw;
562   sw.Start();
563   AliHLTMessage msg(bd->fPtr, bd->fSize);
564   TClass* objclass=msg.GetClass();
565   if (!(fIsObject=(objclass!=NULL))) return NULL;
566   TObject* pObj=msg.ReadObject(objclass);
567   if (!(fIsObject=(pObj!=NULL))) return NULL;
568   sw.Stop();
569   AliHLTUInt32_t usec=AliHLTUInt32_t(sw.RealTime()*fgkTimeScale);
570   fNofExtractions++;
571   fExtractionTimeUsec+=usec;
572   TTimeStamp ts;
573   fLastExtraction=(ts.GetSec()%1000)*fgkTimeScale + ts.GetNanoSec()/1000;
574   return pObj;
575 }
576
577 int AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem::Stream(const TObject* obj, AliHLTMessage& msg)
578 {
579   /// stream object and update performance parameters
580   if (!obj) return -EINVAL;
581   TStopwatch sw;
582   sw.Start();
583   msg.WriteObject(obj);
584
585   AliHLTUInt32_t usec=AliHLTUInt32_t(sw.RealTime()*fgkTimeScale);
586   fNofStreamings++;
587   fStreamingTimeUsec+=usec;
588   TTimeStamp ts;
589   fLastStreaming=(ts.GetSec()%1000)*fgkTimeScale + ts.GetNanoSec()/1000;
590   return 0;
591 }
592
593 void AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem::Print(const char* option) const
594 {
595   /// print status
596   if (fIsObject || !(strcmp(option, "short")==0))
597     cout << "AliHLTDataBlockItem: " << AliHLTComponent::DataType2Text(fDt).c_str() << " " << hex << fSpecification << dec << endl;
598   if (fIsObject) {
599     if (fNofExtractions>0) cout << "   average extraction time: " << fExtractionTimeUsec/fNofExtractions << " usec" << endl;
600     else cout << "   never extracted" << endl;
601     if (fNofStreamings>0) cout << "   average streaming time: " << fStreamingTimeUsec/fNofStreamings << " usec" << endl;
602     else cout << "   never streamed" << endl;
603   }
604 }