]> git.uio.no Git - u/mrichter/AliRoot.git/blobdiff - RAW/AliMDC.cxx
Method to get an approximate output file size is added. AliMDC ProcessEvent method...
[u/mrichter/AliRoot.git] / RAW / AliMDC.cxx
index 2a57a83e2d04d0784c439a2b7fc79196c66ca473..ed929a6c4456470c38156101e5215208151e780f 100644 (file)
 #include <errno.h>
 
 #include <TSystem.h>
-#include <TError.h>
+#include <TROOT.h>
 #include <TStopwatch.h>
 
-#ifdef ALI_DATE
-#include "event.h"
-#endif
+#include <sys/uio.h>
 #ifdef USE_EB
 #include "libDateEb.h"
 #endif
 
-#ifdef USE_HLT
-#include <AliL3StandardIncludes.h>
-#include "AliL3Logging.h"
-#include <AliL3Transform.h>
-#include "AliRawReaderRoot.h"
-#include <AliL3Hough.h>
+#include <AliLog.h>
 #include <AliESD.h>
-#endif
 
 #include "AliRawEvent.h"
 #include "AliRawEventHeader.h"
@@ -80,6 +72,8 @@
 #include "AliRawRootdDB.h"
 #include "AliRawNullDB.h"
 #include "AliTagDB.h"
+#include "AliRunDB.h"
+#include "AliFilter.h"
 
 #include "AliMDC.h"
 
 ClassImp(AliMDC)
 
 
-#define ALIDEBUG(level) \
-   if (AliMDC::Instance() && (AliMDC::Instance()->GetDebugLevel() >= (level)))
-
-
-// Fixed file system locations for the different DB's
-#ifdef USE_RDM
-const char* const AliMDC::fgkFifo       = "/tmp/alimdc.fifo";
-const char* const AliMDC::fgkRawDBFS[2] = { "/tmp/mdc1", "/tmp/mdc2" };
-const char* const AliMDC::fgkTagDBFS    = "/tmp/mdc1/tags";
-const char* const AliMDC::fgkRunDBFS    = "/tmp/mdc1/meta";
-const char* const AliMDC::fgkRFIOFS     = "rfio:/castor/cern.ch/user/r/rdm";
-const char* const AliMDC::fgkCastorFS   = "castor:/castor/cern.ch/user/r/rdm";
-const char* const AliMDC::fgkRootdFS    = "root://localhost//tmp/mdc1";
-const char* const AliMDC::fgkAlienHost  = "alien://aliens7.cern.ch:15000/?direct";
-const char* const AliMDC::fgkAlienDir   = "/alice_mdc/DC";
-#else
-const char* const AliMDC::fgkFifo       = "/tmp/alimdc.fifo";
-const char* const AliMDC::fgkRawDBFS[2] = { "/data1/mdc", "/data2/mdc" };
-const char* const AliMDC::fgkTagDBFS    = "/data1/mdc/tags";
-const char* const AliMDC::fgkRunDBFS    = "/data1/mdc/meta";
-const char* const AliMDC::fgkRFIOFS     = "rfio:/castor/cern.ch/lcg/dc5";
-const char* const AliMDC::fgkCastorFS   = "castor:/castor/cern.ch/lcg/dc5";
-const char* const AliMDC::fgkRootdFS    = "root://localhost//tmp/mdc1";
-const char* const AliMDC::fgkAlienHost  = "alien://aliens7.cern.ch:15000/?direct";
-const char* const AliMDC::fgkAlienDir   = "/alice_mdc/DC";
-#endif
+// Filter names
+const char* const AliMDC::fgkFilterName[kNFilters] = {"AliHoughFilter"};
 
-// Maximum size of tag db files
-const Double_t AliMDC::fgkMaxTagFileSize = 2.5e8;    // 250MB
+//______________________________________________________________________________
+AliMDC::AliMDC(Int_t compress, Bool_t deleteFiles, EFilterMode filterMode, 
+              const char* localRunDB, Bool_t rdbmsRunDB,
+              const char* alienHostRunDB, const char* alienDirRunDB,
+              Double_t maxSizeTagDB, const char* fileNameTagDB) :
+  fEvent(new AliRawEvent),
+  fESD(NULL),
+  fStats(NULL),
+  fRawDB(NULL),
+  fRunDB(new AliRunDB(localRunDB, rdbmsRunDB, alienHostRunDB, alienDirRunDB)),
+  fTagDB(NULL),
+  fCompress(compress),
+  fDeleteFiles(deleteFiles),
+  fFilterMode(filterMode),
+  fFilters(),
+  fStop(kFALSE)
+{
+  // Create MDC processor object.
+  // compress is the file compression mode.
+  // If deleteFiles is kTRUE the raw data files will be deleted after they
+  // were closed.
+  // If the filterMode if kFilterOff no filter algorithm will be, if it is
+  // kFilterTransparent the algorthims will be run but no events will be
+  // rejected, if it is kFilterOn the filters will be run and the event will
+  // be rejected if all filters return kFALSE.
+  // localRunDB is the file name of the local run DB; if NULL no local run DB
+  // will be created.
+  // The filling of a MySQL run DB can be switch on or off with rdbmsRunDB.
+  // The host and directory name of the alien run DB can be specified by
+  // alienHostRunDB and alienDirRunDB; if NULL no alien DB will be filled.
+  // If maxSizeTagDB is greater than 0 it determines the maximal size of the
+  // tag DB and then fileNameTagDB is the directory name for the tag DB.
+  // Otherwise fileNameTagDB is the file name of the tag DB. If it is NULL
+  // no tag DB will be created.
+
+  if (fFilterMode != kFilterOff) {
+    fESD = new AliESD;
+  }
 
-Bool_t AliMDC::fgDeleteFiles = kFALSE;
-AliMDC* AliMDC::fgInstance = NULL;
+  if (fileNameTagDB) {
+    if (maxSizeTagDB > 0) {
+      fTagDB = new AliTagDB(fEvent->GetHeader(), NULL);
+      fTagDB->SetMaxSize(maxSizeTagDB);
+      fTagDB->SetFS(fileNameTagDB);
+      fTagDB->Create();
+    } else {
+      fTagDB = new AliTagDB(fEvent->GetHeader(), fileNameTagDB);
+    }
+  }
 
+  // install SIGUSR1 handler to allow clean interrupts
+  gSystem->AddSignalHandler(new AliMDCInterruptHandler(this));
 
-//______________________________________________________________________________
-AliMDC::AliMDC(Int_t fd, Int_t compress, Double_t maxFileSize, Bool_t useFilter,
-               EWriteMode mode, Bool_t useLoop, Bool_t delFiles)
-{
-   // Create MDC processor object.
-
-   fFd           = fd;
-   fCompress     = compress;
-   fMaxFileSize  = maxFileSize;
-   fUseFilter    = useFilter;
-   fWriteMode    = mode;
-   fUseLoop      = useLoop;
-   fUseFifo      = kFALSE;
-   fUseEb        = kFALSE;
-   fStopLoop     = kFALSE;
-   fNumEvents    = 0;
-   fDebugLevel   = 0;
-   fgDeleteFiles = delFiles;
-
-   if (fFd == -1) {
-#ifdef USE_EB
-     if (!ebRegister()) {
-        Error("AliMDC", "cannot register with the event builder (%s)",
-              ebGetLastError());
-        return;
-     }
-     fUseEb = kTRUE;
-#else
-     if ((mkfifo(fgkFifo, 0644) < 0) && (errno != EEXIST)) {
-         Error("AliMDC", "cannot create fifo %s", fgkFifo);
-         return;
-      }
-      if ((chmod(fgkFifo, 0666) == -1) && (errno != EPERM)) {
-         Error("AliMDC", "cannot change permission of fifo %s", fgkFifo);
-         return;
+  // create the high level filters
+  if (fFilterMode != kFilterOff) {
+    for (Int_t iFilter = 0; iFilter < kNFilters; iFilter++) {
+      TClass* filterClass = gROOT->GetClass(fgkFilterName[iFilter]);
+      if (!filterClass) {
+       Warning("AliMDC", "no filter class %s found", fgkFilterName[iFilter]);
+       continue;
       }
-      if ((fFd = open(fgkFifo, O_RDONLY)) == -1) {
-         Error("AliMDC", "cannot open input file %s", fgkFifo);
-         return;
+      AliFilter* filter = (AliFilter*) filterClass->New();
+      if (!filter) {
+       Warning("AliMDC", "creation of filter %s failed", fgkFilterName[iFilter]);
+       continue;
       }
-      fUseFifo = kTRUE;
-#endif
-      fUseLoop = kFALSE;
-   }
-
-   printf("<AliMDC::AliMDC>: input = %s, rawdb size = %f, filter = %s, "
-          "looping = %s, compression = %d, delete files = %s",
-          fUseFifo ? "fifo" : (fUseEb ? "eb" : "file"), fMaxFileSize,
-          fUseFilter ? "on" : "off", fUseLoop ? "yes" : "no", fCompress,
-          fgDeleteFiles ? "yes" : "no");
-   if (fWriteMode == kRFIO)
-      printf(", use RFIO\n");
-   else if (fWriteMode == kROOTD)
-      printf(", use rootd\n");
-   else if (fWriteMode == kCASTOR)
-      printf(", use CASTOR/rootd\n");
-   else if (fWriteMode == kDEVNULL)
-      printf(", write raw data to /dev/null\n");
-   else
-      printf("\n");
-
-   // install SIGUSR1 handler to allow clean interrupts
-   gSystem->AddSignalHandler(new AliMDCInterruptHandler(this));
-
-   fgInstance = this;
+      fFilters.Add(filter);
+    }
+  }
 }
 
+//______________________________________________________________________________
+AliMDC::~AliMDC()
+{
+// destructor
+
+  fFilters.Delete();
+  delete fTagDB;
+  delete fRunDB;
+  delete fRawDB;
+  delete fStats;
+  delete fESD;
+  delete fEvent;
+}
 //______________________________________________________________________________
 AliMDC::AliMDC(const AliMDC& mdc): TObject(mdc)
 {
@@ -205,425 +187,477 @@ AliMDC& AliMDC::operator = (const AliMDC& /*mdc*/)
   return *this;
 }
 
+
 //______________________________________________________________________________
-Int_t AliMDC::Run()
+Int_t AliMDC::Open(EWriteMode mode, const char* fileName)
 {
-   // Run the MDC processor. Read from the input stream and only return
-   // when the input gave and EOF or a fatal error occured. On success 0
-   // is returned, 1 in case of a fatality.
-
-   TStopwatch timer;
-   Int_t status;
-
-   // Make sure needed directories exist
-   const char *dirs[4];
-   dirs[0] = fgkRawDBFS[0];
-   dirs[1] = fgkRawDBFS[1];
-   dirs[2] = fgkTagDBFS;
-   dirs[3] = fgkRunDBFS;
-   for (int idir = 0; idir < 4; idir++) {
-      gSystem->ResetErrno();
-      gSystem->MakeDirectory(dirs[idir]);
-      if (gSystem->GetErrno() && gSystem->GetErrno() != EEXIST) {
-         SysError("Run", "mkdir %s", dirs[idir]);
-         return 1;
-      }
-   }
+// open a new raw DB file
+
+  if (mode == kRFIO)
+    fRawDB = new AliRawRFIODB(fEvent, fESD, fCompress, fileName);
+  else if (mode == kROOTD)
+    fRawDB = new AliRawRootdDB(fEvent, fESD, fCompress, fileName);
+  else if (mode == kCASTOR)
+    fRawDB = new AliRawCastorDB(fEvent, fESD, fCompress, fileName);
+  else if (mode == kDEVNULL)
+    fRawDB = new AliRawNullDB(fEvent, fESD, fCompress, fileName);
+  else
+    fRawDB = new AliRawDB(fEvent, fESD, fCompress, fileName);
+  fRawDB->SetDeleteFiles(fDeleteFiles);
+
+  if (fRawDB->IsZombie()) {
+    delete fRawDB;
+    fRawDB = NULL;
+    return 1;
+  }
+  Info("Open", "Filling raw DB %s\n", fRawDB->GetDBName());
 
-   // Used for statistics
-   timer.Start();
-   Double_t told = 0, tnew = 0;
-   Float_t  chunkSize = fMaxFileSize/100, nextChunk = chunkSize;
-
-   // Event object used to store event data.
-   AliRawEvent *event = new AliRawEvent;
-#ifdef USE_HLT
-   //Init HLT
-#ifndef use_logging
-   AliL3Log::fgLevel=AliL3Log::kError;
-   ALIDEBUG(1)
-     AliL3Log::fgLevel=AliL3Log::kWarning;
-   ALIDEBUG(2)
-     AliL3Log::fgLevel=AliL3Log::kWarning;
-   ALIDEBUG(3)
-     AliL3Log::fgLevel=AliL3Log::kNone;
-#endif
-   if (!AliL3Transform::Init("./", kFALSE)) {
-     Error("Run","HLT initialization failed!");
-     return 1;
-   }
+  // Create AliStats object
+  fStats = new AliStats(fRawDB->GetDBName(), fCompress, 
+                       fFilterMode != kFilterOff);
+  return 0;
+}
 
-   AliESD *esd = new AliESD;
-#endif
+//______________________________________________________________________________
+Int_t AliMDC::ProcessEvent(void* event, Bool_t isIovecArray)
+{
+// Convert the DATE event to an AliRawEvent object and store it in the raw DB,
+// optionally also run the filter.
+// event is either a pointer to the streamlined event 
+// or, if isIovecArray is kTRUE, a pointer to an array of iovecs with one
+// iovec per subevent (used by the event builder).
+// The return value is the number of written bytes or an error code
+  const UInt_t kFileSizeWarningLevel = 1800000000;
+  const UInt_t kFileSizeErrorLevel   = 1900000000;
+
+  UInt_t currentFileSize = GetTotalSize();
+  if(currentFileSize > kFileSizeErrorLevel) {
+    Error("ProcessEvent", "file size (%u) exceeds the limit "
+         , currentFileSize);
+    return kErrFileSize;
+  }
+  if(currentFileSize > kFileSizeWarningLevel)
+    Warning("ProcessEvent", "file size (%u) is close to the limit "
+           , currentFileSize);
 
-   // Create new raw DB.
-   AliRawDB *rawdb;
-   if (fWriteMode == kRFIO)
-      rawdb = new AliRawRFIODB(event, esd, fMaxFileSize, fCompress);
-   else if (fWriteMode == kROOTD)
-      rawdb = new AliRawRootdDB(event, esd, fMaxFileSize, fCompress);
-   else if (fWriteMode == kCASTOR)
-      rawdb = new AliRawCastorDB(event, esd, fMaxFileSize, fCompress);
-   else if (fWriteMode == kDEVNULL)
-      rawdb = new AliRawNullDB(event, esd, fMaxFileSize, fCompress);
-   else
-      rawdb = new AliRawDB(event, esd, fMaxFileSize, fCompress);
-
-   if (rawdb->IsZombie()) return 1;
-   printf("Filling raw DB %s\n", rawdb->GetDBName());
-
-   // Create new tag DB.
-   AliTagDB *tagdb = 0;
-#if 0
-   // no tagdb for the time being to get maximum speed
-   if (fWriteMode == fgkDEVNULL)
-      tagdb = new AliTagNullDB(event->GetHeader(), fgkMaxTagFileSize);
-   else
-      tagdb = new AliTagDB(event->GetHeader(), fgkMaxTagFileSize);
-   if (tagdb->IsZombie())
-      tagdb = 0;
-   else
-      printf("Filling tag DB %s\n", tagdb->GetDBName());
-#endif
+  Int_t status;
+  char* data = (char*) event;
+  if (isIovecArray) data = (char*) ((iovec*) event)[0].iov_base;
 
-   // Create AliStats object
-   AliStats *stats = new AliStats(rawdb->GetDBName(), fCompress, fUseFilter);
+  // Shortcut for easy header access
+  AliRawEventHeader &header = *fEvent->GetHeader();
 
-   // Shortcut for easy header access
-   AliRawEventHeader &header = *event->GetHeader();
+  // Read event header
+  if ((status = ReadHeader(header, data)) != header.HeaderSize()) {
+    return kErrHeader;
+  }
 
-   // Process input stream
-#ifdef USE_EB
-   Int_t eorFlag = 0;
-   while (!(eorFlag = ebEor())) {
-      struct iovec *ebvec;
-      if ((ebvec = ebGetNextEvent()) == (void *)-1) {
-         Error("Run", "error getting next event (%s)", ebGetLastError());
-         break;
-      }
-      if (ebvec == 0) {
-         // no event, sleep for 1 second and try again
-         gSystem->Sleep(1000);
-         continue;
-      }
-      char *ebdata = (char *) ebvec[0].iov_base;
-#else
-   while (1) {
-      char *ebdata = 0;
-#endif
+  if (AliDebugLevel() > 2) ToAliDebug(3, header.Dump(););
+
+  // Check event type and skip "Start of Run", "End of Run",
+  // "Start of Run Files" and "End of Run Files"
+  Int_t size = header.GetEventSize() - header.HeaderSize();
+  switch (header.GetType()) {
+  case AliRawEventHeader::kStartOfRun:
+  case AliRawEventHeader::kEndOfRun:
+  case AliRawEventHeader::kStartOfRunFiles:
+  case AliRawEventHeader::kEndOfRunFiles:
+    {
+      AliDebug(1, Form("Skipping %s (%d bytes)", header.GetTypeName(), size));
+      return kErrStartEndRun;
+    }
+  default:
+    {
+      AliDebug(1, Form("Processing %s (%d bytes)", header.GetTypeName(), size));
+    }
+  }
 
-      // Read event header
-      if ((status = ReadHeader(header, ebdata)) != header.HeaderSize()) {
-         if (status == 0) {
-            if (fUseLoop) {
-#ifndef USE_EB
-               ::lseek(fFd, 0, SEEK_SET);
-#endif
-               continue;
-            }
-            printf("<AliMDC::Run>: EOF, processed %d events\n", fNumEvents);
-            break;
-         }
-         return 1;
-      }
-      ALIDEBUG(3)
-         header.Dump();
+  // Amount of data left to read for this event
+  Int_t toRead = size;
+
+  // If there is less data for this event than the next sub-event
+  // header, something is wrong. Skip to next event...
+  if (toRead < header.HeaderSize()) {
+    Error("ProcessEvent", "header size (%d) exceeds number of bytes "
+         "to read (%d)", header.HeaderSize(), toRead);
+    if (AliDebugLevel() > 0) ToAliDebug(1, header.Dump(););
+    return kErrHeaderSize;
+  }
+  
+  // Loop over all sub-events... (LDCs)
+  Int_t nsub = 1;
+  while (toRead > 0) {
+    if (isIovecArray) data = (char*) ((iovec*) event)[nsub].iov_base;
+
+    AliDebug(1, Form("reading LDC %d", nsub));
+
+    AliRawEvent *subEvent = fEvent->NextSubEvent();
+
+    // Read sub-event header
+    AliRawEventHeader &subHeader = *subEvent->GetHeader();
+    if ((status = ReadHeader(subHeader, data)) != subHeader.HeaderSize()) {
+      return kErrSubHeader;
+    }
+
+    if (AliDebugLevel() > 2) ToAliDebug(3, subHeader.Dump(););
+
+    toRead -= subHeader.HeaderSize();
+
+    Int_t rawSize = subHeader.GetEventSize() - subHeader.HeaderSize();
+
+    // Make sure raw data less than left over bytes for current event
+    if (rawSize > toRead) {
+      Warning("ProcessEvent", "raw data size (%d) exceeds number of "
+             "bytes to read (%d)\n", rawSize, toRead);
+      if (AliDebugLevel() > 0) ToAliDebug(1, subHeader.Dump(););
+      return kErrDataSize;
+    }
+
+    // Read Equipment Headers (in case of physics or calibration event)
+    if (header.GetType() == AliRawEventHeader::kPhysicsEvent ||
+       header.GetType() == AliRawEventHeader::kCalibrationEvent) {
+      while (rawSize > 0) {
+       AliRawEquipment &equipment = *subEvent->NextEquipment();
+       AliRawEquipmentHeader &equipmentHeader = 
+         *equipment.GetEquipmentHeader();
+       Int_t equipHeaderSize = equipmentHeader.HeaderSize();
+       if ((status = ReadEquipmentHeader(equipmentHeader, header.DataIsSwapped(),
+                                         data)) != equipHeaderSize) {
+         return kErrEquipmentHeader;
+       }
+       toRead  -= equipHeaderSize;
+       rawSize -= equipHeaderSize;
+
+       // Read equipment raw data
+       AliRawData &subRaw = *equipment.GetRawData();
+
+       Int_t eqSize = equipmentHeader.GetEquipmentSize() - equipHeaderSize;
+       if ((status = ReadRawData(subRaw, eqSize, data)) != eqSize) {
+         return kErrEquipment;
+       }
+       toRead  -= eqSize;
+       rawSize -= eqSize;
 
-      // If we were in looping mode stop directly after a SIGUSR1 signal
-      if (StopLoop()) {
-         Info("Run", "Stopping loop, processed %d events", fNumEvents);
-         break;
       }
 
-      // Check if event has any hard track flagged
-      Bool_t callFilter = kFALSE;
-      if (fUseFilter)
-       callFilter = kTRUE;
-
-      // Check event type and skip "Start of Run", "End of Run",
-      // "Start of Run Files" and "End of Run Files"
-      switch (header.GetType()) {
-         case AliRawEventHeader::kStartOfRun:
-         case AliRawEventHeader::kEndOfRun:
-         case AliRawEventHeader::kStartOfRunFiles:
-         case AliRawEventHeader::kEndOfRunFiles:
-            {
-               Int_t skip = header.GetEventSize() - header.HeaderSize();
-#ifndef USE_EB
-               ::lseek(fFd, skip, SEEK_CUR);
-#endif
-               ALIDEBUG(1)
-                  Info("Run", "Skipping %s (%d bytes)", header.GetTypeName(), skip);
-               continue;
-            }
-         default:
-            ALIDEBUG(1) {
-               Int_t s = header.GetEventSize() - header.HeaderSize();
-               Info("Run", "Processing %s (%d bytes)", header.GetTypeName(), s);
-            }
+    } else {  // Read only raw data but no equipment header
+      AliRawEquipment &equipment = *subEvent->NextEquipment();
+      AliRawData &subRaw = *equipment.GetRawData();
+      if ((status = ReadRawData(subRaw, rawSize, data)) != rawSize) {
+       return kErrEquipment;
       }
+      toRead  -= rawSize;
+
+    }
 
-      // Amount of data left to read for this event
-      Int_t toRead = header.GetEventSize() - header.HeaderSize();
-
-      // If there is less data for this event than the next sub-event
-      // header, something is wrong. Skip to next event...
-      if (toRead < header.HeaderSize()) {
-        ALIDEBUG(1) {
-            Warning("Run",
-                   "header size (%d) exceeds number of bytes to read (%d)\n",
-                   header.HeaderSize(), toRead);
-           header.Dump();
-         }
-         if ((status = DumpEvent(toRead)) != toRead) {
-            if (status == 0)
-               break;
-            return 1;
-         }
-         Error("Run", "discarding event %d (too little data for header)", fNumEvents);
-         continue;
+    nsub++;
+  }
+
+  // High Level Event Filter
+  if (fFilterMode != kFilterOff) {
+    if (header.GetType() == AliRawEventHeader::kPhysicsEvent ||
+       header.GetType() == AliRawEventHeader::kCalibrationEvent) {
+      Bool_t result = kFALSE;
+      for (Int_t iFilter = 0; iFilter < fFilters.GetEntriesFast(); iFilter++) {
+       AliFilter* filter = (AliFilter*) fFilters[iFilter];
+       if (!filter) continue;
+       if (filter->Filter(fEvent, fESD)) result = kTRUE;
       }
+      if ((fFilterMode == kFilterOn) && !result) return kFilterReject;
+    }
+  }
 
-      // Loop over all sub-events... (LDCs)
-      Int_t nsub = 1;
-      while (toRead > 0) {
-#ifdef USE_EB
-         ebdata = (char *)ebvec[nsub].iov_base;
-#endif
+  // Set stat info for first event of this file
+  if (fRawDB->GetEvents() == 0)
+    fStats->SetFirstId(header.GetRunNumber(), header.GetEventInRun());
 
-         ALIDEBUG(1)
-            Info("Run", "reading LDC %d", nsub);
+  // Store raw event in tree
+  Int_t nBytes = fRawDB->Fill();
 
-         AliRawEvent *subEvent = event->NextSubEvent();
+  // Store header in tree
+  if (fTagDB) fTagDB->Fill();
 
-         // Read sub-event header
-         AliRawEventHeader &subHeader = *subEvent->GetHeader();
-         if ((status = ReadHeader(subHeader, ebdata)) != subHeader.HeaderSize()) {
-            if (status == 0) {
-               Error("Run", "unexpected EOF reading sub-event header");
-               break;
-            }
-            return 1;
-         }
+  // Make top event object ready for next event data
+  fEvent->Reset();
+  // Clean up HLT ESD for the next event
+  if (fESD) fESD->Reset();
 
-         ALIDEBUG(3)
-            subHeader.Dump();
+  return nBytes;
+}
+
+//______________________________________________________________________________
+Int_t AliMDC::GetTotalSize()
+{
+// return the total current raw DB file size
+
+  if (!fRawDB) return -1;
+
+  return fRawDB->GetTotalSize();
+}
+
+//______________________________________________________________________________
+Int_t AliMDC::Close()
+{
+// close the current raw DB file
+
+  if (!fRawDB) return 1;
+
+  fRawDB->WriteStats(fStats);
+  fRunDB->Update(fStats);
+  delete fRawDB;
+  fRawDB = NULL;
+  delete fStats;
+  fStats = NULL;
+  return 0;
+}
+
+//______________________________________________________________________________
+Int_t AliMDC::Run(const char* inputFile, Bool_t loop,
+                 EWriteMode mode, Double_t maxFileSize, 
+                 const char* fs1, const char* fs2)
+{
+  // Run the MDC processor. Read from the input stream and only return
+  // when the input gave and EOF or a fatal error occured. On success 0
+  // is returned, 1 in case of a fatality.
+  // inputFile is the name of the DATE input file; if NULL the input will
+  // be taken from the event builder.
+  // If loop is set the same input file will be reused in an infinite loop.
+  // mode specifies the type of the raw DB.
+  // maxFileSize is the maximal size of the raw DB.
+  // fs1 and fs2 are the file system locations of the raw DB.
+
+  Info("Run", "input = %s, rawdb size = %f, filter = %s, "
+       "looping = %s, compression = %d, delete files = %s",
+       inputFile ? inputFile : "event builder", maxFileSize,
+       fFilterMode == kFilterOff ? "off" : 
+       (fFilterMode == kFilterOn ? "on" : "transparent"), 
+       loop ? "yes" : "no", fCompress, fDeleteFiles ? "yes" : "no");
+
+  // Open the input file
+  Int_t fd = -1;
+  if (inputFile) {
+    if ((fd = open(inputFile, O_RDONLY)) == -1) {
+      Error("Run", "cannot open input file %s", inputFile);
+      return 1;
+    }
+  }
+
+  // Used for statistics
+  TStopwatch timer;
+  timer.Start();
+  Double_t told = 0, tnew = 0;
+  Float_t  chunkSize = maxFileSize/100, nextChunk = chunkSize;
+
+  // Create new raw DB.
+  if (fRawDB) Close();
+  if (mode == kRFIO) {
+    fRawDB = new AliRawRFIODB(fEvent, fESD, fCompress, NULL);
+  } else if (mode == kROOTD) {
+    fRawDB = new AliRawRootdDB(fEvent, fESD, fCompress, NULL);
+  } else if (mode == kCASTOR) {
+    fRawDB = new AliRawCastorDB(fEvent, fESD, fCompress, NULL);
+  } else if (mode == kDEVNULL) {
+    fRawDB = new AliRawNullDB(fEvent, fESD, fCompress, NULL);
+  } else {
+    fRawDB = new AliRawDB(fEvent, fESD, fCompress, NULL);
+  }
+  fRawDB->SetMaxSize(maxFileSize);
+  fRawDB->SetFS(fs1, fs2);
+  fRawDB->SetDeleteFiles(fDeleteFiles);
+  fRawDB->Create();
+
+  if (fRawDB->IsZombie()) {
+    delete fRawDB;
+    fRawDB = NULL;
+    return 1;
+  }
+  printf("Filling raw DB %s\n", fRawDB->GetDBName());
 
-         toRead -= subHeader.HeaderSize();
+  // Create AliStats object
+  fStats = new AliStats(fRawDB->GetDBName(), fCompress, 
+                       fFilterMode != kFilterOff);
 
+  // Process input stream
 #ifdef USE_EB
-         ebdata = (char *)(ebvec[nsub].iov_base) + subHeader.HeaderSize();
+  Int_t eorFlag = 0;
 #endif
+  char* event = NULL;
+  UInt_t eventSize = 0;
+  Int_t numEvents = 0;
 
-         Int_t rawSize = subHeader.GetEventSize() - subHeader.HeaderSize();
-
-         // Make sure raw data less than left over bytes for current event
-         if (rawSize > toRead) {
-            ALIDEBUG(1) {
-               Warning("Run", "raw data size (%d) exceeds number of "
-                      "bytes to read (%d)\n", rawSize, toRead);
-               subHeader.Dump();
-            }
-            if ((status = DumpEvent(toRead)) != toRead) {
-               if (status == 0)
-                  break;
-               return 1;
-            }
-            Error("Run", "discarding event %d (too much data)", fNumEvents);
-            continue;
-         }
-
-         // Read Equipment Headers (in case of physics or calibration event)
-         if (header.GetType() == AliRawEventHeader::kPhysicsEvent ||
-             header.GetType() == AliRawEventHeader::kCalibrationEvent) {
-            while (rawSize > 0) {
-               AliRawEquipment &equipment = *subEvent->NextEquipment();
-               AliRawEquipmentHeader &equipmentHeader = 
-                  *equipment.GetEquipmentHeader();
-               Int_t equipHeaderSize = equipmentHeader.HeaderSize();
-               if ((status = ReadEquipmentHeader(equipmentHeader, header.DataIsSwapped(),
-                                                 ebdata)) != equipHeaderSize) {
-                  if (status == 0) {
-                     Error("Run", "unexpected EOF reading equipment-header");
-                     break;
-                  }
-                  return 1;
-               }
-               toRead  -= equipHeaderSize;
-               rawSize -= equipHeaderSize;
+  while (kTRUE) {
+
+    // If we were in looping mode stop directly after a SIGUSR1 signal
+    if (fStop) {
+      Info("Run", "Stopping loop, processed %d events", numEvents);
+      break;
+    }
+
+    if (!inputFile) {  // get data from event builder
 #ifdef USE_EB
-               ebdata = (char *)(ebvec[nsub].iov_base) +
-                        subHeader.HeaderSize() + equipHeaderSize;
+      if ((eorFlag = ebEor())) break;
+      if ((event = (char*)ebGetNextEvent()) == (char*)-1) {
+       Error("Run", "error getting next event (%s)", ebGetLastError());
+       break;
+      }
+      if (event == 0) {
+       // no event, sleep for 1 second and try again
+       gSystem->Sleep(1000);
+       continue;
+      }
+#else
+      Error("Run", "AliMDC was compiled without event builder support");
+      delete fRawDB;
+      fRawDB = NULL;
+      delete fStats;
+      fStats = NULL;
+      return 1;
 #endif
 
-               // Read equipment raw data
-               AliRawData &subRaw = *equipment.GetRawData();
-
-              Int_t eqSize = equipmentHeader.GetEquipmentSize() -
-                              equipHeaderSize;
-               if ((status = ReadRawData(subRaw, eqSize, ebdata)) != eqSize) {
-                  if (status == 0) {
-                     Error("Run", "unexpected EOF reading sub-event raw data");
-                     break;
-                  }
-                  return 1;
-               }
-               toRead  -= eqSize;
-               rawSize -= eqSize;
-
-           }
-
-         } else {  // Read only raw data but no equipment header
-            AliRawEquipment &equipment = *subEvent->NextEquipment();
-            AliRawData &subRaw = *equipment.GetRawData();
-            if ((status = ReadRawData(subRaw, rawSize, ebdata)) != rawSize) {
-               if (status == 0) {
-                  Error("Run", "unexpected EOF reading sub-event raw data");
-                  break;
-               }
-               return 1;
-            }
-            toRead  -= rawSize;
-
-        }
-
-         nsub++;
+    } else {  // get data from a file
+      AliRawEventHeader header;
+      Int_t nrecv;
+      if ((nrecv = Read(fd, header.HeaderBegin(), header.HeaderSize())) !=
+         header.HeaderSize()) {
+       if (nrecv == 0) {  // eof
+         if (loop) {
+           ::lseek(fd, 0, SEEK_SET);
+           continue;
+         } else {
+           break;
+         }
+       } else {
+         Error("Run", "error reading header");
+         Close();
+         delete[] event;
+         return 1;
+       }
       }
-
-      //HLT
-      if (callFilter) {
-#ifdef ALI_DATE
-       if(header.GetType() == AliRawEventHeader::kPhysicsEvent ||
-          header.GetType() == AliRawEventHeader::kCalibrationEvent)
-         Filter(
-#ifdef USE_HLT
-                event,esd
-#endif
-                );
-#endif
+      if (eventSize < header.GetEventSize()) {
+       delete[] event;
+       eventSize = 2 * header.GetEventSize();
+       event = new char[eventSize];
       }
+      memcpy(event, header.HeaderBegin(), header.HeaderSize());
+      Int_t size = header.GetEventSize() - header.HeaderSize();
+      if (Read(fd, event + header.HeaderSize(), size) != size) {
+       Error("Run", "error reading data");
+       Close();
+       delete[] event;
+       return 1;
+      }
+    }
 
-      // Set stat info for first event of this file
-      if (rawdb->GetEvents() == 0)
-         stats->SetFirstId(header.GetRunNumber(), header.GetEventInRun());
-
-      // Store raw event in tree
-      rawdb->Fill();
-
-      // Store header in tree
-      if (tagdb) tagdb->Fill();
-
-      fNumEvents++;
+    Int_t result = ProcessEvent(event, !inputFile);
 
-      if (!(fNumEvents%10))
-         printf("Processed event %d (%d)\n", fNumEvents, rawdb->GetEvents());
+    if (result >= 0) {
+      numEvents++;
+      if (!(numEvents%10))
+       printf("Processed event %d (%d)\n", numEvents, fRawDB->GetEvents());
+    }
 
+    if (result > 0) {
       // Filling time statistics
-      if (rawdb->GetBytesWritten() > nextChunk) {
-         tnew = timer.RealTime();
-         stats->Fill(tnew-told);
-         told = tnew;
-         timer.Continue();
-         nextChunk += chunkSize;
+      if (fRawDB->GetBytesWritten() > nextChunk) {
+       tnew = timer.RealTime();
+       fStats->Fill(tnew-told);
+       told = tnew;
+       timer.Continue();
+       nextChunk += chunkSize;
       }
 
       // Check size of raw db. If bigger than maxFileSize, close file
       // and continue with new file.
-      if (rawdb->FileFull()) {
-
-         printf("Written raw DB at a rate of %.1f MB/s\n",
-                rawdb->GetBytesWritten() / timer.RealTime() / 1000000.);
-
-         // Write stats object to raw db, run db, MySQL and AliEn
-         stats->WriteToDB(rawdb);
-         delete stats;
-
-         if (!rawdb->NextFile()) {
-            Error("Run", "error opening next raw data file");
-            return 1;
-         }
-
-         printf("Filling raw DB %s\n", rawdb->GetDBName());
-         stats = new AliStats(rawdb->GetDBName(), fCompress, fUseFilter);
-
-         timer.Start();
-         told = 0, tnew = 0;
-         nextChunk = chunkSize;
+      if (fRawDB->GetBytesWritten() > maxFileSize) {
+
+       printf("Written raw DB at a rate of %.1f MB/s\n",
+              fRawDB->GetBytesWritten() / timer.RealTime() / 1000000.);
+
+       // Write stats object to raw db, run db, MySQL and AliEn
+       fRawDB->WriteStats(fStats);
+       if (fRunDB) fRunDB->Update(fStats);
+       delete fStats;
+       fStats = NULL;
+
+       if (!fRawDB->NextFile()) {
+         Error("Run", "error opening next raw data file");
+         Close();
+         if (inputFile) delete[] event;
+         return 1;
+       }
+
+       printf("Filling raw DB %s\n", fRawDB->GetDBName());
+       fStats = new AliStats(fRawDB->GetDBName(), fCompress, 
+                             fFilterMode != kFilterOff);
+
+       timer.Start();
+       told = 0, tnew = 0;
+       nextChunk = chunkSize;
       }
 
       // Check size of tag db
-      if (tagdb && tagdb->FileFull()) {
-         if (!tagdb->NextFile())
-            tagdb = 0;
-         else
-            printf("Filling tag DB %s\n", tagdb->GetDBName());
+      if (fTagDB && fTagDB->FileFull()) {
+       if (!fTagDB->NextFile()) {
+         delete fTagDB;
+         fTagDB = 0;
+       } else {
+         printf("Filling tag DB %s\n", fTagDB->GetDBName());
+       }
       }
+    }
 
-      // Make top event object ready for next event data
-      //printf("Event %d has %d sub-events\n", fNumEvents, event->GetNSubEvents());
-      event->Reset();
-#ifdef USE_HLT
-      // Clean up HLT ESD for the next event
-      // Probably we could add esd->Reset() method to AliESD?
-      esd->Reset();
-#endif
+    // Make top event object ready for next event data
+    //printf("Event %d has %d sub-events\n", numEvents, fEvent->GetNSubEvents());
+    //    fEvent->Reset();
+    // Clean up HLT ESD for the next event
+    //    if (fESD) fESD->Reset();
+
+    if (!inputFile) {
 #ifdef USE_EB
-      if (!ebReleaseEvent(ebvec)) {
-         Error("Run", "problem releasing event (%s)", ebGetLastError());
-         break;
+      if (!ebReleaseEvent((iovec*)event)) {
+       Error("Run", "problem releasing event (%s)", ebGetLastError());
+       break;
       }
 #endif
-   }
-
-   printf("Written raw DB at a rate of %.1f MB/s\n",
-          rawdb->GetBytesWritten() / timer.RealTime() / 1000000.);
-
-   // Write stats to raw db and run db and delete stats object
-   stats->WriteToDB(rawdb);
-   delete stats;
-
-   // Close the raw DB
-   delete rawdb;
-
-   // Close the tag DB
-   delete tagdb;
+    }
+  }
 
-   // Close input source
-   close(fFd);
+  printf("Written raw DB at a rate of %.1f MB/s\n",
+        fRawDB->GetBytesWritten() / timer.RealTime() / 1000000.);
 
-#if 0
-   // Cleanup fifo
-   if (fUseFifo && ::unlink(fgkFifo) == -1) {
-      SysError("Run", "unlink");
-      return 1;
-   }
-#endif
+  // Write stats to raw db and run db and delete stats object
+  Close();
 
+  if (!inputFile) {
 #ifdef USE_EB
-   // Print eor flag
-   if (eorFlag) {
+    // Print eor flag
+    if (eorFlag) {
       Info("Run", "event builder reported end of run (%d)", eorFlag);
-   }
+    }
 #endif
+  } else {
+    // Close input source
+    close(fd);
+  }
 
-   return 0;
+  return 0;
 }
 
 //______________________________________________________________________________
-Int_t AliMDC::Read(void *buffer, Int_t length)
+Int_t AliMDC::Read(Int_t fd, void *buffer, Int_t length)
 {
    // Read exactly length bytes into buffer. Returns number of bytes
    // received, returns -1 in case of error and 0 for EOF.
 
    errno = 0;
 
-   if (fFd < 0) return -1;
+   if (fd < 0) return -1;
 
    Int_t n, nrecv = 0;
    char *buf = (char *)buffer;
 
    for (n = 0; n < length; n += nrecv) {
-      if ((nrecv = read(fFd, buf+n, length-n)) <= 0) {
+      if ((nrecv = read(fd, buf+n, length-n)) <= 0) {
          if (nrecv == 0)
             break;        // EOF
          if (errno != EINTR)
@@ -635,223 +669,80 @@ Int_t AliMDC::Read(void *buffer, Int_t length)
 }
 
 //______________________________________________________________________________
-Int_t AliMDC::ReadHeader(AliRawEventHeader &header, void *eb)
+Int_t AliMDC::ReadHeader(AliRawEventHeader &header, char*& data)
 {
-   // Read header info from DATE data stream. Returns bytes read (i.e.
-   // AliRawEventHeader::HeaderSize()), -1 in case of error and 0 for EOF.
-
-   Int_t nrecv;
-
-   if (eb) {
-      // read from event builder memory area
-      memcpy(header.HeaderBegin(), eb, header.HeaderSize());
-      nrecv = header.HeaderSize();
-   } else {
-      // read from fifo or file
-      if ((nrecv = Read(header.HeaderBegin(), header.HeaderSize())) !=
-           header.HeaderSize()) {
-         if (nrecv == 0)
-            return 0;
-         return -1;
-      }
-   }
+  // Read header info from DATE data stream. Returns bytes read (i.e.
+  // AliRawEventHeader::HeaderSize()), -1 in case of error and 0 for EOF.
 
-   // Swap header data if needed
-   if (header.IsSwapped())
-      header.Swap();
+  memcpy(header.HeaderBegin(), data, header.HeaderSize());
+  data += header.HeaderSize();
 
-   // Is header valid...
-   if (!header.IsValid()) {
-      Error("ReadHeader", "invalid header format");
-      // try recovery... how?
-      return -1;
-   }
-   if (header.GetEventSize() < (UInt_t)header.HeaderSize()) {
-      Error("ReadHeader", "invalid header size");
-      // try recovery... how?
-      return -1;
-   }
+  // Swap header data if needed
+  if (header.IsSwapped())
+    header.Swap();
 
-   return nrecv;
+  // Is header valid...
+  if (!header.IsValid()) {
+    Error("ReadHeader", "invalid header format");
+    // try recovery... how?
+    return -1;
+  }
+  if (header.GetEventSize() < (UInt_t)header.HeaderSize()) {
+    Error("ReadHeader", "invalid header size");
+    // try recovery... how?
+    return -1;
+  }
+
+  return header.HeaderSize();
 }
 
 //______________________________________________________________________________
 Int_t AliMDC::ReadEquipmentHeader(AliRawEquipmentHeader &header,
-                                  Bool_t isSwapped, void *eb)
+                                  Bool_t isSwapped, char*& data)
 {
-   // Read equipment header info from DATE data stream. Returns bytes read
-   // (i.e. AliRawEquipmentHeader::HeaderSize()), -1 in case of error and
-   // 0 for EOF. If isSwapped is kTRUE the event data is byte swapped
-   // and we will swap the header to host format.
-
-   Int_t nrecv;
-
-   if (eb) {
-      // read from event builder memory area
-      memcpy(header.HeaderBegin(), eb, header.HeaderSize());
-      nrecv = header.HeaderSize();
-   } else {
-      // read from fifo or file
-      if ((nrecv = Read(header.HeaderBegin(), header.HeaderSize())) !=
-           header.HeaderSize()) {
-         if (nrecv == 0)
-            return 0;
-         return -1;
-      }
-   }
-
-   // Swap equipment header data if needed
-   if (isSwapped)
-      header.Swap();
-
-   if (header.GetEquipmentSize() < (UInt_t)header.HeaderSize()) {
-      Error("ReadEquipmentHeader", "invalid equipment header size");
-      // try recovery... how?
-      return -1;
-   }
+  // Read equipment header info from DATE data stream. Returns bytes read
+  // (i.e. AliRawEquipmentHeader::HeaderSize()), -1 in case of error and
+  // 0 for EOF. If isSwapped is kTRUE the event data is byte swapped
+  // and we will swap the header to host format.
+
+  memcpy(header.HeaderBegin(), data, header.HeaderSize());
+  data += header.HeaderSize();
+
+  // Swap equipment header data if needed
+  if (isSwapped)
+    header.Swap();
+
+  if (header.GetEquipmentSize() < (UInt_t)header.HeaderSize()) {
+    Error("ReadEquipmentHeader", "invalid equipment header size");
+    // try recovery... how?
+    return -1;
+  }
 
-   return nrecv;
+  return header.HeaderSize();
 }
 
 //______________________________________________________________________________
-Int_t AliMDC::ReadRawData(AliRawData &raw, Int_t size, void *eb)
+Int_t AliMDC::ReadRawData(AliRawData &raw, Int_t size, char*& data)
 {
-   // Read raw data from DATE data stream. Returns bytes read (i.e.
-   // AliRawEventHeader::HeaderSize()), -1 in case of error and 0 for EOF.
-
-   Int_t nrecv;
-
-   if (eb) {
-      // read from event builder memory area
-      raw.SetBuffer(eb, size);
-      nrecv = size;
-   } else {
-      // read from fifo or file
-      raw.SetSize(size);
-      if ((nrecv = Read(raw.GetBuffer(), size)) != size) {
-         if (nrecv == 0) {
-            Error("ReadRawData", "unexpected EOF");
-            return 0;
-         }
-         return -1;
-      }
-   }
+  // Read raw data from DATE data stream. Returns bytes read (i.e.
+  // size), -1 in case of error and 0 for EOF.
 
-   return nrecv;
-}
+  raw.SetBuffer(data, size);
+  data += size;
 
-//______________________________________________________________________________
-Int_t AliMDC::DumpEvent(Int_t toRead)
-{
-   // This case should not happen, but if it does try to handle it
-   // gracefully by reading the rest of the event and discarding it.
-   // Returns bytes read, -1 in case of fatal error and 0 for EOF.
-
-   Error("DumpEvent", "dumping %d bytes of event %d", toRead, fNumEvents);
-
-   Int_t nrecv;
-   char *tbuf = new char[toRead];
-   if ((nrecv = Read(tbuf, toRead)) != toRead) {
-      if (nrecv == 0) {
-         Error("DumpEvent", "unexpected EOF");
-         return 0;
-      }
-      return -1;
-   }
-   delete [] tbuf;
-
-   return nrecv;
+  return size;
 }
 
 //______________________________________________________________________________
-Int_t AliMDC::Filter(
-#ifdef USE_HLT
-                    AliRawEvent *event,AliESD *esd
-#endif
-                    )
+void AliMDC::Stop()
 {
-  // Call 3rd level filter for this raw data event.
-
-#ifdef USE_HLT
-
-  // Run the HLT code
-  {
-    TStopwatch timer;
-    timer.Start();
-
-    AliL3Hough *hough1 = new AliL3Hough();
-    
-    hough1->SetThreshold(4);
-    hough1->SetTransformerParams(76,140,0.4,-1);
-    hough1->SetPeakThreshold(70,-1);
-    // Attention Z of the vertex to be taken from the event head!
-    // So far for debug purposes it is fixed by hand...
-    hough1->Init(100,4,event,3.82147);
-    hough1->SetAddHistograms();
-
-    AliL3Hough *hough2 = new AliL3Hough();
-
-    hough2->SetThreshold(4);
-    hough2->SetTransformerParams(76,140,0.4,-1);
-    hough2->SetPeakThreshold(70,-1);
-    hough2->Init(100,4,event,3.82147);
-    hough2->SetAddHistograms();
-
-    Int_t nglobaltracks = 0;
-    /* In case we run HLT code in 2 threads */
-    hough1->StartProcessInThread(0,17);
-    hough2->StartProcessInThread(18,35);
-
-    if(hough1->WaitForThreadFinish())
-      ::Fatal("AliL3Hough::WaitForThreadFinish"," Can not join the required thread! ");
-    if(hough2->WaitForThreadFinish())
-      ::Fatal("AliL3Hough::WaitForThreadFinish"," Can not join the required thread! ");
-
-    /* In case we run HLT code in the main thread
-    for(Int_t slice=0; slice<=17; slice++)
-      {
-       hough1->ReadData(slice,0);
-       hough1->Transform();
-       hough1->AddAllHistogramsRows();
-       hough1->FindTrackCandidatesRow();
-       hough1->AddTracks();
-      }
-    for(Int_t slice=18; slice<=35; slice++)
-      {
-       hough2->ReadData(slice,0);
-       hough2->Transform();
-       hough2->AddAllHistogramsRows();
-       hough2->FindTrackCandidatesRow();
-       hough2->AddTracks();
-      }
-    */
-
-    nglobaltracks += hough1->FillESD(esd);
-    nglobaltracks += hough2->FillESD(esd);
-
-    /* In case we want to debug the ESD
-    gSystem->MakeDirectory("hough1");
-    hough1->WriteTracks("./hough1");
-    gSystem->MakeDirectory("hough2");
-    hough2->WriteTracks("./hough2");
-    */
-
-    delete hough1;
-    delete hough2;
-
-    printf("Filter called for event %d\n", fNumEvents);
-    printf("Filter has found %d TPC tracks in %f seconds\n", nglobaltracks,timer.RealTime());
-  }
-
-#else
-
-  printf("Filter called for event %d\n", fNumEvents);
-
-#endif
-
-  return 0;
+  // Stop the event loop
+  fStop = kTRUE; 
+  if (fRawDB) fRawDB->Stop(); 
 }
 
+
 //______________________________________________________________________________
 AliMDC::AliMDCInterruptHandler::AliMDCInterruptHandler(const 
                                                       AliMDCInterruptHandler&