if (iResult>=0) {
if (fBlockDataArray.size()>0) {
HLTWarning("block data array for task %s (%p) was not cleaned", GetName(), this);
- fBlockDataArray.resize(0);
+ fBlockDataArray.clear();
}
// component init
// of the component.
//iResult=Init( AliHLTComponentEnvironment* environ, void* environ_param, int argc, const char** argv );
- // allocate internal task variables for bookkeeping aso.
- // we allocate the BlockData array with at least one member
- if (iNofInputDataBlocks==0) iNofInputDataBlocks=1;
- AliHLTComponentBlockData init;
- memset(&init, 0, sizeof(AliHLTComponentBlockData));
- fBlockDataArray.resize(iNofInputDataBlocks, init);
-
// allocate the data buffer, which controls the output buffer and subscriptions
if (iResult>=0) {
fpDataBuffer=new AliHLTDataBuffer;
}
}
}
+ if (iResult>=0) {
+ // send the SOR event
+
+ }
} else {
HLTError("task %s (%p) does not have a component", GetName(), this);
iResult=-EFAULT;
// see header file for function documentation
int iResult=0;
if (fBlockDataArray.size()>0) {
- fBlockDataArray.resize(0);
- } else {
- HLTWarning("task %s (%p) doesn't seem to be in running mode", GetName(), this);
+ fBlockDataArray.clear();
}
if (fpDataBuffer) {
AliHLTDataBuffer* pBuffer=fpDataBuffer;
AliHLTTaskPList subscribedTaskList;
TObjLink* lnk=fListDependencies.FirstLink();
+ // instances of SOR and EOR events to be kept
+ int iSOR=-1;
+ int iEOR=-1;
+
// subscribe to all source tasks
+ fBlockDataArray.clear();
while (lnk && iResult>=0) {
pSrcTask=(AliHLTTask*)lnk->GetObject();
if (pSrcTask) {
int iMatchingDB=pSrcTask->GetNofMatchingDataBlocks(this);
- if (iMatchingDB>=0 && static_cast<unsigned int>(iMatchingDB)>fBlockDataArray.size()-iSourceDataBlock) {
- AliHLTComponentBlockData init;
- memset(&init, 0, sizeof(AliHLTComponentBlockData));
- fBlockDataArray.resize(iSourceDataBlock+iMatchingDB, init);
- } else {
- if (iMatchingDB<0) {
- HLTError("task %s (%p): error getting no of matching data blocks from task %s (%p), error %d", GetName(), this, pSrcTask->GetName(), pSrcTask, iMatchingDB);
- iResult=iMatchingDB;
- break;
- } else if (iMatchingDB==0) {
- HLTDebug("source task %s (%p) does not provide any matching data type for task %s (%p)", pSrcTask->GetName(), pSrcTask, GetName(), this);
- }
+ if (iMatchingDB<0) {
+ HLTError("task %s (%p): error getting no of matching data blocks from task %s (%p), error %d", GetName(), this, pSrcTask->GetName(), pSrcTask, iMatchingDB);
+ iResult=iMatchingDB;
+ break;
+ } else if (iMatchingDB==0) {
+ HLTDebug("source task %s (%p) does not provide any matching data type for task %s (%p)", pSrcTask->GetName(), pSrcTask, GetName(), this);
}
- if ((iResult=pSrcTask->Subscribe(this, &fBlockDataArray[iSourceDataBlock],fBlockDataArray.size()-iSourceDataBlock))>=0) {
- for (int i=0; i<iResult; i++) {
- iInputDataVolume+=fBlockDataArray[i+iSourceDataBlock].fSize;
+ if ((iResult=pSrcTask->Subscribe(this, fBlockDataArray))>=0) {
+ iSOR=iEOR=-1;
+ AliHLTComponentBlockDataList::iterator block=fBlockDataArray.begin();
+ for (int i=0; block!=fBlockDataArray.end(); i++) {
+ bool bRemove=0;
+ bRemove|=(*block).fDataType==kAliHLTDataTypeSOR && !(iSOR<0 && (iSOR=i)>=0);
+ bRemove|=(*block).fDataType==kAliHLTDataTypeEOR && !(iEOR<0 && (iEOR=i)>=0);
+ //HLTInfo("block %d, iSOR=%d iEOR=%d remove=%d", i, iSOR, iEOR, bRemove);
+ if (i<iSourceDataBlock) {
+ assert(!bRemove);
+ } else if (bRemove) {
+ HLTDebug("remove duplicated event %s (%d)", AliHLTComponent::DataType2Text((*block).fDataType).c_str(), i);
+ pSrcTask->Release(&(*block), this);
+ block=fBlockDataArray.erase(block);
+ continue;
+ } else {
+ iInputDataVolume+=(*block).fSize;
// put the source task as many times into the list as it provides data blocks
// makes the bookkeeping for the data release easier
subscribedTaskList.push_back(pSrcTask);
+ }
+ block++;
}
HLTDebug("Task %s (%p) successfully subscribed to %d data block(s) of task %s (%p)", GetName(), this, iResult, pSrcTask->GetName(), pSrcTask);
- iSourceDataBlock+=iResult;
+ iSourceDataBlock=fBlockDataArray.size();
iResult=0;
} else {
HLTError("Task %s (%p): subscription to task %s (%p) failed with error %d", GetName(), this, pSrcTask->GetName(), pSrcTask, iResult);
iResult=-EFAULT;
}
lnk=lnk->Next();
- }
+ }
// process the event
int iNofTrial=0; // repeat processing if component returns -ENOSPC
iResult=pComponent->ProcessEvent(evtData, &fBlockDataArray[0], trigData, pTgtBuffer, size, outputBlockCnt, outputBlocks, edd);
HLTDebug("task %s: component %s ProcessEvent finnished (%d): size=%d blocks=%d", GetName(), pComponent->GetComponentID(), iResult, size, outputBlockCnt);
if (iResult>=0 && outputBlocks) {
- AliHLTComponentBlockDataList segments;
- for (AliHLTUInt32_t oblock=0; oblock<outputBlockCnt; oblock++) {
- AliHLTUInt32_t iblock=0;
- for (; iblock<evtData.fBlockCnt; iblock++) {
- if (fBlockDataArray[iblock].fPtr==outputBlocks[oblock].fPtr) {
- assert(subscribedTaskList[iblock]!=NULL);
- if (subscribedTaskList[iblock]==NULL) continue;
- HLTDebug("forward segment %d (source task %s %p) to data buffer %p", iblock, pSrcTask->GetName(), pSrcTask, fpDataBuffer);
- fpDataBuffer->Forward(subscribedTaskList[iblock], &fBlockDataArray[iblock]);
- subscribedTaskList[iblock]=NULL; // not to be released in the loop further down
- break;
+ if (fListTargets.First()!=NULL) {
+ AliHLTComponentBlockDataList segments;
+ for (AliHLTUInt32_t oblock=0; oblock<outputBlockCnt; oblock++) {
+ AliHLTUInt32_t iblock=0;
+ for (; iblock<evtData.fBlockCnt; iblock++) {
+ if (fBlockDataArray[iblock].fPtr==outputBlocks[oblock].fPtr) {
+ assert(subscribedTaskList[iblock]!=NULL);
+ if (subscribedTaskList[iblock]==NULL) continue;
+ HLTDebug("forward segment %d (source task %s %p) to data buffer %p", iblock, pSrcTask->GetName(), pSrcTask, fpDataBuffer);
+ fpDataBuffer->Forward(subscribedTaskList[iblock], &fBlockDataArray[iblock]);
+ subscribedTaskList[iblock]=NULL; // not to be released in the loop further down
+ break;
+ }
+ }
+ if (iblock==evtData.fBlockCnt) segments.push_back(outputBlocks[oblock]);
+ if (pTgtBuffer && segments.size()>0) {
+ iResult=fpDataBuffer->SetSegments(pTgtBuffer, &segments[0], segments.size());
}
}
- if (iblock==evtData.fBlockCnt) segments.push_back(outputBlocks[oblock]);
- }
- if (pTgtBuffer && segments.size()>0) {
- iResult=fpDataBuffer->SetSegments(pTgtBuffer, &segments[0], segments.size());
+ } else {
+ // no forwarding, actually we dont even need to keep the data, this is a
+ // dead end (fListTargets empty)
+ //iResult=fpDataBuffer->SetSegments(pTgtBuffer, outputBlocks, outputBlockCnt);
}
delete [] outputBlocks; outputBlocks=NULL; outputBlockCnt=0;
} else {
fpDataBuffer->Reset();
}
+ if (fListTargets.First()!=NULL) {
+ if (iSOR>=0 && subscribedTaskList[iSOR]!=NULL) {
+ HLTDebug("forward SOR event segment %d (source task %s %p) to data buffer %p", iSOR, pSrcTask->GetName(), pSrcTask, fpDataBuffer);
+ fpDataBuffer->Forward(subscribedTaskList[iSOR], &fBlockDataArray[iSOR]);
+ subscribedTaskList[iSOR]=NULL; // not to be released in the loop further down
+ }
+ if (iEOR>=0 && subscribedTaskList[iEOR]!=NULL) {
+ HLTDebug("forward EOR event (%s) segment %d (source task %s %p) to data buffer %p", AliHLTComponent::DataType2Text(fBlockDataArray[iEOR].fDataType).c_str(), iEOR, pSrcTask->GetName(), pSrcTask, fpDataBuffer);
+ fpDataBuffer->Forward(subscribedTaskList[iEOR], &fBlockDataArray[iEOR]);
+ subscribedTaskList[iEOR]=NULL; // not to be released in the loop further down
+ }
+ }
} else {
HLTError("task %s: no target buffer available", GetName());
iResult=-EFAULT;
return iResult;
}
-int AliHLTTask::Subscribe(const AliHLTTask* pConsumerTask, AliHLTComponentBlockData* pBlockDesc, int iArraySize)
+int AliHLTTask::Subscribe(const AliHLTTask* pConsumerTask, AliHLTComponentBlockDataList& blockDescList)
{
// see header file for function documentation
int iResult=0;
if (pConsumerTask) {
if (fpDataBuffer) {
- iResult=fpDataBuffer->Subscribe(pConsumerTask->GetComponent(), pBlockDesc, iArraySize);
+ iResult=fpDataBuffer->Subscribe(pConsumerTask->GetComponent(), blockDescList);
} else {
HLTFatal("internal data buffer missing");
iResult=-EFAULT;