#include "AliHLTDataBuffer.h"
#include "AliHLTConsumerDescriptor.h"
#include "AliHLTComponent.h"
+#include "AliHLTTask.h"
#include <cerrno>
#include <cassert>
//#include <string>
fActiveConsumers(),
fReleasedConsumers(),
fpBuffer(NULL),
- fFlags(0)
+ fFlags(0),
+ fForwardedSegmentSources(),
+ fForwardedSegments()
{
// see header file for class documentation
// or
// all blocks are passed to the consumer, which is the policy also in
// PubSub
tgtList.assign(fSegments.begin(), fSegments.end());
+
+ // add all forwarded blocks
+ tgtList.insert(tgtList.begin(), fForwardedSegments.begin(), fForwardedSegments.end());
iResult=tgtList.size();
return iResult;
// see header file for function documentation
int iResult=0;
if (pConsumer && arrayBlockDesc) {
- if (fpBuffer) {
+ if (1/*fpBuffer*/) {
AliHLTConsumerDescriptor* pDesc=FindConsumer(pConsumer, fConsumers);
if (pDesc) {
AliHLTDataSegmentList tgtList;
// For incoming data blocks, fOffset must be ignored by the
// processing component. It is set for bookkeeping in the framework.
// fPtr always points to the beginning of the data.
- arrayBlockDesc[i].fOffset=(*segment).fSegmentOffset;
- AliHLTUInt8_t* pTgt=*fpBuffer;
- pTgt+=(*segment).fSegmentOffset;
+ arrayBlockDesc[i].fOffset=0;
+ AliHLTUInt8_t* pTgt=*segment;
arrayBlockDesc[i].fPtr=reinterpret_cast<void*>(pTgt);
arrayBlockDesc[i].fSize=(*segment).fSegmentSize;
arrayBlockDesc[i].fDataType=(*segment).fDataType;
arrayBlockDesc[i].fSpecification=(*segment).fSpecification;
- pDesc->SetActiveDataSegment(arrayBlockDesc[i].fOffset, arrayBlockDesc[i].fSize);
+ pDesc->SetActiveDataSegment(*segment);
HLTDebug("component %p (%s) subscribed to segment #%d offset %d size %d data type %s %#x",
pConsumer, ((AliHLTComponent*)pConsumer)->GetComponentID(), i, arrayBlockDesc[i].fOffset,
arrayBlockDesc[i].fSize, (AliHLTComponent::DataType2Text(arrayBlockDesc[i].fDataType)).c_str(),
iResult=-ENOSPC;
} else {
// move this consumer to the active list
- if (ChangeConsumerState(pDesc, fConsumers, fActiveConsumers)>=0) {
+ if (i==0) {
+ ChangeConsumerState(pDesc, fConsumers, fReleasedConsumers);
+ HLTDebug("no input data for component %p (%s) available", pConsumer, ((AliHLTComponent*)pConsumer)->GetComponentID());
+ } else if (ChangeConsumerState(pDesc, fConsumers, fActiveConsumers)>=0) {
HLTDebug("component %p (%s) subscribed to data buffer %p", pConsumer, ((AliHLTComponent*)pConsumer)->GetComponentID(), this);
} else {
// TODO: cleanup the consumer descriptor correctly
return iResult;
}
-int AliHLTDataBuffer::Release(AliHLTComponentBlockData* pBlockDesc, const AliHLTComponent* pConsumer)
+int AliHLTDataBuffer::Release(AliHLTComponentBlockData* pBlockDesc,
+ const AliHLTComponent* pConsumer,
+ const AliHLTTask* pOwnerTask)
{
// see header file for function documentation
int iResult=0;
if (pBlockDesc && pConsumer) {
AliHLTConsumerDescriptor* pDesc=FindConsumer(pConsumer, fActiveConsumers);
if (pDesc) {
- if ((iResult=pDesc->CheckActiveDataSegment(pBlockDesc->fOffset, pBlockDesc->fSize))!=1) {
+ if ((iResult=pDesc->CheckActiveDataSegment(AliHLTDataSegment(pBlockDesc->fPtr, pBlockDesc->fOffset, pBlockDesc->fSize)))!=1) {
HLTWarning("data segment missmatch, component %p has not subscribed to a segment with offset %#x and size %d", pConsumer, pBlockDesc->fOffset, pBlockDesc->fSize);
// TODO: appropriate error handling, but so far optional
iResult=0;
} else {
- pDesc->ReleaseActiveDataSegment(pBlockDesc->fOffset, pBlockDesc->fSize);
- pBlockDesc->fOffset=0;
- pBlockDesc->fPtr=NULL;
- pBlockDesc->fSize=0;
+ pDesc->ReleaseActiveDataSegment(AliHLTDataSegment(pBlockDesc->fPtr, pBlockDesc->fOffset, pBlockDesc->fSize));
+ }
+ if (GetNofPendingConsumers()==0 && fForwardedSegments.size()>0) {
+ // last consumer, release forwarded segments
+ assert(fForwardedSegments.size()==fForwardedSegmentSources.size());
+ AliHLTDataSegmentList::iterator segment=fForwardedSegments.begin();
+ AliHLTTaskPList::iterator src=fForwardedSegmentSources.begin();
+ //HLTDebug("%p checking forwarded segments", this);
+ for (; segment!=fForwardedSegments.end(); segment++, src++) {
+ //HLTDebug("segment ptr=%p offset=%d size=%d\n"
+ // "block ptr=%p offset=%d size=%d", (*segment).fPtr, (*segment).fSegmentOffset, (*segment).fSegmentSize, pBlockDesc->fPtr, pBlockDesc->fOffset, pBlockDesc->fSize);
+ if ((*segment)==AliHLTDataSegment(pBlockDesc->fPtr, pBlockDesc->fOffset, pBlockDesc->fSize)) {
+ //HLTDebug("release segment of task %p", *src);
+ assert((*src)!=NULL);
+ if ((*src)!=NULL) {
+ if ((*src)->Release(pBlockDesc, pOwnerTask)>=0) {
+ HLTDebug("task %s (%p) released forwarded segment %p size %d of task %s (%p)",
+ pOwnerTask->GetName(), pOwnerTask, (*segment).GetPtr(), (*segment).GetSize(),
+ (*src)->GetName(), *src);
+ } else {
+ HLTError("task %s (%p) failed releasing forwarded segment %p size %d of task %s (%p)",
+ pOwnerTask->GetName(), pOwnerTask, (*segment).GetPtr(), (*segment).GetSize(),
+ (*src)->GetName(), *src);
+ }
+ }
+ fForwardedSegments.erase(segment);
+ fForwardedSegmentSources.erase(src);
+ break;
+ }
+ }
}
+ pBlockDesc->fOffset=0;
+ pBlockDesc->fPtr=NULL;
+ pBlockDesc->fSize=0;
if (pDesc->GetNofActiveSegments()==0) {
if ((iResult=ChangeConsumerState(pDesc, fActiveConsumers, fReleasedConsumers))>=0) {
if (GetNofActiveConsumers()==0 && GetNofPendingConsumers()==0) {
return iResult;
}
+int AliHLTDataBuffer::Forward(AliHLTTask* pSrcTask, AliHLTComponentBlockData* pBlockDesc)
+{
+ // see header file for function documentation
+ if (pSrcTask==NULL || pBlockDesc==NULL) return -EINVAL;
+ assert(fForwardedSegments.size()==fForwardedSegmentSources.size());
+ if (fForwardedSegments.size()!=fForwardedSegmentSources.size()) return -EFAULT;
+ fForwardedSegmentSources.push_back(pSrcTask);
+ fForwardedSegments.push_back(AliHLTDataSegment(pBlockDesc->fPtr, pBlockDesc->fOffset, pBlockDesc->fSize));
+ return 0;
+}
+
AliHLTUInt8_t* AliHLTDataBuffer::GetTargetBuffer(int iMinSize)
{
// see header file for function documentation
// This function has to model the behavior of PubSub
// For output blocks only the fOffset value is used, this must be the offset
// relative to the output pointer. fPtr must be either NULL or the output
- // pointer
+ // pointer. In either case it is 'ignored' and set to the beginning of the
+ // data buffer
if (arrayBlockData[i].fPtr==NULL ||
arrayBlockData[i].fPtr==*fpBuffer) {
+ arrayBlockData[i].fPtr=*fpBuffer;
if (arrayBlockData[i].fOffset+arrayBlockData[i].fSize<=fpBuffer->GetUsedSize()) {
segment.fSegmentOffset=arrayBlockData[i].fOffset;
+ segment.fPtr=(AliHLTUInt8_t*)arrayBlockData[i].fPtr;
segment.fSegmentSize=arrayBlockData[i].fSize;
segment.fDataType=arrayBlockData[i].fDataType;
segment.fSpecification=arrayBlockData[i].fSpecification;
AliHLTRawBuffer* pBuffer=fpBuffer;
fpBuffer=NULL;
+ // cleanup forwarded segment lists
+ assert(fForwardedSegments.size()==0);
+ fForwardedSegments.clear();
+ fForwardedSegmentSources.clear();
+
// cleanup consumer states
AliHLTConsumerDescriptorPList::iterator desc;
// if (GetNofPendingConsumers()>0) {