// $Id$
-// splitted from AliHLTConfiguration.cxx,v 1.25 2007/10/12 13:24:47
-/**************************************************************************
- * This file is property of and copyright by the ALICE HLT Project *
- * ALICE Experiment at CERN, All rights reserved. *
- * *
- * Primary Authors: Matthias Richter <Matthias.Richter@ift.uib.no> *
- * for The ALICE HLT Project. *
- * *
- * Permission to use, copy, modify and distribute this software and its *
- * documentation strictly for non-commercial purposes is hereby granted *
- * without fee, provided that the above copyright notice appears in all *
- * copies and that both the copyright notice and this permission notice *
- * appear in the supporting documentation. The authors make no claims *
- * about the suitability of this software for any purpose. It is *
- * provided "as is" without express or implied warranty. *
- **************************************************************************/
-
-/** @file AliHLTTask.cxx
- @author Matthias Richter
- @date
- @brief Implementation of HLT tasks.
-*/
-
-// see header file for class documentation
-// or
-// refer to README to build package
-// or
-// visit http://web.ift.uib.no/~kjeks/doc/alice-hlt
-
-#if __GNUC__>= 3
-using namespace std;
-#endif
+//**************************************************************************
+//* This file is property of and copyright by the *
+//* ALICE Experiment at CERN, All rights reserved. *
+//* *
+//* Primary Authors: Matthias Richter <Matthias.Richter@ift.uib.no> *
+//* for The ALICE HLT Project. *
+//* *
+//* Permission to use, copy, modify and distribute this software and its *
+//* documentation strictly for non-commercial purposes is hereby granted *
+//* without fee, provided that the above copyright notice appears in all *
+//* copies and that both the copyright notice and this permission notice *
+//* appear in the supporting documentation. The authors make no claims *
+//* about the suitability of this software for any purpose. It is *
+//* provided "as is" without express or implied warranty. *
+//**************************************************************************
+
+/// @file AliHLTTask.cxx
+/// @author Matthias Richter
+/// @date
+/// @brief Implementation of HLT tasks.
+///
#include <cerrno>
#include <cassert>
#include <iostream>
#include <string>
+#include <ctime>
#include "AliHLTTask.h"
#include "AliHLTConfiguration.h"
+#include "AliHLTConfigurationHandler.h"
#include "AliHLTComponent.h"
#include "AliHLTComponentHandler.h"
#include "TList.h"
+#include "AliHLTErrorGuard.h"
+
+using std::cout;
/** ROOT macro for the implementation of ROOT specific class methods */
ClassImp(AliHLTTask)
// see header file for function documentation
int iResult=0;
if (fpConfiguration!=NULL && pConf!=NULL && fpConfiguration!=pConf) {
- HLTWarning("overriding existing reference to configuration object %p (%s) by %p",
- fpConfiguration, GetName(), pConf);
+ HLTWarning("overriding existing reference to configuration object %p by %p",
+ fpConfiguration, pConf);
}
if (pConf!=NULL) fpConfiguration=pConf;
iResult=CreateComponent(fpConfiguration, pCH, fpComponent);
return iResult;
}
-int AliHLTTask::CreateComponent(AliHLTConfiguration* pConf, AliHLTComponentHandler* pCH, AliHLTComponent*& pComponent) const
+int AliHLTTask::CreateComponent(AliHLTConfiguration* pConfiguration, AliHLTComponentHandler* pCH, AliHLTComponent*& pComponent) const
{
// see header file for class documentation
int iResult=0;
+ if (!pConfiguration) return -EINVAL;
+
+ const AliHLTConfiguration* pConf=AliHLTConfigurationHandler::FindSubstitution(*pConfiguration);
+ if (!pConf) pConf=pConfiguration;
if (pConf) {
if (pCH) {
int argc=0;
pComponent->Deinit();
delete pComponent;
} else {
- HLTWarning("task %s (%p) doesn't seem to be in initialized", GetName(), this);
+ HLTWarning("task doesn't seem to be in initialized");
}
return iResult;
}
} else
iResult=FollowDependency(id, &tgtList);
if (iResult>0) {
- HLTMessage(" task \"%s\": dependency level %d ", GetName(), iResult);
+ HLTMessage(" dependency level %d ", iResult);
TObjLink* lnk=tgtList.FirstLink();
int i=iResult;
char* pSpace = new char[iResult+1];
if ((iResult=fpDataBuffer->SetConsumer(pTgtTask->GetComponent()))>=0) {
}
} else {
- break;
iResult=-EFAULT;
+ break;
}
lnk=lnk->Next();
}
return iResult;
}
-int AliHLTTask::ProcessTask(Int_t eventNo, AliHLTUInt32_t eventType)
+int AliHLTTask::ProcessTask(Int_t eventNo, AliHLTUInt32_t eventType, AliHLTTriggerMask_t trgMask,
+ AliHLTUInt32_t timestamp, AliHLTUInt32_t participatingDetectors)
{
// see header file for function documentation
int iResult=0;
// instances of SOR and EOR events to be kept
int iSOR=-1;
int iEOR=-1;
+ // TODO 2009-09-30
+ // generalize handling of the special blocks to be forwarded on SOR and EOR
+ // just adding a new specific handling for the ECS parameter block as a quick
+ // solution
+ int iECS=-1;
// subscribe to all source tasks
fBlockDataArray.clear();
HLTDebug("source task %s (%p) does not provide any matching data type for task %s (%p)", pSrcTask->GetName(), pSrcTask, GetName(), this);
}
if ((iResult=pSrcTask->Subscribe(this, fBlockDataArray))>=0) {
- iSOR=iEOR=-1;
+ iSOR=iEOR=iECS=-1;
AliHLTComponentBlockDataList::iterator block=fBlockDataArray.begin();
for (int i=0; block!=fBlockDataArray.end(); i++) {
bool bRemove=0;
bRemove|=(*block).fDataType==kAliHLTDataTypeSOR && !(iSOR<0 && (iSOR=i)>=0);
bRemove|=(*block).fDataType==kAliHLTDataTypeEOR && !(iEOR<0 && (iEOR=i)>=0);
+ bRemove|=(*block).fDataType==kAliHLTDataTypeECSParam && !(iECS<0 && (iECS=i)>=0);
//HLTInfo("block %d, iSOR=%d iEOR=%d remove=%d", i, iSOR, iEOR, bRemove);
if (i<iSourceDataBlock) {
assert(!bRemove);
// process the event
int iNofTrial=0; // repeat processing if component returns -ENOSPC
- AliHLTUInt32_t size=0;
+ AliHLTUInt32_t iLastOutputDataSize=0;
if (iResult>=0) {
do {
long unsigned int iOutputDataSize=0;
} else {
long unsigned int iConstBase=0;
double fInputMultiplier=0;
- if (pComponent->GetComponentType()!=AliHLTComponent::kSink)
+ if (pComponent->GetComponentType()!=AliHLTComponent::kSink) {
pComponent->GetOutputDataSize(iConstBase, fInputMultiplier);
+ // add a small margin to the buffer to allow optional component
+ // statistics
+ iConstBase+=100;
+#if defined(__DEBUG) || defined(HLT_COMPONENT_STATISTICS)
+ for (AliHLTComponentBlockDataList::iterator element=fBlockDataArray.begin();
+ element!=fBlockDataArray.end(); element++) {
+ if (element->fDataType==kAliHLTDataTypeComponentStatistics) {
+ iConstBase+=element->fSize;
+ }
+ }
+#endif
+ }
if (fInputMultiplier<0) {
HLTWarning("ignoring negative input multiplier");
fInputMultiplier=0;
iOutputDataSize=int(fInputMultiplier*iInputDataVolume) + iConstBase;
//HLTDebug("task %s: reqired output size %d", GetName(), iOutputDataSize);
}
+ if (iNofTrial == 0 && fpDataBuffer->GetMaxBufferSize() < iOutputDataSize) {
+ //If the estimated buffer size exceeds the maximum buffer size of AliHLTRawBuffer, decrease the buffer size.
+ //The estimation is often quite high, and GetMaxBufferSize should usually return a size that is sufficient.
+ HLTImportant("Reducing estimated output buffer size of %d to maximum output buffer size\n", iOutputDataSize);
+ iOutputDataSize = fpDataBuffer->GetMaxBufferSize();
+ }
if (iNofTrial>0) {
// dont process again if the buffer size is the same
- if (size==iOutputDataSize) break;
- HLTInfo("processing task %s again with buffer size %d", GetName(), iOutputDataSize);
+ if (iLastOutputDataSize>=iOutputDataSize) break;
+ HLTImportant("processing event %d again with buffer size %d", eventNo, iOutputDataSize);
}
AliHLTUInt8_t* pTgtBuffer=NULL;
if (iOutputDataSize>0) pTgtBuffer=fpDataBuffer->GetTargetBuffer(iOutputDataSize);
AliHLTComponent::FillEventData(evtData);
if (eventNo>=0)
evtData.fEventID=(AliHLTEventID_t)eventNo;
+ if (timestamp < kMaxUInt) evtData.fEventCreation_s=timestamp;
+ else
+ evtData.fEventCreation_s=static_cast<AliHLTUInt32_t>(time(NULL));
AliHLTComponentTriggerData trigData;
+ AliHLTEventTriggerData evtTrigData;
trigData.fStructSize=sizeof(trigData);
- trigData.fDataSize=0;
- trigData.fData=NULL;
- size=iOutputDataSize;
+ trigData.fDataSize=sizeof(AliHLTEventTriggerData);
+ memset(&evtTrigData, 0, trigData.fDataSize);
+ // Setup the CDH in the trigger data, based on the event type, CTP trigger
+ // mask and participating detectors.
+ evtTrigData.fCommonHeaderWordCnt=gkAliHLTCommonHeaderCount;
+ AliHLTUInt8_t l1msg = 0x0;
+ switch (eventType)
+ {
+ case gkAliEventTypeData: l1msg = 0x00; break;
+ case gkAliEventTypeDataReplay: l1msg = 0x00; break;
+ case gkAliEventTypeStartOfRun: l1msg = (0xE << 2) | 0x01; break;
+ case gkAliEventTypeEndOfRun: l1msg = (0xF << 2) | 0x01; break;
+ case gkAliEventTypeCalibration: l1msg = (0x1 << 6) | 0x01; break;
+ case gkAliEventTypeSoftware: l1msg = 0x01; break;
+ }
+ evtTrigData.fCommonHeader[1] = AliHLTUInt32_t(l1msg) << 14;
+ evtTrigData.fCommonHeader[3] = ((l1msg & 0x1) == 0x1) ? (participatingDetectors & 0xFFFFFF) : 0x0;
+ evtTrigData.fCommonHeader[5] = (trgMask & AliHLTTriggerMask_t(0xffffffff)).to_ulong();
+ evtTrigData.fCommonHeader[6] = ((trgMask>>32) & AliHLTTriggerMask_t(0x3ffff)).to_ulong();
+ evtTrigData.fCommonHeader[7] = ((trgMask>>50) & AliHLTTriggerMask_t(0xffffffff)).to_ulong();
+ evtTrigData.fCommonHeader[8] = ((trgMask>>72) & AliHLTTriggerMask_t(0x3ffff)).to_ulong();
+
+ trigData.fData=&evtTrigData;
+ iLastOutputDataSize=iOutputDataSize;
+ AliHLTUInt32_t size=iOutputDataSize;
AliHLTUInt32_t outputBlockCnt=0;
AliHLTComponentBlockData* outputBlocks=NULL;
AliHLTComponentEventDoneData* edd=NULL;
eventTypeBlock.fSpecification=eventType;
fBlockDataArray.push_back(eventTypeBlock);
+ if (CheckFilter(kHLTLogDebug)) Print("proc");
+
+ AliHLTUInt32_t iblock=0;
+ // check input and output buffers for consistency
+ // to be enabled after fixing bug with DataBuffer and forwarded SOR/EOR
+ //for (iblock=0; iblock<fBlockDataArray.size(); iblock++) {
+ // if ((AliHLTUInt8_t*)fBlockDataArray[iblock].fPtr >= pTgtBuffer+size) continue;
+ // if (pTgtBuffer >= (AliHLTUInt8_t*)fBlockDataArray[iblock].fPtr+fBlockDataArray[iblock].fSize) continue;
+ // HLTFatal("input and output buffer overlap for block descriptor %d (ptr %p size %d): output buffer %p %d",
+ // iblock, fBlockDataArray[iblock].fPtr, fBlockDataArray[iblock].fSize,
+ // pTgtBuffer, size);
+ //}
+
// process
evtData.fBlockCnt=fBlockDataArray.size();
iResult=pComponent->ProcessEvent(evtData, &fBlockDataArray[0], trigData, pTgtBuffer, size, outputBlockCnt, outputBlocks, edd);
- HLTDebug("task %s: component %s ProcessEvent finnished (%d): size=%d blocks=%d", GetName(), pComponent->GetComponentID(), iResult, size, outputBlockCnt);
+ HLTDebug("component %s ProcessEvent finnished (%d): size=%d blocks=%d", pComponent->GetComponentID(), iResult, size, outputBlockCnt);
+
+ // EventDoneData is for the moment ignored in AliHLTSystem
+ if (edd) {
+ HLTDebug("got EventDoneData size %d", edd->fDataSize);
+ delete [] reinterpret_cast<char*>(edd);
+ edd=NULL;
+ }
// remove event data block
fBlockDataArray.pop_back();
AliHLTComponentBlockDataList segments;
for (AliHLTUInt32_t oblock=0; oblock<outputBlockCnt; oblock++) {
// consistency check for data reference
- if (outputBlocks[oblock].fPtr!=NULL && outputBlocks[oblock].fOffset!=0) {
+ if (outputBlocks[oblock].fPtr!=NULL && outputBlocks[oblock].fPtr!=pTgtBuffer &&
+ outputBlocks[oblock].fOffset!=0) {
HLTWarning("output block %s 0x%08x has inconsistent data reference ptr=%p offset=0x%08x: "
"for new blocks use offset only, forwarded blocks have fPtr set only",
AliHLTComponent::DataType2Text(outputBlocks[oblock].fDataType).c_str(),
}
// check for duplicates in the output
+ // this check applies for forwarded data blocks where
+ // the ptr is not inside the target buffer
AliHLTUInt32_t checkblock=0;
for (; checkblock<oblock; checkblock++) {
- if (outputBlocks[oblock].fPtr!=NULL && outputBlocks[checkblock].fPtr==outputBlocks[oblock].fPtr) {
+ if (outputBlocks[oblock].fPtr!=NULL && outputBlocks[oblock].fPtr!=pTgtBuffer &&
+ outputBlocks[checkblock].fPtr==outputBlocks[oblock].fPtr) {
if (outputBlocks[checkblock].fSize!=outputBlocks[oblock].fSize ||
outputBlocks[checkblock].fDataType!=outputBlocks[oblock].fDataType) {
HLTWarning("output blocks %d (%s 0x%08x) and %d (%s 0x%08x) have identical data references ptr=%p "
- "but differ in data type and/or size: %d vs. %d",
+ "but differ in data type and/or size: %d vs. %d, ignoring block %d",
oblock,
AliHLTComponent::DataType2Text(outputBlocks[oblock].fDataType).c_str(),
outputBlocks[oblock].fSpecification,
outputBlocks[checkblock].fSpecification,
outputBlocks[oblock].fPtr,
outputBlocks[oblock].fSize,
- outputBlocks[checkblock].fSize);
+ outputBlocks[checkblock].fSize,
+ checkblock);
}
// ignore from the second copy
break;
// to the publisher task. The publisher task of a forwarded data block is
// removed from the list in order to keep the buffer open. It will be releases
// when the subscribing task releases it
- AliHLTUInt32_t iblock=0;
+ iblock=0;
for (; iblock<fBlockDataArray.size(); iblock++) {
+ if (outputBlocks[oblock].fDataType==kAliHLTDataTypeEvent) {
+ // the event type data block is an artificial data block
+ // ignore if it was forwarded
+ break;
+ }
if (fBlockDataArray[iblock].fPtr==outputBlocks[oblock].fPtr) {
assert(subscribedTaskList[iblock]!=NULL);
- if (subscribedTaskList[iblock]==NULL) continue;
+ if (subscribedTaskList[iblock]==NULL) {
+ ALIHLTERRORGUARD(1, "missing parent task for forwarded data block %s 0x%08x, original data block %s 0x%08x, subsequent errors are suppressed",
+ AliHLTComponent::DataType2Text(outputBlocks[oblock].fDataType).c_str(),
+ outputBlocks[oblock].fSpecification,
+ AliHLTComponent::DataType2Text(outputBlocks[iblock].fDataType).c_str(),
+ outputBlocks[iblock].fSpecification);
+ continue;
+ }
HLTDebug("forward segment %d (source task %s %p) to data buffer %p", iblock, pSrcTask->GetName(), pSrcTask, fpDataBuffer);
fpDataBuffer->Forward(subscribedTaskList[iblock], &fBlockDataArray[iblock]);
subscribedTaskList[iblock]=NULL; // not to be released in the loop further down
fpDataBuffer->Forward(subscribedTaskList[iEOR], &fBlockDataArray[iEOR]);
subscribedTaskList[iEOR]=NULL; // not to be released in the loop further down
}
+ if (iECS>=0 && subscribedTaskList[iECS]!=NULL) {
+ HLTDebug("forward ECS event (%s) segment %d (source task %s %p) to data buffer %p", AliHLTComponent::DataType2Text(fBlockDataArray[iECS].fDataType).c_str(), iECS, pSrcTask->GetName(), pSrcTask, fpDataBuffer);
+ fpDataBuffer->Forward(subscribedTaskList[iECS], &fBlockDataArray[iECS]);
+ subscribedTaskList[iECS]=NULL; // not to be released in the loop further down
+ }
}
} else {
- HLTError("task %s: no target buffer available", GetName());
+ HLTError("no target buffer available");
iResult=-EFAULT;
}
} while (iResult==-ENOSPC && iNofTrial++<1);
}
+ fBlockDataArray.clear();
+ if (CheckFilter(kHLTLogDebug)) Print("proc");
+
// now release all buffers which we have subscribed to
iSourceDataBlock=0;
AliHLTTaskPList::iterator element;
if (pSrcTask) {
int iTempRes=0;
if ((iTempRes=pSrcTask->Release(&fBlockDataArray[iSourceDataBlock], this))>=0) {
- HLTDebug("Task %s (%p) successfully released segment of task %s (%p)", GetName(), this, pSrcTask->GetName(), pSrcTask);
+ HLTDebug("successfully released segment of task %s (%p)", pSrcTask->GetName(), pSrcTask);
} else {
- HLTError("Task %s (%p): realease of task %s (%p) failed with error %d", GetName(), this, pSrcTask->GetName(), pSrcTask, iTempRes);
+ HLTError("realease of task %s (%p) failed with error %d", pSrcTask->GetName(), pSrcTask, iTempRes);
}
}
subscribedTaskList.erase(element);
iSourceDataBlock++;
}
if (subscribedTaskList.size()>0) {
- HLTError("task %s (%p): could not release all data buffers", GetName(), this);
+ HLTError("could not release all data buffers");
}
} else {
- HLTError("task %s (%p): internal failure (not initialized component %p, data buffer %p)", GetName(), this, fpComponent, fpDataBuffer);
+ HLTError("internal failure (not initialized component %p, data buffer %p)", fpComponent, fpDataBuffer);
iResult=-EFAULT;
}
return iResult;
}
+int AliHLTTask::SubscribeSourcesAndSkip()
+{
+ // function carries out the proper cleanup of the source components
+ // by subscribing and releasing
+ int iResult=0;
+ AliHLTTask* pSrcTask=NULL;
+ AliHLTTaskPList subscribedTaskList;
+
+ // cleanup the data buffer
+ if (fpDataBuffer) fpDataBuffer->Reset();
+
+ // subscribe to all source tasks
+ fBlockDataArray.clear();
+ for (TObjLink* lnk=fListDependencies.FirstLink(); lnk!=NULL; lnk=lnk->Next()) {
+ if (!lnk->GetObject()) continue;
+ pSrcTask=dynamic_cast<AliHLTTask*>(lnk->GetObject());
+ if (!pSrcTask) continue;
+ unsigned iPosition=fBlockDataArray.size();
+ if ((iResult=pSrcTask->Subscribe(this, fBlockDataArray))>0) {
+ for (unsigned i=iPosition; i<fBlockDataArray.size(); i++) {
+ subscribedTaskList.push_back(pSrcTask);
+ }
+ HLTDebug("subscribed to %d blocks of task %s (%p)", iResult, pSrcTask->GetName(), pSrcTask, iResult);
+ } else if (iResult<0) {
+ HLTError("failed to subscribe to task %s (%p) with error %d", pSrcTask->GetName(), pSrcTask, iResult);
+ }
+ }
+
+ unsigned iSourceDataBlock=0;
+ AliHLTTaskPList::iterator element;
+ while ((element=subscribedTaskList.begin())!=subscribedTaskList.end()) {
+ assert(iSourceDataBlock<fBlockDataArray.size());
+ pSrcTask=*element;
+ if (pSrcTask && iSourceDataBlock<fBlockDataArray.size()) {
+ if ((iResult=pSrcTask->Release(&fBlockDataArray[iSourceDataBlock], this))>=0) {
+ HLTDebug("successfully released segment of task %s (%p)", pSrcTask->GetName(), pSrcTask);
+ } else if (iSourceDataBlock>=fBlockDataArray.size()) {
+ HLTError("mismatch between list of subscribed tasks and block list in task %s (%p), can not release task %s (%p)", GetName(), this, pSrcTask->GetName(), pSrcTask);
+ } else {
+ HLTError("realease of task %s (%p) failed with error %d", pSrcTask->GetName(), pSrcTask, iResult);
+ }
+ }
+ subscribedTaskList.erase(element);
+ iSourceDataBlock++;
+ }
+ if (iSourceDataBlock<fBlockDataArray.size()) {
+ HLTWarning("not all subscriptions released for task %s (%p)", GetName(), this);
+ }
+
+ return 0;
+}
+
int AliHLTTask::GetNofMatchingDataBlocks(const AliHLTTask* pConsumerTask) const
{
// see header file for function documentation
lnk = lnk->Next();
}
} else {
- HLTMessage(" task \"%s\" not initialized", GetName());
+ HLTMessage(" task not initialized");
}
}
+void AliHLTTask::Print(const char* options) const
+{
+ // Overloaded from TObject
+ if (strcmp(options, "proc")==0) {
+ // print processing info
+ HLTMessage("**********************************************");
+ HLTMessage("******* AliHLTTask Processing info ***********");
+ HLTMessage(" component: %p %s", fpComponent, (fpComponent?fpComponent->GetComponentID():""));
+ HLTMessage(" data buffer: %p", fpDataBuffer);
+ if (fpDataBuffer) fpDataBuffer->Print("");
+ HLTMessage(" input block descriptors: %d", fBlockDataArray.size());
+ for (unsigned i=0; i<fBlockDataArray.size(); i++) {
+ HLTMessage(" %d: %s 0x%08x %p %d", i,
+ AliHLTComponent::DataType2Text(fBlockDataArray[i].fDataType).c_str(),
+ fBlockDataArray[i].fSpecification,
+ fBlockDataArray[i].fPtr,
+ fBlockDataArray[i].fSize
+ );
+ }
+ HLTMessage("**** end of AliHLTTask Processing info *******");
+ HLTMessage("**********************************************");
+ return;
+ }
+
+ cout << "AliHLTTask " << GetName() << " " << this
+ << " component " << fpComponent << " "
+ << (fpComponent?fpComponent->GetComponentID():"")
+ << endl;
+}
+
+
int AliHLTTask::CustomInit(AliHLTComponentHandler* /*pCH*/)
{
// default implementation nothing to do
// default implementation nothing to do
return 0;
}
+
+int AliHLTTask::LoggingVarargs(AliHLTComponentLogSeverity severity,
+ const char* originClass, const char* originFunc,
+ const char* file, int line, ... ) const
+{
+ // see header file for function documentation
+ int iResult=0;
+
+ va_list args;
+ va_start(args, line);
+
+ AliHLTLogging::SetLogString(this, " (%p)", "%s_pfmt_: ", GetName());
+ iResult=SendMessage(severity, originClass, originFunc, file, line, AliHLTLogging::BuildLogString(NULL, args, true /*append*/));
+ va_end(args);
+
+ return iResult;
+}