1 // @(#)alimdc:$Name$:$Id$
2 // Author: Fons Rademakers 26/11/99
3 // Updated: Dario Favretto 15/04/2003
5 /**************************************************************************
6 * Copyright(c) 1998-2003, ALICE Experiment at CERN, All rights reserved. *
8 * Author: The ALICE Off-line Project. *
9 * Contributors are mentioned in the code where appropriate. *
11 * Permission to use, copy, modify and distribute this software and its *
12 * documentation strictly for non-commercial purposes is hereby granted *
13 * without fee, provided that the above copyright notice appears in all *
14 * copies and that both the copyright notice and this permission notice *
15 * appear in the supporting documentation. The authors make no claims *
16 * about the suitability of this software for any purpose. It is *
17 * provided "as is" without express or implied warranty. *
18 **************************************************************************/
22 //////////////////////////////////////////////////////////////////////////
26 // Set of classes defining the ALICE RAW event format. The AliRawEvent //
27 // class defines a RAW event. It consists of an AliEventHeader object //
28 // an AliEquipmentHeader object, an AliRawData object and an array of //
29 // sub-events, themselves also being AliRawEvents. The number of //
30 // sub-events depends on the number of DATE LDC's. //
31 // The AliRawEvent objects are written to a ROOT file using different //
32 // technologies, i.e. to local disk via AliRawDB or via rfiod using //
33 // AliRawRFIODB or via rootd using AliRawRootdDB or to CASTOR via //
34 // rootd using AliRawCastorDB (and for performance testing there is //
35 // also AliRawNullDB). //
36 // The AliRunDB class provides the interface to the run and file //
37 // catalogues (AliEn or plain MySQL). //
38 // The AliStats class provides statics information that is added as //
39 // a single keyed object to each raw file. //
40 // The AliTagDB provides an interface to a TAG database. //
41 // The AliMDC class is usid by the "alimdc" stand-alone program //
42 // that reads data directly from DATE. //
44 //////////////////////////////////////////////////////////////////////////
46 #include <sys/types.h>
53 #include <TStopwatch.h>
57 #include "libDateEb.h"
63 #include "AliRawEvent.h"
64 #include "AliRawEventHeaderBase.h"
65 #include "AliRawEquipment.h"
66 #include "AliRawEquipmentHeader.h"
67 #include "AliRawData.h"
70 #include "AliRawRFIODB.h"
71 #include "AliRawCastorDB.h"
72 #include "AliRawRootdDB.h"
73 #include "AliRawNullDB.h"
76 #include "AliFilter.h"
84 const char* const AliMDC::fgkFilterName[kNFilters] = {"AliHoughFilter"};
86 //______________________________________________________________________________
87 AliMDC::AliMDC(Int_t compress, Bool_t deleteFiles, EFilterMode filterMode,
88 const char* localRunDB, Bool_t rdbmsRunDB,
89 const char* alienHostRunDB, const char* alienDirRunDB,
90 Double_t maxSizeTagDB, const char* fileNameTagDB) :
91 fEvent(new AliRawEvent),
95 fRunDB(new AliRunDB(localRunDB, rdbmsRunDB, alienHostRunDB, alienDirRunDB)),
98 fDeleteFiles(deleteFiles),
99 fFilterMode(filterMode),
102 fIsTagDBCreated(kFALSE),
103 fMaxSizeTagDB(maxSizeTagDB),
104 fFileNameTagDB(fileNameTagDB)
106 // Create MDC processor object.
107 // compress is the file compression mode.
108 // If deleteFiles is kTRUE the raw data files will be deleted after they
110 // If the filterMode if kFilterOff no filter algorithm will be, if it is
111 // kFilterTransparent the algorthims will be run but no events will be
112 // rejected, if it is kFilterOn the filters will be run and the event will
113 // be rejected if all filters return kFALSE.
114 // localRunDB is the file name of the local run DB; if NULL no local run DB
116 // The filling of a MySQL run DB can be switch on or off with rdbmsRunDB.
117 // The host and directory name of the alien run DB can be specified by
118 // alienHostRunDB and alienDirRunDB; if NULL no alien DB will be filled.
119 // If maxSizeTagDB is greater than 0 it determines the maximal size of the
120 // tag DB and then fileNameTagDB is the directory name for the tag DB.
121 // Otherwise fileNameTagDB is the file name of the tag DB. If it is NULL
122 // no tag DB will be created.
125 if (fFilterMode != kFilterOff) {
129 // Tag DB is now created at the point where the header version is
131 // if (fileNameTagDB) {
132 // if (maxSizeTagDB > 0) {
133 // fTagDB = new AliTagDB(fEvent->GetHeader(), NULL);
134 // fTagDB->SetMaxSize(maxSizeTagDB);
135 // fTagDB->SetFS(fileNameTagDB);
138 // fTagDB = new AliTagDB(fEvent->GetHeader(), fileNameTagDB);
142 // install SIGUSR1 handler to allow clean interrupts
143 gSystem->AddSignalHandler(new AliMDCInterruptHandler(this));
145 // create the high level filters
146 if (fFilterMode != kFilterOff) {
147 for (Int_t iFilter = 0; iFilter < kNFilters; iFilter++) {
148 TClass* filterClass = gROOT->GetClass(fgkFilterName[iFilter]);
150 Warning("AliMDC", "no filter class %s found", fgkFilterName[iFilter]);
153 AliFilter* filter = (AliFilter*) filterClass->New();
155 Warning("AliMDC", "creation of filter %s failed", fgkFilterName[iFilter]);
158 fFilters.Add(filter);
163 //______________________________________________________________________________
169 if(fTagDB) delete fTagDB;
177 //______________________________________________________________________________
178 Int_t AliMDC::Open(EWriteMode mode, const char* fileName)
180 // open a new raw DB file
183 fRawDB = new AliRawRFIODB(fEvent, fESD, fCompress, fileName);
184 else if (mode == kROOTD)
185 fRawDB = new AliRawRootdDB(fEvent, fESD, fCompress, fileName);
186 else if (mode == kCASTOR)
187 fRawDB = new AliRawCastorDB(fEvent, fESD, fCompress, fileName);
188 else if (mode == kDEVNULL)
189 fRawDB = new AliRawNullDB(fEvent, fESD, fCompress, fileName);
191 fRawDB = new AliRawDB(fEvent, fESD, fCompress, fileName);
192 fRawDB->SetDeleteFiles(fDeleteFiles);
194 if (fRawDB->IsZombie()) {
199 Info("Open", "Filling raw DB %s\n", fRawDB->GetDBName());
201 // Create AliStats object
202 fStats = new AliStats(fRawDB->GetDBName(), fCompress,
203 fFilterMode != kFilterOff);
207 //______________________________________________________________________________
208 Int_t AliMDC::ProcessEvent(void* event, Bool_t isIovecArray)
210 // Convert the DATE event to an AliRawEvent object and store it in the raw DB,
211 // optionally also run the filter.
212 // event is either a pointer to the streamlined event
213 // or, if isIovecArray is kTRUE, a pointer to an array of iovecs with one
214 // iovec per subevent (used by the event builder).
215 // The return value is the number of written bytes or an error code
216 const UInt_t kFileSizeErrorLevel = 1900000000;
218 UInt_t currentFileSize = GetTotalSize();
219 if(currentFileSize > kFileSizeErrorLevel) {
220 Error("ProcessEvent", "file size (%u) exceeds the limit "
226 char* data = (char*) event;
227 if (isIovecArray) data = (char*) ((iovec*) event)[0].iov_base;
229 // Shortcut for easy header access
230 AliRawEventHeaderBase *header = fEvent->GetHeader(data);
233 if ((status = header->ReadHeader(data)) != (Int_t)header->GetHeadSize()) {
237 if (AliDebugLevel() > 2) ToAliDebug(3, header->Dump(););
239 // Check event type and skip "Start of Run", "End of Run",
240 // "Start of Run Files" and "End of Run Files"
241 Int_t size = header->GetEventSize() - header->GetHeadSize();
243 AliDebug(1, Form("Processing %s (%d bytes)", header->GetTypeName(), size));
245 // Amount of data left to read for this event
248 // StartOfRun, EndOfRun etc. events have no payload
249 // Nevertheless, store the event headers in the tree
252 // If there is less data for this event than the next sub-event
253 // header, something is wrong. Skip to next event...
254 if (toRead < (Int_t)header->GetHeadSize()) {
255 Error("ProcessEvent", "header size (%d) exceeds number of bytes "
256 "to read (%d)", header->GetHeadSize(), toRead);
257 if (AliDebugLevel() > 0) ToAliDebug(1, header->Dump(););
258 return kErrHeaderSize;
261 // Loop over all sub-events... (LDCs)
264 if (isIovecArray) data = (char*) ((iovec*) event)[nsub].iov_base;
266 AliDebug(1, Form("reading LDC %d", nsub));
268 AliRawEvent *subEvent = fEvent->NextSubEvent();
270 // Read sub-event header
271 AliRawEventHeaderBase *subHeader = subEvent->GetHeader(data);
272 if ((status = subHeader->ReadHeader(data)) != (Int_t)subHeader->GetHeadSize()) {
273 return kErrSubHeader;
276 if (AliDebugLevel() > 2) ToAliDebug(3, subHeader->Dump(););
278 toRead -= subHeader->GetHeadSize();
280 Int_t rawSize = subHeader->GetEventSize() - subHeader->GetHeadSize();
282 // Make sure raw data less than left over bytes for current event
283 if (rawSize > toRead) {
284 Warning("ProcessEvent", "raw data size (%d) exceeds number of "
285 "bytes to read (%d)\n", rawSize, toRead);
286 if (AliDebugLevel() > 0) ToAliDebug(1, subHeader->Dump(););
290 // Read Equipment Headers (in case of physics or calibration event)
291 if (header->Get("Type") == AliRawEventHeaderBase::kPhysicsEvent ||
292 header->Get("Type") == AliRawEventHeaderBase::kCalibrationEvent ||
293 header->Get("Type") == AliRawEventHeaderBase::kSystemSoftwareTriggerEvent ||
294 header->Get("Type") == AliRawEventHeaderBase::kDetectorSoftwareTriggerEvent) {
295 while (rawSize > 0) {
296 AliRawEquipment &equipment = *subEvent->NextEquipment();
297 AliRawEquipmentHeader &equipmentHeader =
298 *equipment.GetEquipmentHeader();
299 Int_t equipHeaderSize = equipmentHeader.HeaderSize();
300 if ((status = ReadEquipmentHeader(equipmentHeader, header->DataIsSwapped(),
301 data)) != equipHeaderSize) {
302 return kErrEquipmentHeader;
305 if (AliDebugLevel() > 2) ToAliDebug(3, equipmentHeader.Dump(););
307 toRead -= equipHeaderSize;
308 rawSize -= equipHeaderSize;
310 // Read equipment raw data
311 AliRawData &subRaw = *equipment.GetRawData();
313 Int_t eqSize = equipmentHeader.GetEquipmentSize() - equipHeaderSize;
314 if ((status = ReadRawData(subRaw, eqSize, data)) != eqSize) {
315 return kErrEquipment;
322 } else { // Read only raw data but no equipment header
324 AliRawEquipment &equipment = *subEvent->NextEquipment();
325 AliRawData &subRaw = *equipment.GetRawData();
326 if ((status = ReadRawData(subRaw, rawSize, data)) != rawSize) {
327 return kErrEquipment;
337 // High Level Event Filter
338 if (fFilterMode != kFilterOff) {
339 if (header->Get("Type") == AliRawEventHeaderBase::kPhysicsEvent ||
340 header->Get("Type") == AliRawEventHeaderBase::kCalibrationEvent ||
341 header->Get("Type") == AliRawEventHeaderBase::kSystemSoftwareTriggerEvent ||
342 header->Get("Type") == AliRawEventHeaderBase::kDetectorSoftwareTriggerEvent) {
343 Bool_t result = kFALSE;
344 for (Int_t iFilter = 0; iFilter < fFilters.GetEntriesFast(); iFilter++) {
345 AliFilter* filter = (AliFilter*) fFilters[iFilter];
346 if (!filter) continue;
347 if (filter->Filter(fEvent, fESD)) result = kTRUE;
349 if ((fFilterMode == kFilterOn) && !result) return kFilterReject;
353 // Set stat info for first event of this file
354 if (fRawDB->GetEvents() == 0)
355 fStats->SetFirstId(header->Get("RunNb"), header->GetP("Id")[0]);
357 // Store raw event in tree
358 Int_t nBytes = fRawDB->Fill();
360 // Create Tag DB here only after the raw data header
361 // version was already identified
362 if (!fIsTagDBCreated) {
363 if (fFileNameTagDB) {
364 if (fMaxSizeTagDB > 0) {
365 fTagDB = new AliTagDB(fEvent->GetHeader(), NULL);
366 fTagDB->SetMaxSize(fMaxSizeTagDB);
367 fTagDB->SetFS(fFileNameTagDB);
370 fTagDB = new AliTagDB(fEvent->GetHeader(), fFileNameTagDB);
373 fIsTagDBCreated = kTRUE;
376 // Store header in tree
377 if (fTagDB) fTagDB->Fill();
379 // Make top event object ready for next event data
381 // Clean up HLT ESD for the next event
382 if (fESD) fESD->Reset();
390 //______________________________________________________________________________
391 Int_t AliMDC::GetTotalSize()
393 // return the total current raw DB file size
395 if (!fRawDB) return -1;
397 return fRawDB->GetTotalSize();
400 //______________________________________________________________________________
401 Int_t AliMDC::Close()
403 // close the current raw DB file
405 if (!fRawDB) return -1;
407 fRawDB->WriteStats(fStats);
408 fRunDB->Update(fStats);
409 Int_t filesize = fRawDB->Close();
417 //______________________________________________________________________________
418 Int_t AliMDC::Run(const char* inputFile, Bool_t loop,
419 EWriteMode mode, Double_t maxFileSize,
420 const char* fs1, const char* fs2)
422 // Run the MDC processor. Read from the input stream and only return
423 // when the input gave and EOF or a fatal error occured. On success 0
424 // is returned, 1 in case of a fatality.
425 // inputFile is the name of the DATE input file; if NULL the input will
426 // be taken from the event builder.
427 // If loop is set the same input file will be reused in an infinite loop.
428 // mode specifies the type of the raw DB.
429 // maxFileSize is the maximal size of the raw DB.
430 // fs1 and fs2 are the file system locations of the raw DB.
432 Info("Run", "input = %s, rawdb size = %f, filter = %s, "
433 "looping = %s, compression = %d, delete files = %s",
434 inputFile ? inputFile : "event builder", maxFileSize,
435 fFilterMode == kFilterOff ? "off" :
436 (fFilterMode == kFilterOn ? "on" : "transparent"),
437 loop ? "yes" : "no", fCompress, fDeleteFiles ? "yes" : "no");
439 // Open the input file
442 if ((fd = open(inputFile, O_RDONLY)) == -1) {
443 Error("Run", "cannot open input file %s", inputFile);
448 // Used for statistics
451 Double_t told = 0, tnew = 0;
452 Float_t chunkSize = maxFileSize/100, nextChunk = chunkSize;
454 // Create new raw DB.
457 fRawDB = new AliRawRFIODB(fEvent, fESD, fCompress, NULL);
458 } else if (mode == kROOTD) {
459 fRawDB = new AliRawRootdDB(fEvent, fESD, fCompress, NULL);
460 } else if (mode == kCASTOR) {
461 fRawDB = new AliRawCastorDB(fEvent, fESD, fCompress, NULL);
462 } else if (mode == kDEVNULL) {
463 fRawDB = new AliRawNullDB(fEvent, fESD, fCompress, NULL);
465 fRawDB = new AliRawDB(fEvent, fESD, fCompress, NULL);
467 fRawDB->SetMaxSize(maxFileSize);
468 fRawDB->SetFS(fs1, fs2);
469 fRawDB->SetDeleteFiles(fDeleteFiles);
472 if (fRawDB->IsZombie()) {
477 printf("Filling raw DB %s\n", fRawDB->GetDBName());
479 // Create AliStats object
480 fStats = new AliStats(fRawDB->GetDBName(), fCompress,
481 fFilterMode != kFilterOff);
483 // Process input stream
488 UInt_t eventSize = 0;
491 AliRawEventHeaderBase header;
495 // If we were in looping mode stop directly after a SIGUSR1 signal
497 Info("Run", "Stopping loop, processed %d events", numEvents);
501 if (!inputFile) { // get data from event builder
503 if ((eorFlag = ebEor())) break;
504 if ((event = (char*)ebGetNextEvent()) == (char*)-1) {
505 Error("Run", "error getting next event (%s)", ebGetLastError());
509 // no event, sleep for 1 second and try again
510 gSystem->Sleep(1000);
514 Error("Run", "AliMDC was compiled without event builder support");
522 } else { // get data from a file
525 if ((nrecv = Read(fd, header.HeaderBaseBegin(), header.HeaderBaseSize())) !=
526 header.HeaderBaseSize()) {
527 if (nrecv == 0) { // eof
529 ::lseek(fd, 0, SEEK_SET);
535 Error("Run", "error reading base header");
542 char *data = (char *)header.HeaderBaseBegin();
543 AliRawEventHeaderBase *hdr = AliRawEventHeaderBase::Create(data);
545 if ((nrecv = Read(fd, hdr->HeaderBegin(), hdr->HeaderSize())) !=
547 if (nrecv == 0) { // eof
549 ::lseek(fd, 0, SEEK_SET);
557 Error("Run", "error reading header");
564 if (eventSize < hdr->GetEventSize()) {
566 eventSize = 2 * hdr->GetEventSize();
567 event = new char[eventSize];
569 memcpy(event, hdr->HeaderBaseBegin(), hdr->HeaderBaseSize());
570 memcpy(event+hdr->HeaderBaseSize(), hdr->HeaderBegin(), hdr->HeaderSize());
571 if (hdr->GetExtendedDataSize() != 0)
572 memcpy(event+hdr->HeaderBaseSize()+hdr->HeaderSize(),
573 hdr->GetExtendedData(), hdr->GetExtendedDataSize());
574 Int_t size = hdr->GetEventSize() - hdr->GetHeadSize();
575 if (Read(fd, event + hdr->GetHeadSize(), size) != size) {
576 Error("Run", "error reading data");
585 Int_t result = ProcessEvent(event, !inputFile);
587 Error("Run", "error writing data. Error code: %d",result);
592 printf("Processed event %d (%d)\n", numEvents, fRawDB->GetEvents());
596 // Filling time statistics
597 if (fRawDB->GetBytesWritten() > nextChunk) {
598 tnew = timer.RealTime();
599 fStats->Fill(tnew-told);
602 nextChunk += chunkSize;
605 // Check size of raw db. If bigger than maxFileSize, close file
606 // and continue with new file.
607 if (fRawDB->GetBytesWritten() > maxFileSize) {
609 printf("Written raw DB at a rate of %.1f MB/s\n",
610 fRawDB->GetBytesWritten() / timer.RealTime() / 1000000.);
612 // Write stats object to raw db, run db, MySQL and AliEn
613 fRawDB->WriteStats(fStats);
614 if (fRunDB) fRunDB->Update(fStats);
618 if (!fRawDB->NextFile()) {
619 Error("Run", "error opening next raw data file");
621 if (inputFile) delete[] event;
625 printf("Filling raw DB %s\n", fRawDB->GetDBName());
626 fStats = new AliStats(fRawDB->GetDBName(), fCompress,
627 fFilterMode != kFilterOff);
631 nextChunk = chunkSize;
634 // Check size of tag db
635 if (fTagDB && fTagDB->FileFull()) {
636 if (!fTagDB->NextFile()) {
640 printf("Filling tag DB %s\n", fTagDB->GetDBName());
645 // Make top event object ready for next event data
646 //printf("Event %d has %d sub-events\n", numEvents, fEvent->GetNSubEvents());
648 // Clean up HLT ESD for the next event
649 // if (fESD) fESD->Reset();
653 if (!ebReleaseEvent((iovec*)event)) {
654 Error("Run", "problem releasing event (%s)", ebGetLastError());
661 printf("Written raw DB at a rate of %.1f MB/s\n",
662 fRawDB->GetBytesWritten() / timer.RealTime() / 1000000.);
664 // Write stats to raw db and run db and delete stats object
671 Info("Run", "event builder reported end of run (%d)", eorFlag);
675 // Close input source
683 //______________________________________________________________________________
684 Int_t AliMDC::Read(Int_t fd, void *buffer, Int_t length)
686 // Read exactly length bytes into buffer. Returns number of bytes
687 // received, returns -1 in case of error and 0 for EOF.
691 if (fd < 0) return -1;
694 char *buf = (char *)buffer;
696 for (n = 0; n < length; n += nrecv) {
697 if ((nrecv = read(fd, buf+n, length-n)) <= 0) {
701 SysError("Read", "read");
708 //______________________________________________________________________________
709 Int_t AliMDC::ReadEquipmentHeader(AliRawEquipmentHeader &header,
710 Bool_t isSwapped, char*& data)
712 // Read equipment header info from DATE data stream. Returns bytes read
713 // (i.e. AliRawEquipmentHeader::HeaderSize()), -1 in case of error and
714 // 0 for EOF. If isSwapped is kTRUE the event data is byte swapped
715 // and we will swap the header to host format.
717 memcpy(header.HeaderBegin(), data, header.HeaderSize());
718 data += header.HeaderSize();
720 // Swap equipment header data if needed
724 if (header.GetEquipmentSize() < (UInt_t)header.HeaderSize()) {
725 Error("ReadEquipmentHeader", "invalid equipment header size");
726 // try recovery... how?
730 return header.HeaderSize();
733 //______________________________________________________________________________
734 Int_t AliMDC::ReadRawData(AliRawData &raw, Int_t size, char*& data)
736 // Read raw data from DATE data stream. Returns bytes read (i.e.
737 // size), -1 in case of error and 0 for EOF.
739 raw.SetBuffer(data, size);
745 //______________________________________________________________________________
748 // Stop the event loop
751 if (fRawDB) fRawDB->Stop();