return iResult;
}
-int AliHLTDataBuffer::Subscribe(const AliHLTComponent* pConsumer, AliHLTComponentBlockData* arrayBlockDesc, int iArraySize)
+int AliHLTDataBuffer::Subscribe(const AliHLTComponent* pConsumer, AliHLTComponentBlockDataList& blockDescList)
{
// see header file for function documentation
int iResult=0;
- if (pConsumer && arrayBlockDesc) {
+ if (pConsumer) {
if (1/*fpBuffer*/) {
AliHLTConsumerDescriptor* pDesc=FindConsumer(pConsumer, fConsumers);
if (pDesc) {
// so it does not matter if there are matching data types or not, unless
// we implement such a check in PubSub
if ((iResult=FindMatchingDataSegments(pConsumer, tgtList))>=0) {
- int i =0;
AliHLTDataSegmentList::iterator segment=tgtList.begin();
- while (segment!=tgtList.end() && i<iArraySize) {
+ while (segment!=tgtList.end()) {
// fill the block data descriptor
- arrayBlockDesc[i].fStructSize=sizeof(AliHLTComponentBlockData);
- // the shared memory key is not used in AliRoot
- arrayBlockDesc[i].fShmKey.fStructSize=sizeof(AliHLTComponentShmData);
- arrayBlockDesc[i].fShmKey.fShmType=gkAliHLTComponentInvalidShmType;
- arrayBlockDesc[i].fShmKey.fShmID=gkAliHLTComponentInvalidShmID;
+ AliHLTComponentBlockData bd;
+ AliHLTComponent::FillBlockData(bd);
// This models the behavior of PubSub.
// For incoming data blocks, fOffset must be ignored by the
// processing component. It is set for bookkeeping in the framework.
// fPtr always points to the beginning of the data.
- arrayBlockDesc[i].fOffset=0;
+ bd.fOffset=0;
AliHLTUInt8_t* pTgt=*segment;
- arrayBlockDesc[i].fPtr=reinterpret_cast<void*>(pTgt);
- arrayBlockDesc[i].fSize=(*segment).fSegmentSize;
- arrayBlockDesc[i].fDataType=(*segment).fDataType;
- arrayBlockDesc[i].fSpecification=(*segment).fSpecification;
+ bd.fPtr=reinterpret_cast<void*>(pTgt);
+ bd.fSize=(*segment).fSegmentSize;
+ bd.fDataType=(*segment).fDataType;
+ bd.fSpecification=(*segment).fSpecification;
+ blockDescList.push_back(bd);
pDesc->SetActiveDataSegment(*segment);
- HLTDebug("component %p (%s) subscribed to segment #%d offset %d size %d data type %s %#x",
- pConsumer, ((AliHLTComponent*)pConsumer)->GetComponentID(), i, arrayBlockDesc[i].fOffset,
- arrayBlockDesc[i].fSize, (AliHLTComponent::DataType2Text(arrayBlockDesc[i].fDataType)).c_str(),
- arrayBlockDesc[i].fSpecification);
- i++;
+ HLTDebug("component %p (%s) subscribed to segment offset %d size %d data type %s %#x",
+ pConsumer, ((AliHLTComponent*)pConsumer)->GetComponentID(), bd.fOffset,
+ bd.fSize, (AliHLTComponent::DataType2Text(bd.fDataType)).c_str(),
+ bd.fSpecification);
segment++;
}
- // check whether there was enough space for the segments
- if (i!=(int)tgtList.size()) {
- HLTError("too little space in block descriptor array: required %d, available %d", tgtList.size(), iArraySize);
- iResult=-ENOSPC;
- } else {
// move this consumer to the active list
- if (i==0) {
+ if (tgtList.size()==0) {
ChangeConsumerState(pDesc, fConsumers, fReleasedConsumers);
HLTDebug("no input data for component %p (%s) available", pConsumer, ((AliHLTComponent*)pConsumer)->GetComponentID());
} else if (ChangeConsumerState(pDesc, fConsumers, fActiveConsumers)>=0) {
HLTDebug("component %p (%s) subscribed to data buffer %p", pConsumer, ((AliHLTComponent*)pConsumer)->GetComponentID(), this);
} else {
// TODO: cleanup the consumer descriptor correctly
- memset(arrayBlockDesc, 0, iArraySize*sizeof(AliHLTComponentBlockData));
+ segment=tgtList.begin();
+ while (segment!=tgtList.end()) {
+ blockDescList.pop_back();
+ segment++;
+ }
HLTError("can not activate consumer %p for data buffer %p", pConsumer, this);
iResult=-EACCES;
}
- }
} else {
HLTError("unresolved data segment(s) for component %p (%s)", pConsumer, ((AliHLTComponent*)pConsumer)->GetComponentID());
iResult=-EBADF;
assert(fForwardedSegments.size()==fForwardedSegmentSources.size());
if (fForwardedSegments.size()!=fForwardedSegmentSources.size()) return -EFAULT;
fForwardedSegmentSources.push_back(pSrcTask);
- fForwardedSegments.push_back(AliHLTDataSegment(pBlockDesc->fPtr, pBlockDesc->fOffset, pBlockDesc->fSize));
+ fForwardedSegments.push_back(AliHLTDataSegment(pBlockDesc->fPtr, pBlockDesc->fOffset, pBlockDesc->fSize, pBlockDesc->fDataType, pBlockDesc->fSpecification));
return 0;
}
AliHLTComponentBlockData* arrayBlockDesc,
int iArraySize);
+ int Subscribe(const AliHLTComponent* pConsumer,
+ AliHLTComponentBlockDataList& blockDescList);
+
/**
* Release an instance of the data buffer.
* Resets the variables of the block descriptor.
// visit http://web.ift.uib.no/~kjeks/doc/alice-hlt
}
+void* AliHLTDataSource::fgpSpecialEvent=NULL;
+int AliHLTDataSource::fgSpecialEventSize=0;
+AliHLTComponentDataType AliHLTDataSource::fgSpecialEventDataType=kAliHLTVoidDataType;
+AliHLTUInt32_t AliHLTDataSource::fgSpecialEventSpecification=kAliHLTVoidDataSpec;
+
AliHLTDataSource::~AliHLTDataSource()
{
// see header file for class documentation
int AliHLTDataSource::DoProcessing( const AliHLTComponentEventData& evtData,
- const AliHLTComponentBlockData* blocks,
+ const AliHLTComponentBlockData* /*blocks*/,
AliHLTComponentTriggerData& trigData,
AliHLTUInt8_t* outputPtr,
AliHLTUInt32_t& size,
{
// see header file for class documentation
int iResult=0;
- if (blocks) {
- // this is currently just to get rid of the warning "unused parameter"
- }
if (evtData.fBlockCnt > 0) {
HLTWarning("Data source component skips input data blocks");
}
- iResult=GetEvent(evtData, trigData, outputPtr, size, outputBlocks);
- HLTDebug("component %s (%p) GetEvent finished (%d)", GetComponentID(), this, iResult);
+ if (fgpSpecialEvent==NULL || fgSpecialEventSize==0) {
+ // normal event publishing
+ iResult=GetEvent(evtData, trigData, outputPtr, size, outputBlocks);
+ HLTDebug("component %s (%p) GetEvent finished (%d)", GetComponentID(), this, iResult);
+ } else {
+ // publish special event
+ if (size>=fgSpecialEventSize) {
+ memcpy(outputPtr, fgpSpecialEvent, fgSpecialEventSize);
+ AliHLTComponentBlockData bd;
+ FillBlockData(bd);
+ bd.fOffset=0;
+ bd.fSize=fgSpecialEventSize;
+ bd.fDataType=fgSpecialEventDataType;
+ bd.fSpecification=fgSpecialEventSpecification;
+ outputBlocks.push_back(bd);
+ size=bd.fSize;
+ } else {
+ iResult=-ENOSPC;
+ }
+ }
edd = NULL;
return iResult;
}
HLTFatal("no processing method implemented");
return -ENOSYS;
}
+
+AliHLTDataSource::AliSpecialEventGuard::AliSpecialEventGuard(AliHLTRunDesc* pDesc, AliHLTComponentDataType dt, AliHLTUInt32_t spec)
+{
+ AliHLTDataSource::fgpSpecialEvent=pDesc;
+ AliHLTDataSource::fgSpecialEventSize=sizeof(AliHLTRunDesc);
+ AliHLTDataSource::fgSpecialEventDataType=dt;
+ AliHLTDataSource::fgSpecialEventSpecification=spec;
+}
+
+AliHLTDataSource::AliSpecialEventGuard::~AliSpecialEventGuard()
+{
+ AliHLTDataSource::fgpSpecialEvent=NULL;
+ AliHLTDataSource::fgSpecialEventSize=0;
+ AliHLTDataSource::fgSpecialEventDataType=kAliHLTVoidDataType;
+ AliHLTDataSource::fgSpecialEventSpecification=kAliHLTVoidDataSpec;
+}
*/
void GetInputDataTypes( vector<AliHLTComponentDataType>& list);
- protected:
+ /**
+ * @class AliSpecialEventGuard
+ * Guard structure to set the data sources into 'special event publishing'
+ * mode. The SOR and EOR events are generated by all the data sources and
+ * perculated through the chain as normal events. The AliSpecialEventGuard
+ * is a back-door mechansim to trigger publishing of the special event
+ * described by the run descriptor instead of the publishing of real data.
+ *
+ * The descriptor has to be valid throughout the lifetime of the guard.
+ */
+ class AliSpecialEventGuard {
+ public:
+ /** constructor, set run descriptor */
+ AliSpecialEventGuard(AliHLTRunDesc* pDesc, AliHLTComponentDataType dt, AliHLTUInt32_t spec);
+ /** destructor, reset run descriptor */
+ ~AliSpecialEventGuard();
+ };
+
+protected:
/**
* The low-level data processing method for the component.
*/
virtual int GetEvent( const AliHLTComponentEventData& evtData, AliHLTComponentTriggerData& trigData);
- ClassDef(AliHLTDataSource, 1)
+private:
+ /** pointer to the special event going to be published */
+ static void* fgpSpecialEvent; //! transient
+ /** data size of the special event going to be published */
+ static int fgSpecialEventSize; //! transient
+ /** data type of the special event going to be published */
+ static AliHLTComponentDataType fgSpecialEventDataType; //! transient
+ /** data specification of the special event going to be published */
+ static AliHLTUInt32_t fgSpecialEventSpecification; //! transient
+
+ ClassDef(AliHLTDataSource, 2)
};
#endif
AliHLTComponentTriggerData& trigData,
AliHLTUInt8_t* outputPtr,
AliHLTUInt32_t& size,
- vector<AliHLTComponentBlockData>& outputBlocks,
+ AliHLTComponentBlockDataList& outputBlocks,
AliHLTComponentEventDoneData*& edd );
// Information member functions for registration.
AliHLTComponentTriggerData& trigData,
AliHLTUInt8_t* outputPtr,
AliHLTUInt32_t& size,
- vector<AliHLTComponentBlockData>& outputBlocks );
+ AliHLTComponentBlockDataList& outputBlocks );
/**
* The high-level data processing method.
#include "AliHLTTask.h"
#include "AliHLTModuleAgent.h"
#include "AliHLTOfflineInterface.h"
+#include "AliHLTDataSource.h"
#include <TObjArray.h>
#include <TObjString.h>
#include <TStopwatch.h>
} else {
fEventCount=0;
fGoodEvents=0;
+ if ((iResult=SendControlEvent(kAliHLTDataTypeSOR))<0) {
+ HLTError("can not send SOR event");
+ }
}
return iResult;
}
{
// see header file for class documentation
int iResult=0;
+ if ((iResult=SendControlEvent(kAliHLTDataTypeEOR))<0) {
+ HLTError("can not send EOR event");
+ }
TObjLink *lnk=fTaskList.FirstLink();
- while (lnk && iResult>=0) {
+ while (lnk) {
TObject* obj=lnk->GetObject();
if (obj) {
AliHLTTask* pTask=(AliHLTTask*)obj;
- iResult=pTask->EndRun();
+ int locResult=pTask->EndRun();
+ if (iResult>=0 && locResult<0) iResult=locResult;
// ProcInfo_t ProcInfo;
// gSystem->GetProcInfo(&ProcInfo);
// HLTInfo("task %s stopped (%d), current memory usage %d %d", pTask->GetName(), iResult, ProcInfo.fMemResident, ProcInfo.fMemVirtual);
return iResult;
}
+int AliHLTSystem::SendControlEvent(AliHLTComponentDataType dt)
+{
+ // see header file for class documentation
+
+ // disabled for the moment
+ return 0;
+
+ int iResult=0;
+ AliHLTRunDesc runDesc;
+ memset(&runDesc, 0, sizeof(AliHLTRunDesc));
+ runDesc.fStructSize=sizeof(AliHLTRunDesc);
+ AliHLTDataSource::AliSpecialEventGuard g(&runDesc, dt, kAliHLTVoidDataSpec);
+ HLTDebug("sending event %s, run descriptor %p", AliHLTComponent::DataType2Text(dt).c_str(), &runDesc);
+ TObjLink *lnk=fTaskList.FirstLink();
+ while (lnk && iResult>=0) {
+ TObject* obj=lnk->GetObject();
+ if (obj) {
+ AliHLTTask* pTask=(AliHLTTask*)obj;
+ iResult=pTask->ProcessTask(-1);
+ } else {
+ }
+ lnk = lnk->Next();
+ }
+ HLTDebug("event %s done (%d)", AliHLTComponent::DataType2Text(dt).c_str(), iResult);
+ return iResult;
+}
+
int AliHLTSystem::DeinitTasks()
{
// see header file for class documentation
*/
int StopTasks();
+ /**
+ * Send a control event trough the chain.
+ * All data sources in the chain are switched to publish a control event like
+ * SOR or EOR. The event is propagated in the same way as a normal event.
+ * @param dt type of the event
+ */
+ int SendControlEvent(AliHLTComponentDataType dt);
+
/**
* De-init all tasks from the list.
* The @ref AliHLTTask::Deinit method is called for each task, the components
if (iResult>=0) {
if (fBlockDataArray.size()>0) {
HLTWarning("block data array for task %s (%p) was not cleaned", GetName(), this);
- fBlockDataArray.resize(0);
+ fBlockDataArray.clear();
}
// component init
// of the component.
//iResult=Init( AliHLTComponentEnvironment* environ, void* environ_param, int argc, const char** argv );
- // allocate internal task variables for bookkeeping aso.
- // we allocate the BlockData array with at least one member
- if (iNofInputDataBlocks==0) iNofInputDataBlocks=1;
- AliHLTComponentBlockData init;
- memset(&init, 0, sizeof(AliHLTComponentBlockData));
- fBlockDataArray.resize(iNofInputDataBlocks, init);
-
// allocate the data buffer, which controls the output buffer and subscriptions
if (iResult>=0) {
fpDataBuffer=new AliHLTDataBuffer;
}
}
}
+ if (iResult>=0) {
+ // send the SOR event
+
+ }
} else {
HLTError("task %s (%p) does not have a component", GetName(), this);
iResult=-EFAULT;
// see header file for function documentation
int iResult=0;
if (fBlockDataArray.size()>0) {
- fBlockDataArray.resize(0);
- } else {
- HLTWarning("task %s (%p) doesn't seem to be in running mode", GetName(), this);
+ fBlockDataArray.clear();
}
if (fpDataBuffer) {
AliHLTDataBuffer* pBuffer=fpDataBuffer;
AliHLTTaskPList subscribedTaskList;
TObjLink* lnk=fListDependencies.FirstLink();
+ // instances of SOR and EOR events to be kept
+ int iSOR=-1;
+ int iEOR=-1;
+
// subscribe to all source tasks
+ fBlockDataArray.clear();
while (lnk && iResult>=0) {
pSrcTask=(AliHLTTask*)lnk->GetObject();
if (pSrcTask) {
int iMatchingDB=pSrcTask->GetNofMatchingDataBlocks(this);
- if (iMatchingDB>=0 && static_cast<unsigned int>(iMatchingDB)>fBlockDataArray.size()-iSourceDataBlock) {
- AliHLTComponentBlockData init;
- memset(&init, 0, sizeof(AliHLTComponentBlockData));
- fBlockDataArray.resize(iSourceDataBlock+iMatchingDB, init);
- } else {
- if (iMatchingDB<0) {
- HLTError("task %s (%p): error getting no of matching data blocks from task %s (%p), error %d", GetName(), this, pSrcTask->GetName(), pSrcTask, iMatchingDB);
- iResult=iMatchingDB;
- break;
- } else if (iMatchingDB==0) {
- HLTDebug("source task %s (%p) does not provide any matching data type for task %s (%p)", pSrcTask->GetName(), pSrcTask, GetName(), this);
- }
+ if (iMatchingDB<0) {
+ HLTError("task %s (%p): error getting no of matching data blocks from task %s (%p), error %d", GetName(), this, pSrcTask->GetName(), pSrcTask, iMatchingDB);
+ iResult=iMatchingDB;
+ break;
+ } else if (iMatchingDB==0) {
+ HLTDebug("source task %s (%p) does not provide any matching data type for task %s (%p)", pSrcTask->GetName(), pSrcTask, GetName(), this);
}
- if ((iResult=pSrcTask->Subscribe(this, &fBlockDataArray[iSourceDataBlock],fBlockDataArray.size()-iSourceDataBlock))>=0) {
- for (int i=0; i<iResult; i++) {
- iInputDataVolume+=fBlockDataArray[i+iSourceDataBlock].fSize;
+ if ((iResult=pSrcTask->Subscribe(this, fBlockDataArray))>=0) {
+ iSOR=iEOR=-1;
+ AliHLTComponentBlockDataList::iterator block=fBlockDataArray.begin();
+ for (int i=0; block!=fBlockDataArray.end(); i++) {
+ bool bRemove=0;
+ bRemove|=(*block).fDataType==kAliHLTDataTypeSOR && !(iSOR<0 && (iSOR=i)>=0);
+ bRemove|=(*block).fDataType==kAliHLTDataTypeEOR && !(iEOR<0 && (iEOR=i)>=0);
+ //HLTInfo("block %d, iSOR=%d iEOR=%d remove=%d", i, iSOR, iEOR, bRemove);
+ if (i<iSourceDataBlock) {
+ assert(!bRemove);
+ } else if (bRemove) {
+ HLTDebug("remove duplicated event %s (%d)", AliHLTComponent::DataType2Text((*block).fDataType).c_str(), i);
+ pSrcTask->Release(&(*block), this);
+ block=fBlockDataArray.erase(block);
+ continue;
+ } else {
+ iInputDataVolume+=(*block).fSize;
// put the source task as many times into the list as it provides data blocks
// makes the bookkeeping for the data release easier
subscribedTaskList.push_back(pSrcTask);
+ }
+ block++;
}
HLTDebug("Task %s (%p) successfully subscribed to %d data block(s) of task %s (%p)", GetName(), this, iResult, pSrcTask->GetName(), pSrcTask);
- iSourceDataBlock+=iResult;
+ iSourceDataBlock=fBlockDataArray.size();
iResult=0;
} else {
HLTError("Task %s (%p): subscription to task %s (%p) failed with error %d", GetName(), this, pSrcTask->GetName(), pSrcTask, iResult);
iResult=-EFAULT;
}
lnk=lnk->Next();
- }
+ }
// process the event
int iNofTrial=0; // repeat processing if component returns -ENOSPC
iResult=pComponent->ProcessEvent(evtData, &fBlockDataArray[0], trigData, pTgtBuffer, size, outputBlockCnt, outputBlocks, edd);
HLTDebug("task %s: component %s ProcessEvent finnished (%d): size=%d blocks=%d", GetName(), pComponent->GetComponentID(), iResult, size, outputBlockCnt);
if (iResult>=0 && outputBlocks) {
- AliHLTComponentBlockDataList segments;
- for (AliHLTUInt32_t oblock=0; oblock<outputBlockCnt; oblock++) {
- AliHLTUInt32_t iblock=0;
- for (; iblock<evtData.fBlockCnt; iblock++) {
- if (fBlockDataArray[iblock].fPtr==outputBlocks[oblock].fPtr) {
- assert(subscribedTaskList[iblock]!=NULL);
- if (subscribedTaskList[iblock]==NULL) continue;
- HLTDebug("forward segment %d (source task %s %p) to data buffer %p", iblock, pSrcTask->GetName(), pSrcTask, fpDataBuffer);
- fpDataBuffer->Forward(subscribedTaskList[iblock], &fBlockDataArray[iblock]);
- subscribedTaskList[iblock]=NULL; // not to be released in the loop further down
- break;
+ if (fListTargets.First()!=NULL) {
+ AliHLTComponentBlockDataList segments;
+ for (AliHLTUInt32_t oblock=0; oblock<outputBlockCnt; oblock++) {
+ AliHLTUInt32_t iblock=0;
+ for (; iblock<evtData.fBlockCnt; iblock++) {
+ if (fBlockDataArray[iblock].fPtr==outputBlocks[oblock].fPtr) {
+ assert(subscribedTaskList[iblock]!=NULL);
+ if (subscribedTaskList[iblock]==NULL) continue;
+ HLTDebug("forward segment %d (source task %s %p) to data buffer %p", iblock, pSrcTask->GetName(), pSrcTask, fpDataBuffer);
+ fpDataBuffer->Forward(subscribedTaskList[iblock], &fBlockDataArray[iblock]);
+ subscribedTaskList[iblock]=NULL; // not to be released in the loop further down
+ break;
+ }
+ }
+ if (iblock==evtData.fBlockCnt) segments.push_back(outputBlocks[oblock]);
+ if (pTgtBuffer && segments.size()>0) {
+ iResult=fpDataBuffer->SetSegments(pTgtBuffer, &segments[0], segments.size());
}
}
- if (iblock==evtData.fBlockCnt) segments.push_back(outputBlocks[oblock]);
- }
- if (pTgtBuffer && segments.size()>0) {
- iResult=fpDataBuffer->SetSegments(pTgtBuffer, &segments[0], segments.size());
+ } else {
+ // no forwarding, actually we dont even need to keep the data, this is a
+ // dead end (fListTargets empty)
+ //iResult=fpDataBuffer->SetSegments(pTgtBuffer, outputBlocks, outputBlockCnt);
}
delete [] outputBlocks; outputBlocks=NULL; outputBlockCnt=0;
} else {
fpDataBuffer->Reset();
}
+ if (fListTargets.First()!=NULL) {
+ if (iSOR>=0 && subscribedTaskList[iSOR]!=NULL) {
+ HLTDebug("forward SOR event segment %d (source task %s %p) to data buffer %p", iSOR, pSrcTask->GetName(), pSrcTask, fpDataBuffer);
+ fpDataBuffer->Forward(subscribedTaskList[iSOR], &fBlockDataArray[iSOR]);
+ subscribedTaskList[iSOR]=NULL; // not to be released in the loop further down
+ }
+ if (iEOR>=0 && subscribedTaskList[iEOR]!=NULL) {
+ HLTDebug("forward EOR event (%s) segment %d (source task %s %p) to data buffer %p", AliHLTComponent::DataType2Text(fBlockDataArray[iEOR].fDataType).c_str(), iEOR, pSrcTask->GetName(), pSrcTask, fpDataBuffer);
+ fpDataBuffer->Forward(subscribedTaskList[iEOR], &fBlockDataArray[iEOR]);
+ subscribedTaskList[iEOR]=NULL; // not to be released in the loop further down
+ }
+ }
} else {
HLTError("task %s: no target buffer available", GetName());
iResult=-EFAULT;
return iResult;
}
-int AliHLTTask::Subscribe(const AliHLTTask* pConsumerTask, AliHLTComponentBlockData* pBlockDesc, int iArraySize)
+int AliHLTTask::Subscribe(const AliHLTTask* pConsumerTask, AliHLTComponentBlockDataList& blockDescList)
{
// see header file for function documentation
int iResult=0;
if (pConsumerTask) {
if (fpDataBuffer) {
- iResult=fpDataBuffer->Subscribe(pConsumerTask->GetComponent(), pBlockDesc, iArraySize);
+ iResult=fpDataBuffer->Subscribe(pConsumerTask->GetComponent(), blockDescList);
} else {
HLTFatal("internal data buffer missing");
iResult=-EFAULT;
* returns the number of blocks which would be prepared in case the target
* array is big enough.
* @param pConsumerTask the task which subscribes to the data
- * @param arrayBlockDesc pointer to block descriptor to be filled
- * @param iArraySize size of the block descriptor array
+ * @param blockDescList block descriptor list to be filled
* @return number of matching data blocks, negative error code if failed
*/
- int Subscribe(const AliHLTTask* pConsumerTask,
- AliHLTComponentBlockData* arrayBlockDesc, int iArraySize);
+ int Subscribe(const AliHLTTask* pConsumerTask, AliHLTComponentBlockDataList& blockDescList);
/**
* Release a block descriptor.