Preparing the RootSchemaEvolutionComponent to run at 2kHz
authorrichterm <richterm@f7af4fe6-9843-0410-8265-dc069ae4e863>
Wed, 18 Aug 2010 21:27:44 +0000 (21:27 +0000)
committerrichterm <richterm@f7af4fe6-9843-0410-8265-dc069ae4e863>
Wed, 18 Aug 2010 21:27:44 +0000 (21:27 +0000)
The rate constraint is configurable by the component argument '-rate=1000' specifying
the rate in Hz. It is fulfilled by dynamic scale down of processing of already known
objects according to average processing time. New objects are always processed, thus
ensuring that every object makes it to the streamer info table.

HLT/BASE/util/AliHLTRootSchemaEvolutionComponent.cxx
HLT/BASE/util/AliHLTRootSchemaEvolutionComponent.h

index 982128e..94afbd8 100644 (file)
@@ -28,6 +28,9 @@
 #include "TStreamerInfo.h"
 #include "TList.h"
 #include "TFile.h"
+#include "TStopwatch.h"
+#include "TTimeStamp.h"
+#include "TDatime.h"
 
 #include "AliCDBStorage.h"
 #include "AliCDBManager.h"
 #include "AliCDBMetaData.h"
 #include "AliCDBEntry.h"
 
+#include <numeric>
+using std::accumulate;
+
 /** ROOT macro for the implementation of ROOT specific class methods */
 ClassImp(AliHLTRootSchemaEvolutionComponent)
 
 AliHLTRootSchemaEvolutionComponent::AliHLTRootSchemaEvolutionComponent()
   : AliHLTProcessor()
+  , fList()
   , fFlags(0)
   , fpStreamerInfos(NULL)
+  , fpEventTimer(NULL)
+  , fpCycleTimer(NULL)
+  , fMaxEventTime(500)
   , fFXSPrescaler(0)
   , fFileName()
 {
@@ -54,6 +64,7 @@ AliHLTRootSchemaEvolutionComponent::AliHLTRootSchemaEvolutionComponent()
 
 }
 const char* AliHLTRootSchemaEvolutionComponent::fgkConfigurationObject=NULL;
+const AliHLTUInt32_t AliHLTRootSchemaEvolutionComponent::fgkTimeScale=1000000;
 
 AliHLTRootSchemaEvolutionComponent::~AliHLTRootSchemaEvolutionComponent()
 {
@@ -102,7 +113,17 @@ int AliHLTRootSchemaEvolutionComponent::DoInit(int argc, const char** argv)
   if (iResult>=0) {
     fpStreamerInfos=new TObjArray();
     if (!fpStreamerInfos) iResult=-ENOMEM;
+
+    fpEventTimer=new TStopwatch;
+    if (fpEventTimer) {
+      fpEventTimer->Reset();
+    }
+    fpCycleTimer=new TStopwatch;
+    if (fpCycleTimer) {
+      fpCycleTimer->Reset();
+    }
   }
+
   return 0;
 }
 
@@ -120,6 +141,14 @@ int AliHLTRootSchemaEvolutionComponent::DoDeinit()
   }
   fpStreamerInfos=NULL;
 
+  if (fpEventTimer) {
+    delete fpEventTimer;
+    fpEventTimer=NULL;
+  }
+  if (fpCycleTimer) {
+    delete fpCycleTimer;
+    fpCycleTimer=NULL;
+  }
   return 0;
 }
 
@@ -134,13 +163,69 @@ int AliHLTRootSchemaEvolutionComponent::DoEvent( const AliHLTComponentEventData&
     return 0;
   }
 
+  AliHLTUInt32_t listtime=accumulate(fList.begin(), fList.end(), int(0), AliHLTDataBlockItem::TimeSum());
+  AliHLTUInt32_t averageEventTime=0;
+  AliHLTUInt32_t averageCycleTime=0;
+
+  AliHLTUInt32_t proctime=0;
+  if (fpEventTimer) {
+    averageEventTime=(fpEventTimer->RealTime()*fgkTimeScale)/(GetEventCount()+1);
+    proctime=fpEventTimer->RealTime()*fgkTimeScale;
+    fpEventTimer->Start(kFALSE);
+  }
+  if (fpCycleTimer) {
+    fpCycleTimer->Stop();
+    averageCycleTime=(fpCycleTimer->RealTime()*fgkTimeScale)/(GetEventCount()+1);
+  }
+
+  // scale down the event processing according to the required rate
+  // and average processing time.
   AliHLTMessage msg(kMESS_OBJECT);
   msg.EnableSchemaEvolution();
-  for (const TObject* pObj=GetFirstInputObject();
-       pObj && iResult>=0;
-       pObj=GetNextInputObject()) {
-    msg.WriteObject(pObj);
-    iResult=UpdateStreamerInfos(msg.GetStreamerInfos(), fpStreamerInfos);
+  for (const AliHLTComponentBlockData* pBlock=GetFirstInputBlock();
+       pBlock && iResult>=0;
+       pBlock=GetNextInputBlock()) {
+    bool processBlock=true;
+    AliHLTDataBlockItem* item=FindItem(pBlock->fDataType, pBlock->fSpecification);
+    if (item) {
+      // TODO: do a selection of blocks on basis of the time spent in its processing
+      // for now only the global processing time is checked
+      // process if the average event time is smaller then the cycle time, i.e.
+      // the time is spent outside the component
+      // apply a factor 4 margin
+      processBlock=4*averageEventTime<fMaxEventTime || 2*averageEventTime<averageCycleTime;
+    } else {
+      // always process new incoming blocks
+      processBlock=true;
+      fList.push_back(AliHLTDataBlockItem(pBlock->fDataType, pBlock->fSpecification));
+      item=&fList[fList.size()-1];
+    }
+    if (processBlock) {
+      TObject* pObj=item->Extract(pBlock);
+      if (pObj) {
+       if ((iResult=item->Stream(pObj, msg))>=0)
+         iResult=UpdateStreamerInfos(msg.GetStreamerInfos(), fpStreamerInfos);
+       delete pObj;
+       pObj=NULL;
+      }
+    }
+  }
+
+  if (fpEventTimer) {
+    fpEventTimer->Stop();
+    proctime=fpEventTimer->RealTime()*fgkTimeScale-proctime;
+    averageEventTime=(fpEventTimer->RealTime()*fgkTimeScale)/(GetEventCount()+1);
+
+    // info output once every 2 seconds
+    static UInt_t lastTime=0;
+    TDatime time;
+    if (time.Get()-lastTime>2) {
+      lastTime=time.Get();
+      HLTInfo("event time %d, average time %d, list time %d, cycle time %d", proctime, averageEventTime, listtime, averageCycleTime);
+    }
+  }
+  if (fpCycleTimer) {
+    fpCycleTimer->Start(kFALSE);
   }
 
   if (iResult>=0) {
@@ -160,6 +245,21 @@ int AliHLTRootSchemaEvolutionComponent::DoEvent( const AliHLTComponentEventData&
     fFileName.Clear();
   }
 
+  if (eventType==gkAliEventTypeEndOfRun) {
+    for (unsigned i=0; i<fList.size(); i++) {
+      if (CheckFilter(kHLTLogDebug)) fList[i].Print("short");
+      else if (fList[i].IsObject()) {
+       HLTInfo("AliHLTDataBlockItem %s %08x\n"
+               "   average extraction time: %d usec\n"
+               "   average streaming time: %d usec"
+               , AliHLTComponent::DataType2Text(fList[i]).c_str()
+               , fList[i].GetSpecification()
+               , fList[i].GetExtractionTime()
+               , fList[i].GetStreamingTime());
+      }
+    }
+  }
+
   return iResult;
 }
 
@@ -251,6 +351,18 @@ int AliHLTRootSchemaEvolutionComponent::ScanConfigurationArgument(int argc, cons
     }
     return 1;
   }
+
+  if (argument.Contains("-rate=")) {
+    argument.ReplaceAll("-rate=", "");
+    AliHLTUInt32_t rate=argument.Atoi();
+    if (rate>0 && rate<fgkTimeScale) {
+      fMaxEventTime=fgkTimeScale/rate;
+    } else {
+      HLTError("argument -file= expects number [Hz]");
+      return -EINVAL;
+    }
+    return 1;
+  }
   
   return iResult;
 }
@@ -292,3 +404,92 @@ int AliHLTRootSchemaEvolutionComponent::WriteToFile(const char* filename, const
 
   return 0;
 }
+
+AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem*
+AliHLTRootSchemaEvolutionComponent::FindItem(AliHLTComponentDataType dt,
+                                            AliHLTUInt32_t spec)
+{
+  /// find item in the list
+  // vector<AliHLTDataBlockItem>::iterator element=std::find(fList.begin(), fList.end(), AliHLTDataBlockItem(dt,spec));
+  // if (element!=fList.end()) return &(*element);
+  for (unsigned i=0; i<fList.size(); i++) {
+    if (fList[i]==dt && fList[i]==spec) return &fList[i];
+  }
+  return NULL;
+}
+
+AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem::AliHLTDataBlockItem(AliHLTComponentDataType dt,
+                                                                            AliHLTUInt32_t spec)
+  : fDt(dt)
+  , fSpecification(spec)
+  , fIsObject(false)
+  , fNofExtractions(0)
+  , fExtractionTimeUsec(0)
+  , fLastExtraction(0)
+  , fNofStreamings(0)
+  , fStreamingTimeUsec(0)
+  , fLastStreaming(0)
+{
+  // helper class to keep track of input data blocks
+  // in the AliHLTRootSchemaEvolutionComponent
+  //
+}
+
+AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem::~AliHLTDataBlockItem()
+{
+  // destructor
+}
+
+TObject* AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem::Extract(const AliHLTComponentBlockData* bd)
+{
+  /// extract data block to root object, and update performance parameters
+  /// object needs to be deleted externally
+  if (!bd || !bd->fPtr || bd->fSize<8) return NULL;
+
+  AliHLTUInt32_t firstWord=*((AliHLTUInt32_t*)bd->fPtr);
+  if (!(fIsObject=(firstWord==bd->fSize-sizeof(AliHLTUInt32_t)))) return NULL;
+
+  TStopwatch sw;
+  sw.Start();
+  AliHLTMessage msg(bd->fPtr, bd->fSize);
+  TClass* objclass=msg.GetClass();
+  if (!(fIsObject=(objclass!=NULL))) return NULL;
+  TObject* pObj=msg.ReadObject(objclass);
+  if (!(fIsObject=(pObj!=NULL))) return NULL;
+  sw.Stop();
+  AliHLTUInt32_t usec=sw.RealTime()*fgkTimeScale;
+  fNofExtractions++;
+  fExtractionTimeUsec+=usec;
+  TTimeStamp ts;
+  fLastExtraction=(ts.GetSec()%1000)*fgkTimeScale + ts.GetNanoSec()/1000;
+  return pObj;
+}
+
+int AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem::Stream(TObject* obj, AliHLTMessage& msg)
+{
+  /// stream object and update performance parameters
+  if (!obj) return -EINVAL;
+  TStopwatch sw;
+  sw.Start();
+  msg.WriteObject(obj);
+
+  AliHLTUInt32_t usec=sw.RealTime()*fgkTimeScale;
+  fNofStreamings++;
+  fStreamingTimeUsec+=usec;
+  TTimeStamp ts;
+  fLastStreaming=(ts.GetSec()%1000)*fgkTimeScale + ts.GetNanoSec()/1000;
+  return 0;
+}
+
+void AliHLTRootSchemaEvolutionComponent::AliHLTDataBlockItem::Print(const char* option) const
+{
+  /// print status
+  if (fIsObject || !(strcmp(option, "short")==0))
+    cout << "AliHLTDataBlockItem: " << AliHLTComponent::DataType2Text(fDt).c_str() << " " << hex << fSpecification << dec << endl;
+  if (fIsObject) {
+    if (fNofExtractions>0) cout << "   average extraction time: " << fExtractionTimeUsec/fNofExtractions << " usec" << endl;
+    else cout << "   never extracted" << endl;
+    if (fNofStreamings>0) cout << "   average streaming time: " << fStreamingTimeUsec/fNofStreamings << " usec" << endl;
+    else cout << "   never streamed" << endl;
+  }
+}
index 8127f59..ed7d70d 100644 (file)
 
 #include "AliHLTProcessor.h"
 #include "TString.h"
+#include <vector>
 
 class TObjArray;
+class TObject;
+class TStopwatch;
+class AliHLTMessage;
 
 /**
  * @class AliHLTRootSchemaEvolutionComponent
+ * Collects streamer info for all input objects and produces the corresponding
+ * calibration object for reconstruction of HLT. The component runs with a
+ * configurable rate constraint and skips the processing of known data blocks
+ * for the sake of performance. New data blocks are always processed and added
+ * to the list.
  *
  * <h2>General properties:</h2>
  *
@@ -40,8 +49,10 @@ class TObjArray;
  * \li -hltout<=[all,first,eor,off]> <br>
  *      push streamer info to output, the streamer info is stored in the
  *      events in all, the first, and/or the EOR.
- * \li -file=<filename> <br>
+ * \li -file=filename <br>
  *      write to file at EOR
+ * \li -rate=hz <br>
+ *      required processing rate in Hz, default 2000Hz
  *
  * <h2>Configuration:</h2>
  * <!-- NOTE: ignore the \li. <i> and </i>: it's just doxygen formatting -->
@@ -99,6 +110,71 @@ class AliHLTRootSchemaEvolutionComponent : public AliHLTProcessor
   /// and adds if it is a new info. 
   int UpdateStreamerInfos(const TList* list, TObjArray* infos) const;
 
+  class AliHLTDataBlockItem
+  {
+  public:
+    AliHLTDataBlockItem(AliHLTComponentDataType dt=kAliHLTVoidDataType,
+                       AliHLTUInt32_t spec=kAliHLTVoidDataSpec);
+    ~AliHLTDataBlockItem();
+
+    /// extract data block to root object, and update performance parameters
+    /// object needs to be deleted externally
+    TObject* Extract(const AliHLTComponentBlockData* bd);
+
+    /// stream object and update performance parameters
+    int Stream(TObject* obj, AliHLTMessage& msg);
+
+    bool IsObject() const {return fIsObject;}
+    bool operator==(AliHLTDataBlockItem& i) const {return fDt==i.fDt && fSpecification==i.fSpecification;}
+    bool operator==(AliHLTComponentDataType dt) const {return fDt==dt;}
+    bool operator==(AliHLTUInt32_t spec) const {return fSpecification==spec;}
+    int operator+(AliHLTDataBlockItem& b) const;
+    operator const AliHLTComponentDataType&() const {return fDt;}
+    AliHLTUInt32_t GetSpecification() const {return fSpecification;}
+    
+    /// average extraction time in usec
+    AliHLTUInt32_t GetExtractionTime() const {return fNofExtractions>0?fExtractionTimeUsec/fNofExtractions:0;}
+    /// average streaming time in usec
+    AliHLTUInt32_t GetStreamingTime() const {return fNofStreamings>0?fStreamingTimeUsec/fNofStreamings:0;}
+    /// average total time in usec
+    AliHLTUInt32_t GetTotalTime() const {return GetExtractionTime() + GetStreamingTime();}
+
+    /// print status
+    void Print(const char* option) const;
+
+    class TimeSum : public binary_function<int,AliHLTDataBlockItem,int> {
+    public:
+      int operator() (int a, AliHLTDataBlockItem b) {
+       return a+b.GetTotalTime();
+      }
+    };
+
+  private:
+    /// data type of the block
+    AliHLTComponentDataType fDt; //! transient
+    /// specification of the block
+    AliHLTUInt32_t fSpecification; //! transient
+    /// flag for TObject
+    bool fIsObject; //! transient
+
+    /// number of extractions
+    AliHLTUInt32_t fNofExtractions; //! transient
+    /// object extraction time in usec
+    AliHLTUInt32_t fExtractionTimeUsec; //! transient
+    /// timestamp of last extraction in usec
+    AliHLTUInt32_t fLastExtraction; //! transient
+    /// number of streamings
+    AliHLTUInt32_t fNofStreamings; //! transient
+    /// object streaming time in usec
+    AliHLTUInt32_t fStreamingTimeUsec; //! transient
+    /// timestamp of last streaming in usec
+    AliHLTUInt32_t fLastStreaming; // !transient
+  };
+
+  /// find item in the list
+  AliHLTDataBlockItem* FindItem(AliHLTComponentDataType dt,
+                               AliHLTUInt32_t spec);
+
  protected:
   /// inherited from AliHLTComponent: custom initialization
   int DoInit( int argc, const char** argv );
@@ -129,16 +205,22 @@ private:
   /** assignment operator prohibited */
   AliHLTRootSchemaEvolutionComponent& operator=(const AliHLTRootSchemaEvolutionComponent&);
 
+  vector<AliHLTDataBlockItem> fList; //! list of block properties
+
   AliHLTUInt32_t fFlags; //! property flags
 
   TObjArray* fpStreamerInfos; //! array of streamer infos
+  TStopwatch* fpEventTimer; //! stopwatch for event processing
+  TStopwatch* fpCycleTimer; //! stopwatch for event cycle
+  AliHLTUInt32_t fMaxEventTime; //! required maximum processing time in usec
 
   AliHLTUInt32_t fFXSPrescaler; //! prescalar for the publishing to FXS
 
   TString fFileName; //! file name for dump at EOR
 
   static const char* fgkConfigurationObject; //! configuration object
+  static const AliHLTUInt32_t fgkTimeScale; //! timescale base
 
-  ClassDef(AliHLTRootSchemaEvolutionComponent, 2) // ROOT schema evolution component
+  ClassDef(AliHLTRootSchemaEvolutionComponent, 0) // ROOT schema evolution component
 };
 #endif