implementation of SOR and EOR events (not yet enabled), bugfix in DataBuffer: data...
authorrichterm <richterm@f7af4fe6-9843-0410-8265-dc069ae4e863>
Fri, 23 Nov 2007 11:59:59 +0000 (11:59 +0000)
committerrichterm <richterm@f7af4fe6-9843-0410-8265-dc069ae4e863>
Fri, 23 Nov 2007 11:59:59 +0000 (11:59 +0000)
HLT/BASE/AliHLTDataBuffer.cxx
HLT/BASE/AliHLTDataBuffer.h
HLT/BASE/AliHLTDataSource.cxx
HLT/BASE/AliHLTDataSource.h
HLT/BASE/AliHLTProcessor.h
HLT/BASE/AliHLTSystem.cxx
HLT/BASE/AliHLTSystem.h
HLT/BASE/AliHLTTask.cxx
HLT/BASE/AliHLTTask.h

index 144be4d..5825dd5 100644 (file)
@@ -170,11 +170,11 @@ int AliHLTDataBuffer::FindMatchingDataSegments(const AliHLTComponent* pConsumer,
   return iResult;
 }
 
-int AliHLTDataBuffer::Subscribe(const AliHLTComponent* pConsumer, AliHLTComponentBlockData* arrayBlockDesc, int iArraySize)
+int AliHLTDataBuffer::Subscribe(const AliHLTComponent* pConsumer, AliHLTComponentBlockDataList& blockDescList)
 {
   // see header file for function documentation
   int iResult=0;
-  if (pConsumer && arrayBlockDesc) {
+  if (pConsumer) {
     if (1/*fpBuffer*/) {
       AliHLTConsumerDescriptor* pDesc=FindConsumer(pConsumer, fConsumers);
       if (pDesc) {
@@ -183,51 +183,45 @@ int AliHLTDataBuffer::Subscribe(const AliHLTComponent* pConsumer, AliHLTComponen
        // so it does not matter if there are matching data types or not, unless
        // we implement such a check in PubSub
        if ((iResult=FindMatchingDataSegments(pConsumer, tgtList))>=0) {
-         int i =0;
          AliHLTDataSegmentList::iterator segment=tgtList.begin();
-         while (segment!=tgtList.end() && i<iArraySize) {
+         while (segment!=tgtList.end()) {
            // fill the block data descriptor
-           arrayBlockDesc[i].fStructSize=sizeof(AliHLTComponentBlockData);
-           // the shared memory key is not used in AliRoot
-           arrayBlockDesc[i].fShmKey.fStructSize=sizeof(AliHLTComponentShmData);
-           arrayBlockDesc[i].fShmKey.fShmType=gkAliHLTComponentInvalidShmType;
-           arrayBlockDesc[i].fShmKey.fShmID=gkAliHLTComponentInvalidShmID;
+           AliHLTComponentBlockData bd;
+           AliHLTComponent::FillBlockData(bd);
            // This models the behavior of PubSub.
            // For incoming data blocks, fOffset must be ignored by the
            // processing component. It is set for bookkeeping in the framework.
            // fPtr always points to the beginning of the data.
-           arrayBlockDesc[i].fOffset=0;
+           bd.fOffset=0;
            AliHLTUInt8_t* pTgt=*segment;
-           arrayBlockDesc[i].fPtr=reinterpret_cast<void*>(pTgt);
-           arrayBlockDesc[i].fSize=(*segment).fSegmentSize;
-           arrayBlockDesc[i].fDataType=(*segment).fDataType;
-           arrayBlockDesc[i].fSpecification=(*segment).fSpecification;
+           bd.fPtr=reinterpret_cast<void*>(pTgt);
+           bd.fSize=(*segment).fSegmentSize;
+           bd.fDataType=(*segment).fDataType;
+           bd.fSpecification=(*segment).fSpecification;
+           blockDescList.push_back(bd);
            pDesc->SetActiveDataSegment(*segment);
-           HLTDebug("component %p (%s) subscribed to segment #%d offset %d size %d data type %s %#x", 
-                    pConsumer, ((AliHLTComponent*)pConsumer)->GetComponentID(), i, arrayBlockDesc[i].fOffset,
-                    arrayBlockDesc[i].fSize, (AliHLTComponent::DataType2Text(arrayBlockDesc[i].fDataType)).c_str(), 
-                    arrayBlockDesc[i].fSpecification);
-           i++;
+           HLTDebug("component %p (%s) subscribed to segment offset %d size %d data type %s %#x", 
+                    pConsumer, ((AliHLTComponent*)pConsumer)->GetComponentID(), bd.fOffset,
+                    bd.fSize, (AliHLTComponent::DataType2Text(bd.fDataType)).c_str(), 
+                    bd.fSpecification);
            segment++;
          }
-         // check whether there was enough space for the segments
-         if (i!=(int)tgtList.size()) {
-           HLTError("too little space in block descriptor array: required %d, available %d", tgtList.size(), iArraySize);
-           iResult=-ENOSPC;
-         } else {
          // move this consumer to the active list
-         if (i==0) {
+         if (tgtList.size()==0) {
            ChangeConsumerState(pDesc, fConsumers, fReleasedConsumers);
            HLTDebug("no input data for component %p (%s) available", pConsumer, ((AliHLTComponent*)pConsumer)->GetComponentID());
          } else if (ChangeConsumerState(pDesc, fConsumers, fActiveConsumers)>=0) {
            HLTDebug("component %p (%s) subscribed to data buffer %p", pConsumer, ((AliHLTComponent*)pConsumer)->GetComponentID(), this);
          } else {
            // TODO: cleanup the consumer descriptor correctly
-           memset(arrayBlockDesc, 0, iArraySize*sizeof(AliHLTComponentBlockData));
+           segment=tgtList.begin();
+           while (segment!=tgtList.end()) {
+             blockDescList.pop_back();
+             segment++;
+           }
            HLTError("can not activate consumer %p for data buffer %p", pConsumer, this);
            iResult=-EACCES;
          }
-         }
        } else {
          HLTError("unresolved data segment(s) for component %p (%s)", pConsumer, ((AliHLTComponent*)pConsumer)->GetComponentID());
          iResult=-EBADF;
@@ -327,7 +321,7 @@ int AliHLTDataBuffer::Forward(AliHLTTask* pSrcTask, AliHLTComponentBlockData* pB
   assert(fForwardedSegments.size()==fForwardedSegmentSources.size());
   if (fForwardedSegments.size()!=fForwardedSegmentSources.size()) return -EFAULT;
   fForwardedSegmentSources.push_back(pSrcTask);
-  fForwardedSegments.push_back(AliHLTDataSegment(pBlockDesc->fPtr, pBlockDesc->fOffset, pBlockDesc->fSize));
+  fForwardedSegments.push_back(AliHLTDataSegment(pBlockDesc->fPtr, pBlockDesc->fOffset, pBlockDesc->fSize, pBlockDesc->fDataType, pBlockDesc->fSpecification));
   return 0;
 }
 
index 280eab9..e65bdda 100644 (file)
@@ -110,6 +110,9 @@ class AliHLTDataBuffer : public TObject, public AliHLTLogging
                AliHLTComponentBlockData* arrayBlockDesc,
                int iArraySize);
 
+  int Subscribe(const AliHLTComponent* pConsumer,
+               AliHLTComponentBlockDataList& blockDescList);
+
   /**
    * Release an instance of the data buffer.
    * Resets the variables of the block descriptor.
index dff1d57..c2073b3 100644 (file)
@@ -39,6 +39,11 @@ AliHLTDataSource::AliHLTDataSource()
   // visit http://web.ift.uib.no/~kjeks/doc/alice-hlt
 }
 
+void* AliHLTDataSource::fgpSpecialEvent=NULL;
+int AliHLTDataSource::fgSpecialEventSize=0;
+AliHLTComponentDataType AliHLTDataSource::fgSpecialEventDataType=kAliHLTVoidDataType;
+AliHLTUInt32_t AliHLTDataSource::fgSpecialEventSpecification=kAliHLTVoidDataSpec;
+
 AliHLTDataSource::~AliHLTDataSource()
 { 
   // see header file for class documentation
@@ -52,7 +57,7 @@ void AliHLTDataSource::GetInputDataTypes( vector<AliHLTComponentDataType>& list)
 
 
 int AliHLTDataSource::DoProcessing( const AliHLTComponentEventData& evtData,
-                                   const AliHLTComponentBlockData* blocks, 
+                                   const AliHLTComponentBlockData* /*blocks*/, 
                                    AliHLTComponentTriggerData& trigData,
                                    AliHLTUInt8_t* outputPtr, 
                                    AliHLTUInt32_t& size,
@@ -61,14 +66,29 @@ int AliHLTDataSource::DoProcessing( const AliHLTComponentEventData& evtData,
 {
   // see header file for class documentation
   int iResult=0;
-  if (blocks) {
-    // this is currently just to get rid of the warning "unused parameter"
-  }
   if (evtData.fBlockCnt > 0) {
     HLTWarning("Data source component skips input data blocks");
   }
-  iResult=GetEvent(evtData, trigData, outputPtr, size, outputBlocks);
-  HLTDebug("component %s (%p) GetEvent finished (%d)", GetComponentID(), this, iResult);
+  if (fgpSpecialEvent==NULL || fgSpecialEventSize==0) {
+    // normal event publishing
+    iResult=GetEvent(evtData, trigData, outputPtr, size, outputBlocks);
+    HLTDebug("component %s (%p) GetEvent finished (%d)", GetComponentID(), this, iResult);
+  } else {
+    // publish special event
+    if (size>=fgSpecialEventSize) {
+      memcpy(outputPtr, fgpSpecialEvent, fgSpecialEventSize);
+      AliHLTComponentBlockData bd;
+      FillBlockData(bd);
+      bd.fOffset=0;
+      bd.fSize=fgSpecialEventSize;
+      bd.fDataType=fgSpecialEventDataType;
+      bd.fSpecification=fgSpecialEventSpecification;
+      outputBlocks.push_back(bd);
+      size=bd.fSize;
+    } else {
+      iResult=-ENOSPC;
+    }
+  }
   edd = NULL;
   return iResult;
 }
@@ -89,3 +109,19 @@ int AliHLTDataSource::GetEvent( const AliHLTComponentEventData& /*evtData*/, Ali
   HLTFatal("no processing method implemented");
   return -ENOSYS;
 }
+
+AliHLTDataSource::AliSpecialEventGuard::AliSpecialEventGuard(AliHLTRunDesc* pDesc, AliHLTComponentDataType dt, AliHLTUInt32_t spec)
+{
+  AliHLTDataSource::fgpSpecialEvent=pDesc; 
+  AliHLTDataSource::fgSpecialEventSize=sizeof(AliHLTRunDesc); 
+  AliHLTDataSource::fgSpecialEventDataType=dt; 
+  AliHLTDataSource::fgSpecialEventSpecification=spec;
+}
+
+AliHLTDataSource::AliSpecialEventGuard::~AliSpecialEventGuard()
+{
+  AliHLTDataSource::fgpSpecialEvent=NULL; 
+  AliHLTDataSource::fgSpecialEventSize=0; 
+  AliHLTDataSource::fgSpecialEventDataType=kAliHLTVoidDataType;
+  AliHLTDataSource::fgSpecialEventSpecification=kAliHLTVoidDataSpec;
+}
index 2c7ff5c..3dc2ae2 100644 (file)
@@ -73,7 +73,25 @@ class AliHLTDataSource : public AliHLTComponent {
    */
   void GetInputDataTypes( vector<AliHLTComponentDataType>& list);
 
- protected:
+  /**
+   * @class AliSpecialEventGuard
+   * Guard structure to set the data sources into 'special event publishing'
+   * mode. The SOR and EOR events are generated by all the data sources and
+   * perculated through the chain as normal events. The AliSpecialEventGuard
+   * is a back-door mechansim to trigger publishing of the special event
+   * described by the run descriptor instead of the publishing of real data.
+   *
+   * The descriptor has to be valid throughout the lifetime of the guard.
+   */
+  class AliSpecialEventGuard {
+  public:
+    /** constructor, set run descriptor */
+    AliSpecialEventGuard(AliHLTRunDesc* pDesc, AliHLTComponentDataType dt, AliHLTUInt32_t spec);
+    /** destructor, reset run descriptor */
+    ~AliSpecialEventGuard();
+  };
+
+protected:
 
   /**
    * The low-level data processing method for the component.
@@ -103,6 +121,16 @@ class AliHLTDataSource : public AliHLTComponent {
    */
   virtual int GetEvent( const AliHLTComponentEventData& evtData, AliHLTComponentTriggerData& trigData);
 
-  ClassDef(AliHLTDataSource, 1)
+private:
+  /** pointer to the special event going to be published */
+  static void* fgpSpecialEvent; //! transient
+  /** data size of the special event going to be published */
+  static int fgSpecialEventSize; //! transient
+  /** data type of the special event going to be published */
+  static AliHLTComponentDataType fgSpecialEventDataType; //! transient
+  /** data specification of the special event going to be published */
+  static AliHLTUInt32_t fgSpecialEventSpecification; //! transient
+
+  ClassDef(AliHLTDataSource, 2)
 };
 #endif
index 3e5e679..65047a8 100644 (file)
@@ -59,7 +59,7 @@ class AliHLTProcessor : public AliHLTComponent {
                    AliHLTComponentTriggerData& trigData,
                    AliHLTUInt8_t* outputPtr, 
                    AliHLTUInt32_t& size,
-                   vector<AliHLTComponentBlockData>& outputBlocks,
+                   AliHLTComponentBlockDataList& outputBlocks,
                    AliHLTComponentEventDoneData*& edd );
 
   // Information member functions for registration.
@@ -90,7 +90,7 @@ class AliHLTProcessor : public AliHLTComponent {
                       AliHLTComponentTriggerData& trigData,
                       AliHLTUInt8_t* outputPtr, 
                       AliHLTUInt32_t& size,
-                      vector<AliHLTComponentBlockData>& outputBlocks );
+                      AliHLTComponentBlockDataList& outputBlocks );
 
   /**
    * The high-level data processing method.
index b93b212..2c7387a 100644 (file)
@@ -42,6 +42,7 @@ using namespace std;
 #include "AliHLTTask.h"
 #include "AliHLTModuleAgent.h"
 #include "AliHLTOfflineInterface.h"
+#include "AliHLTDataSource.h"
 #include <TObjArray.h>
 #include <TObjString.h>
 #include <TStopwatch.h>
@@ -530,6 +531,9 @@ int AliHLTSystem::StartTasks()
   } else {
     fEventCount=0;
     fGoodEvents=0;
+    if ((iResult=SendControlEvent(kAliHLTDataTypeSOR))<0) {
+      HLTError("can not send SOR event");
+    }
   }
   return iResult;
 }
@@ -567,12 +571,16 @@ int AliHLTSystem::StopTasks()
 {
   // see header file for class documentation
   int iResult=0;
+  if ((iResult=SendControlEvent(kAliHLTDataTypeEOR))<0) {
+    HLTError("can not send EOR event");
+  }
   TObjLink *lnk=fTaskList.FirstLink();
-  while (lnk && iResult>=0) {
+  while (lnk) {
     TObject* obj=lnk->GetObject();
     if (obj) {
       AliHLTTask* pTask=(AliHLTTask*)obj;
-      iResult=pTask->EndRun();
+      int locResult=pTask->EndRun();
+      if (iResult>=0 && locResult<0) iResult=locResult;
 //       ProcInfo_t ProcInfo;
 //       gSystem->GetProcInfo(&ProcInfo);
 //       HLTInfo("task %s stopped (%d), current memory usage %d %d", pTask->GetName(), iResult, ProcInfo.fMemResident, ProcInfo.fMemVirtual);
@@ -584,6 +592,33 @@ int AliHLTSystem::StopTasks()
   return iResult;
 }
 
+int AliHLTSystem::SendControlEvent(AliHLTComponentDataType dt)
+{
+  // see header file for class documentation
+
+  // disabled for the moment
+  return 0;
+
+  int iResult=0;
+  AliHLTRunDesc runDesc;
+  memset(&runDesc, 0, sizeof(AliHLTRunDesc));
+  runDesc.fStructSize=sizeof(AliHLTRunDesc);
+  AliHLTDataSource::AliSpecialEventGuard g(&runDesc, dt, kAliHLTVoidDataSpec);
+  HLTDebug("sending event %s, run descriptor %p", AliHLTComponent::DataType2Text(dt).c_str(), &runDesc);
+  TObjLink *lnk=fTaskList.FirstLink();
+  while (lnk && iResult>=0) {
+    TObject* obj=lnk->GetObject();
+    if (obj) {
+      AliHLTTask* pTask=(AliHLTTask*)obj;
+      iResult=pTask->ProcessTask(-1);
+    } else {
+    }
+    lnk = lnk->Next();
+  }
+  HLTDebug("event %s done (%d)", AliHLTComponent::DataType2Text(dt).c_str(), iResult);
+  return iResult;
+}
+
 int AliHLTSystem::DeinitTasks()
 {
   // see header file for class documentation
index b2d9db1..530da60 100644 (file)
@@ -196,6 +196,14 @@ class AliHLTSystem : public AliHLTLogging {
   int StopTasks();
 
   /**
+   * Send a control event trough the chain.
+   * All data sources in the chain are switched to publish a control event like
+   * SOR or EOR. The event is propagated in the same way as a normal event.
+   * @param dt       type of the event
+   */
+  int SendControlEvent(AliHLTComponentDataType dt);
+
+  /**
    * De-init all tasks from the list.
    * The @ref AliHLTTask::Deinit method is called for each task, the components
    * will be deleted.
index 536433e..78e007f 100644 (file)
@@ -364,7 +364,7 @@ int AliHLTTask::StartRun()
     if (iResult>=0) {
       if (fBlockDataArray.size()>0) {
        HLTWarning("block data array for task %s (%p) was not cleaned", GetName(), this);
-       fBlockDataArray.resize(0);
+       fBlockDataArray.clear();
       }
 
       // component init
@@ -372,13 +372,6 @@ int AliHLTTask::StartRun()
       // of the component.
       //iResult=Init( AliHLTComponentEnvironment* environ, void* environ_param, int argc, const char** argv );
 
-      // allocate internal task variables for bookkeeping aso.
-      // we allocate the BlockData array with at least one member
-      if (iNofInputDataBlocks==0) iNofInputDataBlocks=1;
-      AliHLTComponentBlockData init;
-      memset(&init, 0, sizeof(AliHLTComponentBlockData));
-      fBlockDataArray.resize(iNofInputDataBlocks, init);
-
       // allocate the data buffer, which controls the output buffer and subscriptions
       if (iResult>=0) {
        fpDataBuffer=new AliHLTDataBuffer;
@@ -403,6 +396,10 @@ int AliHLTTask::StartRun()
        }
       }
     }
+    if (iResult>=0) {
+      // send the SOR event
+      
+    }
   } else {
     HLTError("task %s (%p) does not have a component", GetName(), this);
     iResult=-EFAULT;
@@ -415,9 +412,7 @@ int AliHLTTask::EndRun()
   // see header file for function documentation
   int iResult=0;
   if (fBlockDataArray.size()>0) {
-    fBlockDataArray.resize(0);
-  } else {
-    HLTWarning("task %s (%p) doesn't seem to be in running mode", GetName(), this);
+    fBlockDataArray.clear();
   }
   if (fpDataBuffer) {
     AliHLTDataBuffer* pBuffer=fpDataBuffer;
@@ -442,33 +437,48 @@ int AliHLTTask::ProcessTask(Int_t eventNo)
     AliHLTTaskPList subscribedTaskList;
     TObjLink* lnk=fListDependencies.FirstLink();
 
+    // instances of SOR and EOR events to be kept
+    int iSOR=-1;
+    int iEOR=-1;
+
     // subscribe to all source tasks
+    fBlockDataArray.clear();
     while (lnk && iResult>=0) {
       pSrcTask=(AliHLTTask*)lnk->GetObject();
       if (pSrcTask) {
        int iMatchingDB=pSrcTask->GetNofMatchingDataBlocks(this);
-       if (iMatchingDB>=0 && static_cast<unsigned int>(iMatchingDB)>fBlockDataArray.size()-iSourceDataBlock) {
-         AliHLTComponentBlockData init;
-         memset(&init, 0, sizeof(AliHLTComponentBlockData));
-         fBlockDataArray.resize(iSourceDataBlock+iMatchingDB, init);
-       } else {
-         if (iMatchingDB<0) {
-           HLTError("task %s (%p): error getting no of matching data blocks from task %s (%p), error %d", GetName(), this, pSrcTask->GetName(), pSrcTask, iMatchingDB);
-           iResult=iMatchingDB;
-           break;
-         } else if (iMatchingDB==0) {
-           HLTDebug("source task %s (%p) does not provide any matching data type for task %s (%p)", pSrcTask->GetName(), pSrcTask, GetName(), this);
-         }
+       if (iMatchingDB<0) {
+         HLTError("task %s (%p): error getting no of matching data blocks from task %s (%p), error %d", GetName(), this, pSrcTask->GetName(), pSrcTask, iMatchingDB);
+         iResult=iMatchingDB;
+         break;
+       } else if (iMatchingDB==0) {
+         HLTDebug("source task %s (%p) does not provide any matching data type for task %s (%p)", pSrcTask->GetName(), pSrcTask, GetName(), this);
        }
-       if ((iResult=pSrcTask->Subscribe(this, &fBlockDataArray[iSourceDataBlock],fBlockDataArray.size()-iSourceDataBlock))>=0) {
-         for (int i=0; i<iResult; i++) {
-           iInputDataVolume+=fBlockDataArray[i+iSourceDataBlock].fSize;
+       if ((iResult=pSrcTask->Subscribe(this, fBlockDataArray))>=0) {
+         iSOR=iEOR=-1;
+         AliHLTComponentBlockDataList::iterator block=fBlockDataArray.begin();
+         for (int i=0; block!=fBlockDataArray.end(); i++) {
+           bool bRemove=0;
+           bRemove|=(*block).fDataType==kAliHLTDataTypeSOR && !(iSOR<0 && (iSOR=i)>=0);
+           bRemove|=(*block).fDataType==kAliHLTDataTypeEOR && !(iEOR<0 && (iEOR=i)>=0);
+           //HLTInfo("block %d, iSOR=%d iEOR=%d remove=%d", i, iSOR, iEOR, bRemove);
+           if (i<iSourceDataBlock) {
+             assert(!bRemove);
+           } else if (bRemove) {
+             HLTDebug("remove duplicated event %s (%d)", AliHLTComponent::DataType2Text((*block).fDataType).c_str(), i);
+             pSrcTask->Release(&(*block), this);
+             block=fBlockDataArray.erase(block);
+             continue;
+           } else {
+           iInputDataVolume+=(*block).fSize;
            // put the source task as many times into the list as it provides data blocks
            // makes the bookkeeping for the data release easier
            subscribedTaskList.push_back(pSrcTask);
+           }
+           block++;
          }
          HLTDebug("Task %s (%p) successfully subscribed to %d data block(s) of task %s (%p)", GetName(), this, iResult, pSrcTask->GetName(), pSrcTask);
-         iSourceDataBlock+=iResult;      
+         iSourceDataBlock=fBlockDataArray.size();        
          iResult=0;
        } else {
          HLTError("Task %s (%p): subscription to task %s (%p) failed with error %d", GetName(), this, pSrcTask->GetName(), pSrcTask, iResult);
@@ -479,7 +489,7 @@ int AliHLTTask::ProcessTask(Int_t eventNo)
        iResult=-EFAULT;
       }
       lnk=lnk->Next();
-    }
+    }    
 
     // process the event
     int iNofTrial=0; // repeat processing if component returns -ENOSPC
@@ -517,28 +527,46 @@ int AliHLTTask::ProcessTask(Int_t eventNo)
        iResult=pComponent->ProcessEvent(evtData, &fBlockDataArray[0], trigData, pTgtBuffer, size, outputBlockCnt, outputBlocks, edd);
        HLTDebug("task %s: component %s ProcessEvent finnished (%d): size=%d blocks=%d", GetName(), pComponent->GetComponentID(), iResult, size, outputBlockCnt);
        if (iResult>=0 && outputBlocks) {
-         AliHLTComponentBlockDataList segments;
-         for (AliHLTUInt32_t oblock=0; oblock<outputBlockCnt; oblock++) {
-           AliHLTUInt32_t iblock=0;
-           for (; iblock<evtData.fBlockCnt; iblock++) {
-             if (fBlockDataArray[iblock].fPtr==outputBlocks[oblock].fPtr) {
-               assert(subscribedTaskList[iblock]!=NULL);
-               if (subscribedTaskList[iblock]==NULL) continue;
-               HLTDebug("forward segment %d (source task %s %p) to data buffer %p", iblock, pSrcTask->GetName(), pSrcTask, fpDataBuffer);
-               fpDataBuffer->Forward(subscribedTaskList[iblock], &fBlockDataArray[iblock]);
-               subscribedTaskList[iblock]=NULL; // not to be released in the loop further down
-               break;
+         if (fListTargets.First()!=NULL) {
+           AliHLTComponentBlockDataList segments;
+           for (AliHLTUInt32_t oblock=0; oblock<outputBlockCnt; oblock++) {
+             AliHLTUInt32_t iblock=0;
+             for (; iblock<evtData.fBlockCnt; iblock++) {
+               if (fBlockDataArray[iblock].fPtr==outputBlocks[oblock].fPtr) {
+                 assert(subscribedTaskList[iblock]!=NULL);
+                 if (subscribedTaskList[iblock]==NULL) continue;
+                 HLTDebug("forward segment %d (source task %s %p) to data buffer %p", iblock, pSrcTask->GetName(), pSrcTask, fpDataBuffer);
+                 fpDataBuffer->Forward(subscribedTaskList[iblock], &fBlockDataArray[iblock]);
+                 subscribedTaskList[iblock]=NULL; // not to be released in the loop further down
+                 break;
+               }
+             }
+             if (iblock==evtData.fBlockCnt) segments.push_back(outputBlocks[oblock]);
+             if (pTgtBuffer && segments.size()>0) {
+               iResult=fpDataBuffer->SetSegments(pTgtBuffer, &segments[0], segments.size());
              }
            }
-           if (iblock==evtData.fBlockCnt) segments.push_back(outputBlocks[oblock]);
-         }
-         if (pTgtBuffer && segments.size()>0) {
-           iResult=fpDataBuffer->SetSegments(pTgtBuffer, &segments[0], segments.size());
+         } else {
+           // no forwarding, actually we dont even need to keep the data, this is a
+           // dead end (fListTargets empty)
+           //iResult=fpDataBuffer->SetSegments(pTgtBuffer, outputBlocks, outputBlockCnt);
          }
          delete [] outputBlocks; outputBlocks=NULL; outputBlockCnt=0;
        } else {
          fpDataBuffer->Reset();
        }
+       if (fListTargets.First()!=NULL) {
+         if (iSOR>=0 && subscribedTaskList[iSOR]!=NULL) {
+           HLTDebug("forward SOR event segment %d (source task %s %p) to data buffer %p", iSOR, pSrcTask->GetName(), pSrcTask, fpDataBuffer);
+           fpDataBuffer->Forward(subscribedTaskList[iSOR], &fBlockDataArray[iSOR]);
+           subscribedTaskList[iSOR]=NULL; // not to be released in the loop further down
+         }
+         if (iEOR>=0 && subscribedTaskList[iEOR]!=NULL) {
+           HLTDebug("forward EOR event (%s) segment %d (source task %s %p) to data buffer %p", AliHLTComponent::DataType2Text(fBlockDataArray[iEOR].fDataType).c_str(), iEOR, pSrcTask->GetName(), pSrcTask, fpDataBuffer);
+           fpDataBuffer->Forward(subscribedTaskList[iEOR], &fBlockDataArray[iEOR]);
+           subscribedTaskList[iEOR]=NULL; // not to be released in the loop further down
+         }
+       }
       } else {
        HLTError("task %s: no target buffer available", GetName());
        iResult=-EFAULT;
@@ -612,13 +640,13 @@ int AliHLTTask::GetNofMatchingDataTypes(const AliHLTTask* pConsumerTask) const
   return iResult;
 }
 
-int AliHLTTask::Subscribe(const AliHLTTask* pConsumerTask, AliHLTComponentBlockData* pBlockDesc, int iArraySize)
+int AliHLTTask::Subscribe(const AliHLTTask* pConsumerTask, AliHLTComponentBlockDataList& blockDescList)
 {
   // see header file for function documentation
   int iResult=0;
   if (pConsumerTask) {
     if (fpDataBuffer) {
-      iResult=fpDataBuffer->Subscribe(pConsumerTask->GetComponent(), pBlockDesc, iArraySize);
+      iResult=fpDataBuffer->Subscribe(pConsumerTask->GetComponent(), blockDescList);
     } else {
       HLTFatal("internal data buffer missing");
       iResult=-EFAULT;
index b528f0c..04e91f4 100644 (file)
@@ -226,12 +226,10 @@ class AliHLTTask : public TObject, public AliHLTLogging {
    * returns the number of blocks which would be prepared in case the target
    * array is big enough.
    * @param pConsumerTask   the task which subscribes to the data
-   * @param arrayBlockDesc  pointer to block descriptor to be filled
-   * @param iArraySize      size of the block descriptor array
+   * @param blockDescList   block descriptor list to be filled
    * @return number of matching data blocks, negative error code if failed
    */
-  int Subscribe(const AliHLTTask* pConsumerTask,
-               AliHLTComponentBlockData* arrayBlockDesc, int iArraySize);
+  int Subscribe(const AliHLTTask* pConsumerTask, AliHLTComponentBlockDataList& blockDescList);
 
   /**
    * Release a block descriptor.