]> git.uio.no Git - u/mrichter/AliRoot.git/blob - HLT/BASE/util/AliHLTRootSchemaEvolutionComponent.cxx
attaching file writer to the StreamerInfo component
[u/mrichter/AliRoot.git] / HLT / BASE / util / AliHLTRootSchemaEvolutionComponent.cxx
1 // $Id$
2
3 //**************************************************************************
4 //* This file is property of and copyright by the ALICE HLT Project        * 
5 //* ALICE Experiment at CERN, All rights reserved.                         *
6 //*                                                                        *
7 //* Primary Authors: Matthias Richter <Matthias.Richter@ift.uib.no>        *
8 //*                  for The ALICE HLT Project.                            *
9 //*                                                                        *
10 //* Permission to use, copy, modify and distribute this software and its   *
11 //* documentation strictly for non-commercial purposes is hereby granted   *
12 //* without fee, provided that the above copyright notice appears in all   *
13 //* copies and that both the copyright notice and this permission notice   *
14 //* appear in the supporting documentation. The authors make no claims     *
15 //* about the suitability of this software for any purpose. It is          *
16 //* provided "as is" without express or implied warranty.                  *
17 //**************************************************************************
18
19 /** @file   AliHLTRootSchemaEvolutionComponent.cxx
20     @author Matthias Richter
21     @date   2009-10-18
22     @brief  Handler component for ROOT schema evolution of streamed objects
23 */
24
25 #include "AliHLTRootSchemaEvolutionComponent.h"
26 #include "AliHLTMessage.h"
27 #include "AliHLTReadoutList.h"
28 #include "TObjArray.h"
29 #include "TStreamerInfo.h"
30 #include "TList.h"
31 #include "TFile.h"
32 #include "TStopwatch.h"
33 #include "TTimeStamp.h"
34 #include "TDatime.h"
35
36 #include "AliCDBStorage.h"
37 #include "AliCDBManager.h"
38 #include "AliCDBPath.h"
39 #include "AliCDBId.h"
40 #include "AliCDBMetaData.h"
41 #include "AliCDBEntry.h"
42
43 #include <numeric>
44 using std::accumulate;
45
46 /** ROOT macro for the implementation of ROOT specific class methods */
47 ClassImp(AliHLTRootSchemaEvolutionComponent)
48
49 AliHLTRootSchemaEvolutionComponent::AliHLTRootSchemaEvolutionComponent()
50   : AliHLTCalibrationProcessor()
51   , fList()
52   , fFlags(kFXS)
53   , fpStreamerInfos(NULL)
54   , fpEventTimer(NULL)
55   , fpCycleTimer(NULL)
56   , fMaxEventTime(500)
57   , fFXSPrescaler(0)
58   , fFileName()
59 {
60   // see header file for class documentation
61   // or
62   // refer to README to build package
63   // or
64   // visit http://web.ift.uib.no/~kjeks/doc/alice-hlt
65
66 }
67
68 // FIXME: read below when defining an OCDB object here
69 const char* AliHLTRootSchemaEvolutionComponent::fgkConfigurationObject=NULL;
70 const AliHLTUInt32_t AliHLTRootSchemaEvolutionComponent::fgkTimeScale=1000000;
71
72 AliHLTRootSchemaEvolutionComponent::~AliHLTRootSchemaEvolutionComponent()
73 {
74   // see header file for class documentation
75   if (fpStreamerInfos) {
76     fpStreamerInfos->Clear();
77     delete fpStreamerInfos;
78   }
79   fpStreamerInfos=NULL;
80 }
81
82 void AliHLTRootSchemaEvolutionComponent::GetInputDataTypes(AliHLTComponentDataTypeList& list)
83 {
84   // see header file for class documentation
85   list.push_back(kAliHLTAnyDataType);
86 }
87
88 AliHLTComponentDataType AliHLTRootSchemaEvolutionComponent::GetOutputDataType()
89 {
90   // see header file for class documentation
91   return kAliHLTDataTypeStreamerInfo;
92 }
93
94 void AliHLTRootSchemaEvolutionComponent::GetOutputDataSize(unsigned long& constBase, double& inputMultiplier)
95 {
96   // see header file for class documentation
97
98   // this is nothing more than an assumption, in fact it's very difficult to predict how
99   // much output the component produces
100   constBase=100*1024;
101   inputMultiplier=3;
102 }
103
104 int AliHLTRootSchemaEvolutionComponent::InitCalibration()
105 {
106   // see header file for class documentation
107
108   int iResult=0;
109
110   // default configuration from CDB
111   // FIXME: has to be called from AliHLTCalibrationProcessor::DoInit in order to set
112   // the default parameters from OCDB before the custom argument scan
113   // not valid at the moment because fgkConfigurationObject==NULL
114   if (iResult>=0 && fgkConfigurationObject!=NULL) iResult=ConfigureFromCDBTObjString(fgkConfigurationObject);
115
116   if (iResult>=0) {
117     fpStreamerInfos=new TObjArray();
118     if (!fpStreamerInfos) iResult=-ENOMEM;
119
120     fpEventTimer=new TStopwatch;
121     if (fpEventTimer) {
122       fpEventTimer->Reset();
123     }
124     fpCycleTimer=new TStopwatch;
125     if (fpCycleTimer) {
126       fpCycleTimer->Reset();
127     }
128   }
129
130   return 0;
131 }
132
133 int AliHLTRootSchemaEvolutionComponent::DeinitCalibration()
134 {
135   // see header file for class documentation
136   if (fFileName.IsNull()==0) {
137     WriteToFile(fFileName, fpStreamerInfos);
138     fFileName.Clear();
139   }
140
141   if (fpStreamerInfos) {
142     fpStreamerInfos->Clear();
143     delete fpStreamerInfos;
144   }
145   fpStreamerInfos=NULL;
146
147   if (fpEventTimer) {
148     delete fpEventTimer;
149     fpEventTimer=NULL;
150   }
151   if (fpCycleTimer) {
152     delete fpCycleTimer;
153     fpCycleTimer=NULL;
154   }
155   return 0;
156 }
157
158 int AliHLTRootSchemaEvolutionComponent::ProcessCalibration( const AliHLTComponentEventData& /*evtData*/,
159                                                             AliHLTComponentTriggerData& /*trigData*/ )
160 {
161   // see header file for class documentation
162   int iResult=0;
163   AliHLTUInt32_t eventType=gkAliEventTypeUnknown;
164   if (!IsDataEvent(&eventType) && 
165       eventType==gkAliEventTypeStartOfRun) {
166     return 0;
167   }
168
169   AliHLTUInt32_t listtime=accumulate(fList.begin(), fList.end(), int(0), AliHLTDataBlockItem::TimeSum());
170   AliHLTUInt32_t averageEventTime=0;
171   AliHLTUInt32_t averageCycleTime=0;
172
173   AliHLTUInt32_t proctime=0;
174   if (fpEventTimer) {
175     averageEventTime=(fpEventTimer->RealTime()*fgkTimeScale)/(GetEventCount()+1);
176     proctime=fpEventTimer->RealTime()*fgkTimeScale;
177     fpEventTimer->Start(kFALSE);
178   }
179   if (fpCycleTimer) {
180     fpCycleTimer->Stop();
181     averageCycleTime=(fpCycleTimer->RealTime()*fgkTimeScale)/(GetEventCount()+1);
182   }
183
184   // scale down the event processing according to the required rate
185   // and average processing time.
186   for (const AliHLTComponentBlockData* pBlock=GetFirstInputBlock();
187        pBlock && iResult>=0;
188        pBlock=GetNextInputBlock()) {
189     bool processBlock=true;
190     AliHLTDataBlockItem* item=FindItem(pBlock->fDataType, pBlock->fSpecification);
191     if (item) {
192       // TODO: do a selection of blocks on basis of the time spent in its processing
193       // for now only the global processing time is checked
194       // process if the average event time is smaller then the cycle time, i.e.
195       // the time is spent outside the component
196       // apply a factor 4 margin
197       processBlock=4*averageEventTime<fMaxEventTime || 2*averageEventTime<averageCycleTime;
198     } else {
199       // always process new incoming blocks
200       processBlock=true;
201       fList.push_back(AliHLTDataBlockItem(pBlock->fDataType, pBlock->fSpecification));
202       item=&fList[fList.size()-1];
203     }
204     if (processBlock) {
205       TObject* pObj=item->Extract(pBlock);
206       if (pObj) {
207         AliHLTMessage msg(kMESS_OBJECT);
208         msg.EnableSchemaEvolution();
209         if ((iResult=item->Stream(pObj, msg))>=0) {
210           iResult=UpdateStreamerInfos(msg.GetStreamerInfos(), fpStreamerInfos);
211         } else {
212           HLTError("failed to stream object %s of type %s", pObj->GetName(), pObj->ClassName());
213         }
214         delete pObj;
215         pObj=NULL;
216       }
217     }
218   }
219
220   if (fpEventTimer) {
221     fpEventTimer->Stop();
222     proctime=fpEventTimer->RealTime()*fgkTimeScale-proctime;
223     averageEventTime=(fpEventTimer->RealTime()*fgkTimeScale)/(GetEventCount()+1);
224
225     // info output once every 2 seconds
226     static UInt_t lastTime=0;
227     TDatime time;
228     if (time.Get()-lastTime>2) {
229       lastTime=time.Get();
230       HLTInfo("event time %d, average time %d, list time %d, cycle time %d", proctime, averageEventTime, listtime, averageCycleTime);
231     }
232   }
233   if (fpCycleTimer) {
234     fpCycleTimer->Start(kFALSE);
235   }
236
237   if (iResult>=0) {
238     if ((TestBits(kHLTOUTatFirstEvent) && GetEventCount()==0) ||
239         (TestBits(kHLTOUTatAllEvents))) {
240       PushBack(fpStreamerInfos, kAliHLTDataTypeStreamerInfo);
241     }
242   }
243
244   if (TestBits(kFXS) && fFXSPrescaler>0 && (GetEventCount()%fFXSPrescaler)==0) {
245     // push to FXS
246     AliHLTReadoutList rdList(AliHLTReadoutList::kHLT);
247     PushToFXS((TObject*)fpStreamerInfos, "HLT", "StreamerInfo", &rdList );
248   }
249
250   return iResult;
251 }
252
253 int AliHLTRootSchemaEvolutionComponent::ShipDataToFXS( const AliHLTComponentEventData& /*evtData*/,
254                                                        AliHLTComponentTriggerData& /*trigData*/)
255 {
256   // see header file for class documentation
257   if (TestBits(kFXS)) {
258     // push to FXS
259     AliHLTReadoutList rdList(AliHLTReadoutList::kHLT);
260     PushToFXS((TObject*)fpStreamerInfos, "HLT", "StreamerInfo", &rdList );
261   }
262
263   if (fFileName.IsNull()==0) {
264     WriteToFile(fFileName, fpStreamerInfos);
265     fFileName.Clear();
266   }
267
268     if (TestBits(kHLTOUTatEOR)) {
269       PushBack(fpStreamerInfos, kAliHLTDataTypeStreamerInfo);
270     }
271
272     for (unsigned i=0; i<fList.size(); i++) {
273       if (CheckFilter(kHLTLogDebug)) fList[i].Print("short");
274       else if (fList[i].IsObject()) {
275         HLTInfo("AliHLTDataBlockItem %s %08x\n"
276                 "   average extraction time: %d usec\n"
277                 "   average streaming time: %d usec"
278                 , AliHLTComponent::DataType2Text(fList[i]).c_str()
279                 , fList[i].GetSpecification()
280                 , fList[i].GetExtractionTime()
281                 , fList[i].GetStreamingTime());
282       }
283     }
284
285   return 0;
286 }
287
288 int AliHLTRootSchemaEvolutionComponent::UpdateStreamerInfos(const TList* list, TObjArray* infos) const
289 {
290   // see header file for class documentation
291   int iResult=0;
292   if (!list || !infos) {
293     return -EINVAL;
294   }
295
296   TObject* element=NULL;
297   TIter next((TList*)list);
298   while ((element = next())) {
299     TStreamerInfo* pInfo=dynamic_cast<TStreamerInfo*>(element);
300     if (!pInfo) continue;
301     TString name=pInfo->GetName();
302     int i=0;
303     if (pInfo->GetClassVersion()==0) continue; // skip classes which are not for storage
304     for (; i<infos->GetEntriesFast(); i++) {
305       if (name.CompareTo(infos->At(i)->GetName())==0 &&
306           pInfo->GetClassVersion() == ((TStreamerInfo*)infos->At(i))->GetClassVersion()) {
307         // have it already
308         break;
309       }
310     }
311
312     // Add streamer info if not yet there
313     if (i>=infos->GetEntriesFast()) {
314       infos->Add(pInfo);
315     }
316   }
317
318   return iResult;
319 }
320
321 int AliHLTRootSchemaEvolutionComponent::ScanConfigurationArgument(int argc, const char** argv)
322 {
323   // see header file for class documentation
324   int iResult=0;
325   if (argc<=0) return 0;
326   int i=0;
327   TString argument=argv[i];
328
329   // -hltout=[all,first,eor,off]
330   if (argument.Contains("-hltout")) {
331     argument.ReplaceAll("-hltout", "");
332     argument.ReplaceAll("=", "");
333     if (argument.IsNull() || argument.CompareTo("all")==0) {
334       SetBits(kHLTOUTatAllEvents|kHLTOUTatEOR);
335     } else if (argument.CompareTo("first")==0) {
336       SetBits(kHLTOUTatFirstEvent);
337     } else if (argument.CompareTo("eor")==0) {
338       SetBits(kHLTOUTatEOR);
339     } else if (argument.CompareTo("off")==0) {
340       ClearBits(kHLTOUTatAllEvents | kHLTOUTatFirstEvent | kHLTOUTatEOR);
341     } else {
342       HLTError("invalid parameter for argument -hltout= : %s", argument.Data());
343       return -EINVAL;
344     }
345     return 1;
346   }
347
348   // -fxs=[n,off]
349   if (argument.Contains("-fxs")) {
350     argument.ReplaceAll("-fxs", "");
351     argument.ReplaceAll("=", "");
352     SetBits(kFXS);
353     if (argument.IsNull()) {
354     } else if (argument.CompareTo("off")==0) {
355       ClearBits(kFXS);
356     } else if (argument.IsDigit()) {
357       fFXSPrescaler=argument.Atoi();
358     } else {
359       HLTError("invalid parameter for argument -fxs= : %s", argument.Data());
360       return -EINVAL;
361     }
362     return 1;
363   }
364
365   // -file=<filename>
366   if (argument.Contains("-file=")) {
367     argument.ReplaceAll("-file=", "");
368     if (!argument.IsNull()) {
369       fFileName=argument;
370     } else {
371       HLTError("argument -file= expects file name");
372       return -EINVAL;
373     }
374     return 1;
375   }
376
377   if (argument.Contains("-rate=")) {
378     argument.ReplaceAll("-rate=", "");
379     AliHLTUInt32_t rate=argument.Atoi();
380     if (rate>0 && rate<fgkTimeScale) {
381       fMaxEventTime=fgkTimeScale/rate;
382     } else {
383       HLTError("argument -file= expects number [Hz]");
384       return -EINVAL;
385     }
386     return 1;
387   }
388
389   return iResult;
390 }
391
392 int AliHLTRootSchemaEvolutionComponent::WriteToFile(const char* filename, const TObjArray* infos) const
393 {
394   // write aray of streamer infos to file
395   if (!filename || !infos) return -EINVAL;
396
397   TFile out(filename, "RECREATE");
398   if (out.IsZombie()) {
399     HLTError("failed to open file %s", filename);
400     return -EBADF;
401   }
402
403   const char* entrypath="HLT/Calib/StreamerInfo";
404   int version = -1;
405   AliCDBStorage* store = NULL;
406   // TODO: to be activated later, first some methods need to be made
407   // public in AliCDBManager. Or some new methods to be added 
408   //if (AliCDBManager::Instance()->SelectSpecificStorage(entrypath))
409   //  store = AliCDBManager::Instance()->GetSpecificStorage(entrypath);
410   if (!store) 
411     store = AliCDBManager::Instance()->GetDefaultStorage();
412   AliCDBEntry* existingEntry=NULL;
413   if (store && store->GetLatestVersion(entrypath, GetRunNo())>=0 &&
414       (existingEntry=AliCDBManager::Instance()->Get(entrypath))!=NULL) {
415     version=existingEntry->GetId().GetVersion();
416   }
417   version++;
418
419   TObjArray* clone=NULL;
420
421   if (existingEntry && existingEntry->GetObject()) {
422     TObject* cloneObj=existingEntry->GetObject()->Clone();
423     if (cloneObj) clone=dynamic_cast<TObjArray*>(cloneObj);
424     if (MergeStreamerInfo(clone, infos)==0) {
425       // no change, store with identical version
426       version=existingEntry->GetId().GetVersion();
427     }
428   } else {
429     TObject* cloneObj=infos->Clone();
430     if (cloneObj) clone=dynamic_cast<TObjArray*>(cloneObj);
431   }
432   if (!clone) {
433     HLTError("failed to clone streamer info object array");
434     return -ENOMEM;
435   }
436
437   AliCDBPath cdbPath(entrypath);
438   AliCDBId cdbId(cdbPath, AliCDBManager::Instance()->GetRun(), AliCDBRunRange::Infinity(), version, 0);
439   AliCDBMetaData* cdbMetaData=new AliCDBMetaData;
440   cdbMetaData->SetResponsible("ALICE HLT Matthias.Richter@cern.ch");
441   cdbMetaData->SetComment("Streamer info for HLTOUT payload");
442   AliCDBEntry* entry=new AliCDBEntry(clone, cdbId, cdbMetaData, kTRUE);
443
444   out.cd();
445   entry->Write();
446   // this is a small memory leak
447   // seg fault in ROOT object handling if the two objects are deleted
448   // investigate later
449   //delete entry;
450   //delete cdbMetaData;
451   out.Close();
452
453   return 0;
454 }
455
456 int AliHLTRootSchemaEvolutionComponent::MergeStreamerInfo(TObjArray* tgt, const TObjArray* src)
457 {
458   /// merge streamer info entries from source array to target array
459   /// return 1 if target array has been changed
460
461   // add all existing infos if not existing in the current one, or having
462   // different class version
463   int iResult=0;
464   if (!tgt || !src) return -EINVAL;
465
466   {
467     // check if all infos from the existing entry are in the new entry and with
468     // identical class version
469     TIter next(src);
470     TObject* nextobj=NULL;
471     while ((nextobj=next())) {
472       TStreamerInfo* srcInfo=dynamic_cast<TStreamerInfo*>(nextobj);
473       if (!srcInfo) continue;
474       TString srcInfoName=srcInfo->GetName();
475
476       int i=0;
477       for (; i<tgt->GetEntriesFast(); i++) {
478         if (tgt->At(i)==NULL) continue;
479         if (srcInfoName.CompareTo(tgt->At(i)->GetName())!=0) continue;
480         // TODO: 2010-08-23 some more detailed investigation is needed.
481         // Structures used for data exchange, e.g. AliHLTComponentDataType
482         // or AliHLTEventDDLV1 do not have a class version, but need to be stored in the
483         // streamer info. Strictly speaking not, because those structures are not supposed
484         // to be changed at all, so they should be the same in all versions in the future.
485         // There has been a problem with detecting whether the streamer info is already in
486         // the target array if the srcInfo has class version -1. As it just concerns
487         // structures not going to be changed we can safely skip checking the class version,
488         // as long as the entry is already in the target streamer infos it does not need
489         // to be copied again.
490         if (srcInfo->GetClassVersion()<0) break;
491         TStreamerInfo* tgtInfo=dynamic_cast<TStreamerInfo*>(tgt->At(i));
492         if (tgtInfo && tgtInfo->GetClassVersion()==srcInfo->GetClassVersion()) break;
493       }
494       if (i<tgt->GetEntriesFast()) continue;
495
496       iResult=1;
497       tgt->Add(srcInfo);
498     }
499   }
500
501   return iResult;
502 }
503
504 AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem*
505 AliHLTRootSchemaEvolutionComponent::FindItem(AliHLTComponentDataType dt,
506                                              AliHLTUInt32_t spec)
507 {
508   /// find item in the list
509   // vector<AliHLTDataBlockItem>::iterator element=std::find(fList.begin(), fList.end(), AliHLTDataBlockItem(dt,spec));
510   // if (element!=fList.end()) return &(*element);
511   for (unsigned i=0; i<fList.size(); i++) {
512     if (fList[i]==dt && fList[i]==spec) return &fList[i];
513   }
514   return NULL;
515 }
516
517 AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem::AliHLTDataBlockItem(AliHLTComponentDataType dt,
518                                                                              AliHLTUInt32_t spec)
519   : fDt(dt)
520   , fSpecification(spec)
521   , fIsObject(false)
522   , fNofExtractions(0)
523   , fExtractionTimeUsec(0)
524   , fLastExtraction(0)
525   , fNofStreamings(0)
526   , fStreamingTimeUsec(0)
527   , fLastStreaming(0)
528 {
529   // helper class to keep track of input data blocks
530   // in the AliHLTRootSchemaEvolutionComponent
531   //
532 }
533
534 AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem::~AliHLTDataBlockItem()
535 {
536   // destructor
537 }
538
539 TObject* AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem::Extract(const AliHLTComponentBlockData* bd)
540 {
541   /// extract data block to root object, and update performance parameters
542   /// object needs to be deleted externally
543   if (!bd || !bd->fPtr || bd->fSize<8) return NULL;
544
545   AliHLTUInt32_t firstWord=*((AliHLTUInt32_t*)bd->fPtr);
546   if (!(fIsObject=(firstWord==bd->fSize-sizeof(AliHLTUInt32_t)))) return NULL;
547
548   TStopwatch sw;
549   sw.Start();
550   AliHLTMessage msg(bd->fPtr, bd->fSize);
551   TClass* objclass=msg.GetClass();
552   if (!(fIsObject=(objclass!=NULL))) return NULL;
553   TObject* pObj=msg.ReadObject(objclass);
554   if (!(fIsObject=(pObj!=NULL))) return NULL;
555   sw.Stop();
556   AliHLTUInt32_t usec=sw.RealTime()*fgkTimeScale;
557   fNofExtractions++;
558   fExtractionTimeUsec+=usec;
559   TTimeStamp ts;
560   fLastExtraction=(ts.GetSec()%1000)*fgkTimeScale + ts.GetNanoSec()/1000;
561   return pObj;
562 }
563
564 int AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem::Stream(TObject* obj, AliHLTMessage& msg)
565 {
566   /// stream object and update performance parameters
567   if (!obj) return -EINVAL;
568   TStopwatch sw;
569   sw.Start();
570   msg.WriteObject(obj);
571
572   AliHLTUInt32_t usec=sw.RealTime()*fgkTimeScale;
573   fNofStreamings++;
574   fStreamingTimeUsec+=usec;
575   TTimeStamp ts;
576   fLastStreaming=(ts.GetSec()%1000)*fgkTimeScale + ts.GetNanoSec()/1000;
577   return 0;
578 }
579
580 void AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem::Print(const char* option) const
581 {
582   /// print status
583   if (fIsObject || !(strcmp(option, "short")==0))
584     cout << "AliHLTDataBlockItem: " << AliHLTComponent::DataType2Text(fDt).c_str() << " " << hex << fSpecification << dec << endl;
585   if (fIsObject) {
586     if (fNofExtractions>0) cout << "   average extraction time: " << fExtractionTimeUsec/fNofExtractions << " usec" << endl;
587     else cout << "   never extracted" << endl;
588     if (fNofStreamings>0) cout << "   average streaming time: " << fStreamingTimeUsec/fNofStreamings << " usec" << endl;
589     else cout << "   never streamed" << endl;
590   }
591 }