X-Git-Url: http://git.uio.no/git/?a=blobdiff_plain;f=RAW%2FAliMDC.cxx;h=30e25c394d94d2de27087c4b5b475559815f0f11;hb=7473795ad4a626a0a576dd1b1c3d3e9f9382c923;hp=ffc81bdfba6e383a0381ae627879c2622275dd6d;hpb=8c49c5a564019013352875263778b75e621da9d6;p=u%2Fmrichter%2FAliRoot.git diff --git a/RAW/AliMDC.cxx b/RAW/AliMDC.cxx index ffc81bdfba6..30e25c394d9 100644 --- a/RAW/AliMDC.cxx +++ b/RAW/AliMDC.cxx @@ -1,4 +1,4 @@ -// @(#)alimdc:$Name$:$Id$ +// @(#)alimdc:$Name: $:$Id$ // Author: Fons Rademakers 26/11/99 // Updated: Dario Favretto 15/04/2003 @@ -33,8 +33,6 @@ // AliRawRFIODB or via rootd using AliRawRootdDB or to CASTOR via // // rootd using AliRawCastorDB (and for performance testing there is // // also AliRawNullDB). // -// The AliRunDB class provides the interface to the run and file // -// catalogues (AliEn or plain MySQL). // // The AliStats class provides statics information that is added as // // a single keyed object to each raw file. // // The AliTagDB provides an interface to a TAG database. // @@ -49,18 +47,23 @@ #include #include -#include +#include #include +#include -#ifdef ALI_DATE -#include "event.h" -#endif +#include #ifdef USE_EB #include "libDateEb.h" #endif +#include "AliMDC.h" + +#include +#include + #include "AliRawEvent.h" -#include "AliRawEventHeader.h" +#include "AliRawEventHeaderBase.h" +#include "AliRawEquipment.h" #include "AliRawEquipmentHeader.h" #include "AliRawData.h" #include "AliStats.h" @@ -70,505 +73,647 @@ #include "AliRawRootdDB.h" #include "AliRawNullDB.h" #include "AliTagDB.h" +#include "AliRawEventTag.h" +#include "AliFilter.h" -#include "AliMDC.h" ClassImp(AliMDC) -#define ALIDEBUG(level) \ - if (AliMDC::Instance() && (AliMDC::Instance()->GetDebugLevel() >= (level))) - - -// Fixed file system locations for the different DB's -#ifdef USE_RDM -const char* const AliMDC::fgkFifo = "/tmp/alimdc.fifo"; -const char* const AliMDC::fgkRawDBFS[2] = { "/tmp/mdc1", "/tmp/mdc2" }; -const char* const AliMDC::fgkTagDBFS = "/tmp/mdc1/tags"; -const char* const AliMDC::fgkRunDBFS = "/tmp/mdc1/meta"; -const char* const AliMDC::fgkRFIOFS = "rfio:/castor/cern.ch/user/r/rdm"; -const char* const AliMDC::fgkCastorFS = "castor:/castor/cern.ch/user/r/rdm"; -const char* const AliMDC::fgkRootdFS = "root://localhost//tmp/mdc1"; -const char* const AliMDC::fgkAlienHost = "alien://aliens7.cern.ch:15000/?direct"; -const char* const AliMDC::fgkAlienDir = "/alice_mdc/DC"; -#else -const char* const AliMDC::fgkFifo = "/tmp/alimdc.fifo"; -const char* const AliMDC::fgkRawDBFS[2] = { "/data1/mdc", "/data2/mdc" }; -const char* const AliMDC::fgkTagDBFS = "/data1/mdc/tags"; -const char* const AliMDC::fgkRunDBFS = "/data1/mdc/meta"; -const char* const AliMDC::fgkRFIOFS = "rfio:/castor/cern.ch/lcg/dc5"; -const char* const AliMDC::fgkCastorFS = "castor:/castor/cern.ch/lcg/dc5"; -const char* const AliMDC::fgkRootdFS = "root://localhost//tmp/mdc1"; -const char* const AliMDC::fgkAlienHost = "alien://aliens7.cern.ch:15000/?direct"; -const char* const AliMDC::fgkAlienDir = "/alice_mdc/DC"; -#endif - -// Maximum size of tag db files -const Double_t AliMDC::fgkMaxTagFileSize = 2.5e8; // 250MB - -Bool_t AliMDC::fgDeleteFiles = kFALSE; -AliMDC* AliMDC::fgInstance = NULL; - +// Filter names +const char* const AliMDC::fgkFilterName[kNFilters] = {"AliHoughFilter"}; //______________________________________________________________________________ -AliMDC::AliMDC(Int_t fd, Int_t compress, Double_t maxFileSize, Bool_t useFilter, - EWriteMode mode, Bool_t useLoop, Bool_t delFiles) +AliMDC::AliMDC(Int_t compress, Bool_t deleteFiles, EFilterMode filterMode, + Double_t maxSizeTagDB, const char* fileNameTagDB, + const char *guidFileFolder, + Int_t basketsize) : + fEvent(new AliRawEvent), + fESD(NULL), + fStats(NULL), + fRawDB(NULL), + fTagDB(NULL), + fEventTag(new AliRawEventTag), + fCompress(compress), + fBasketSize(basketsize), + fDeleteFiles(deleteFiles), + fFilterMode(filterMode), + fFilters(), + fStop(kFALSE), + fIsTagDBCreated(kFALSE), + fMaxSizeTagDB(maxSizeTagDB), + fFileNameTagDB(fileNameTagDB), + fGuidFileFolder(guidFileFolder) { - // Create MDC processor object. - - fFd = fd; - fCompress = compress; - fMaxFileSize = maxFileSize; - fUseFilter = useFilter; - fWriteMode = mode; - fUseLoop = useLoop; - fUseFifo = kFALSE; - fUseEb = kFALSE; - fStopLoop = kFALSE; - fNumEvents = 0; - fDebugLevel = 0; - fgDeleteFiles = delFiles; - - if (fFd == -1) { -#ifdef USE_EB - if (!ebRegister()) { - Error("AliMDC", "cannot register with the event builder (%s)", - ebGetLastError()); - return; - } - fUseEb = kTRUE; -#else - if ((mkfifo(fgkFifo, 0644) < 0) && (errno != EEXIST)) { - Error("AliMDC", "cannot create fifo %s", fgkFifo); - return; + // Create MDC processor object. + // compress is the file compression mode. + // If deleteFiles is kTRUE the raw data files will be deleted after they + // were closed. + // If the filterMode if kFilterOff no filter algorithm will be, if it is + // kFilterTransparent the algorthims will be run but no events will be + // rejected, if it is kFilterOn the filters will be run and the event will + // be rejected if all filters return kFALSE. + // If maxSizeTagDB is greater than 0 it determines the maximal size of the + // tag DB and then fileNameTagDB is the directory name for the tag DB. + // Otherwise fileNameTagDB is the file name of the tag DB. If it is NULL + // no tag DB will be created. + // Optional 'guidFileFolder' specifies the folder in which *.guid files + // will be stored. In case this option is not given, the *.guid files + // will be written to the same folder as the raw-data files. + + // Set the maximum tree size to 19GB + // in order to allow big raw data files + TTree::SetMaxTreeSize(20000000000LL); + + // This line is needed in case of a stand-alone application w/o + // $ROOTSYS/etc/system.rootrc file + gROOT->GetPluginManager()->AddHandler("TVirtualStreamerInfo", + "*", + "TStreamerInfo", + "RIO", + "TStreamerInfo()"); + + if (fFilterMode != kFilterOff) { + fESD = new AliESDEvent(); + } + + // Create the guid files folder if it does not exist + if (!fGuidFileFolder.IsNull()) { + gSystem->ResetErrno(); + gSystem->MakeDirectory(fGuidFileFolder.Data()); + if (gSystem->GetErrno() && gSystem->GetErrno() != EEXIST) { + SysError("AliMDC", "mkdir %s", fGuidFileFolder.Data()); + } + } + + // install SIGUSR1 handler to allow clean interrupts + gSystem->AddSignalHandler(new AliMDCInterruptHandler(this)); + + // create the high level filters + if (fFilterMode != kFilterOff) { + for (Int_t iFilter = 0; iFilter < kNFilters; iFilter++) { + TClass* filterClass = gROOT->GetClass(fgkFilterName[iFilter]); + if (!filterClass) { + Warning("AliMDC", "no filter class %s found", fgkFilterName[iFilter]); + continue; } - if ((chmod(fgkFifo, 0666) == -1) && (errno != EPERM)) { - Error("AliMDC", "cannot change permission of fifo %s", fgkFifo); - return; + AliFilter* filter = (AliFilter*) filterClass->New(); + if (!filter) { + Warning("AliMDC", "creation of filter %s failed", fgkFilterName[iFilter]); + continue; } - if ((fFd = open(fgkFifo, O_RDONLY)) == -1) { - Error("AliMDC", "cannot open input file %s", fgkFifo); - return; - } - fUseFifo = kTRUE; -#endif - fUseLoop = kFALSE; - } - - printf(": input = %s, rawdb size = %f, filter = %s, " - "looping = %s, compression = %d, delete files = %s", - fUseFifo ? "fifo" : (fUseEb ? "eb" : "file"), fMaxFileSize, - fUseFilter ? "on" : "off", fUseLoop ? "yes" : "no", fCompress, - fgDeleteFiles ? "yes" : "no"); - if (fWriteMode == kRFIO) - printf(", use RFIO\n"); - else if (fWriteMode == kROOTD) - printf(", use rootd\n"); - else if (fWriteMode == kCASTOR) - printf(", use CASTOR/rootd\n"); - else if (fWriteMode == kDEVNULL) - printf(", write raw data to /dev/null\n"); - else - printf("\n"); - - // install SIGUSR1 handler to allow clean interrupts - gSystem->AddSignalHandler(new AliMDCInterruptHandler(this)); - - fgInstance = this; + fFilters.Add(filter); + } + } } //______________________________________________________________________________ -AliMDC::AliMDC(const AliMDC& mdc): TObject(mdc) +AliMDC::~AliMDC() { -// copy constructor - - Fatal("AliMDC", "copy constructor not implemented"); +// destructor + + fFilters.Delete(); + if(fTagDB) delete fTagDB; + delete fRawDB; + delete fStats; + delete fESD; + delete fEvent; + delete fEventTag; } - + //______________________________________________________________________________ -AliMDC& AliMDC::operator = (const AliMDC& /*mdc*/) +Int_t AliMDC::Open(EWriteMode mode, const char* fileName, + Double_t maxFileSize, + const char* fs1, const char* fs2) { -// assignment operator +// open a new raw DB file + + if (mode == kRFIO) + fRawDB = new AliRawRFIODB(fEvent, fESD, fCompress, fileName); + else if (mode == kROOTD) + fRawDB = new AliRawRootdDB(fEvent, fESD, fCompress, fileName); + else if (mode == kCASTOR) + fRawDB = new AliRawCastorDB(fEvent, fESD, fCompress, fileName); + else if (mode == kDEVNULL) + fRawDB = new AliRawNullDB(fEvent, fESD, fCompress, fileName); + else + fRawDB = new AliRawDB(fEvent, fESD, fCompress, fileName); + fRawDB->SetDeleteFiles(fDeleteFiles); + + if (fRawDB->IsZombie()) { + delete fRawDB; + fRawDB = NULL; + return -1; + } + + if (fileName == NULL) { + fRawDB->SetMaxSize(maxFileSize); + fRawDB->SetFS(fs1, fs2); + if (!fRawDB->Create()) { + delete fRawDB; + fRawDB = NULL; + return -1; + } + } + + if (!fRawDB->WriteGuidFile(fGuidFileFolder)) { + delete fRawDB; + fRawDB = NULL; + return -2; + } - Fatal("operator =", "assignment operator not implemented"); - return *this; + Info("Open", "Filling raw DB %s\n", fRawDB->GetDBName()); + + // Create AliStats object + fStats = new AliStats(fRawDB->GetDBName(), fCompress, + fFilterMode != kFilterOff); + return 0; } //______________________________________________________________________________ -Int_t AliMDC::Run() +Int_t AliMDC::ProcessEvent(void* event, Bool_t isIovecArray) { - // Run the MDC processor. Read from the input stream and only return - // when the input gave and EOF or a fatal error occured. On success 0 - // is returned, 1 in case of a fatality. - - TStopwatch timer; - Int_t status; - - // Make sure needed directories exist - const char *dirs[4]; - dirs[0] = fgkRawDBFS[0]; - dirs[1] = fgkRawDBFS[1]; - dirs[2] = fgkTagDBFS; - dirs[3] = fgkRunDBFS; - for (int idir = 0; idir < 4; idir++) { - gSystem->ResetErrno(); - gSystem->MakeDirectory(dirs[idir]); - if (gSystem->GetErrno() && gSystem->GetErrno() != EEXIST) { - SysError("Run", "mkdir %s", dirs[idir]); - return 1; +// Convert the DATE event to an AliRawEvent object and store it in the raw DB, +// optionally also run the filter. +// event is either a pointer to the streamlined event +// or, if isIovecArray is kTRUE, a pointer to an array of iovecs with one +// iovec per subevent (used by the event builder). +// The return value is the number of written bytes or an error code + const Long64_t kFileSizeErrorLevel = 19000000000LL; + + Long64_t currentFileSize = GetTotalSize(); + AliDebug(1,Form("current file size is %lld bytes",currentFileSize)); + if(currentFileSize > kFileSizeErrorLevel) { + Error("ProcessEvent", "file size (%lu) exceeds the limit " + , currentFileSize); + return kErrFileSize; + } + + Int_t status; + char* data = (char*) event; + if (isIovecArray) data = (char*) ((iovec*) event)[0].iov_base; + + // Shortcut for easy header access + AliRawEventHeaderBase *header = fEvent->GetHeader(data); + + // Read event header + if ((status = header->ReadHeader(data)) != (Int_t)header->GetHeadSize()) { + return kErrHeader; + } + + if (AliDebugLevel() > 2) ToAliDebug(3, header->Dump();); + + // Check event type and skip "Start of Run", "End of Run", + // "Start of Run Files" and "End of Run Files" + Int_t size = header->GetEventSize() - header->GetHeadSize(); + + AliDebug(1, Form("Processing %s (%d bytes)", header->GetTypeName(), size)); + + // Amount of data left to read for this event + Int_t toRead = size; + + // StartOfRun, EndOfRun etc. events have no payload + // Nevertheless, store the event headers in the tree + if (toRead > 0) { + + // If there is less data for this event than the next sub-event + // header, something is wrong. Skip to next event... + if (toRead < (Int_t)header->GetHeadSize()) { + Error("ProcessEvent", "header size (%d) exceeds number of bytes " + "to read (%d)", header->GetHeadSize(), toRead); + if (AliDebugLevel() > 0) ToAliDebug(1, header->Dump();); + return kErrHeaderSize; + } + + // Loop over all sub-events... (LDCs) + Int_t nsub = 1; + while (toRead > 0) { + if (isIovecArray) data = (char*) ((iovec*) event)[nsub].iov_base; + + AliDebug(1, Form("reading LDC %d", nsub)); + + AliRawEvent *subEvent = fEvent->NextSubEvent(); + + // Read sub-event header + AliRawEventHeaderBase *subHeader = subEvent->GetHeader(data); + if ((status = subHeader->ReadHeader(data)) != (Int_t)subHeader->GetHeadSize()) { + return kErrSubHeader; } - } - // Used for statistics - timer.Start(); - Double_t told = 0, tnew = 0; - Float_t chunkSize = fMaxFileSize/100, nextChunk = chunkSize; - - // Event object used to store event data. - AliRawEvent *event = new AliRawEvent; - - // Create new raw DB. - AliRawDB *rawdb; - if (fWriteMode == kRFIO) - rawdb = new AliRawRFIODB(event, fMaxFileSize, fCompress); - else if (fWriteMode == kROOTD) - rawdb = new AliRawRootdDB(event, fMaxFileSize, fCompress); - else if (fWriteMode == kCASTOR) - rawdb = new AliRawCastorDB(event, fMaxFileSize, fCompress); - else if (fWriteMode == kDEVNULL) - rawdb = new AliRawNullDB(event, fMaxFileSize, fCompress); - else - rawdb = new AliRawDB(event, fMaxFileSize, fCompress); - - if (rawdb->IsZombie()) return 1; - printf("Filling raw DB %s\n", rawdb->GetDBName()); - - // Create new tag DB. - AliTagDB *tagdb = 0; -#if 0 - // no tagdb for the time being to get maximum speed - if (fWriteMode == fgkDEVNULL) - tagdb = new AliTagNullDB(event->GetHeader(), fgkMaxTagFileSize); - else - tagdb = new AliTagDB(event->GetHeader(), fgkMaxTagFileSize); - if (tagdb->IsZombie()) - tagdb = 0; - else - printf("Filling tag DB %s\n", tagdb->GetDBName()); -#endif + if (AliDebugLevel() > 2) ToAliDebug(3, subHeader->Dump();); - // Create AliStats object - AliStats *stats = new AliStats(rawdb->GetDBName(), fCompress, fUseFilter); + toRead -= subHeader->GetHeadSize(); - // Shortcut for easy header access - AliRawEventHeader &header = *event->GetHeader(); + Int_t rawSize = subHeader->GetEventSize() - subHeader->GetHeadSize(); - // Process input stream -#ifdef USE_EB - Int_t eorFlag = 0; - while (!(eorFlag = ebEor())) { - struct iovec *ebvec; - if ((ebvec = ebGetNextEvent()) == (void *)-1) { - Error("Run", "error getting next event (%s)", ebGetLastError()); - break; + // Make sure raw data less than left over bytes for current event + if (rawSize > toRead) { + Warning("ProcessEvent", "raw data size (%d) exceeds number of " + "bytes to read (%d)\n", rawSize, toRead); + if (AliDebugLevel() > 0) ToAliDebug(1, subHeader->Dump();); + return kErrDataSize; } - if (ebvec == 0) { - // no event, sleep for 1 second and try again - gSystem->Sleep(1000); - continue; - } - char *ebdata = (char *) ebvec[0].iov_base; -#else - while (1) { - char *ebdata = 0; -#endif - // Read event header - if ((status = ReadHeader(header, ebdata)) != header.HeaderSize()) { - if (status == 0) { - if (fUseLoop) { -#ifndef USE_EB - ::lseek(fFd, 0, SEEK_SET); -#endif - continue; - } - printf(": EOF, processed %d events\n", fNumEvents); - break; - } - return 1; + // Read Equipment Headers (in case of physics or calibration event) + if (header->Get("Type") == AliRawEventHeaderBase::kPhysicsEvent || + header->Get("Type") == AliRawEventHeaderBase::kCalibrationEvent || + header->Get("Type") == AliRawEventHeaderBase::kSystemSoftwareTriggerEvent || + header->Get("Type") == AliRawEventHeaderBase::kDetectorSoftwareTriggerEvent) { + while (rawSize > 0) { + AliRawEquipment &equipment = *subEvent->NextEquipment(); + AliRawEquipmentHeader &equipmentHeader = + *equipment.GetEquipmentHeader(); + Int_t equipHeaderSize = equipmentHeader.HeaderSize(); + if ((status = ReadEquipmentHeader(equipmentHeader, header->DataIsSwapped(), + data)) != equipHeaderSize) { + return kErrEquipmentHeader; + } + + if (AliDebugLevel() > 2) ToAliDebug(3, equipmentHeader.Dump();); + + toRead -= equipHeaderSize; + rawSize -= equipHeaderSize; + + // Read equipment raw data + AliRawData &subRaw = *equipment.GetRawData(); + + Int_t eqSize = equipmentHeader.GetEquipmentSize() - equipHeaderSize; + if ((status = ReadRawData(subRaw, eqSize, data)) != eqSize) { + return kErrEquipment; + } + toRead -= eqSize; + rawSize -= eqSize; + + } + + } else { // Read only raw data but no equipment header + if (rawSize) { + AliRawEquipment &equipment = *subEvent->NextEquipment(); + AliRawData &subRaw = *equipment.GetRawData(); + if ((status = ReadRawData(subRaw, rawSize, data)) != rawSize) { + return kErrEquipment; + } + toRead -= rawSize; + } } - ALIDEBUG(3) - header.Dump(); - // If we were in looping mode stop directly after a SIGUSR1 signal - if (StopLoop()) { - Info("Run", "Stopping loop, processed %d events", fNumEvents); - break; + nsub++; + } + } + + // High Level Event Filter + if (fFilterMode != kFilterOff) { + if (header->Get("Type") == AliRawEventHeaderBase::kPhysicsEvent || + header->Get("Type") == AliRawEventHeaderBase::kCalibrationEvent || + header->Get("Type") == AliRawEventHeaderBase::kSystemSoftwareTriggerEvent || + header->Get("Type") == AliRawEventHeaderBase::kDetectorSoftwareTriggerEvent) { + Bool_t result = kFALSE; + for (Int_t iFilter = 0; iFilter < fFilters.GetEntriesFast(); iFilter++) { + AliFilter* filter = (AliFilter*) fFilters[iFilter]; + if (!filter) continue; + if (filter->Filter(fEvent, fESD)) result = kTRUE; } - - // Check if event has any hard track flagged - Bool_t callFilter = kFALSE; - // This needs to be re-engineered for the next ADC... - //if (fUseFilter && TEST_USER_ATTRIBUTE(header.GetTypeAttribute(), 0)) - // callFilter = kTRUE; - - // Check event type and skip "Start of Run", "End of Run", - // "Start of Run Files" and "End of Run Files" - switch (header.GetType()) { - case AliRawEventHeader::kStartOfRun: - case AliRawEventHeader::kEndOfRun: - case AliRawEventHeader::kStartOfRunFiles: - case AliRawEventHeader::kEndOfRunFiles: - { - Int_t skip = header.GetEventSize() - header.HeaderSize(); -#ifndef USE_EB - ::lseek(fFd, skip, SEEK_CUR); -#endif - ALIDEBUG(1) - Info("Run", "Skipping %s (%d bytes)", header.GetTypeName(), skip); - continue; - } - default: - ALIDEBUG(1) { - Int_t s = header.GetEventSize() - header.HeaderSize(); - Info("Run", "Processing %s (%d bytes)", header.GetTypeName(), s); - } + if ((fFilterMode == kFilterOn) && !result) return kFilterReject; + } + } + + // Set stat info for first event of this file + if (fRawDB->GetEvents() == 0) + fStats->SetFirstId(header->Get("RunNb"), header->GetP("Id")[0]); + + // Store raw event in tree + Int_t nBytes = fRawDB->Fill(); + + // Fill the event tag object + fEventTag->SetHeader(fEvent->GetHeader()); + fEventTag->SetGUID(fRawDB->GetDB()->GetUUID().AsString()); + fEventTag->SetEventNumber(fRawDB->GetEvents()-1); + + // Create Tag DB here only after the raw data header + // version was already identified + if (!fIsTagDBCreated) { + if (!fFileNameTagDB.IsNull()) { + if (fMaxSizeTagDB > 0) { + fTagDB = new AliTagDB(fEventTag, NULL); + fTagDB->SetMaxSize(fMaxSizeTagDB); + fTagDB->SetFS(fFileNameTagDB.Data()); + if (!fTagDB->Create()) return kErrTagFile; + } else { + fTagDB = new AliTagDB(fEventTag, fFileNameTagDB.Data()); + if (fTagDB->IsZombie()) return kErrTagFile; } + } + fIsTagDBCreated = kTRUE; + } + + // Store event tag in tree + if (fTagDB) fTagDB->Fill(); + + // Make top event object ready for next event data + fEvent->Reset(); + // Clean up HLT ESD for the next event + if (fESD) fESD->Reset(); + + if(nBytes >= 0) + return nBytes; + else + return kErrWriting; +} - // Amount of data left to read for this event - Int_t toRead = header.GetEventSize() - header.HeaderSize(); - - // If there is less data for this event than the next sub-event - // header, something is wrong. Skip to next event... - if (toRead < header.HeaderSize()) { - ALIDEBUG(1) { - Warning("Run", - "header size (%d) exceeds number of bytes to read (%d)\n", - header.HeaderSize(), toRead); - header.Dump(); - } - if ((status = DumpEvent(toRead)) != toRead) { - if (status == 0) - break; - return 1; - } - Error("Run", "discarding event %d (too little data for header)", fNumEvents); - continue; - } +//______________________________________________________________________________ +Long64_t AliMDC::GetTotalSize() +{ +// return the total current raw DB file size - // Loop over all sub-events... (LDCs) - Int_t nsub = 1; - while (toRead > 0) { -#ifdef USE_EB - ebdata = (char *)ebvec[nsub].iov_base; -#endif + if (!fRawDB) return -1; + + return fRawDB->GetTotalSize(); +} - ALIDEBUG(1) - Info("Run", "reading LDC %d", nsub); +//______________________________________________________________________________ +Long64_t AliMDC::Close() +{ +// close the current raw DB file - AliRawEvent *subEvent = event->NextSubEvent(); + if (!fRawDB) return -1; - // Read sub-event header - AliRawEventHeader &subHeader = *subEvent->GetHeader(); - if ((status = ReadHeader(subHeader, ebdata)) != subHeader.HeaderSize()) { - if (status == 0) { - Error("Run", "unexpected EOF reading sub-event header"); - break; - } - return 1; - } + fRawDB->WriteStats(fStats); + Long64_t filesize = fRawDB->Close(); + delete fRawDB; + fRawDB = NULL; + delete fStats; + fStats = NULL; + return filesize; +} - ALIDEBUG(3) - subHeader.Dump(); +//______________________________________________________________________________ +Long64_t AliMDC::AutoSave() +{ + // Auto-save the raw-data + // and esd (if any) trees - toRead -= subHeader.HeaderSize(); + if (!fRawDB) return -1; -#ifdef USE_EB - ebdata = (char *)(ebvec[nsub].iov_base) + subHeader.HeaderSize(); -#endif + return fRawDB->AutoSave(); +} - Int_t rawSize = subHeader.GetEventSize() - subHeader.HeaderSize(); - - // Read Equipment Header (in case of physics or calibration event) - if (header.GetType() == AliRawEventHeader::kPhysicsEvent || - header.GetType() == AliRawEventHeader::kCalibrationEvent) { - AliRawEquipmentHeader &equipment = *subEvent->GetEquipmentHeader(); - Int_t equipHeaderSize = equipment.HeaderSize(); - if ((status = ReadEquipmentHeader(equipment, header.DataIsSwapped(), - ebdata)) != equipHeaderSize) { - if (status == 0) { - Error("Run", "unexpected EOF reading equipment-header"); - break; - } - return 1; - } - toRead -= equipHeaderSize; - rawSize -= equipHeaderSize; +//______________________________________________________________________________ +Int_t AliMDC::Run(const char* inputFile, Bool_t loop, + EWriteMode mode, Double_t maxFileSize, + const char* fs1, const char* fs2) +{ + // Run the MDC processor. Read from the input stream and only return + // when the input gave and EOF or a fatal error occured. On success 0 + // is returned, 1 in case of a fatality. + // inputFile is the name of the DATE input file; if NULL the input will + // be taken from the event builder. + // If loop is set the same input file will be reused in an infinite loop. + // mode specifies the type of the raw DB. + // maxFileSize is the maximal size of the raw DB. + // fs1 and fs2 are the file system locations of the raw DB. + + Info("Run", "input = %s, rawdb size = %f, filter = %s, " + "looping = %s, compression = %d, delete files = %s", + inputFile ? inputFile : "event builder", maxFileSize, + fFilterMode == kFilterOff ? "off" : + (fFilterMode == kFilterOn ? "on" : "transparent"), + loop ? "yes" : "no", fCompress, fDeleteFiles ? "yes" : "no"); + + // Open the input file + Int_t fd = -1; + if (inputFile) { + if ((fd = open(inputFile, O_RDONLY)) == -1) { + Error("Run", "cannot open input file %s", inputFile); + return 1; + } + } + + // Used for statistics + TStopwatch timer; + timer.Start(); + Double_t told = 0, tnew = 0; + Float_t chunkSize = maxFileSize/100, nextChunk = chunkSize; + + // Create new raw DB. + if (fRawDB) Close(); + + if (Open(mode,NULL,maxFileSize,fs1,fs2) < 0) return 1; + + // Process input stream #ifdef USE_EB - ebdata = (char *)(ebvec[nsub].iov_base) + subHeader.HeaderSize() + - equipHeaderSize; -#endif - } - - // Make sure raw data less than left over bytes for current event - if (rawSize > toRead) { - ALIDEBUG(1) { - Warning("Run", "raw data size (%d) exceeds number of bytes " - "to read (%d)\n", rawSize, toRead); - subHeader.Dump(); - } - if ((status = DumpEvent(toRead)) != toRead) { - if (status == 0) - break; - return 1; - } - Error("Run", "discarding event %d (too much data)", fNumEvents); - continue; - } - - // Read sub-event raw data - AliRawData &subRaw = *subEvent->GetRawData(); - if ((status = ReadRawData(subRaw, rawSize, ebdata)) != rawSize) { - if (status == 0) { - Error("Run", "unexpected EOF reading sub-event raw data"); - break; - } - return 1; - } - - if (callFilter) { -#ifdef ALI_DATE - if (TEST_USER_ATTRIBUTE(subHeader.GetTypeAttribute(), 0)) - Filter(subRaw); - else { - // set size of all sectors without hard track flag to 0 - subRaw.SetSize(0); - } + Int_t eorFlag = 0; #endif - } + char* event = NULL; + UInt_t eventSize = 0; + Int_t numEvents = 0; - toRead -= rawSize; - nsub++; - } + AliRawEventHeaderBase header; + + while (kTRUE) { - // Set stat info for first event of this file - if (rawdb->GetEvents() == 0) - stats->SetFirstId(header.GetRunNumber(), header.GetEventInRun()); + // If we were in looping mode stop directly after a SIGUSR1 signal + if (fStop) { + Info("Run", "Stopping loop, processed %d events", numEvents); + break; + } - // Store raw event in tree - rawdb->Fill(); + if (!inputFile) { // get data from event builder +#ifdef USE_EB + if ((eorFlag = ebEor())) break; + if ((event = (char*)ebGetNextEvent()) == (char*)-1) { + Error("Run", "error getting next event (%s)", ebGetLastError()); + break; + } + if (event == 0) { + // no event, sleep for 1 second and try again + gSystem->Sleep(1000); + continue; + } +#else + Error("Run", "AliMDC was compiled without event builder support"); + delete fRawDB; + fRawDB = NULL; + delete fStats; + fStats = NULL; + return 1; +#endif - // Store header in tree - if (tagdb) tagdb->Fill(); + } else { // get data from a file + { + Int_t nrecv; + if ((nrecv = Read(fd, header.HeaderBaseBegin(), header.HeaderBaseSize())) != + header.HeaderBaseSize()) { + if (nrecv == 0) { // eof + if (loop) { + ::lseek(fd, 0, SEEK_SET); + continue; + } else { + break; + } + } else { + Error("Run", "error reading base header"); + Close(); + delete[] event; + return 1; + } + } + } + char *data = (char *)header.HeaderBaseBegin(); + AliRawEventHeaderBase *hdr = AliRawEventHeaderBase::Create(data); + Int_t nrecv; + if ((nrecv = Read(fd, hdr->HeaderBegin(), hdr->HeaderSize())) != + hdr->HeaderSize()) { + if (nrecv == 0) { // eof + if (loop) { + ::lseek(fd, 0, SEEK_SET); + delete hdr; + continue; + } else { + delete hdr; + break; + } + } else { + Error("Run", "error reading header"); + Close(); + delete[] event; + delete hdr; + return 1; + } + } + if (eventSize < hdr->GetEventSize()) { + delete[] event; + eventSize = 2 * hdr->GetEventSize(); + event = new char[eventSize]; + } + memcpy(event, header.HeaderBaseBegin(), header.HeaderBaseSize()); + memcpy(event+hdr->HeaderBaseSize(), hdr->HeaderBegin(), hdr->HeaderSize()); + if (hdr->GetExtendedDataSize() != 0) + memcpy(event+hdr->HeaderBaseSize()+hdr->HeaderSize(), + hdr->GetExtendedData(), hdr->GetExtendedDataSize()); + Int_t size = hdr->GetEventSize() - hdr->GetHeadSize(); + if (Read(fd, event + hdr->GetHeadSize(), size) != size) { + Error("Run", "error reading data"); + Close(); + delete[] event; + delete hdr; + return 1; + } + delete hdr; + } - fNumEvents++; + Int_t result = ProcessEvent(event, !inputFile); + if(result < -1) + Error("Run", "error writing data. Error code: %d",result); - if (!(fNumEvents%10)) - printf("Processed event %d (%d)\n", fNumEvents, rawdb->GetEvents()); + if (result >= 0) { + numEvents++; + if (!(numEvents%10)) + printf("Processed event %d (%d)\n", numEvents, fRawDB->GetEvents()); + } + if (result > 0) { // Filling time statistics - if (rawdb->GetBytesWritten() > nextChunk) { - tnew = timer.RealTime(); - stats->Fill(tnew-told); - told = tnew; - timer.Continue(); - nextChunk += chunkSize; + if (fRawDB->GetBytesWritten() > nextChunk) { + tnew = timer.RealTime(); + fStats->Fill(tnew-told); + told = tnew; + timer.Continue(); + nextChunk += chunkSize; } // Check size of raw db. If bigger than maxFileSize, close file // and continue with new file. - if (rawdb->FileFull()) { - - printf("Written raw DB at a rate of %.1f MB/s\n", - rawdb->GetBytesWritten() / timer.RealTime() / 1000000.); - - // Write stats object to raw db, run db, MySQL and AliEn - stats->WriteToDB(rawdb); - delete stats; - - if (!rawdb->NextFile()) { - Error("Run", "error opening next raw data file"); - return 1; - } - - printf("Filling raw DB %s\n", rawdb->GetDBName()); - stats = new AliStats(rawdb->GetDBName(), fCompress, fUseFilter); - - timer.Start(); - told = 0, tnew = 0; - nextChunk = chunkSize; + if (fRawDB->GetBytesWritten() > maxFileSize) { + + printf("Written raw DB at a rate of %.1f MB/s\n", + fRawDB->GetBytesWritten() / timer.RealTime() / 1000000.); + + // Write stats object to raw db, run db, MySQL and AliEn + fRawDB->WriteStats(fStats); + delete fStats; + fStats = NULL; + + if (!fRawDB->NextFile()) { + Error("Run", "error opening next raw data file"); + Close(); + if (inputFile) delete[] event; + return 1; + } + + printf("Filling raw DB %s\n", fRawDB->GetDBName()); + fStats = new AliStats(fRawDB->GetDBName(), fCompress, + fFilterMode != kFilterOff); + + timer.Start(); + told = 0, tnew = 0; + nextChunk = chunkSize; } // Check size of tag db - if (tagdb && tagdb->FileFull()) { - if (!tagdb->NextFile()) - tagdb = 0; - else - printf("Filling tag DB %s\n", tagdb->GetDBName()); + if (fTagDB && fTagDB->FileFull()) { + if (!fTagDB->NextFile()) { + delete fTagDB; + fTagDB = 0; + } else { + printf("Filling tag DB %s\n", fTagDB->GetDBName()); + } } + } - // Make top event object ready for next event data - //printf("Event %d has %d sub-events\n", fNumEvents, event->GetNSubEvents()); - event->Reset(); + // Make top event object ready for next event data + //printf("Event %d has %d sub-events\n", numEvents, fEvent->GetNSubEvents()); + // fEvent->Reset(); + // Clean up HLT ESD for the next event + // if (fESD) fESD->Reset(); + if (!inputFile) { #ifdef USE_EB - if (!ebReleaseEvent(ebvec)) { - Error("Run", "problem releasing event (%s)", ebGetLastError()); - break; + if (!ebReleaseEvent((iovec*)event)) { + Error("Run", "problem releasing event (%s)", ebGetLastError()); + break; } #endif - } - - printf("Written raw DB at a rate of %.1f MB/s\n", - rawdb->GetBytesWritten() / timer.RealTime() / 1000000.); - - // Write stats to raw db and run db and delete stats object - stats->WriteToDB(rawdb); - delete stats; + } + } - // Close the raw DB - delete rawdb; + printf("Written raw DB at a rate of %.1f MB/s\n", + fRawDB->GetBytesWritten() / timer.RealTime() / 1000000.); - // Close the tag DB - delete tagdb; - - // Close input source - close(fFd); - -#if 0 - // Cleanup fifo - if (fUseFifo && ::unlink(fgkFifo) == -1) { - SysError("Run", "unlink"); - return 1; - } -#endif + // Write stats to raw db and run db and delete stats object + Close(); + if (!inputFile) { #ifdef USE_EB - // Print eor flag - if (eorFlag) { + // Print eor flag + if (eorFlag) { Info("Run", "event builder reported end of run (%d)", eorFlag); - } + } #endif + } else { + // Close input source + close(fd); + delete [] event; + } - return 0; + return 0; } //______________________________________________________________________________ -Int_t AliMDC::Read(void *buffer, Int_t length) +Int_t AliMDC::Read(Int_t fd, void *buffer, Int_t length) { // Read exactly length bytes into buffer. Returns number of bytes // received, returns -1 in case of error and 0 for EOF. errno = 0; - if (fFd < 0) return -1; + if (fd < 0) return -1; Int_t n, nrecv = 0; char *buf = (char *)buffer; for (n = 0; n < length; n += nrecv) { - if ((nrecv = read(fFd, buf+n, length-n)) <= 0) { + if ((nrecv = read(fd, buf+n, length-n)) <= 0) { if (nrecv == 0) break; // EOF if (errno != EINTR) @@ -579,172 +724,50 @@ Int_t AliMDC::Read(void *buffer, Int_t length) return n; } -//______________________________________________________________________________ -Int_t AliMDC::ReadHeader(AliRawEventHeader &header, void *eb) -{ - // Read header info from DATE data stream. Returns bytes read (i.e. - // AliRawEventHeader::HeaderSize()), -1 in case of error and 0 for EOF. - - Int_t nrecv; - - if (eb) { - // read from event builder memory area - memcpy(header.HeaderBegin(), eb, header.HeaderSize()); - nrecv = header.HeaderSize(); - } else { - // read from fifo or file - if ((nrecv = Read(header.HeaderBegin(), header.HeaderSize())) != - header.HeaderSize()) { - if (nrecv == 0) - return 0; - return -1; - } - } - - // Swap header data if needed - if (header.IsSwapped()) - header.Swap(); - - // Is header valid... - if (!header.IsValid()) { - Error("ReadHeader", "invalid header format"); - // try recovery... how? - return -1; - } - if (header.GetEventSize() < (UInt_t)header.HeaderSize()) { - Error("ReadHeader", "invalid header size"); - // try recovery... how? - return -1; - } - - return nrecv; -} - //______________________________________________________________________________ Int_t AliMDC::ReadEquipmentHeader(AliRawEquipmentHeader &header, - Bool_t isSwapped, void *eb) + Bool_t isSwapped, char*& data) { - // Read equipment header info from DATE data stream. Returns bytes read - // (i.e. AliRawEquipmentHeader::HeaderSize()), -1 in case of error and - // 0 for EOF. If isSwapped is kTRUE the event data is byte swapped - // and we will swap the header to host format. - - Int_t nrecv; - - if (eb) { - // read from event builder memory area - memcpy(header.HeaderBegin(), eb, header.HeaderSize()); - nrecv = header.HeaderSize(); - } else { - // read from fifo or file - if ((nrecv = Read(header.HeaderBegin(), header.HeaderSize())) != - header.HeaderSize()) { - if (nrecv == 0) - return 0; - return -1; - } - } - - // Swap equipment header data if needed - if (isSwapped) - header.Swap(); - - if (header.GetEquipmentSize() < (UInt_t)header.HeaderSize()) { - Error("ReadEquipmentHeader", "invalid equipment header size"); - // try recovery... how? - return -1; - } - - return nrecv; -} + // Read equipment header info from DATE data stream. Returns bytes read + // (i.e. AliRawEquipmentHeader::HeaderSize()), -1 in case of error and + // 0 for EOF. If isSwapped is kTRUE the event data is byte swapped + // and we will swap the header to host format. -//______________________________________________________________________________ -Int_t AliMDC::ReadRawData(AliRawData &raw, Int_t size, void *eb) -{ - // Read raw data from DATE data stream. Returns bytes read (i.e. - // AliRawEventHeader::HeaderSize()), -1 in case of error and 0 for EOF. - - Int_t nrecv; - - if (eb) { - // read from event builder memory area - raw.SetBuffer(eb, size); - nrecv = size; - } else { - // read from fifo or file - raw.SetSize(size); - if ((nrecv = Read(raw.GetBuffer(), size)) != size) { - if (nrecv == 0) { - Error("ReadRawData", "unexpected EOF"); - return 0; - } - return -1; - } - } + memcpy(header.HeaderBegin(), data, header.HeaderSize()); + data += header.HeaderSize(); - return nrecv; -} + // Swap equipment header data if needed + if (isSwapped) + header.Swap(); -//______________________________________________________________________________ -Int_t AliMDC::DumpEvent(Int_t toRead) -{ - // This case should not happen, but if it does try to handle it - // gracefully by reading the rest of the event and discarding it. - // Returns bytes read, -1 in case of fatal error and 0 for EOF. - - Error("DumpEvent", "dumping %d bytes of event %d", toRead, fNumEvents); - - Int_t nrecv; - char *tbuf = new char[toRead]; - if ((nrecv = Read(tbuf, toRead)) != toRead) { - if (nrecv == 0) { - Error("DumpEvent", "unexpected EOF"); - return 0; - } - return -1; - } - delete [] tbuf; + if (header.GetEquipmentSize() < (UInt_t)header.HeaderSize()) { + Error("ReadEquipmentHeader", "invalid equipment header size"); + // try recovery... how? + return -1; + } - return nrecv; + return header.HeaderSize(); } //______________________________________________________________________________ -Int_t AliMDC::Filter(AliRawData &raw) +Int_t AliMDC::ReadRawData(AliRawData &raw, Int_t size, char*& data) { - // Call 3rd level filter for this raw data segment. - -#ifdef USE_HLT - - // Add HLT code here + // Read raw data from DATE data stream. Returns bytes read (i.e. + // size), -1 in case of error and 0 for EOF. -#else - - raw.GetSize(); - printf("Filter called for event %d\n", fNumEvents); - -#endif + raw.SetBuffer(data, size); + data += size; - return 0; + return size; } //______________________________________________________________________________ -AliMDC::AliMDCInterruptHandler::AliMDCInterruptHandler(const - AliMDCInterruptHandler& - handler): - TSignalHandler(handler) +void AliMDC::Stop() { -// copy constructor - - Fatal("AliMDCInterruptHandler", "copy constructor not implemented"); + // Stop the event loop + + fStop = kTRUE; + if (fRawDB) fRawDB->Stop(); } -//______________________________________________________________________________ -AliMDC::AliMDCInterruptHandler& - AliMDC::AliMDCInterruptHandler::operator = (const AliMDCInterruptHandler& - /*handler*/) -{ -// assignment operator - Fatal("operator =", "assignment operator not implemented"); - return *this; -}