]> git.uio.no Git - u/mrichter/AliRoot.git/blobdiff - HLT/BASE/AliHLTTask.cxx
implementation of SOR and EOR events (not yet enabled), bugfix in DataBuffer: data...
[u/mrichter/AliRoot.git] / HLT / BASE / AliHLTTask.cxx
index 536433ed15d10991769ed715b904a06cd690e351..78e007f13e08ed43dae36c89a1158a1d6ff10432 100644 (file)
@@ -364,7 +364,7 @@ int AliHLTTask::StartRun()
     if (iResult>=0) {
       if (fBlockDataArray.size()>0) {
        HLTWarning("block data array for task %s (%p) was not cleaned", GetName(), this);
-       fBlockDataArray.resize(0);
+       fBlockDataArray.clear();
       }
 
       // component init
@@ -372,13 +372,6 @@ int AliHLTTask::StartRun()
       // of the component.
       //iResult=Init( AliHLTComponentEnvironment* environ, void* environ_param, int argc, const char** argv );
 
-      // allocate internal task variables for bookkeeping aso.
-      // we allocate the BlockData array with at least one member
-      if (iNofInputDataBlocks==0) iNofInputDataBlocks=1;
-      AliHLTComponentBlockData init;
-      memset(&init, 0, sizeof(AliHLTComponentBlockData));
-      fBlockDataArray.resize(iNofInputDataBlocks, init);
-
       // allocate the data buffer, which controls the output buffer and subscriptions
       if (iResult>=0) {
        fpDataBuffer=new AliHLTDataBuffer;
@@ -403,6 +396,10 @@ int AliHLTTask::StartRun()
        }
       }
     }
+    if (iResult>=0) {
+      // send the SOR event
+      
+    }
   } else {
     HLTError("task %s (%p) does not have a component", GetName(), this);
     iResult=-EFAULT;
@@ -415,9 +412,7 @@ int AliHLTTask::EndRun()
   // see header file for function documentation
   int iResult=0;
   if (fBlockDataArray.size()>0) {
-    fBlockDataArray.resize(0);
-  } else {
-    HLTWarning("task %s (%p) doesn't seem to be in running mode", GetName(), this);
+    fBlockDataArray.clear();
   }
   if (fpDataBuffer) {
     AliHLTDataBuffer* pBuffer=fpDataBuffer;
@@ -442,33 +437,48 @@ int AliHLTTask::ProcessTask(Int_t eventNo)
     AliHLTTaskPList subscribedTaskList;
     TObjLink* lnk=fListDependencies.FirstLink();
 
+    // instances of SOR and EOR events to be kept
+    int iSOR=-1;
+    int iEOR=-1;
+
     // subscribe to all source tasks
+    fBlockDataArray.clear();
     while (lnk && iResult>=0) {
       pSrcTask=(AliHLTTask*)lnk->GetObject();
       if (pSrcTask) {
        int iMatchingDB=pSrcTask->GetNofMatchingDataBlocks(this);
-       if (iMatchingDB>=0 && static_cast<unsigned int>(iMatchingDB)>fBlockDataArray.size()-iSourceDataBlock) {
-         AliHLTComponentBlockData init;
-         memset(&init, 0, sizeof(AliHLTComponentBlockData));
-         fBlockDataArray.resize(iSourceDataBlock+iMatchingDB, init);
-       } else {
-         if (iMatchingDB<0) {
-           HLTError("task %s (%p): error getting no of matching data blocks from task %s (%p), error %d", GetName(), this, pSrcTask->GetName(), pSrcTask, iMatchingDB);
-           iResult=iMatchingDB;
-           break;
-         } else if (iMatchingDB==0) {
-           HLTDebug("source task %s (%p) does not provide any matching data type for task %s (%p)", pSrcTask->GetName(), pSrcTask, GetName(), this);
-         }
+       if (iMatchingDB<0) {
+         HLTError("task %s (%p): error getting no of matching data blocks from task %s (%p), error %d", GetName(), this, pSrcTask->GetName(), pSrcTask, iMatchingDB);
+         iResult=iMatchingDB;
+         break;
+       } else if (iMatchingDB==0) {
+         HLTDebug("source task %s (%p) does not provide any matching data type for task %s (%p)", pSrcTask->GetName(), pSrcTask, GetName(), this);
        }
-       if ((iResult=pSrcTask->Subscribe(this, &fBlockDataArray[iSourceDataBlock],fBlockDataArray.size()-iSourceDataBlock))>=0) {
-         for (int i=0; i<iResult; i++) {
-           iInputDataVolume+=fBlockDataArray[i+iSourceDataBlock].fSize;
+       if ((iResult=pSrcTask->Subscribe(this, fBlockDataArray))>=0) {
+         iSOR=iEOR=-1;
+         AliHLTComponentBlockDataList::iterator block=fBlockDataArray.begin();
+         for (int i=0; block!=fBlockDataArray.end(); i++) {
+           bool bRemove=0;
+           bRemove|=(*block).fDataType==kAliHLTDataTypeSOR && !(iSOR<0 && (iSOR=i)>=0);
+           bRemove|=(*block).fDataType==kAliHLTDataTypeEOR && !(iEOR<0 && (iEOR=i)>=0);
+           //HLTInfo("block %d, iSOR=%d iEOR=%d remove=%d", i, iSOR, iEOR, bRemove);
+           if (i<iSourceDataBlock) {
+             assert(!bRemove);
+           } else if (bRemove) {
+             HLTDebug("remove duplicated event %s (%d)", AliHLTComponent::DataType2Text((*block).fDataType).c_str(), i);
+             pSrcTask->Release(&(*block), this);
+             block=fBlockDataArray.erase(block);
+             continue;
+           } else {
+           iInputDataVolume+=(*block).fSize;
            // put the source task as many times into the list as it provides data blocks
            // makes the bookkeeping for the data release easier
            subscribedTaskList.push_back(pSrcTask);
+           }
+           block++;
          }
          HLTDebug("Task %s (%p) successfully subscribed to %d data block(s) of task %s (%p)", GetName(), this, iResult, pSrcTask->GetName(), pSrcTask);
-         iSourceDataBlock+=iResult;      
+         iSourceDataBlock=fBlockDataArray.size();        
          iResult=0;
        } else {
          HLTError("Task %s (%p): subscription to task %s (%p) failed with error %d", GetName(), this, pSrcTask->GetName(), pSrcTask, iResult);
@@ -479,7 +489,7 @@ int AliHLTTask::ProcessTask(Int_t eventNo)
        iResult=-EFAULT;
       }
       lnk=lnk->Next();
-    }
+    }    
 
     // process the event
     int iNofTrial=0; // repeat processing if component returns -ENOSPC
@@ -517,28 +527,46 @@ int AliHLTTask::ProcessTask(Int_t eventNo)
        iResult=pComponent->ProcessEvent(evtData, &fBlockDataArray[0], trigData, pTgtBuffer, size, outputBlockCnt, outputBlocks, edd);
        HLTDebug("task %s: component %s ProcessEvent finnished (%d): size=%d blocks=%d", GetName(), pComponent->GetComponentID(), iResult, size, outputBlockCnt);
        if (iResult>=0 && outputBlocks) {
-         AliHLTComponentBlockDataList segments;
-         for (AliHLTUInt32_t oblock=0; oblock<outputBlockCnt; oblock++) {
-           AliHLTUInt32_t iblock=0;
-           for (; iblock<evtData.fBlockCnt; iblock++) {
-             if (fBlockDataArray[iblock].fPtr==outputBlocks[oblock].fPtr) {
-               assert(subscribedTaskList[iblock]!=NULL);
-               if (subscribedTaskList[iblock]==NULL) continue;
-               HLTDebug("forward segment %d (source task %s %p) to data buffer %p", iblock, pSrcTask->GetName(), pSrcTask, fpDataBuffer);
-               fpDataBuffer->Forward(subscribedTaskList[iblock], &fBlockDataArray[iblock]);
-               subscribedTaskList[iblock]=NULL; // not to be released in the loop further down
-               break;
+         if (fListTargets.First()!=NULL) {
+           AliHLTComponentBlockDataList segments;
+           for (AliHLTUInt32_t oblock=0; oblock<outputBlockCnt; oblock++) {
+             AliHLTUInt32_t iblock=0;
+             for (; iblock<evtData.fBlockCnt; iblock++) {
+               if (fBlockDataArray[iblock].fPtr==outputBlocks[oblock].fPtr) {
+                 assert(subscribedTaskList[iblock]!=NULL);
+                 if (subscribedTaskList[iblock]==NULL) continue;
+                 HLTDebug("forward segment %d (source task %s %p) to data buffer %p", iblock, pSrcTask->GetName(), pSrcTask, fpDataBuffer);
+                 fpDataBuffer->Forward(subscribedTaskList[iblock], &fBlockDataArray[iblock]);
+                 subscribedTaskList[iblock]=NULL; // not to be released in the loop further down
+                 break;
+               }
+             }
+             if (iblock==evtData.fBlockCnt) segments.push_back(outputBlocks[oblock]);
+             if (pTgtBuffer && segments.size()>0) {
+               iResult=fpDataBuffer->SetSegments(pTgtBuffer, &segments[0], segments.size());
              }
            }
-           if (iblock==evtData.fBlockCnt) segments.push_back(outputBlocks[oblock]);
-         }
-         if (pTgtBuffer && segments.size()>0) {
-           iResult=fpDataBuffer->SetSegments(pTgtBuffer, &segments[0], segments.size());
+         } else {
+           // no forwarding, actually we dont even need to keep the data, this is a
+           // dead end (fListTargets empty)
+           //iResult=fpDataBuffer->SetSegments(pTgtBuffer, outputBlocks, outputBlockCnt);
          }
          delete [] outputBlocks; outputBlocks=NULL; outputBlockCnt=0;
        } else {
          fpDataBuffer->Reset();
        }
+       if (fListTargets.First()!=NULL) {
+         if (iSOR>=0 && subscribedTaskList[iSOR]!=NULL) {
+           HLTDebug("forward SOR event segment %d (source task %s %p) to data buffer %p", iSOR, pSrcTask->GetName(), pSrcTask, fpDataBuffer);
+           fpDataBuffer->Forward(subscribedTaskList[iSOR], &fBlockDataArray[iSOR]);
+           subscribedTaskList[iSOR]=NULL; // not to be released in the loop further down
+         }
+         if (iEOR>=0 && subscribedTaskList[iEOR]!=NULL) {
+           HLTDebug("forward EOR event (%s) segment %d (source task %s %p) to data buffer %p", AliHLTComponent::DataType2Text(fBlockDataArray[iEOR].fDataType).c_str(), iEOR, pSrcTask->GetName(), pSrcTask, fpDataBuffer);
+           fpDataBuffer->Forward(subscribedTaskList[iEOR], &fBlockDataArray[iEOR]);
+           subscribedTaskList[iEOR]=NULL; // not to be released in the loop further down
+         }
+       }
       } else {
        HLTError("task %s: no target buffer available", GetName());
        iResult=-EFAULT;
@@ -612,13 +640,13 @@ int AliHLTTask::GetNofMatchingDataTypes(const AliHLTTask* pConsumerTask) const
   return iResult;
 }
 
-int AliHLTTask::Subscribe(const AliHLTTask* pConsumerTask, AliHLTComponentBlockData* pBlockDesc, int iArraySize)
+int AliHLTTask::Subscribe(const AliHLTTask* pConsumerTask, AliHLTComponentBlockDataList& blockDescList)
 {
   // see header file for function documentation
   int iResult=0;
   if (pConsumerTask) {
     if (fpDataBuffer) {
-      iResult=fpDataBuffer->Subscribe(pConsumerTask->GetComponent(), pBlockDesc, iArraySize);
+      iResult=fpDataBuffer->Subscribe(pConsumerTask->GetComponent(), blockDescList);
     } else {
       HLTFatal("internal data buffer missing");
       iResult=-EFAULT;