]> git.uio.no Git - u/mrichter/AliRoot.git/blobdiff - HLT/BASE/AliHLTTask.cxx
implementation of SOR and EOR events (not yet enabled), bugfix in DataBuffer: data...
[u/mrichter/AliRoot.git] / HLT / BASE / AliHLTTask.cxx
index 6a57a3476bea449b8ea8eb367543a0a994c67492..78e007f13e08ed43dae36c89a1158a1d6ff10432 100644 (file)
@@ -33,6 +33,7 @@ using namespace std;
 #endif
 
 #include <cerrno>
+#include <cassert>
 #include <iostream>
 #include <string>
 #include "AliHLTTask.h"
@@ -363,7 +364,7 @@ int AliHLTTask::StartRun()
     if (iResult>=0) {
       if (fBlockDataArray.size()>0) {
        HLTWarning("block data array for task %s (%p) was not cleaned", GetName(), this);
-       fBlockDataArray.resize(0);
+       fBlockDataArray.clear();
       }
 
       // component init
@@ -371,13 +372,6 @@ int AliHLTTask::StartRun()
       // of the component.
       //iResult=Init( AliHLTComponentEnvironment* environ, void* environ_param, int argc, const char** argv );
 
-      // allocate internal task variables for bookkeeping aso.
-      // we allocate the BlockData array with at least one member
-      if (iNofInputDataBlocks==0) iNofInputDataBlocks=1;
-      AliHLTComponentBlockData init;
-      memset(&init, 0, sizeof(AliHLTComponentBlockData));
-      fBlockDataArray.resize(iNofInputDataBlocks, init);
-
       // allocate the data buffer, which controls the output buffer and subscriptions
       if (iResult>=0) {
        fpDataBuffer=new AliHLTDataBuffer;
@@ -402,6 +396,10 @@ int AliHLTTask::StartRun()
        }
       }
     }
+    if (iResult>=0) {
+      // send the SOR event
+      
+    }
   } else {
     HLTError("task %s (%p) does not have a component", GetName(), this);
     iResult=-EFAULT;
@@ -414,9 +412,7 @@ int AliHLTTask::EndRun()
   // see header file for function documentation
   int iResult=0;
   if (fBlockDataArray.size()>0) {
-    fBlockDataArray.resize(0);
-  } else {
-    HLTWarning("task %s (%p) doesn't seem to be in running mode", GetName(), this);
+    fBlockDataArray.clear();
   }
   if (fpDataBuffer) {
     AliHLTDataBuffer* pBuffer=fpDataBuffer;
@@ -438,36 +434,51 @@ int AliHLTTask::ProcessTask(Int_t eventNo)
     int iInputDataVolume=0;
 
     AliHLTTask* pSrcTask=NULL;
-    TList subscribedTaskList;
+    AliHLTTaskPList subscribedTaskList;
     TObjLink* lnk=fListDependencies.FirstLink();
 
+    // instances of SOR and EOR events to be kept
+    int iSOR=-1;
+    int iEOR=-1;
+
     // subscribe to all source tasks
+    fBlockDataArray.clear();
     while (lnk && iResult>=0) {
       pSrcTask=(AliHLTTask*)lnk->GetObject();
       if (pSrcTask) {
        int iMatchingDB=pSrcTask->GetNofMatchingDataBlocks(this);
-       if (iMatchingDB>=0 && static_cast<unsigned int>(iMatchingDB)>fBlockDataArray.size()-iSourceDataBlock) {
-         AliHLTComponentBlockData init;
-         memset(&init, 0, sizeof(AliHLTComponentBlockData));
-         fBlockDataArray.resize(iSourceDataBlock+iMatchingDB, init);
-       } else {
-         if (iMatchingDB<0) {
-           HLTError("task %s (%p): error getting no of matching data blocks from task %s (%p), error %d", GetName(), this, pSrcTask->GetName(), pSrcTask, iMatchingDB);
-           iResult=iMatchingDB;
-           break;
-         } else if (iMatchingDB==0) {
-           HLTDebug("source task %s (%p) does not provide any matching data type for task %s (%p)", pSrcTask->GetName(), pSrcTask, GetName(), this);
-         }
+       if (iMatchingDB<0) {
+         HLTError("task %s (%p): error getting no of matching data blocks from task %s (%p), error %d", GetName(), this, pSrcTask->GetName(), pSrcTask, iMatchingDB);
+         iResult=iMatchingDB;
+         break;
+       } else if (iMatchingDB==0) {
+         HLTDebug("source task %s (%p) does not provide any matching data type for task %s (%p)", pSrcTask->GetName(), pSrcTask, GetName(), this);
        }
-       if ((iResult=pSrcTask->Subscribe(this, &fBlockDataArray[iSourceDataBlock],fBlockDataArray.size()-iSourceDataBlock))>=0) {
-         for (int i=0; i<iResult; i++) {
-           iInputDataVolume+=fBlockDataArray[i+iSourceDataBlock].fSize;
+       if ((iResult=pSrcTask->Subscribe(this, fBlockDataArray))>=0) {
+         iSOR=iEOR=-1;
+         AliHLTComponentBlockDataList::iterator block=fBlockDataArray.begin();
+         for (int i=0; block!=fBlockDataArray.end(); i++) {
+           bool bRemove=0;
+           bRemove|=(*block).fDataType==kAliHLTDataTypeSOR && !(iSOR<0 && (iSOR=i)>=0);
+           bRemove|=(*block).fDataType==kAliHLTDataTypeEOR && !(iEOR<0 && (iEOR=i)>=0);
+           //HLTInfo("block %d, iSOR=%d iEOR=%d remove=%d", i, iSOR, iEOR, bRemove);
+           if (i<iSourceDataBlock) {
+             assert(!bRemove);
+           } else if (bRemove) {
+             HLTDebug("remove duplicated event %s (%d)", AliHLTComponent::DataType2Text((*block).fDataType).c_str(), i);
+             pSrcTask->Release(&(*block), this);
+             block=fBlockDataArray.erase(block);
+             continue;
+           } else {
+           iInputDataVolume+=(*block).fSize;
            // put the source task as many times into the list as it provides data blocks
            // makes the bookkeeping for the data release easier
-           subscribedTaskList.Add(pSrcTask);
+           subscribedTaskList.push_back(pSrcTask);
+           }
+           block++;
          }
          HLTDebug("Task %s (%p) successfully subscribed to %d data block(s) of task %s (%p)", GetName(), this, iResult, pSrcTask->GetName(), pSrcTask);
-         iSourceDataBlock+=iResult;      
+         iSourceDataBlock=fBlockDataArray.size();        
          iResult=0;
        } else {
          HLTError("Task %s (%p): subscription to task %s (%p) failed with error %d", GetName(), this, pSrcTask->GetName(), pSrcTask, iResult);
@@ -478,7 +489,7 @@ int AliHLTTask::ProcessTask(Int_t eventNo)
        iResult=-EFAULT;
       }
       lnk=lnk->Next();
-    }
+    }    
 
     // process the event
     int iNofTrial=0; // repeat processing if component returns -ENOSPC
@@ -515,12 +526,47 @@ int AliHLTTask::ProcessTask(Int_t eventNo)
       if (pTgtBuffer!=NULL || iOutputDataSize==0) {
        iResult=pComponent->ProcessEvent(evtData, &fBlockDataArray[0], trigData, pTgtBuffer, size, outputBlockCnt, outputBlocks, edd);
        HLTDebug("task %s: component %s ProcessEvent finnished (%d): size=%d blocks=%d", GetName(), pComponent->GetComponentID(), iResult, size, outputBlockCnt);
-       if (iResult>=0 && pTgtBuffer && outputBlocks) {
-         iResult=fpDataBuffer->SetSegments(pTgtBuffer, outputBlocks, outputBlockCnt);
+       if (iResult>=0 && outputBlocks) {
+         if (fListTargets.First()!=NULL) {
+           AliHLTComponentBlockDataList segments;
+           for (AliHLTUInt32_t oblock=0; oblock<outputBlockCnt; oblock++) {
+             AliHLTUInt32_t iblock=0;
+             for (; iblock<evtData.fBlockCnt; iblock++) {
+               if (fBlockDataArray[iblock].fPtr==outputBlocks[oblock].fPtr) {
+                 assert(subscribedTaskList[iblock]!=NULL);
+                 if (subscribedTaskList[iblock]==NULL) continue;
+                 HLTDebug("forward segment %d (source task %s %p) to data buffer %p", iblock, pSrcTask->GetName(), pSrcTask, fpDataBuffer);
+                 fpDataBuffer->Forward(subscribedTaskList[iblock], &fBlockDataArray[iblock]);
+                 subscribedTaskList[iblock]=NULL; // not to be released in the loop further down
+                 break;
+               }
+             }
+             if (iblock==evtData.fBlockCnt) segments.push_back(outputBlocks[oblock]);
+             if (pTgtBuffer && segments.size()>0) {
+               iResult=fpDataBuffer->SetSegments(pTgtBuffer, &segments[0], segments.size());
+             }
+           }
+         } else {
+           // no forwarding, actually we dont even need to keep the data, this is a
+           // dead end (fListTargets empty)
+           //iResult=fpDataBuffer->SetSegments(pTgtBuffer, outputBlocks, outputBlockCnt);
+         }
          delete [] outputBlocks; outputBlocks=NULL; outputBlockCnt=0;
        } else {
          fpDataBuffer->Reset();
        }
+       if (fListTargets.First()!=NULL) {
+         if (iSOR>=0 && subscribedTaskList[iSOR]!=NULL) {
+           HLTDebug("forward SOR event segment %d (source task %s %p) to data buffer %p", iSOR, pSrcTask->GetName(), pSrcTask, fpDataBuffer);
+           fpDataBuffer->Forward(subscribedTaskList[iSOR], &fBlockDataArray[iSOR]);
+           subscribedTaskList[iSOR]=NULL; // not to be released in the loop further down
+         }
+         if (iEOR>=0 && subscribedTaskList[iEOR]!=NULL) {
+           HLTDebug("forward EOR event (%s) segment %d (source task %s %p) to data buffer %p", AliHLTComponent::DataType2Text(fBlockDataArray[iEOR].fDataType).c_str(), iEOR, pSrcTask->GetName(), pSrcTask, fpDataBuffer);
+           fpDataBuffer->Forward(subscribedTaskList[iEOR], &fBlockDataArray[iEOR]);
+           subscribedTaskList[iEOR]=NULL; // not to be released in the loop further down
+         }
+       }
       } else {
        HLTError("task %s: no target buffer available", GetName());
        iResult=-EFAULT;
@@ -530,9 +576,9 @@ int AliHLTTask::ProcessTask(Int_t eventNo)
 
     // now release all buffers which we have subscribed to
     iSourceDataBlock=0;
-    lnk=subscribedTaskList.FirstLink();
-    while (lnk) {
-      pSrcTask=(AliHLTTask*)lnk->GetObject();
+    AliHLTTaskPList::iterator element;
+    while ((element=subscribedTaskList.begin())!=subscribedTaskList.end()) {
+      pSrcTask=*element;
       if (pSrcTask) {
        int iTempRes=0;
        if ((iTempRes=pSrcTask->Release(&fBlockDataArray[iSourceDataBlock], this))>=0) {
@@ -540,15 +586,11 @@ int AliHLTTask::ProcessTask(Int_t eventNo)
        } else {
          HLTError("Task %s (%p): realease of task %s (%p) failed with error %d", GetName(), this, pSrcTask->GetName(), pSrcTask, iTempRes);
        }
-      } else {
-       HLTFatal("task %s (%p): internal error in ROOT list handling", GetName(), this);
-       if (iResult>=0) iResult=-EFAULT;
       }
-      subscribedTaskList.Remove(lnk);
-      lnk=subscribedTaskList.FirstLink();
+      subscribedTaskList.erase(element);
       iSourceDataBlock++;
     }
-    if (subscribedTaskList.GetSize()>0) {
+    if (subscribedTaskList.size()>0) {
       HLTError("task %s (%p): could not release all data buffers", GetName(), this);
     }
   } else {
@@ -598,13 +640,13 @@ int AliHLTTask::GetNofMatchingDataTypes(const AliHLTTask* pConsumerTask) const
   return iResult;
 }
 
-int AliHLTTask::Subscribe(const AliHLTTask* pConsumerTask, AliHLTComponentBlockData* pBlockDesc, int iArraySize)
+int AliHLTTask::Subscribe(const AliHLTTask* pConsumerTask, AliHLTComponentBlockDataList& blockDescList)
 {
   // see header file for function documentation
   int iResult=0;
   if (pConsumerTask) {
     if (fpDataBuffer) {
-      iResult=fpDataBuffer->Subscribe(pConsumerTask->GetComponent(), pBlockDesc, iArraySize);
+      iResult=fpDataBuffer->Subscribe(pConsumerTask->GetComponent(), blockDescList);
     } else {
       HLTFatal("internal data buffer missing");
       iResult=-EFAULT;
@@ -621,7 +663,7 @@ int AliHLTTask::Release(AliHLTComponentBlockData* pBlockDesc, const AliHLTTask*
   int iResult=0;
   if (pConsumerTask && pBlockDesc) {
     if (fpDataBuffer) {
-      iResult=fpDataBuffer->Release(pBlockDesc, pConsumerTask->GetComponent());
+      iResult=fpDataBuffer->Release(pBlockDesc, pConsumerTask->GetComponent(), this);
     } else {
       HLTFatal("internal data buffer missing");
       iResult=-EFAULT;