X-Git-Url: http://git.uio.no/git/?p=u%2Fmrichter%2FAliRoot.git;a=blobdiff_plain;f=RAW%2FAliMDC.cxx;h=618bfffd937b2e6a6aa92cb2bc95e0686c1041d9;hp=2a57a83e2d04d0784c439a2b7fc79196c66ca473;hb=012c556b56b90ff46e219efa69b856d4f3c6f8c6;hpb=149419e43369179d7274b1acc437ab96b0de5177 diff --git a/RAW/AliMDC.cxx b/RAW/AliMDC.cxx index 2a57a83e2d0..618bfffd937 100644 --- a/RAW/AliMDC.cxx +++ b/RAW/AliMDC.cxx @@ -1,4 +1,4 @@ -// @(#)alimdc:$Name$:$Id$ +// @(#)alimdc:$Name: $:$Id$ // Author: Fons Rademakers 26/11/99 // Updated: Dario Favretto 15/04/2003 @@ -33,8 +33,6 @@ // AliRawRFIODB or via rootd using AliRawRootdDB or to CASTOR via // // rootd using AliRawCastorDB (and for performance testing there is // // also AliRawNullDB). // -// The AliRunDB class provides the interface to the run and file // -// catalogues (AliEn or plain MySQL). // // The AliStats class provides statics information that is added as // // a single keyed object to each raw file. // // The AliTagDB provides an interface to a TAG database. // @@ -49,27 +47,22 @@ #include #include -#include +#include #include +#include -#ifdef ALI_DATE -#include "event.h" -#endif +#include #ifdef USE_EB #include "libDateEb.h" #endif -#ifdef USE_HLT -#include -#include "AliL3Logging.h" -#include -#include "AliRawReaderRoot.h" -#include -#include -#endif +#include "AliMDC.h" + +#include +#include #include "AliRawEvent.h" -#include "AliRawEventHeader.h" +#include "AliRawEventHeaderBase.h" #include "AliRawEquipment.h" #include "AliRawEquipmentHeader.h" #include "AliRawData.h" @@ -80,550 +73,651 @@ #include "AliRawRootdDB.h" #include "AliRawNullDB.h" #include "AliTagDB.h" +#include "AliRawEventTag.h" +#include "AliFilter.h" -#include "AliMDC.h" ClassImp(AliMDC) -#define ALIDEBUG(level) \ - if (AliMDC::Instance() && (AliMDC::Instance()->GetDebugLevel() >= (level))) - +// Filter names +const char* const AliMDC::fgkFilterName[kNFilters] = {"AliHoughFilter"}; -// Fixed file system locations for the different DB's -#ifdef USE_RDM -const char* const AliMDC::fgkFifo = "/tmp/alimdc.fifo"; -const char* const AliMDC::fgkRawDBFS[2] = { "/tmp/mdc1", "/tmp/mdc2" }; -const char* const AliMDC::fgkTagDBFS = "/tmp/mdc1/tags"; -const char* const AliMDC::fgkRunDBFS = "/tmp/mdc1/meta"; -const char* const AliMDC::fgkRFIOFS = "rfio:/castor/cern.ch/user/r/rdm"; -const char* const AliMDC::fgkCastorFS = "castor:/castor/cern.ch/user/r/rdm"; -const char* const AliMDC::fgkRootdFS = "root://localhost//tmp/mdc1"; -const char* const AliMDC::fgkAlienHost = "alien://aliens7.cern.ch:15000/?direct"; -const char* const AliMDC::fgkAlienDir = "/alice_mdc/DC"; -#else -const char* const AliMDC::fgkFifo = "/tmp/alimdc.fifo"; -const char* const AliMDC::fgkRawDBFS[2] = { "/data1/mdc", "/data2/mdc" }; -const char* const AliMDC::fgkTagDBFS = "/data1/mdc/tags"; -const char* const AliMDC::fgkRunDBFS = "/data1/mdc/meta"; -const char* const AliMDC::fgkRFIOFS = "rfio:/castor/cern.ch/lcg/dc5"; -const char* const AliMDC::fgkCastorFS = "castor:/castor/cern.ch/lcg/dc5"; -const char* const AliMDC::fgkRootdFS = "root://localhost//tmp/mdc1"; -const char* const AliMDC::fgkAlienHost = "alien://aliens7.cern.ch:15000/?direct"; -const char* const AliMDC::fgkAlienDir = "/alice_mdc/DC"; -#endif - -// Maximum size of tag db files -const Double_t AliMDC::fgkMaxTagFileSize = 2.5e8; // 250MB +//______________________________________________________________________________ +AliMDC::AliMDC(Int_t compress, Bool_t deleteFiles, EFilterMode filterMode, + Double_t maxSizeTagDB, const char* fileNameTagDB, + const char *guidFileFolder, + Int_t basketsize) : + fEvent(new AliRawEvent), + fESD(NULL), + fStats(NULL), + fRawDB(NULL), + fTagDB(NULL), + fEventTag(new AliRawEventTag), + fCompress(compress), + fBasketSize(basketsize), + fDeleteFiles(deleteFiles), + fFilterMode(filterMode), + fFilters(), + fStop(kFALSE), + fIsTagDBCreated(kFALSE), + fMaxSizeTagDB(maxSizeTagDB), + fFileNameTagDB(fileNameTagDB), + fGuidFileFolder(guidFileFolder) +{ + // Create MDC processor object. + // compress is the file compression mode. + // If deleteFiles is kTRUE the raw data files will be deleted after they + // were closed. + // If the filterMode if kFilterOff no filter algorithm will be, if it is + // kFilterTransparent the algorthims will be run but no events will be + // rejected, if it is kFilterOn the filters will be run and the event will + // be rejected if all filters return kFALSE. + // If maxSizeTagDB is greater than 0 it determines the maximal size of the + // tag DB and then fileNameTagDB is the directory name for the tag DB. + // Otherwise fileNameTagDB is the file name of the tag DB. If it is NULL + // no tag DB will be created. + // Optional 'guidFileFolder' specifies the folder in which *.guid files + // will be stored. In case this option is not given, the *.guid files + // will be written to the same folder as the raw-data files. + + // Set the maximum tree size to 19GB + // in order to allow big raw data files + TTree::SetMaxTreeSize(20000000000LL); + + // This line is needed in case of a stand-alone application w/o + // $ROOTSYS/etc/system.rootrc file + gROOT->GetPluginManager()->AddHandler("TVirtualStreamerInfo", + "*", + "TStreamerInfo", + "RIO", + "TStreamerInfo()"); + + if (fFilterMode != kFilterOff) { + fESD = new AliESDEvent(); + } -Bool_t AliMDC::fgDeleteFiles = kFALSE; -AliMDC* AliMDC::fgInstance = NULL; + // Create the guid files folder if it does not exist + if (!fGuidFileFolder.IsNull()) { + gSystem->ResetErrno(); + gSystem->MakeDirectory(fGuidFileFolder.Data()); + if (gSystem->GetErrno() && gSystem->GetErrno() != EEXIST) { + SysError("AliMDC", "mkdir %s", fGuidFileFolder.Data()); + } + } + // install SIGUSR1 handler to allow clean interrupts + gSystem->AddSignalHandler(new AliMDCInterruptHandler(this)); -//______________________________________________________________________________ -AliMDC::AliMDC(Int_t fd, Int_t compress, Double_t maxFileSize, Bool_t useFilter, - EWriteMode mode, Bool_t useLoop, Bool_t delFiles) -{ - // Create MDC processor object. - - fFd = fd; - fCompress = compress; - fMaxFileSize = maxFileSize; - fUseFilter = useFilter; - fWriteMode = mode; - fUseLoop = useLoop; - fUseFifo = kFALSE; - fUseEb = kFALSE; - fStopLoop = kFALSE; - fNumEvents = 0; - fDebugLevel = 0; - fgDeleteFiles = delFiles; - - if (fFd == -1) { -#ifdef USE_EB - if (!ebRegister()) { - Error("AliMDC", "cannot register with the event builder (%s)", - ebGetLastError()); - return; - } - fUseEb = kTRUE; -#else - if ((mkfifo(fgkFifo, 0644) < 0) && (errno != EEXIST)) { - Error("AliMDC", "cannot create fifo %s", fgkFifo); - return; + // create the high level filters + if (fFilterMode != kFilterOff) { + for (Int_t iFilter = 0; iFilter < kNFilters; iFilter++) { + TClass* filterClass = gROOT->GetClass(fgkFilterName[iFilter]); + if (!filterClass) { + Warning("AliMDC", "no filter class %s found", fgkFilterName[iFilter]); + continue; } - if ((chmod(fgkFifo, 0666) == -1) && (errno != EPERM)) { - Error("AliMDC", "cannot change permission of fifo %s", fgkFifo); - return; + AliFilter* filter = (AliFilter*) filterClass->New(); + if (!filter) { + Warning("AliMDC", "creation of filter %s failed", fgkFilterName[iFilter]); + continue; } - if ((fFd = open(fgkFifo, O_RDONLY)) == -1) { - Error("AliMDC", "cannot open input file %s", fgkFifo); - return; - } - fUseFifo = kTRUE; -#endif - fUseLoop = kFALSE; - } - - printf(": input = %s, rawdb size = %f, filter = %s, " - "looping = %s, compression = %d, delete files = %s", - fUseFifo ? "fifo" : (fUseEb ? "eb" : "file"), fMaxFileSize, - fUseFilter ? "on" : "off", fUseLoop ? "yes" : "no", fCompress, - fgDeleteFiles ? "yes" : "no"); - if (fWriteMode == kRFIO) - printf(", use RFIO\n"); - else if (fWriteMode == kROOTD) - printf(", use rootd\n"); - else if (fWriteMode == kCASTOR) - printf(", use CASTOR/rootd\n"); - else if (fWriteMode == kDEVNULL) - printf(", write raw data to /dev/null\n"); - else - printf("\n"); - - // install SIGUSR1 handler to allow clean interrupts - gSystem->AddSignalHandler(new AliMDCInterruptHandler(this)); - - fgInstance = this; + fFilters.Add(filter); + } + } } //______________________________________________________________________________ -AliMDC::AliMDC(const AliMDC& mdc): TObject(mdc) +AliMDC::~AliMDC() { -// copy constructor - - Fatal("AliMDC", "copy constructor not implemented"); +// destructor + + fFilters.Delete(); + if(fTagDB) delete fTagDB; + delete fRawDB; + delete fStats; + delete fESD; + delete fEvent; + delete fEventTag; } - + //______________________________________________________________________________ -AliMDC& AliMDC::operator = (const AliMDC& /*mdc*/) +Int_t AliMDC::Open(EWriteMode mode, const char* fileName, + Double_t maxFileSize, + const char* fs1, const char* fs2) { -// assignment operator +// open a new raw DB file + + if (mode == kRFIO) + fRawDB = new AliRawRFIODB(fEvent, fESD, fCompress, fileName, fBasketSize); + else if (mode == kROOTD) + fRawDB = new AliRawRootdDB(fEvent, fESD, fCompress, fileName, fBasketSize); + else if (mode == kCASTOR) + fRawDB = new AliRawCastorDB(fEvent, fESD, fCompress, fileName, fBasketSize); + else if (mode == kDEVNULL) + fRawDB = new AliRawNullDB(fEvent, fESD, fCompress, fileName, fBasketSize); + else + fRawDB = new AliRawDB(fEvent, fESD, fCompress, fileName, fBasketSize); + fRawDB->SetDeleteFiles(fDeleteFiles); + + if (fRawDB->IsZombie()) { + delete fRawDB; + fRawDB = NULL; + return -1; + } + + if (fileName == NULL) { + fRawDB->SetMaxSize(maxFileSize); + fRawDB->SetFS(fs1, fs2); + if (!fRawDB->Create()) { + delete fRawDB; + fRawDB = NULL; + return -1; + } + } - Fatal("operator =", "assignment operator not implemented"); - return *this; + if (!fRawDB->WriteGuidFile(fGuidFileFolder)) { + delete fRawDB; + fRawDB = NULL; + return -2; + } + + Info("Open", "Filling raw DB %s\n", fRawDB->GetDBName()); + + // Create AliStats object + fStats = new AliStats(fRawDB->GetDBName(), fCompress, + fFilterMode != kFilterOff); + return 0; } //______________________________________________________________________________ -Int_t AliMDC::Run() +Int_t AliMDC::ProcessEvent(void* event, Bool_t isIovecArray) { - // Run the MDC processor. Read from the input stream and only return - // when the input gave and EOF or a fatal error occured. On success 0 - // is returned, 1 in case of a fatality. - - TStopwatch timer; - Int_t status; - - // Make sure needed directories exist - const char *dirs[4]; - dirs[0] = fgkRawDBFS[0]; - dirs[1] = fgkRawDBFS[1]; - dirs[2] = fgkTagDBFS; - dirs[3] = fgkRunDBFS; - for (int idir = 0; idir < 4; idir++) { - gSystem->ResetErrno(); - gSystem->MakeDirectory(dirs[idir]); - if (gSystem->GetErrno() && gSystem->GetErrno() != EEXIST) { - SysError("Run", "mkdir %s", dirs[idir]); - return 1; - } - } +// Convert the DATE event to an AliRawEvent object and store it in the raw DB, +// optionally also run the filter. +// event is either a pointer to the streamlined event +// or, if isIovecArray is kTRUE, a pointer to an array of iovecs with one +// iovec per subevent (used by the event builder). +// The return value is the number of written bytes or an error code + const Long64_t kFileSizeErrorLevel = 19000000000LL; + + Long64_t currentFileSize = GetTotalSize(); + AliDebug(1,Form("current file size is %lld bytes",currentFileSize)); + if(currentFileSize > kFileSizeErrorLevel) { + Error("ProcessEvent", "file size (%lu) exceeds the limit " + , currentFileSize); + return kErrFileSize; + } - // Used for statistics - timer.Start(); - Double_t told = 0, tnew = 0; - Float_t chunkSize = fMaxFileSize/100, nextChunk = chunkSize; - - // Event object used to store event data. - AliRawEvent *event = new AliRawEvent; -#ifdef USE_HLT - //Init HLT -#ifndef use_logging - AliL3Log::fgLevel=AliL3Log::kError; - ALIDEBUG(1) - AliL3Log::fgLevel=AliL3Log::kWarning; - ALIDEBUG(2) - AliL3Log::fgLevel=AliL3Log::kWarning; - ALIDEBUG(3) - AliL3Log::fgLevel=AliL3Log::kNone; -#endif - if (!AliL3Transform::Init("./", kFALSE)) { - Error("Run","HLT initialization failed!"); - return 1; - } + Int_t status; + char* data = (char*) event; + if (isIovecArray) data = (char*) ((iovec*) event)[0].iov_base; - AliESD *esd = new AliESD; -#endif + // Shortcut for easy header access + AliRawEventHeaderBase *header = fEvent->GetHeader(data); - // Create new raw DB. - AliRawDB *rawdb; - if (fWriteMode == kRFIO) - rawdb = new AliRawRFIODB(event, esd, fMaxFileSize, fCompress); - else if (fWriteMode == kROOTD) - rawdb = new AliRawRootdDB(event, esd, fMaxFileSize, fCompress); - else if (fWriteMode == kCASTOR) - rawdb = new AliRawCastorDB(event, esd, fMaxFileSize, fCompress); - else if (fWriteMode == kDEVNULL) - rawdb = new AliRawNullDB(event, esd, fMaxFileSize, fCompress); - else - rawdb = new AliRawDB(event, esd, fMaxFileSize, fCompress); - - if (rawdb->IsZombie()) return 1; - printf("Filling raw DB %s\n", rawdb->GetDBName()); - - // Create new tag DB. - AliTagDB *tagdb = 0; -#if 0 - // no tagdb for the time being to get maximum speed - if (fWriteMode == fgkDEVNULL) - tagdb = new AliTagNullDB(event->GetHeader(), fgkMaxTagFileSize); - else - tagdb = new AliTagDB(event->GetHeader(), fgkMaxTagFileSize); - if (tagdb->IsZombie()) - tagdb = 0; - else - printf("Filling tag DB %s\n", tagdb->GetDBName()); -#endif + // Read event header + if ((status = header->ReadHeader(data)) != (Int_t)header->GetHeadSize()) { + return kErrHeader; + } - // Create AliStats object - AliStats *stats = new AliStats(rawdb->GetDBName(), fCompress, fUseFilter); + if (AliDebugLevel() > 2) ToAliDebug(3, header->Dump();); + + // Check event type and skip "Start of Run", "End of Run", + // "Start of Run Files" and "End of Run Files" + Int_t size = header->GetEventSize() - header->GetHeadSize(); + + AliDebug(1, Form("Processing %s (%d bytes)", header->GetTypeName(), size)); + + // Amount of data left to read for this event + Int_t toRead = size; + + // StartOfRun, EndOfRun etc. events have no payload + // Nevertheless, store the event headers in the tree + if (toRead > 0) { + + // If there is less data for this event than the next sub-event + // header, something is wrong. Skip to next event... + if (toRead < (Int_t)header->GetHeadSize()) { + Error("ProcessEvent", "header size (%d) exceeds number of bytes " + "to read (%d)", header->GetHeadSize(), toRead); + if (AliDebugLevel() > 0) ToAliDebug(1, header->Dump();); + return kErrHeaderSize; + } + + // Loop over all sub-events... (LDCs) + Int_t nsub = 1; + while (toRead > 0) { + if (isIovecArray) data = (char*) ((iovec*) event)[nsub].iov_base; + + AliDebug(1, Form("reading LDC %d", nsub)); + + AliRawEvent *subEvent = fEvent->NextSubEvent(); + + // Read sub-event header + AliRawEventHeaderBase *subHeader = subEvent->GetHeader(data); + if ((status = subHeader->ReadHeader(data)) != (Int_t)subHeader->GetHeadSize()) { + return kErrSubHeader; + } - // Shortcut for easy header access - AliRawEventHeader &header = *event->GetHeader(); + if (AliDebugLevel() > 2) ToAliDebug(3, subHeader->Dump();); - // Process input stream -#ifdef USE_EB - Int_t eorFlag = 0; - while (!(eorFlag = ebEor())) { - struct iovec *ebvec; - if ((ebvec = ebGetNextEvent()) == (void *)-1) { - Error("Run", "error getting next event (%s)", ebGetLastError()); - break; - } - if (ebvec == 0) { - // no event, sleep for 1 second and try again - gSystem->Sleep(1000); - continue; - } - char *ebdata = (char *) ebvec[0].iov_base; -#else - while (1) { - char *ebdata = 0; -#endif + toRead -= subHeader->GetHeadSize(); - // Read event header - if ((status = ReadHeader(header, ebdata)) != header.HeaderSize()) { - if (status == 0) { - if (fUseLoop) { -#ifndef USE_EB - ::lseek(fFd, 0, SEEK_SET); -#endif - continue; - } - printf(": EOF, processed %d events\n", fNumEvents); - break; - } - return 1; + Int_t rawSize = subHeader->GetEventSize() - subHeader->GetHeadSize(); + + // Make sure raw data less than left over bytes for current event + if (rawSize > toRead) { + Warning("ProcessEvent", "raw data size (%d) exceeds number of " + "bytes to read (%d)\n", rawSize, toRead); + if (AliDebugLevel() > 0) ToAliDebug(1, subHeader->Dump();); + return kErrDataSize; } - ALIDEBUG(3) - header.Dump(); - // If we were in looping mode stop directly after a SIGUSR1 signal - if (StopLoop()) { - Info("Run", "Stopping loop, processed %d events", fNumEvents); - break; + // Read Equipment Headers (in case of physics or calibration event) + if (header->Get("Type") == AliRawEventHeaderBase::kPhysicsEvent || + header->Get("Type") == AliRawEventHeaderBase::kCalibrationEvent || + header->Get("Type") == AliRawEventHeaderBase::kSystemSoftwareTriggerEvent || + header->Get("Type") == AliRawEventHeaderBase::kDetectorSoftwareTriggerEvent || + header->Get("Type") == AliRawEventHeaderBase::kStartOfData || + header->Get("Type") == AliRawEventHeaderBase::kEndOfData) { + while (rawSize > 0) { + AliRawEquipment &equipment = *subEvent->NextEquipment(); + AliRawEquipmentHeader &equipmentHeader = + *equipment.GetEquipmentHeader(); + Int_t equipHeaderSize = equipmentHeader.HeaderSize(); + if ((status = ReadEquipmentHeader(equipmentHeader, header->DataIsSwapped(), + data)) != equipHeaderSize) { + return kErrEquipmentHeader; + } + + if (AliDebugLevel() > 2) ToAliDebug(3, equipmentHeader.Dump();); + + toRead -= equipHeaderSize; + rawSize -= equipHeaderSize; + + // Read equipment raw data + AliRawData &subRaw = *equipment.GetRawData(); + + Int_t eqSize = equipmentHeader.GetEquipmentSize() - equipHeaderSize; + if ((status = ReadRawData(subRaw, eqSize, data)) != eqSize) { + return kErrEquipment; + } + toRead -= eqSize; + rawSize -= eqSize; + + } + + } else { // Read only raw data but no equipment header + if (rawSize) { + AliRawEquipment &equipment = *subEvent->NextEquipment(); + AliRawData &subRaw = *equipment.GetRawData(); + if ((status = ReadRawData(subRaw, rawSize, data)) != rawSize) { + return kErrEquipment; + } + toRead -= rawSize; + } } - // Check if event has any hard track flagged - Bool_t callFilter = kFALSE; - if (fUseFilter) - callFilter = kTRUE; - - // Check event type and skip "Start of Run", "End of Run", - // "Start of Run Files" and "End of Run Files" - switch (header.GetType()) { - case AliRawEventHeader::kStartOfRun: - case AliRawEventHeader::kEndOfRun: - case AliRawEventHeader::kStartOfRunFiles: - case AliRawEventHeader::kEndOfRunFiles: - { - Int_t skip = header.GetEventSize() - header.HeaderSize(); -#ifndef USE_EB - ::lseek(fFd, skip, SEEK_CUR); -#endif - ALIDEBUG(1) - Info("Run", "Skipping %s (%d bytes)", header.GetTypeName(), skip); - continue; - } - default: - ALIDEBUG(1) { - Int_t s = header.GetEventSize() - header.HeaderSize(); - Info("Run", "Processing %s (%d bytes)", header.GetTypeName(), s); - } + nsub++; + } + } + + // High Level Event Filter + if (fFilterMode != kFilterOff) { + if (header->Get("Type") == AliRawEventHeaderBase::kPhysicsEvent || + header->Get("Type") == AliRawEventHeaderBase::kCalibrationEvent || + header->Get("Type") == AliRawEventHeaderBase::kSystemSoftwareTriggerEvent || + header->Get("Type") == AliRawEventHeaderBase::kDetectorSoftwareTriggerEvent || + header->Get("Type") == AliRawEventHeaderBase::kStartOfData || + header->Get("Type") == AliRawEventHeaderBase::kEndOfData) { + Bool_t result = kFALSE; + for (Int_t iFilter = 0; iFilter < fFilters.GetEntriesFast(); iFilter++) { + AliFilter* filter = (AliFilter*) fFilters[iFilter]; + if (!filter) continue; + if (filter->Filter(fEvent, fESD)) result = kTRUE; } + if ((fFilterMode == kFilterOn) && !result) return kFilterReject; + } + } - // Amount of data left to read for this event - Int_t toRead = header.GetEventSize() - header.HeaderSize(); - - // If there is less data for this event than the next sub-event - // header, something is wrong. Skip to next event... - if (toRead < header.HeaderSize()) { - ALIDEBUG(1) { - Warning("Run", - "header size (%d) exceeds number of bytes to read (%d)\n", - header.HeaderSize(), toRead); - header.Dump(); - } - if ((status = DumpEvent(toRead)) != toRead) { - if (status == 0) - break; - return 1; - } - Error("Run", "discarding event %d (too little data for header)", fNumEvents); - continue; + // Set stat info for first event of this file + if (fRawDB->GetEvents() == 0) + fStats->SetFirstId(header->Get("RunNb"), header->GetP("Id")[0]); + + // Store raw event in tree + Int_t nBytes = fRawDB->Fill(); + + // Fill the event tag object + fEventTag->SetHeader(fEvent->GetHeader()); + fEventTag->SetGUID(fRawDB->GetDB()->GetUUID().AsString()); + fEventTag->SetEventNumber(fRawDB->GetEvents()-1); + + // Create Tag DB here only after the raw data header + // version was already identified + if (!fIsTagDBCreated) { + if (!fFileNameTagDB.IsNull()) { + if (fMaxSizeTagDB > 0) { + fTagDB = new AliTagDB(fEventTag, NULL); + fTagDB->SetMaxSize(fMaxSizeTagDB); + fTagDB->SetFS(fFileNameTagDB.Data()); + if (!fTagDB->Create()) return kErrTagFile; + } else { + fTagDB = new AliTagDB(fEventTag, fFileNameTagDB.Data()); + if (fTagDB->IsZombie()) return kErrTagFile; } + } + fIsTagDBCreated = kTRUE; + } - // Loop over all sub-events... (LDCs) - Int_t nsub = 1; - while (toRead > 0) { -#ifdef USE_EB - ebdata = (char *)ebvec[nsub].iov_base; -#endif + // Store event tag in tree + if (fTagDB) fTagDB->Fill(); - ALIDEBUG(1) - Info("Run", "reading LDC %d", nsub); + // Make top event object ready for next event data + fEvent->Reset(); + // Clean up HLT ESD for the next event + if (fESD) fESD->Reset(); - AliRawEvent *subEvent = event->NextSubEvent(); + if(nBytes >= 0) + return nBytes; + else + return kErrWriting; +} - // Read sub-event header - AliRawEventHeader &subHeader = *subEvent->GetHeader(); - if ((status = ReadHeader(subHeader, ebdata)) != subHeader.HeaderSize()) { - if (status == 0) { - Error("Run", "unexpected EOF reading sub-event header"); - break; - } - return 1; - } +//______________________________________________________________________________ +Long64_t AliMDC::GetTotalSize() +{ +// return the total current raw DB file size - ALIDEBUG(3) - subHeader.Dump(); + if (!fRawDB) return -1; - toRead -= subHeader.HeaderSize(); + return fRawDB->GetTotalSize(); +} -#ifdef USE_EB - ebdata = (char *)(ebvec[nsub].iov_base) + subHeader.HeaderSize(); -#endif +//______________________________________________________________________________ +Long64_t AliMDC::Close() +{ +// close the current raw DB file - Int_t rawSize = subHeader.GetEventSize() - subHeader.HeaderSize(); - - // Make sure raw data less than left over bytes for current event - if (rawSize > toRead) { - ALIDEBUG(1) { - Warning("Run", "raw data size (%d) exceeds number of " - "bytes to read (%d)\n", rawSize, toRead); - subHeader.Dump(); - } - if ((status = DumpEvent(toRead)) != toRead) { - if (status == 0) - break; - return 1; - } - Error("Run", "discarding event %d (too much data)", fNumEvents); - continue; - } - - // Read Equipment Headers (in case of physics or calibration event) - if (header.GetType() == AliRawEventHeader::kPhysicsEvent || - header.GetType() == AliRawEventHeader::kCalibrationEvent) { - while (rawSize > 0) { - AliRawEquipment &equipment = *subEvent->NextEquipment(); - AliRawEquipmentHeader &equipmentHeader = - *equipment.GetEquipmentHeader(); - Int_t equipHeaderSize = equipmentHeader.HeaderSize(); - if ((status = ReadEquipmentHeader(equipmentHeader, header.DataIsSwapped(), - ebdata)) != equipHeaderSize) { - if (status == 0) { - Error("Run", "unexpected EOF reading equipment-header"); - break; - } - return 1; - } - toRead -= equipHeaderSize; - rawSize -= equipHeaderSize; -#ifdef USE_EB - ebdata = (char *)(ebvec[nsub].iov_base) + - subHeader.HeaderSize() + equipHeaderSize; -#endif + if (!fRawDB) return -1; - // Read equipment raw data - AliRawData &subRaw = *equipment.GetRawData(); - - Int_t eqSize = equipmentHeader.GetEquipmentSize() - - equipHeaderSize; - if ((status = ReadRawData(subRaw, eqSize, ebdata)) != eqSize) { - if (status == 0) { - Error("Run", "unexpected EOF reading sub-event raw data"); - break; - } - return 1; - } - toRead -= eqSize; - rawSize -= eqSize; + fRawDB->WriteStats(fStats); + Long64_t filesize = fRawDB->Close(); + delete fRawDB; + fRawDB = NULL; + delete fStats; + fStats = NULL; + return filesize; +} - } +//______________________________________________________________________________ +Long64_t AliMDC::AutoSave() +{ + // Auto-save the raw-data + // and esd (if any) trees - } else { // Read only raw data but no equipment header - AliRawEquipment &equipment = *subEvent->NextEquipment(); - AliRawData &subRaw = *equipment.GetRawData(); - if ((status = ReadRawData(subRaw, rawSize, ebdata)) != rawSize) { - if (status == 0) { - Error("Run", "unexpected EOF reading sub-event raw data"); - break; - } - return 1; - } - toRead -= rawSize; - - } - - nsub++; - } + if (!fRawDB) return -1; - //HLT - if (callFilter) { -#ifdef ALI_DATE - if(header.GetType() == AliRawEventHeader::kPhysicsEvent || - header.GetType() == AliRawEventHeader::kCalibrationEvent) - Filter( -#ifdef USE_HLT - event,esd -#endif - ); -#endif - } + return fRawDB->AutoSave(); +} - // Set stat info for first event of this file - if (rawdb->GetEvents() == 0) - stats->SetFirstId(header.GetRunNumber(), header.GetEventInRun()); +//______________________________________________________________________________ +Int_t AliMDC::Run(const char* inputFile, Bool_t loop, + EWriteMode mode, Double_t maxFileSize, + const char* fs1, const char* fs2) +{ + // Run the MDC processor. Read from the input stream and only return + // when the input gave and EOF or a fatal error occured. On success 0 + // is returned, 1 in case of a fatality. + // inputFile is the name of the DATE input file; if NULL the input will + // be taken from the event builder. + // If loop is set the same input file will be reused in an infinite loop. + // mode specifies the type of the raw DB. + // maxFileSize is the maximal size of the raw DB. + // fs1 and fs2 are the file system locations of the raw DB. + + Info("Run", "input = %s, rawdb size = %f, filter = %s, " + "looping = %s, compression = %d, delete files = %s", + inputFile ? inputFile : "event builder", maxFileSize, + fFilterMode == kFilterOff ? "off" : + (fFilterMode == kFilterOn ? "on" : "transparent"), + loop ? "yes" : "no", fCompress, fDeleteFiles ? "yes" : "no"); + + // Open the input file + Int_t fd = -1; + if (inputFile) { + if ((fd = open(inputFile, O_RDONLY)) == -1) { + Error("Run", "cannot open input file %s", inputFile); + return 1; + } + } - // Store raw event in tree - rawdb->Fill(); + // Used for statistics + TStopwatch timer; + timer.Start(); + Double_t told = 0, tnew = 0; + Float_t chunkSize = maxFileSize/100, nextChunk = chunkSize; - // Store header in tree - if (tagdb) tagdb->Fill(); + // Create new raw DB. + if (fRawDB) Close(); - fNumEvents++; + if (Open(mode,NULL,maxFileSize,fs1,fs2) < 0) return 1; - if (!(fNumEvents%10)) - printf("Processed event %d (%d)\n", fNumEvents, rawdb->GetEvents()); + // Process input stream +#ifdef USE_EB + Int_t eorFlag = 0; +#endif + char* event = NULL; + UInt_t eventSize = 0; + Int_t numEvents = 0; - // Filling time statistics - if (rawdb->GetBytesWritten() > nextChunk) { - tnew = timer.RealTime(); - stats->Fill(tnew-told); - told = tnew; - timer.Continue(); - nextChunk += chunkSize; - } + AliRawEventHeaderBase header; - // Check size of raw db. If bigger than maxFileSize, close file - // and continue with new file. - if (rawdb->FileFull()) { + while (kTRUE) { - printf("Written raw DB at a rate of %.1f MB/s\n", - rawdb->GetBytesWritten() / timer.RealTime() / 1000000.); + // If we were in looping mode stop directly after a SIGUSR1 signal + if (fStop) { + Info("Run", "Stopping loop, processed %d events", numEvents); + break; + } - // Write stats object to raw db, run db, MySQL and AliEn - stats->WriteToDB(rawdb); - delete stats; + if (!inputFile) { // get data from event builder +#ifdef USE_EB + if ((eorFlag = ebEor())) break; + if ((event = (char*)ebGetNextEvent()) == (char*)-1) { + Error("Run", "error getting next event (%s)", ebGetLastError()); + break; + } + if (event == 0) { + // no event, sleep for 1 second and try again + gSystem->Sleep(1000); + continue; + } +#else + Error("Run", "AliMDC was compiled without event builder support"); + delete fRawDB; + fRawDB = NULL; + delete fStats; + fStats = NULL; + return 1; +#endif - if (!rawdb->NextFile()) { - Error("Run", "error opening next raw data file"); - return 1; - } + } else { // get data from a file + { + Int_t nrecv; + if ((nrecv = Read(fd, header.HeaderBaseBegin(), header.HeaderBaseSize())) != + header.HeaderBaseSize()) { + if (nrecv == 0) { // eof + if (loop) { + ::lseek(fd, 0, SEEK_SET); + continue; + } else { + break; + } + } else { + Error("Run", "error reading base header"); + Close(); + delete[] event; + return 1; + } + } + } + char *data = (char *)header.HeaderBaseBegin(); + AliRawEventHeaderBase *hdr = AliRawEventHeaderBase::Create(data); + Int_t nrecv; + if ((nrecv = Read(fd, hdr->HeaderBegin(), hdr->HeaderSize())) != + hdr->HeaderSize()) { + if (nrecv == 0) { // eof + if (loop) { + ::lseek(fd, 0, SEEK_SET); + delete hdr; + continue; + } else { + delete hdr; + break; + } + } else { + Error("Run", "error reading header"); + Close(); + delete[] event; + delete hdr; + return 1; + } + } + if (eventSize < hdr->GetEventSize()) { + delete[] event; + eventSize = 2 * hdr->GetEventSize(); + event = new char[eventSize]; + } + memcpy(event, header.HeaderBaseBegin(), header.HeaderBaseSize()); + memcpy(event+hdr->HeaderBaseSize(), hdr->HeaderBegin(), hdr->HeaderSize()); + if (hdr->GetExtendedDataSize() != 0) + memcpy(event+hdr->HeaderBaseSize()+hdr->HeaderSize(), + hdr->GetExtendedData(), hdr->GetExtendedDataSize()); + Int_t size = hdr->GetEventSize() - hdr->GetHeadSize(); + if (Read(fd, event + hdr->GetHeadSize(), size) != size) { + Error("Run", "error reading data"); + Close(); + delete[] event; + delete hdr; + return 1; + } + delete hdr; + } + + Int_t result = ProcessEvent(event, !inputFile); + if(result < -1) + Error("Run", "error writing data. Error code: %d",result); - printf("Filling raw DB %s\n", rawdb->GetDBName()); - stats = new AliStats(rawdb->GetDBName(), fCompress, fUseFilter); + if (result >= 0) { + numEvents++; + if (!(numEvents%10)) + printf("Processed event %d (%d)\n", numEvents, fRawDB->GetEvents()); + } + + if (result > 0) { + // Filling time statistics + if (fRawDB->GetBytesWritten() > nextChunk) { + tnew = timer.RealTime(); + fStats->Fill(tnew-told); + told = tnew; + timer.Continue(); + nextChunk += chunkSize; + } - timer.Start(); - told = 0, tnew = 0; - nextChunk = chunkSize; + // Check size of raw db. If bigger than maxFileSize, close file + // and continue with new file. + if (fRawDB->GetBytesWritten() > maxFileSize) { + + printf("Written raw DB at a rate of %.1f MB/s\n", + fRawDB->GetBytesWritten() / timer.RealTime() / 1000000.); + + // Write stats object to raw db, run db, MySQL and AliEn + fRawDB->WriteStats(fStats); + delete fStats; + fStats = NULL; + + if (!fRawDB->NextFile()) { + Error("Run", "error opening next raw data file"); + Close(); + if (inputFile) delete[] event; + return 1; + } + + printf("Filling raw DB %s\n", fRawDB->GetDBName()); + fStats = new AliStats(fRawDB->GetDBName(), fCompress, + fFilterMode != kFilterOff); + + timer.Start(); + told = 0, tnew = 0; + nextChunk = chunkSize; } // Check size of tag db - if (tagdb && tagdb->FileFull()) { - if (!tagdb->NextFile()) - tagdb = 0; - else - printf("Filling tag DB %s\n", tagdb->GetDBName()); + if (fTagDB && fTagDB->FileFull()) { + if (!fTagDB->NextFile()) { + delete fTagDB; + fTagDB = 0; + } else { + printf("Filling tag DB %s\n", fTagDB->GetDBName()); + } } + } - // Make top event object ready for next event data - //printf("Event %d has %d sub-events\n", fNumEvents, event->GetNSubEvents()); - event->Reset(); -#ifdef USE_HLT - // Clean up HLT ESD for the next event - // Probably we could add esd->Reset() method to AliESD? - esd->Reset(); -#endif + // Make top event object ready for next event data + //printf("Event %d has %d sub-events\n", numEvents, fEvent->GetNSubEvents()); + // fEvent->Reset(); + // Clean up HLT ESD for the next event + // if (fESD) fESD->Reset(); + + if (!inputFile) { #ifdef USE_EB - if (!ebReleaseEvent(ebvec)) { - Error("Run", "problem releasing event (%s)", ebGetLastError()); - break; + if (!ebReleaseEvent((iovec*)event)) { + Error("Run", "problem releasing event (%s)", ebGetLastError()); + break; } #endif - } - - printf("Written raw DB at a rate of %.1f MB/s\n", - rawdb->GetBytesWritten() / timer.RealTime() / 1000000.); - - // Write stats to raw db and run db and delete stats object - stats->WriteToDB(rawdb); - delete stats; - - // Close the raw DB - delete rawdb; - - // Close the tag DB - delete tagdb; + } + } - // Close input source - close(fFd); + printf("Written raw DB at a rate of %.1f MB/s\n", + fRawDB->GetBytesWritten() / timer.RealTime() / 1000000.); -#if 0 - // Cleanup fifo - if (fUseFifo && ::unlink(fgkFifo) == -1) { - SysError("Run", "unlink"); - return 1; - } -#endif + // Write stats to raw db and run db and delete stats object + Close(); + if (!inputFile) { #ifdef USE_EB - // Print eor flag - if (eorFlag) { + // Print eor flag + if (eorFlag) { Info("Run", "event builder reported end of run (%d)", eorFlag); - } + } #endif + } else { + // Close input source + close(fd); + delete [] event; + } - return 0; + return 0; } //______________________________________________________________________________ -Int_t AliMDC::Read(void *buffer, Int_t length) +Int_t AliMDC::Read(Int_t fd, void *buffer, Int_t length) { // Read exactly length bytes into buffer. Returns number of bytes // received, returns -1 in case of error and 0 for EOF. errno = 0; - if (fFd < 0) return -1; + if (fd < 0) return -1; Int_t n, nrecv = 0; char *buf = (char *)buffer; for (n = 0; n < length; n += nrecv) { - if ((nrecv = read(fFd, buf+n, length-n)) <= 0) { + if ((nrecv = read(fd, buf+n, length-n)) <= 0) { if (nrecv == 0) break; // EOF if (errno != EINTR) @@ -634,242 +728,50 @@ Int_t AliMDC::Read(void *buffer, Int_t length) return n; } -//______________________________________________________________________________ -Int_t AliMDC::ReadHeader(AliRawEventHeader &header, void *eb) -{ - // Read header info from DATE data stream. Returns bytes read (i.e. - // AliRawEventHeader::HeaderSize()), -1 in case of error and 0 for EOF. - - Int_t nrecv; - - if (eb) { - // read from event builder memory area - memcpy(header.HeaderBegin(), eb, header.HeaderSize()); - nrecv = header.HeaderSize(); - } else { - // read from fifo or file - if ((nrecv = Read(header.HeaderBegin(), header.HeaderSize())) != - header.HeaderSize()) { - if (nrecv == 0) - return 0; - return -1; - } - } - - // Swap header data if needed - if (header.IsSwapped()) - header.Swap(); - - // Is header valid... - if (!header.IsValid()) { - Error("ReadHeader", "invalid header format"); - // try recovery... how? - return -1; - } - if (header.GetEventSize() < (UInt_t)header.HeaderSize()) { - Error("ReadHeader", "invalid header size"); - // try recovery... how? - return -1; - } - - return nrecv; -} - //______________________________________________________________________________ Int_t AliMDC::ReadEquipmentHeader(AliRawEquipmentHeader &header, - Bool_t isSwapped, void *eb) + Bool_t isSwapped, char*& data) { - // Read equipment header info from DATE data stream. Returns bytes read - // (i.e. AliRawEquipmentHeader::HeaderSize()), -1 in case of error and - // 0 for EOF. If isSwapped is kTRUE the event data is byte swapped - // and we will swap the header to host format. - - Int_t nrecv; - - if (eb) { - // read from event builder memory area - memcpy(header.HeaderBegin(), eb, header.HeaderSize()); - nrecv = header.HeaderSize(); - } else { - // read from fifo or file - if ((nrecv = Read(header.HeaderBegin(), header.HeaderSize())) != - header.HeaderSize()) { - if (nrecv == 0) - return 0; - return -1; - } - } - - // Swap equipment header data if needed - if (isSwapped) - header.Swap(); - - if (header.GetEquipmentSize() < (UInt_t)header.HeaderSize()) { - Error("ReadEquipmentHeader", "invalid equipment header size"); - // try recovery... how? - return -1; - } - - return nrecv; -} - -//______________________________________________________________________________ -Int_t AliMDC::ReadRawData(AliRawData &raw, Int_t size, void *eb) -{ - // Read raw data from DATE data stream. Returns bytes read (i.e. - // AliRawEventHeader::HeaderSize()), -1 in case of error and 0 for EOF. - - Int_t nrecv; - - if (eb) { - // read from event builder memory area - raw.SetBuffer(eb, size); - nrecv = size; - } else { - // read from fifo or file - raw.SetSize(size); - if ((nrecv = Read(raw.GetBuffer(), size)) != size) { - if (nrecv == 0) { - Error("ReadRawData", "unexpected EOF"); - return 0; - } - return -1; - } - } - - return nrecv; -} - -//______________________________________________________________________________ -Int_t AliMDC::DumpEvent(Int_t toRead) -{ - // This case should not happen, but if it does try to handle it - // gracefully by reading the rest of the event and discarding it. - // Returns bytes read, -1 in case of fatal error and 0 for EOF. - - Error("DumpEvent", "dumping %d bytes of event %d", toRead, fNumEvents); - - Int_t nrecv; - char *tbuf = new char[toRead]; - if ((nrecv = Read(tbuf, toRead)) != toRead) { - if (nrecv == 0) { - Error("DumpEvent", "unexpected EOF"); - return 0; - } - return -1; - } - delete [] tbuf; + // Read equipment header info from DATE data stream. Returns bytes read + // (i.e. AliRawEquipmentHeader::HeaderSize()), -1 in case of error and + // 0 for EOF. If isSwapped is kTRUE the event data is byte swapped + // and we will swap the header to host format. + + memcpy(header.HeaderBegin(), data, header.HeaderSize()); + data += header.HeaderSize(); + + // Swap equipment header data if needed + if (isSwapped) + header.Swap(); + + if (header.GetEquipmentSize() < (UInt_t)header.HeaderSize()) { + Error("ReadEquipmentHeader", "invalid equipment header size"); + // try recovery... how? + return -1; + } - return nrecv; + return header.HeaderSize(); } //______________________________________________________________________________ -Int_t AliMDC::Filter( -#ifdef USE_HLT - AliRawEvent *event,AliESD *esd -#endif - ) +Int_t AliMDC::ReadRawData(AliRawData &raw, Int_t size, char*& data) { - // Call 3rd level filter for this raw data event. - -#ifdef USE_HLT - - // Run the HLT code - { - TStopwatch timer; - timer.Start(); - - AliL3Hough *hough1 = new AliL3Hough(); - - hough1->SetThreshold(4); - hough1->SetTransformerParams(76,140,0.4,-1); - hough1->SetPeakThreshold(70,-1); - // Attention Z of the vertex to be taken from the event head! - // So far for debug purposes it is fixed by hand... - hough1->Init(100,4,event,3.82147); - hough1->SetAddHistograms(); - - AliL3Hough *hough2 = new AliL3Hough(); - - hough2->SetThreshold(4); - hough2->SetTransformerParams(76,140,0.4,-1); - hough2->SetPeakThreshold(70,-1); - hough2->Init(100,4,event,3.82147); - hough2->SetAddHistograms(); - - Int_t nglobaltracks = 0; - /* In case we run HLT code in 2 threads */ - hough1->StartProcessInThread(0,17); - hough2->StartProcessInThread(18,35); - - if(hough1->WaitForThreadFinish()) - ::Fatal("AliL3Hough::WaitForThreadFinish"," Can not join the required thread! "); - if(hough2->WaitForThreadFinish()) - ::Fatal("AliL3Hough::WaitForThreadFinish"," Can not join the required thread! "); - - /* In case we run HLT code in the main thread - for(Int_t slice=0; slice<=17; slice++) - { - hough1->ReadData(slice,0); - hough1->Transform(); - hough1->AddAllHistogramsRows(); - hough1->FindTrackCandidatesRow(); - hough1->AddTracks(); - } - for(Int_t slice=18; slice<=35; slice++) - { - hough2->ReadData(slice,0); - hough2->Transform(); - hough2->AddAllHistogramsRows(); - hough2->FindTrackCandidatesRow(); - hough2->AddTracks(); - } - */ - - nglobaltracks += hough1->FillESD(esd); - nglobaltracks += hough2->FillESD(esd); - - /* In case we want to debug the ESD - gSystem->MakeDirectory("hough1"); - hough1->WriteTracks("./hough1"); - gSystem->MakeDirectory("hough2"); - hough2->WriteTracks("./hough2"); - */ + // Read raw data from DATE data stream. Returns bytes read (i.e. + // size), -1 in case of error and 0 for EOF. - delete hough1; - delete hough2; - - printf("Filter called for event %d\n", fNumEvents); - printf("Filter has found %d TPC tracks in %f seconds\n", nglobaltracks,timer.RealTime()); - } + raw.SetBuffer(data, size); + data += size; -#else - - printf("Filter called for event %d\n", fNumEvents); - -#endif - - return 0; + return size; } //______________________________________________________________________________ -AliMDC::AliMDCInterruptHandler::AliMDCInterruptHandler(const - AliMDCInterruptHandler& - handler): - TSignalHandler(handler) +void AliMDC::Stop() { -// copy constructor - - Fatal("AliMDCInterruptHandler", "copy constructor not implemented"); + // Stop the event loop + + fStop = kTRUE; + if (fRawDB) fRawDB->Stop(); } -//______________________________________________________________________________ -AliMDC::AliMDCInterruptHandler& - AliMDC::AliMDCInterruptHandler::operator = (const AliMDCInterruptHandler& - /*handler*/) -{ -// assignment operator - Fatal("operator =", "assignment operator not implemented"); - return *this; -}