]> git.uio.no Git - u/mrichter/AliRoot.git/blobdiff - HLT/BASE/AliHLTDataBuffer.cxx
implement block forwarding
[u/mrichter/AliRoot.git] / HLT / BASE / AliHLTDataBuffer.cxx
index 5751583f67f9bdb87c862610e5e0062b92f5e93f..4503d842a6aca35d0648edfdbd13392b9a67a965 100644 (file)
@@ -35,6 +35,7 @@ using namespace std;
 #include "AliHLTDataBuffer.h"
 #include "AliHLTConsumerDescriptor.h"
 #include "AliHLTComponent.h"
+#include "AliHLTTask.h"
 #include <cerrno>
 #include <cassert>
 //#include <string>
@@ -53,7 +54,9 @@ AliHLTDataBuffer::AliHLTDataBuffer()
   fActiveConsumers(),
   fReleasedConsumers(),
   fpBuffer(NULL),
-  fFlags(0)
+  fFlags(0),
+  fForwardedSegmentSources(),
+  fForwardedSegments()
 {
   // see header file for class documentation
   // or
@@ -138,6 +141,9 @@ int AliHLTDataBuffer::FindMatchingDataSegments(const AliHLTComponent* pConsumer,
   // all blocks are passed to the consumer, which is the policy also in
   // PubSub
   tgtList.assign(fSegments.begin(), fSegments.end());
+
+  // add all forwarded blocks
+  tgtList.insert(tgtList.begin(), fForwardedSegments.begin(), fForwardedSegments.end());
   iResult=tgtList.size();
   return iResult;
   
@@ -169,7 +175,7 @@ int AliHLTDataBuffer::Subscribe(const AliHLTComponent* pConsumer, AliHLTComponen
   // see header file for function documentation
   int iResult=0;
   if (pConsumer && arrayBlockDesc) {
-    if (fpBuffer) {
+    if (1/*fpBuffer*/) {
       AliHLTConsumerDescriptor* pDesc=FindConsumer(pConsumer, fConsumers);
       if (pDesc) {
        AliHLTDataSegmentList tgtList;
@@ -190,14 +196,13 @@ int AliHLTDataBuffer::Subscribe(const AliHLTComponent* pConsumer, AliHLTComponen
            // For incoming data blocks, fOffset must be ignored by the
            // processing component. It is set for bookkeeping in the framework.
            // fPtr always points to the beginning of the data.
-           arrayBlockDesc[i].fOffset=(*segment).fSegmentOffset;
-           AliHLTUInt8_t* pTgt=*fpBuffer;
-           pTgt+=(*segment).fSegmentOffset;
+           arrayBlockDesc[i].fOffset=0;
+           AliHLTUInt8_t* pTgt=*segment;
            arrayBlockDesc[i].fPtr=reinterpret_cast<void*>(pTgt);
            arrayBlockDesc[i].fSize=(*segment).fSegmentSize;
            arrayBlockDesc[i].fDataType=(*segment).fDataType;
            arrayBlockDesc[i].fSpecification=(*segment).fSpecification;
-           pDesc->SetActiveDataSegment(arrayBlockDesc[i].fOffset, arrayBlockDesc[i].fSize);
+           pDesc->SetActiveDataSegment(*segment);
            HLTDebug("component %p (%s) subscribed to segment #%d offset %d size %d data type %s %#x", 
                     pConsumer, ((AliHLTComponent*)pConsumer)->GetComponentID(), i, arrayBlockDesc[i].fOffset,
                     arrayBlockDesc[i].fSize, (AliHLTComponent::DataType2Text(arrayBlockDesc[i].fDataType)).c_str(), 
@@ -211,7 +216,10 @@ int AliHLTDataBuffer::Subscribe(const AliHLTComponent* pConsumer, AliHLTComponen
            iResult=-ENOSPC;
          } else {
          // move this consumer to the active list
-         if (ChangeConsumerState(pDesc, fConsumers, fActiveConsumers)>=0) {
+         if (i==0) {
+           ChangeConsumerState(pDesc, fConsumers, fReleasedConsumers);
+           HLTDebug("no input data for component %p (%s) available", pConsumer, ((AliHLTComponent*)pConsumer)->GetComponentID());
+         } else if (ChangeConsumerState(pDesc, fConsumers, fActiveConsumers)>=0) {
            HLTDebug("component %p (%s) subscribed to data buffer %p", pConsumer, ((AliHLTComponent*)pConsumer)->GetComponentID(), this);
          } else {
            // TODO: cleanup the consumer descriptor correctly
@@ -242,23 +250,54 @@ int AliHLTDataBuffer::Subscribe(const AliHLTComponent* pConsumer, AliHLTComponen
   return iResult;
 }
 
-int AliHLTDataBuffer::Release(AliHLTComponentBlockData* pBlockDesc, const AliHLTComponent* pConsumer)
+int AliHLTDataBuffer::Release(AliHLTComponentBlockData* pBlockDesc,
+                             const AliHLTComponent* pConsumer,
+                             const AliHLTTask* pOwnerTask)
 {
   // see header file for function documentation
   int iResult=0;
   if (pBlockDesc && pConsumer) {
     AliHLTConsumerDescriptor* pDesc=FindConsumer(pConsumer, fActiveConsumers);
     if (pDesc) {
-      if ((iResult=pDesc->CheckActiveDataSegment(pBlockDesc->fOffset, pBlockDesc->fSize))!=1) {
+      if ((iResult=pDesc->CheckActiveDataSegment(AliHLTDataSegment(pBlockDesc->fPtr, pBlockDesc->fOffset, pBlockDesc->fSize)))!=1) {
        HLTWarning("data segment missmatch, component %p has not subscribed to a segment with offset %#x and size %d", pConsumer, pBlockDesc->fOffset, pBlockDesc->fSize);
        // TODO: appropriate error handling, but so far optional
        iResult=0;
       } else {
-       pDesc->ReleaseActiveDataSegment(pBlockDesc->fOffset, pBlockDesc->fSize);
-       pBlockDesc->fOffset=0;
-       pBlockDesc->fPtr=NULL;
-       pBlockDesc->fSize=0;
+       pDesc->ReleaseActiveDataSegment(AliHLTDataSegment(pBlockDesc->fPtr, pBlockDesc->fOffset, pBlockDesc->fSize));
+      }
+      if (GetNofPendingConsumers()==0 && fForwardedSegments.size()>0) {
+       // last consumer, release forwarded segments
+       assert(fForwardedSegments.size()==fForwardedSegmentSources.size());
+       AliHLTDataSegmentList::iterator segment=fForwardedSegments.begin();
+       AliHLTTaskPList::iterator src=fForwardedSegmentSources.begin();
+       //HLTDebug("%p checking forwarded segments", this);
+       for (; segment!=fForwardedSegments.end(); segment++, src++) {
+         //HLTDebug("segment ptr=%p offset=%d size=%d\n"
+         //   "block ptr=%p offset=%d size=%d", (*segment).fPtr, (*segment).fSegmentOffset, (*segment).fSegmentSize, pBlockDesc->fPtr, pBlockDesc->fOffset, pBlockDesc->fSize);
+         if ((*segment)==AliHLTDataSegment(pBlockDesc->fPtr, pBlockDesc->fOffset, pBlockDesc->fSize)) {
+           //HLTDebug("release segment of task %p", *src);
+           assert((*src)!=NULL);
+           if ((*src)!=NULL) {
+             if ((*src)->Release(pBlockDesc, pOwnerTask)>=0) {
+               HLTDebug("task %s (%p) released forwarded segment %p size %d of task %s (%p)",
+                        pOwnerTask->GetName(), pOwnerTask, (*segment).GetPtr(), (*segment).GetSize(),
+                        (*src)->GetName(), *src);
+             } else {
+               HLTError("task %s (%p) failed releasing forwarded segment %p size %d of task %s (%p)",
+                        pOwnerTask->GetName(), pOwnerTask, (*segment).GetPtr(), (*segment).GetSize(),
+                        (*src)->GetName(), *src);
+             }
+           }
+           fForwardedSegments.erase(segment);
+           fForwardedSegmentSources.erase(src);
+           break;
+         }
+       }
       }
+      pBlockDesc->fOffset=0;
+      pBlockDesc->fPtr=NULL;
+      pBlockDesc->fSize=0;
       if (pDesc->GetNofActiveSegments()==0) {
        if ((iResult=ChangeConsumerState(pDesc, fActiveConsumers, fReleasedConsumers))>=0) {
          if (GetNofActiveConsumers()==0 && GetNofPendingConsumers()==0) {
@@ -281,6 +320,17 @@ int AliHLTDataBuffer::Release(AliHLTComponentBlockData* pBlockDesc, const AliHLT
   return iResult;
 }
 
+int AliHLTDataBuffer::Forward(AliHLTTask* pSrcTask, AliHLTComponentBlockData* pBlockDesc)
+{
+  // see header file for function documentation
+  if (pSrcTask==NULL || pBlockDesc==NULL) return -EINVAL;
+  assert(fForwardedSegments.size()==fForwardedSegmentSources.size());
+  if (fForwardedSegments.size()!=fForwardedSegmentSources.size()) return -EFAULT;
+  fForwardedSegmentSources.push_back(pSrcTask);
+  fForwardedSegments.push_back(AliHLTDataSegment(pBlockDesc->fPtr, pBlockDesc->fOffset, pBlockDesc->fSize));
+  return 0;
+}
+
 AliHLTUInt8_t* AliHLTDataBuffer::GetTargetBuffer(int iMinSize)
 {
   // see header file for function documentation
@@ -309,11 +359,14 @@ int AliHLTDataBuffer::SetSegments(AliHLTUInt8_t* pTgt, AliHLTComponentBlockData*
          // This function has to model the behavior of PubSub
          // For output blocks only the fOffset value is used, this must be the offset
          // relative to the output pointer. fPtr must be either NULL or the output
-         // pointer
+         // pointer. In either case it is 'ignored' and set to the beginning of the
+         // data buffer
          if (arrayBlockData[i].fPtr==NULL ||
              arrayBlockData[i].fPtr==*fpBuffer) {
+           arrayBlockData[i].fPtr=*fpBuffer;
            if (arrayBlockData[i].fOffset+arrayBlockData[i].fSize<=fpBuffer->GetUsedSize()) {
              segment.fSegmentOffset=arrayBlockData[i].fOffset;
+             segment.fPtr=(AliHLTUInt8_t*)arrayBlockData[i].fPtr;
              segment.fSegmentSize=arrayBlockData[i].fSize;
              segment.fDataType=arrayBlockData[i].fDataType;
              segment.fSpecification=arrayBlockData[i].fSpecification;
@@ -498,6 +551,11 @@ int AliHLTDataBuffer::ResetDataBuffer()
   AliHLTRawBuffer* pBuffer=fpBuffer;
   fpBuffer=NULL;
 
+  // cleanup forwarded segment lists
+  assert(fForwardedSegments.size()==0);
+  fForwardedSegments.clear();
+  fForwardedSegmentSources.clear();
+
   // cleanup consumer states
   AliHLTConsumerDescriptorPList::iterator desc;
 //   if (GetNofPendingConsumers()>0) {