// $Id$
/**************************************************************************
- * Copyright(c) 1998-1999, ALICE Experiment at CERN, All rights reserved. *
+ * This file is property of and copyright by the ALICE HLT Project *
+ * ALICE Experiment at CERN, All rights reserved. *
* *
- * Authors: Matthias Richter <Matthias.Richter@ift.uib.no> *
- * for The ALICE Off-line Project. *
+ * Primary Authors: Matthias Richter <Matthias.Richter@ift.uib.no> *
+ * for The ALICE HLT Project. *
* *
* Permission to use, copy, modify and distribute this software and its *
* documentation strictly for non-commercial purposes is hereby granted *
@brief Handling of Data Buffers for HLT components.
*/
+// see header file for class documentation
+// or
+// refer to README to build package
+// or
+// visit http://web.ift.uib.no/~kjeks/doc/alice-hlt
+
#if __GNUC__>= 3
using namespace std;
#endif
#include "AliHLTDataBuffer.h"
#include "AliHLTConsumerDescriptor.h"
#include "AliHLTComponent.h"
-#include <string>
-#include "AliHLTSystem.h"
+#include <cerrno>
+#include <cassert>
+//#include <string>
+//#include "AliHLTSystem.h"
/** ROOT macro for the implementation of ROOT specific class methods */
ClassImp(AliHLTDataBuffer)
fgNofInstances++;
}
-AliHLTDataBuffer::AliHLTDataBuffer(const AliHLTDataBuffer&)
- :
- TObject(),
- AliHLTLogging(),
- fSegments(),
- fConsumers(),
- fActiveConsumers(),
- fReleasedConsumers(),
- fpBuffer(NULL),
- fFlags(0)
-{
- // see header file for function documentation
- HLTFatal("copy constructor untested");
-}
-
-AliHLTDataBuffer& AliHLTDataBuffer::operator=(const AliHLTDataBuffer&)
-{
- // see header file for function documentation
- HLTFatal("assignment operator untested");
- return *this;
-}
-
int AliHLTDataBuffer::fgNofInstances=0;
vector<AliHLTDataBuffer::AliHLTRawBuffer*> AliHLTDataBuffer::fgFreeBuffers;
vector<AliHLTDataBuffer::AliHLTRawBuffer*> AliHLTDataBuffer::fgActiveBuffers;
{
// see header file for function documentation
int iResult=0;
+
+ // Matthias 26.09.2007 relax the restriction to matching data blocks
+ // all blocks are passed to the consumer, which is the policy also in
+ // PubSub
+ tgtList.assign(fSegments.begin(), fSegments.end());
+ iResult=tgtList.size();
+ return iResult;
+
if (pConsumer) {
vector<AliHLTComponentDataType> dtlist;
((AliHLTComponent*)pConsumer)->GetInputDataTypes(dtlist);
AliHLTConsumerDescriptor* pDesc=FindConsumer(pConsumer, fConsumers);
if (pDesc) {
vector<AliHLTDataBuffer::AliHLTDataSegment> tgtList;
- /* TODO: think about a good policy for this check
- * is it enough that at least one segment is available, or have all to be available?
- * or is it possible to have optional segments?
- */
- if ((iResult=FindMatchingDataSegments(pConsumer, tgtList))>0) {
+ // Matthias 26.07.2007 AliHLTSystem should behave the same way as PubSub
+ // so it does not matter if there are matching data types or not, unless
+ // we implement such a check in PubSub
+ if ((iResult=FindMatchingDataSegments(pConsumer, tgtList))>=0) {
int i =0;
vector<AliHLTDataBuffer::AliHLTDataSegment>::iterator segment=tgtList.begin();
while (segment!=tgtList.end() && i<iArraySize) {
arrayBlockDesc[i].fShmKey.fStructSize=sizeof(AliHLTComponentShmData);
arrayBlockDesc[i].fShmKey.fShmType=gkAliHLTComponentInvalidShmType;
arrayBlockDesc[i].fShmKey.fShmID=gkAliHLTComponentInvalidShmID;
+ // This models the behavior of PubSub.
+ // For incoming data blocks, fOffset must be ignored by the
+ // processing component. It is set for bookkeeping in the framework.
+ // fPtr always points to the beginning of the data.
arrayBlockDesc[i].fOffset=(*segment).fSegmentOffset;
- arrayBlockDesc[i].fPtr=fpBuffer->fPtr;
+ AliHLTUInt8_t* pTgt=*fpBuffer;
+ pTgt+=(*segment).fSegmentOffset;
+ arrayBlockDesc[i].fPtr=reinterpret_cast<void*>(pTgt);
arrayBlockDesc[i].fSize=(*segment).fSegmentSize;
arrayBlockDesc[i].fDataType=(*segment).fDataType;
arrayBlockDesc[i].fSpecification=(*segment).fSpecification;
pDesc->SetActiveDataSegment(arrayBlockDesc[i].fOffset, arrayBlockDesc[i].fSize);
- HLTDebug("component %p (%s) subscribed to segment #%d offset %d", pConsumer, ((AliHLTComponent*)pConsumer)->GetComponentID(), i, arrayBlockDesc[i].fOffset);
+ HLTDebug("component %p (%s) subscribed to segment #%d offset %d size %d data type %s %#x",
+ pConsumer, ((AliHLTComponent*)pConsumer)->GetComponentID(), i, arrayBlockDesc[i].fOffset,
+ arrayBlockDesc[i].fSize, (AliHLTComponent::DataType2Text(arrayBlockDesc[i].fDataType)).c_str(),
+ arrayBlockDesc[i].fSpecification);
i++;
segment++;
}
+ // check whether there was enough space for the segments
+ if (i!=(int)tgtList.size()) {
+ HLTError("too little space in block descriptor array: required %d, available %d", tgtList.size(), iArraySize);
+ iResult=-ENOSPC;
+ } else {
// move this consumer to the active list
if (ChangeConsumerState(pDesc, fConsumers, fActiveConsumers)>=0) {
HLTDebug("component %p (%s) subscribed to data buffer %p", pConsumer, ((AliHLTComponent*)pConsumer)->GetComponentID(), this);
HLTError("can not activate consumer %p for data buffer %p", pConsumer, this);
iResult=-EACCES;
}
+ }
} else {
HLTError("unresolved data segment(s) for component %p (%s)", pConsumer, ((AliHLTComponent*)pConsumer)->GetComponentID());
iResult=-EBADF;
iResult=-ENOENT;
}
} else {
- HLTError("data buffer %p is empty", this);
- iResult=-ENODATA;
+ // Matthias 26.07.2007 until now, data had to be present for successful subscription
+ // in order to be consistent with the PubSub framework, this restiction has been
+ // removed
+ //HLTError("data buffer %p is empty", this);
+ //iResult=-ENODATA;
}
} else {
HLTError("invalid parameter");
}
if (pDesc->GetNofActiveSegments()==0) {
if ((iResult=ChangeConsumerState(pDesc, fActiveConsumers, fReleasedConsumers))>=0) {
- if (GetNofActiveConsumers()==0) {
+ if (GetNofActiveConsumers()==0 && GetNofPendingConsumers()==0) {
// this is the last consumer, reset the consumer list and release the raw buffer
ResetDataBuffer();
}
{
// see header file for function documentation
AliHLTUInt8_t* pTargetBuffer=NULL;
+ if (fpBuffer!=NULL) {
+ HLTWarning("data buffer not properly reset, possible memory leak\n");
+ }
fpBuffer=CreateRawBuffer(iMinSize);
if (fpBuffer) {
- pTargetBuffer=(AliHLTUInt8_t*)fpBuffer->fPtr;
+ pTargetBuffer=*fpBuffer;
} else {
HLTError("can not create raw buffer");
}
int iResult=0;
if (pTgt && arrayBlockData && iSize>=0) {
if (fpBuffer) {
- if (fpBuffer->fPtr==(void*)pTgt) {
+ if (*fpBuffer==pTgt) {
AliHLTDataBuffer::AliHLTDataSegment segment;
- memset(&segment, 0, sizeof(AliHLTDataBuffer::AliHLTDataSegment));
for (int i=0; i<iSize; i++) {
- if (arrayBlockData[i].fOffset+arrayBlockData[i].fSize<=fpBuffer->fSize) {
- segment.fSegmentOffset=arrayBlockData[i].fOffset;
- segment.fSegmentSize=arrayBlockData[i].fSize;
- segment.fDataType=arrayBlockData[i].fDataType;
- segment.fSpecification=arrayBlockData[i].fSpecification;
- fSegments.push_back(segment);
- HLTDebug("set segment %s with size %d at offset %d", AliHLTComponent::DataType2Text(segment.fDataType).data(), segment.fSegmentSize, segment.fSegmentOffset);
+ // This function has to model the behavior of PubSub
+ // For output blocks only the fOffset value is used, this must be the offset
+ // relative to the output pointer. fPtr must be either NULL or the output
+ // pointer
+ if (arrayBlockData[i].fPtr==NULL ||
+ arrayBlockData[i].fPtr==*fpBuffer) {
+ if (arrayBlockData[i].fOffset+arrayBlockData[i].fSize<=fpBuffer->GetUsedSize()) {
+ segment.fSegmentOffset=arrayBlockData[i].fOffset;
+ segment.fSegmentSize=arrayBlockData[i].fSize;
+ segment.fDataType=arrayBlockData[i].fDataType;
+ segment.fSpecification=arrayBlockData[i].fSpecification;
+ fSegments.push_back(segment);
+ HLTDebug("set segment %s with size %d at offset %d", AliHLTComponent::DataType2Text(segment.fDataType).data(), segment.fSegmentSize, segment.fSegmentOffset);
+ } else {
+ HLTError("block data specification %#d (%s) exceeds size of data buffer", i, AliHLTComponent::DataType2Text(arrayBlockData[i].fDataType).data());
+ HLTError("block offset=%d, block size=%d, buffer size=%d", arrayBlockData[i].fOffset, arrayBlockData[i].fSize, fpBuffer->GetUsedSize());
+ iResult=-E2BIG;
+ }
} else {
- HLTError("block data specification %#d (%s) exceeds size of data buffer", i, AliHLTComponent::DataType2Text(arrayBlockData[i].fDataType).data());
- HLTError("block offset=%d, block size=%d, buffer size=%d", arrayBlockData[i].fOffset, arrayBlockData[i].fSize, fpBuffer->fSize);
- iResult=-E2BIG;
+ HLTError("invalid pointer (%p) in block data specification (buffer %p size %d)."
+ "please note: for output blocks only the fOffset value is valid and must "
+ "be relative to the output buffer", arrayBlockData[i].fPtr, fpBuffer->GetPointer(), fpBuffer->GetUsedSize());
+ iResult=-ERANGE;
}
}
} else {
- HLTError("this data buffer (%p) does not match the internal data buffer %p of raw buffer %p", pTgt, fpBuffer->fPtr, fpBuffer);
+ HLTError("this data buffer (%p) does not match the internal data buffer %p of raw buffer %p", pTgt, fpBuffer->GetPointer(), fpBuffer);
iResult=-EINVAL;
}
} else {
return iResult;
}
+int AliHLTDataBuffer::GetNofPendingConsumers()
+{
+ // see header file for function documentation
+ int iResult=fConsumers.size();
+ return iResult;
+}
+
int AliHLTDataBuffer::GetNofActiveConsumers()
{
// see header file for function documentation
unsigned int reqSize=size+fgkSafetyPatternSize;
vector<AliHLTRawBuffer*>::iterator buffer=fgFreeBuffers.begin();
while (buffer!=fgFreeBuffers.end() && pRawBuffer==NULL) {
- if ((*buffer)->fTotalSize>=reqSize && ((*buffer)->fTotalSize-reqSize)<fgMargin) {
+ if ((*buffer)->CheckSize(reqSize)) {
// assign this element
pRawBuffer=*buffer;
- pRawBuffer->fSize=size;
+ pRawBuffer->UseBuffer(size);
fgFreeBuffers.erase(buffer);
- fgLogging.Logging(kHLTLogDebug, "AliHLTDataBuffer::CreateRawBuffer", "data buffer handling", "raw buffer container %p provided for request of %d bytes (total %d available in buffer %p)", pRawBuffer, size, pRawBuffer->fTotalSize, pRawBuffer->fPtr);
+ fgLogging.Logging(kHLTLogDebug, "AliHLTDataBuffer::CreateRawBuffer", "data buffer handling", "raw buffer container %p provided for request of %d bytes (total %d available in buffer %p)", pRawBuffer, size, pRawBuffer->GetTotalSize(), pRawBuffer->GetPointer());
fgActiveBuffers.push_back(pRawBuffer);
break;
}
}
if (pRawBuffer==NULL) {
// no buffer found, create a new one
- pRawBuffer=new AliHLTRawBuffer;
+ pRawBuffer=new AliHLTRawBuffer(reqSize);
if (pRawBuffer) {
- memset(pRawBuffer, 0, sizeof(AliHLTRawBuffer));
- pRawBuffer->fPtr=malloc(reqSize);
- if (pRawBuffer->fPtr) {
- pRawBuffer->fSize=size;
- pRawBuffer->fTotalSize=reqSize;
+ if (pRawBuffer->GetPointer()) {
+ pRawBuffer->UseBuffer(size);
fgActiveBuffers.push_back(pRawBuffer);
- fgLogging.Logging(kHLTLogDebug, "AliHLTDataBuffer::CreateRawBuffer", "data buffer handling", "new raw buffer %p of size %d created (container %p)", pRawBuffer->fPtr, pRawBuffer->fTotalSize, pRawBuffer);
- if (fgkSafetyPatternSize>0) {
- memcpy(((char*)pRawBuffer->fPtr)+pRawBuffer->fSize, fgkSafetyPattern, fgkSafetyPatternSize);
- }
+ fgLogging.Logging(kHLTLogDebug, "AliHLTDataBuffer::CreateRawBuffer", "data buffer handling", "new raw buffer %p of size %d created (container %p)", pRawBuffer->GetPointer(), pRawBuffer->GetTotalSize(), pRawBuffer);
} else {
delete pRawBuffer;
pRawBuffer=NULL;
fgLogging.Logging(kHLTLogError, "AliHLTDataBuffer::CreateRawBuffer", "data buffer handling", "memory allocation failed");
}
}
+ if (pRawBuffer!=NULL && fgkSafetyPatternSize>0) {
+ //fgLogging.Logging(kHLTLogDebug, "AliHLTDataBuffer::CreateRawBuffer", "data buffer handling", "writing safety pattern to %p offset %d", (*buffer)->GetPointer(), (*buffer)->GetUsedSize());
+ int res=pRawBuffer->WritePattern(fgkSafetyPattern, fgkSafetyPatternSize);
+ assert(res>=0);
+ }
return pRawBuffer;
}
}
if (buffer!=fgActiveBuffers.end()) {
if (fgkSafetyPatternSize>0) {
- if (memcmp(((char*)(*buffer)->fPtr)+(*buffer)->fSize, fgkSafetyPattern, fgkSafetyPatternSize)!=0) {
- fgLogging.Logging(kHLTLogFatal, "AliHLTDataBuffer::ReleaseRawBuffer", "data buffer handling", "component has written beyond end of data buffer %p size %d", (*buffer)->fPtr, (*buffer)->fSize);
+ //fgLogging.Logging(kHLTLogDebug, "AliHLTDataBuffer::ReleaseRawBuffer", "data buffer handling", "comparing safety pattern at %p offset %d", (*buffer)->GetPointer(), reinterpret_cast<AliHLTUInt32_t>(*buffer));
+ if ((*buffer)->CheckPattern(fgkSafetyPattern, fgkSafetyPatternSize)) {
+ fgLogging.Logging(kHLTLogFatal, "AliHLTDataBuffer::ReleaseRawBuffer", "data buffer handling", "component has written beyond end of data buffer %p size %d", (*buffer)->GetPointer(), (*buffer)->GetUsedSize());
}
}
- (*buffer)->fSize=0;
+ (*buffer)->Reset();
fgFreeBuffers.push_back(*buffer);
fgActiveBuffers.erase(buffer);
} else {
int iResult=0;
vector<AliHLTRawBuffer*>::iterator buffer=fgFreeBuffers.begin();
while (buffer!=fgFreeBuffers.end()) {
- free((*buffer)->fPtr);
delete *buffer;
fgFreeBuffers.erase(buffer);
buffer=fgFreeBuffers.begin();
}
buffer=fgActiveBuffers.begin();
while (buffer!=fgActiveBuffers.end()) {
- fgLogging.Logging(kHLTLogWarning, "AliHLTDataBuffer::ReleaseRawBuffer", "data buffer handling", "request to delete active raw buffer container (raw buffer %p, size %d)", (*buffer)->fPtr, (*buffer)->fTotalSize);
- free((*buffer)->fPtr);
+ fgLogging.Logging(kHLTLogWarning, "AliHLTDataBuffer::ReleaseRawBuffer", "data buffer handling", "request to delete active raw buffer container (raw buffer %p, size %d)", (*buffer)->GetPointer(), (*buffer)->GetTotalSize());
delete *buffer;
fgActiveBuffers.erase(buffer);
buffer=fgActiveBuffers.begin();
fpBuffer=NULL;
// cleanup consumer states
- vector<AliHLTConsumerDescriptor*>::iterator desc=fReleasedConsumers.begin();
+ vector<AliHLTConsumerDescriptor*>::iterator desc;
+// if (GetNofPendingConsumers()>0) {
+// desc=fConsumers.begin();
+// while (desc!=fConsumers.end()) {
+// AliHLTComponent* pComp=(*desc)->GetComponent();
+// HLTError("internal error: consumer %p (%s %p) did not get data from data buffer %p", *desc, pComp?pComp->GetComponentID():"", pComp, this);
+// desc++;
+// }
+// }
+ desc=fReleasedConsumers.begin();
while (desc!=fReleasedConsumers.end()) {
AliHLTConsumerDescriptor* pDesc=*desc;
fReleasedConsumers.erase(desc);
}
return 0;
}
+
+AliHLTDataBuffer::AliHLTRawBuffer::AliHLTRawBuffer(AliHLTUInt32_t size)
+ :
+ fSize(0),
+ fTotalSize(size),
+ fPtr(static_cast<AliHLTUInt8_t*>(malloc(size)))
+{
+ // see header file for class documentation
+ // or
+ // refer to README to build package
+ // or
+ // visit http://web.ift.uib.no/~kjeks/doc/alice-hlt
+ if (fPtr==NULL) {
+ fSize=0;
+ fTotalSize=0;
+ }
+}
+
+AliHLTDataBuffer::AliHLTRawBuffer::~AliHLTRawBuffer()
+{
+ if (fPtr) {
+ free(fPtr);
+ }
+ fPtr=NULL;
+ fSize=0;
+ fTotalSize=0;
+}
+
+int AliHLTDataBuffer::AliHLTRawBuffer::operator==(void* ptr) const
+{
+ // see header file for function documentation
+ return fPtr == static_cast<AliHLTUInt8_t*>(ptr);
+}
+
+int AliHLTDataBuffer::AliHLTRawBuffer::operator<=(void* ptr) const
+{
+ // see header file for function documentation
+ int iResult=fPtr <= static_cast<AliHLTUInt8_t*>(ptr);
+ //printf("%p: %p <= %p (%d)\n", this, fPtr, ptr, iResult);
+ return iResult;
+}
+
+int AliHLTDataBuffer::AliHLTRawBuffer::operator>(void* ptr) const
+{
+ // see header file for function documentation
+ int iResult=fPtr+fSize > static_cast<AliHLTUInt8_t*>(ptr);
+ //printf("%p: %p + %d > %p (%d)\n", this, fPtr, fSize, ptr, iResult);
+ return iResult;
+}
+
+int AliHLTDataBuffer::AliHLTRawBuffer::operator-(void* ptr) const
+{
+ // see header file for function documentation
+ return static_cast<int>(static_cast<AliHLTUInt8_t*>(ptr)-fPtr);
+}
+
+AliHLTUInt8_t* AliHLTDataBuffer::AliHLTRawBuffer::UseBuffer(AliHLTUInt32_t size)
+{
+ // see header file for function documentation
+ if (size>0 && CheckSize(size)) {
+ fSize=size;
+ return fPtr;
+ }
+ return NULL;
+}
+
+int AliHLTDataBuffer::AliHLTRawBuffer::CheckSize(AliHLTUInt32_t size) const
+{
+ // see header file for function documentation
+ return fTotalSize>=size && ((fTotalSize-size)<fgMargin);
+}
+
+int AliHLTDataBuffer::AliHLTRawBuffer::Reset()
+{
+ // see header file for function documentation
+ fSize=0;
+}
+
+int AliHLTDataBuffer::AliHLTRawBuffer::WritePattern(const char* pattern, int size)
+{
+ // see header file for function documentation
+ int iResult=0;
+ if (pattern!=NULL && size>0) {
+ if (fSize+size<=fTotalSize) {
+ memcpy(((char*)fPtr)+fSize, pattern, size);
+ iResult=size;
+ } else {
+ iResult=-ENOSPC;
+ }
+ }
+ return iResult;
+}
+
+int AliHLTDataBuffer::AliHLTRawBuffer::CheckPattern(const char* pattern, int size) const
+{
+ // see header file for function documentation
+ int iResult=0;
+ if (pattern!=NULL && size>0) {
+ if (fSize+size<=fTotalSize) {
+ iResult=memcmp(((char*)fPtr)+fSize, pattern, size)!=0;
+ } else {
+ iResult=-ENOSPC;
+ }
+ }
+ return iResult;
+}