// $Id$ // splitted from AliHLTConfiguration.cxx,v 1.25 2007/10/12 13:24:47 /************************************************************************** * This file is property of and copyright by the ALICE HLT Project * * ALICE Experiment at CERN, All rights reserved. * * * * Primary Authors: Matthias Richter * * for The ALICE HLT Project. * * * * Permission to use, copy, modify and distribute this software and its * * documentation strictly for non-commercial purposes is hereby granted * * without fee, provided that the above copyright notice appears in all * * copies and that both the copyright notice and this permission notice * * appear in the supporting documentation. The authors make no claims * * about the suitability of this software for any purpose. It is * * provided "as is" without express or implied warranty. * **************************************************************************/ /** @file AliHLTTask.cxx @author Matthias Richter @date @brief Implementation of HLT tasks. */ // see header file for class documentation // or // refer to README to build package // or // visit http://web.ift.uib.no/~kjeks/doc/alice-hlt #if __GNUC__>= 3 using namespace std; #endif #include #include #include #include #include "AliHLTTask.h" #include "AliHLTConfiguration.h" #include "AliHLTComponent.h" #include "AliHLTComponentHandler.h" #include "TList.h" /** ROOT macro for the implementation of ROOT specific class methods */ ClassImp(AliHLTTask) AliHLTTask::AliHLTTask() : fpConfiguration(NULL), fpComponent(NULL), fpDataBuffer(NULL), fListTargets(), fListDependencies(), fBlockDataArray() { // see header file for class documentation // or // refer to README to build package // or // visit http://web.ift.uib.no/~kjeks/doc/alice-hlt } AliHLTTask::AliHLTTask(AliHLTConfiguration* pConf) : fpConfiguration(pConf), fpComponent(NULL), fpDataBuffer(NULL), fListTargets(), fListDependencies(), fBlockDataArray() { // see header file for function documentation } AliHLTTask::~AliHLTTask() { // see header file for function documentation TObjLink* lnk=fListDependencies.FirstLink(); while (lnk!=NULL) { AliHLTTask* pTask=(AliHLTTask*)lnk->GetObject(); pTask->UnsetTarget(this); lnk=lnk->Next(); } lnk=fListTargets.FirstLink(); while (lnk!=NULL) { AliHLTTask* pTask=(AliHLTTask*)lnk->GetObject(); pTask->UnsetDependency(this); lnk=lnk->Next(); } if (fpComponent) delete fpComponent; fpComponent=NULL; } int AliHLTTask::Init(AliHLTConfiguration* pConf, AliHLTComponentHandler* pCH) { // see header file for function documentation int iResult=0; if (fpConfiguration!=NULL && pConf!=NULL && fpConfiguration!=pConf) { HLTWarning("overriding existing reference to configuration object %p by %p", fpConfiguration, pConf); } if (pConf!=NULL) fpConfiguration=pConf; iResult=CreateComponent(fpConfiguration, pCH, fpComponent); if (iResult>=0) { iResult=CustomInit(pCH); } return iResult; } int AliHLTTask::CreateComponent(AliHLTConfiguration* pConf, AliHLTComponentHandler* pCH, AliHLTComponent*& pComponent) const { // see header file for class documentation int iResult=0; if (pConf) { if (pCH) { int argc=0; const char** argv=NULL; if ((iResult=pConf->GetArguments(&argv))>=0) { argc=iResult; // just to make it clear // TODO: we have to think about the optional environment parameter, // currently just set to NULL. iResult=pCH->CreateComponent(pConf->GetComponentID(), pComponent); if (pComponent && iResult>=0) { TString description; description.Form("chainid=%s", GetName()); pComponent->SetComponentDescription(description.Data()); const AliHLTAnalysisEnvironment* pEnv=pCH->GetEnvironment(); if ((iResult=pComponent->Init(pEnv, NULL, argc, argv))>=0) { //HLTDebug("component %s (%p) created", pComponent->GetComponentID(), pComponent); } else { HLTError("Initialization of component \"%s\" failed with error %d", pComponent->GetComponentID(), iResult); } } else { //HLTError("can not find component \"%s\" (%d)", pConf->GetComponentID(), iResult); } } else { HLTError("can not get argument list for configuration %s (%s)", pConf->GetName(), pConf->GetComponentID()); iResult=-EINVAL; } } else { HLTError("component handler instance needed for task initialization"); iResult=-EINVAL; } } else { HLTError("configuration object instance needed for task initialization"); iResult=-EINVAL; } return iResult; } int AliHLTTask::Deinit() { // see header file for function documentation int iResult=0; CustomCleanup(); AliHLTComponent* pComponent=GetComponent(); fpComponent=NULL; if (pComponent) { //HLTDebug("delete component %s (%p)", pComponent->GetComponentID(), pComponent); pComponent->Deinit(); delete pComponent; } else { HLTWarning("task doesn't seem to be in initialized"); } return iResult; } const char *AliHLTTask::GetName() const { // see header file for function documentation if (fpConfiguration) return fpConfiguration->GetName(); return TObject::GetName(); } AliHLTConfiguration* AliHLTTask::GetConf() const { // see header file for function documentation return fpConfiguration; } AliHLTComponent* AliHLTTask::GetComponent() const { // see header file for function documentation return fpComponent; } AliHLTTask* AliHLTTask::FindDependency(const char* id) { // see header file for function documentation AliHLTTask* pTask=NULL; if (id) { pTask=(AliHLTTask*)fListDependencies.FindObject(id); } return pTask; } int AliHLTTask::FollowDependency(const char* id, TList* pTgtList) { // see header file for function documentation int iResult=0; if (id) { AliHLTTask* pDep=NULL; if ((pDep=(AliHLTTask*)fListDependencies.FindObject(id))!=NULL) { if (pTgtList) pTgtList->Add(pDep); iResult++; } else { TObjLink* lnk=fListDependencies.FirstLink(); while (lnk && iResult==0) { pDep=(AliHLTTask*)lnk->GetObject(); if (pDep) { if ((iResult=pDep->FollowDependency(id, pTgtList))>0) { if (pTgtList) pTgtList->AddFirst(pDep); iResult++; } } else { iResult=-EFAULT; } lnk=lnk->Next(); } } } else { iResult=-EINVAL; } return iResult; } void AliHLTTask::PrintDependencyTree(const char* id, int bFromConfiguration) { // see header file for function documentation HLTLogKeyword("task dependencies"); int iResult=0; TList tgtList; if (bFromConfiguration) { if (fpConfiguration) iResult=fpConfiguration->FollowDependency(id, &tgtList); else iResult=-EFAULT; } else iResult=FollowDependency(id, &tgtList); if (iResult>0) { HLTMessage(" dependency level %d ", iResult); TObjLink* lnk=tgtList.FirstLink(); int i=iResult; char* pSpace = new char[iResult+1]; if (pSpace) { memset(pSpace, 32, iResult); pSpace[i]=0; while (lnk) { TObject* obj=lnk->GetObject(); HLTMessage(" %s^-- %s ", &pSpace[i--], obj->GetName()); lnk=lnk->Next(); } delete [] pSpace; } else { iResult=-ENOMEM; } } } int AliHLTTask::SetDependency(AliHLTTask* pDep) { // see header file for function documentation int iResult=0; if (pDep) { if (FindDependency(pDep->GetName())==NULL) { fListDependencies.Add(pDep); } else { iResult=-EEXIST; } } else { iResult=-EINVAL; } return iResult; } int AliHLTTask::UnsetDependency(AliHLTTask* pDep) { // see header file for function documentation fListDependencies.Remove(pDep); if (fpConfiguration) { fpConfiguration->InvalidateSources(); } return 0; } int AliHLTTask::CheckDependencies() { // see header file for function documentation int iResult=0; AliHLTConfiguration* pSrc=fpConfiguration->GetFirstSource(); while (pSrc) { if (FindDependency(pSrc->GetName())==NULL) { //HLTDebug("dependency \"%s\" unresolved", pSrc->GetName()); iResult++; } pSrc=fpConfiguration->GetNextSource(); } return iResult; } int AliHLTTask::Depends(AliHLTTask* pTask) { // see header file for function documentation int iResult=0; if (pTask) { if (fpConfiguration) { iResult=fpConfiguration->GetSource(pTask->GetName())!=NULL; if (iResult>0) { //HLTDebug("task \"%s\" depends on \"%s\"", GetName(), pTask->GetName()); } else { //HLTDebug("task \"%s\" independend of \"%s\"", GetName(), pTask->GetName()); } } else { iResult=-EFAULT; } } else { iResult=-EINVAL; } return iResult; } AliHLTTask* AliHLTTask::FindTarget(const char* id) { // see header file for function documentation AliHLTTask* pTask=NULL; if (id) { pTask=(AliHLTTask*)fListTargets.FindObject(id); } return pTask; } int AliHLTTask::SetTarget(AliHLTTask* pTgt) { // see header file for function documentation int iResult=0; if (pTgt) { if (FindTarget(pTgt->GetName())==NULL) { fListTargets.Add(pTgt); } else { iResult=-EEXIST; } } else { iResult=-EINVAL; } return iResult; } int AliHLTTask::UnsetTarget(AliHLTTask* pTarget) { // see header file for function documentation fListTargets.Remove(pTarget); return 0; } int AliHLTTask::StartRun() { // see header file for function documentation int iResult=0; int iNofInputDataBlocks=0; AliHLTComponent* pComponent=GetComponent(); if (pComponent) { // determine the number of input data blocks provided from the source tasks { // set scope for lnk as a local variable TObjLink* lnk=fListDependencies.FirstLink(); while (lnk && iResult>=0) { AliHLTTask* pSrcTask=(AliHLTTask*)lnk->GetObject(); if (pSrcTask) { if ((iResult=pSrcTask->GetNofMatchingDataTypes(this))>0) { iNofInputDataBlocks+=iResult; } else if (iResult==0) { HLTWarning("source task %s (%p) does not provide any matching data type for task %s (%p)", pSrcTask->GetName(), pSrcTask, GetName(), this); } else { HLTError("task %s (%p): error getting matching data types for source task %s (%p)", GetName(), this, pSrcTask->GetName(), pSrcTask); iResult=-EFAULT; } } lnk=lnk->Next(); } } if (iResult>=0) { if (fBlockDataArray.size()>0) { HLTWarning("block data array for task %s (%p) was not cleaned", GetName(), this); fBlockDataArray.clear(); } // component init // the initialization of the component is done by the ComponentHandler after creation // of the component. //iResult=Init( AliHLTAnalysisEnvironment* environ, void* environ_param, int argc, const char** argv ); // allocate the data buffer, which controls the output buffer and subscriptions if (iResult>=0) { fpDataBuffer=new AliHLTDataBuffer; if (fpDataBuffer!=NULL) { fpDataBuffer->SetLocalLoggingLevel(GetLocalLoggingLevel()); HLTDebug("created data buffer %p for task %s (%p)", fpDataBuffer, GetName(), this); TObjLink* lnk=fListTargets.FirstLink(); while (lnk && iResult>=0) { AliHLTTask* pTgtTask=(AliHLTTask*)lnk->GetObject(); if (pTgtTask) { if ((iResult=fpDataBuffer->SetConsumer(pTgtTask->GetComponent()))>=0) { } } else { break; iResult=-EFAULT; } lnk=lnk->Next(); } } else { HLTFatal("can not create data buffer object, memory allocation failed"); iResult=-ENOMEM; } } } if (iResult>=0) { // send the SOR event } } else { HLTError("task %s (%p) does not have a component", GetName(), this); iResult=-EFAULT; } return iResult; } int AliHLTTask::EndRun() { // see header file for function documentation int iResult=0; if (fBlockDataArray.size()>0) { fBlockDataArray.clear(); } if (fpDataBuffer) { AliHLTDataBuffer* pBuffer=fpDataBuffer; fpDataBuffer=NULL; delete pBuffer; } return iResult; } int AliHLTTask::ProcessTask(Int_t eventNo, AliHLTUInt32_t eventType) { // see header file for function documentation int iResult=0; AliHLTComponent* pComponent=GetComponent(); if (pComponent && fpDataBuffer) { HLTDebug("Processing task %s (%p) fpDataBuffer %p", GetName(), this, fpDataBuffer); fpDataBuffer->Reset(); int iSourceDataBlock=0; int iInputDataVolume=0; AliHLTTask* pSrcTask=NULL; AliHLTTaskPList subscribedTaskList; TObjLink* lnk=fListDependencies.FirstLink(); // instances of SOR and EOR events to be kept int iSOR=-1; int iEOR=-1; // subscribe to all source tasks fBlockDataArray.clear(); while (lnk && iResult>=0) { pSrcTask=(AliHLTTask*)lnk->GetObject(); if (pSrcTask) { int iMatchingDB=pSrcTask->GetNofMatchingDataBlocks(this); if (iMatchingDB<0) { HLTError("task %s (%p): error getting no of matching data blocks from task %s (%p), error %d", GetName(), this, pSrcTask->GetName(), pSrcTask, iMatchingDB); iResult=iMatchingDB; break; } else if (iMatchingDB==0) { HLTDebug("source task %s (%p) does not provide any matching data type for task %s (%p)", pSrcTask->GetName(), pSrcTask, GetName(), this); } if ((iResult=pSrcTask->Subscribe(this, fBlockDataArray))>=0) { iSOR=iEOR=-1; AliHLTComponentBlockDataList::iterator block=fBlockDataArray.begin(); for (int i=0; block!=fBlockDataArray.end(); i++) { bool bRemove=0; bRemove|=(*block).fDataType==kAliHLTDataTypeSOR && !(iSOR<0 && (iSOR=i)>=0); bRemove|=(*block).fDataType==kAliHLTDataTypeEOR && !(iEOR<0 && (iEOR=i)>=0); //HLTInfo("block %d, iSOR=%d iEOR=%d remove=%d", i, iSOR, iEOR, bRemove); if (iRelease(&(*block), this); block=fBlockDataArray.erase(block); continue; } else { iInputDataVolume+=(*block).fSize; // put the source task as many times into the list as it provides data blocks // makes the bookkeeping for the data release easier subscribedTaskList.push_back(pSrcTask); } block++; } HLTDebug("Task %s (%p) successfully subscribed to %d data block(s) of task %s (%p)", GetName(), this, iResult, pSrcTask->GetName(), pSrcTask); iSourceDataBlock=fBlockDataArray.size(); iResult=0; } else { HLTError("Task %s (%p): subscription to task %s (%p) failed with error %d", GetName(), this, pSrcTask->GetName(), pSrcTask, iResult); iResult=-EFAULT; } } else { HLTFatal("fatal internal error in ROOT list handling"); iResult=-EFAULT; } lnk=lnk->Next(); } // process the event int iNofTrial=0; // repeat processing if component returns -ENOSPC AliHLTUInt32_t iLastOutputDataSize=0; if (iResult>=0) { do { long unsigned int iOutputDataSize=0; AliHLTConfiguration* pConf=GetConf(); // check if there was a buffer size specified, query output size // estimator from component otherwize if (pConf && pConf->GetOutputBufferSize()>=0) { iOutputDataSize=pConf->GetOutputBufferSize(); } else { long unsigned int iConstBase=0; double fInputMultiplier=0; if (pComponent->GetComponentType()!=AliHLTComponent::kSink) { pComponent->GetOutputDataSize(iConstBase, fInputMultiplier); // add a small margin to the buffer to allow optional component // statistics iConstBase+=100; #if defined(__DEBUG) || defined(HLT_COMPONENT_STATISTICS) for (AliHLTComponentBlockDataList::iterator element=fBlockDataArray.begin(); element!=fBlockDataArray.end(); element++) { if (element->fDataType==kAliHLTDataTypeComponentStatistics) { iConstBase+=element->fSize; } } #endif } if (fInputMultiplier<0) { HLTWarning("ignoring negative input multiplier"); fInputMultiplier=0; } iOutputDataSize=int(fInputMultiplier*iInputDataVolume) + iConstBase; //HLTDebug("task %s: reqired output size %d", GetName(), iOutputDataSize); } if (iNofTrial>0) { // dont process again if the buffer size is the same if (iLastOutputDataSize==iOutputDataSize) break; HLTImportant("processing event %d again with buffer size %d", eventNo, iOutputDataSize); } AliHLTUInt8_t* pTgtBuffer=NULL; if (iOutputDataSize>0) pTgtBuffer=fpDataBuffer->GetTargetBuffer(iOutputDataSize); //HLTDebug("provided raw buffer %p", pTgtBuffer); AliHLTComponentEventData evtData; AliHLTComponent::FillEventData(evtData); if (eventNo>=0) evtData.fEventID=(AliHLTEventID_t)eventNo; AliHLTComponentTriggerData trigData; trigData.fStructSize=sizeof(trigData); trigData.fDataSize=0; trigData.fData=NULL; iLastOutputDataSize=iOutputDataSize; AliHLTUInt32_t size=iOutputDataSize; AliHLTUInt32_t outputBlockCnt=0; AliHLTComponentBlockData* outputBlocks=NULL; AliHLTComponentEventDoneData* edd=NULL; if (pTgtBuffer!=NULL || iOutputDataSize==0) { // add event type data block // the block is removed immediately after processing from the list AliHLTComponentBlockData eventTypeBlock; AliHLTComponent::FillBlockData(eventTypeBlock); // Note: no payload! eventTypeBlock.fDataType=kAliHLTDataTypeEvent; eventTypeBlock.fSpecification=eventType; fBlockDataArray.push_back(eventTypeBlock); // process evtData.fBlockCnt=fBlockDataArray.size(); iResult=pComponent->ProcessEvent(evtData, &fBlockDataArray[0], trigData, pTgtBuffer, size, outputBlockCnt, outputBlocks, edd); HLTDebug("component %s ProcessEvent finnished (%d): size=%d blocks=%d", pComponent->GetComponentID(), iResult, size, outputBlockCnt); // EventDoneData is for the moment ignored in AliHLTSystem if (edd) { HLTDebug("got EventDoneData size %d", edd->fDataSize); delete [] reinterpret_cast(edd); edd=NULL; } // remove event data block fBlockDataArray.pop_back(); // check for forwarded blocks. // loop over all output blocks and check // 1. for duplicate blocks (pointing to same location in output buffer // or to the same buffer) // 2. for blocks forwarded from the input. if (iResult>=0 && outputBlocks) { if (fListTargets.First()!=NULL) { AliHLTComponentBlockDataList segments; for (AliHLTUInt32_t oblock=0; oblockGetName(), pSrcTask, fpDataBuffer); fpDataBuffer->Forward(subscribedTaskList[iblock], &fBlockDataArray[iblock]); subscribedTaskList[iblock]=NULL; // not to be released in the loop further down break; } } if (iblock==fBlockDataArray.size()) segments.push_back(outputBlocks[oblock]); } if (pTgtBuffer && segments.size()>0) { iResult=fpDataBuffer->SetSegments(pTgtBuffer, &segments[0], segments.size()); } } else { // no forwarding, actually we dont even need to keep the data, this is a // dead end (fListTargets empty) //iResult=fpDataBuffer->SetSegments(pTgtBuffer, outputBlocks, outputBlockCnt); } delete [] outputBlocks; outputBlocks=NULL; outputBlockCnt=0; } else { fpDataBuffer->Reset(); } if (fListTargets.First()!=NULL) { if (iSOR>=0 && subscribedTaskList[iSOR]!=NULL) { HLTDebug("forward SOR event segment %d (source task %s %p) to data buffer %p", iSOR, pSrcTask->GetName(), pSrcTask, fpDataBuffer); fpDataBuffer->Forward(subscribedTaskList[iSOR], &fBlockDataArray[iSOR]); subscribedTaskList[iSOR]=NULL; // not to be released in the loop further down } if (iEOR>=0 && subscribedTaskList[iEOR]!=NULL) { HLTDebug("forward EOR event (%s) segment %d (source task %s %p) to data buffer %p", AliHLTComponent::DataType2Text(fBlockDataArray[iEOR].fDataType).c_str(), iEOR, pSrcTask->GetName(), pSrcTask, fpDataBuffer); fpDataBuffer->Forward(subscribedTaskList[iEOR], &fBlockDataArray[iEOR]); subscribedTaskList[iEOR]=NULL; // not to be released in the loop further down } } } else { HLTError("no target buffer available"); iResult=-EFAULT; } } while (iResult==-ENOSPC && iNofTrial++<1); } // now release all buffers which we have subscribed to iSourceDataBlock=0; AliHLTTaskPList::iterator element; while ((element=subscribedTaskList.begin())!=subscribedTaskList.end()) { pSrcTask=*element; if (pSrcTask) { int iTempRes=0; if ((iTempRes=pSrcTask->Release(&fBlockDataArray[iSourceDataBlock], this))>=0) { HLTDebug("successfully released segment of task %s (%p)", pSrcTask->GetName(), pSrcTask); } else { HLTError("realease of task %s (%p) failed with error %d", pSrcTask->GetName(), pSrcTask, iTempRes); } } subscribedTaskList.erase(element); iSourceDataBlock++; } if (subscribedTaskList.size()>0) { HLTError("could not release all data buffers"); } } else { HLTError("internal failure (not initialized component %p, data buffer %p)", fpComponent, fpDataBuffer); iResult=-EFAULT; } return iResult; } int AliHLTTask::GetNofMatchingDataBlocks(const AliHLTTask* pConsumerTask) const { // see header file for function documentation int iResult=0; if (pConsumerTask) { if (fpDataBuffer) { iResult=fpDataBuffer->FindMatchingDataBlocks(pConsumerTask->GetComponent(), NULL); } else { HLTFatal("internal data buffer missing"); iResult=-EFAULT; } } else { iResult=-EINVAL; } return iResult; } int AliHLTTask::GetNofMatchingDataTypes(const AliHLTTask* pConsumerTask) const { // see header file for function documentation int iResult=0; if (pConsumerTask) { AliHLTComponent* pComponent=GetComponent(); if (!pComponent) { // init ? HLTError("component not initialized"); iResult=-EFAULT; } if (pComponent) { iResult=pComponent->FindMatchingDataTypes(pConsumerTask->GetComponent(), NULL); } else { HLTFatal("task initialization failed"); iResult=-EFAULT; } } else { iResult=-EINVAL; } return iResult; } int AliHLTTask::Subscribe(const AliHLTTask* pConsumerTask, AliHLTComponentBlockDataList& blockDescList) { // see header file for function documentation int iResult=0; if (pConsumerTask) { if (fpDataBuffer) { iResult=fpDataBuffer->Subscribe(pConsumerTask->GetComponent(), blockDescList); } else { HLTFatal("internal data buffer missing"); iResult=-EFAULT; } } else { iResult=-EINVAL; } return iResult; } int AliHLTTask::Release(AliHLTComponentBlockData* pBlockDesc, const AliHLTTask* pConsumerTask) { // see header file for function documentation int iResult=0; if (pConsumerTask && pBlockDesc) { if (fpDataBuffer) { iResult=fpDataBuffer->Release(pBlockDesc, pConsumerTask->GetComponent(), this); } else { HLTFatal("internal data buffer missing"); iResult=-EFAULT; } } else { iResult=-EINVAL; } return iResult; } void AliHLTTask::PrintStatus() { // see header file for function documentation HLTLogKeyword("task properties"); AliHLTComponent* pComponent=GetComponent(); if (pComponent) { HLTMessage(" component: %s (%p)", pComponent->GetComponentID(), pComponent); } else { HLTMessage(" no component set!"); } if (fpConfiguration) { AliHLTConfiguration* pSrc=fpConfiguration->GetFirstSource(); while (pSrc) { const char* pQualifier="unresolved"; if (FindDependency(pSrc->GetName())) pQualifier="resolved"; HLTMessage(" source: %s (%s)", pSrc->GetName(), pQualifier); pSrc=fpConfiguration->GetNextSource(); } TObjLink* lnk = fListTargets.FirstLink(); while (lnk) { TObject *obj = lnk->GetObject(); HLTMessage(" target: %s", obj->GetName()); lnk = lnk->Next(); } } else { HLTMessage(" task not initialized"); } } int AliHLTTask::CustomInit(AliHLTComponentHandler* /*pCH*/) { // default implementation nothing to do return 0; } int AliHLTTask::CustomCleanup() { // default implementation nothing to do return 0; } int AliHLTTask::LoggingVarargs(AliHLTComponentLogSeverity severity, const char* originClass, const char* originFunc, const char* file, int line, ... ) const { // see header file for function documentation int iResult=0; va_list args; va_start(args, line); AliHLTLogging::SetLogString("%s (%p): ", GetName(), this); iResult=SendMessage(severity, originClass, originFunc, file, line, AliHLTLogging::BuildLogString(NULL, args, true /*append*/)); va_end(args); return iResult; }