1 // @(#)alimdc:$Name$:$Id$
2 // Author: Fons Rademakers 26/11/99
3 // Updated: Dario Favretto 15/04/2003
5 /**************************************************************************
6 * Copyright(c) 1998-2003, ALICE Experiment at CERN, All rights reserved. *
8 * Author: The ALICE Off-line Project. *
9 * Contributors are mentioned in the code where appropriate. *
11 * Permission to use, copy, modify and distribute this software and its *
12 * documentation strictly for non-commercial purposes is hereby granted *
13 * without fee, provided that the above copyright notice appears in all *
14 * copies and that both the copyright notice and this permission notice *
15 * appear in the supporting documentation. The authors make no claims *
16 * about the suitability of this software for any purpose. It is *
17 * provided "as is" without express or implied warranty. *
18 **************************************************************************/
20 //////////////////////////////////////////////////////////////////////////
24 // Set of classes defining the ALICE RAW event format. The AliRawEvent //
25 // class defines a RAW event. It consists of an AliEventHeader object //
26 // an AliEquipmentHeader object, an AliRawData object and an array of //
27 // sub-events, themselves also being AliRawEvents. The number of //
28 // sub-events depends on the number of DATE LDC's. //
29 // The AliRawEvent objects are written to a ROOT file using different //
30 // technologies, i.e. to local disk via AliRawDB or via rfiod using //
31 // AliRawRFIODB or via rootd using AliRawRootdDB or to CASTOR via //
32 // rootd using AliRawCastorDB (and for performance testing there is //
33 // also AliRawNullDB). //
34 // The AliRunDB class provides the interface to the run and file //
35 // catalogues (AliEn or plain MySQL). //
36 // The AliStats class provides statics information that is added as //
37 // a single keyed object to each raw file. //
38 // The AliTagDB provides an interface to a TAG database. //
39 // The AliMDC class is usid by the "alimdc" stand-alone program //
40 // that reads data directly from DATE. //
42 //////////////////////////////////////////////////////////////////////////
48 #include <TStopwatch.h>
54 #include "libDateEb.h"
57 #include "AliRawEvent.h"
58 #include "AliRawEventHeader.h"
59 #include "AliRawEquipmentHeader.h"
60 #include "AliRawData.h"
63 #include "AliRawRFIODB.h"
64 #include "AliRawCastorDB.h"
65 #include "AliRawRootdDB.h"
66 #include "AliRawNullDB.h"
75 #define ALIDEBUG(level) \
76 if (AliMDC::Instance() && (AliMDC::Instance()->GetDebugLevel() >= (level)))
79 // Fixed file system locations for the different DB's
81 const char* const AliMDC::fgkFifo = "/tmp/alimdc.fifo";
82 const char* const AliMDC::fgkRawDBFS[2] = { "/tmp/mdc1", "/tmp/mdc2" };
83 const char* const AliMDC::fgkTagDBFS = "/tmp/mdc1/tags";
84 const char* const AliMDC::fgkRunDBFS = "/tmp/mdc1/meta";
85 const char* const AliMDC::fgkRFIOFS = "rfio:/castor/cern.ch/user/r/rdm";
86 const char* const AliMDC::fgkCastorFS = "castor:/castor/cern.ch/user/r/rdm";
87 const char* const AliMDC::fgkRootdFS = "root://localhost//tmp/mdc1";
88 const char* const AliMDC::fgkAlienHost = "alien://aliens7.cern.ch:15000/?direct";
89 const char* const AliMDC::fgkAlienDir = "/alice_mdc/DC";
91 const char* const AliMDC::fgkFifo = "/tmp/alimdc.fifo";
92 const char* const AliMDC::fgkRawDBFS[2] = { "/data1/mdc", "/data2/mdc" };
93 const char* const AliMDC::fgkTagDBFS = "/data1/mdc/tags";
94 const char* const AliMDC::fgkRunDBFS = "/data1/mdc/meta";
95 const char* const AliMDC::fgkRFIOFS = "rfio:/castor/cern.ch/lcg/dc5";
96 const char* const AliMDC::fgkCastorFS = "castor:/castor/cern.ch/lcg/dc5";
97 const char* const AliMDC::fgkRootdFS = "root://localhost//tmp/mdc1";
98 const char* const AliMDC::fgkAlienHost = "alien://aliens7.cern.ch:15000/?direct";
99 const char* const AliMDC::fgkAlienDir = "/alice_mdc/DC";
102 // Maximum size of tag db files
103 const Double_t AliMDC::fgkMaxTagFileSize = 2.5e8; // 250MB
105 Bool_t AliMDC::fgDeleteFiles = kFALSE;
106 AliMDC* AliMDC::fgInstance = NULL;
109 //______________________________________________________________________________
110 AliMDC::AliMDC(Int_t fd, Int_t compress, Double_t maxFileSize, Bool_t useFilter,
111 EWriteMode mode, Bool_t useLoop, Bool_t delFiles)
113 // Create MDC processor object.
116 fCompress = compress;
117 fMaxFileSize = maxFileSize;
118 fUseFilter = useFilter;
126 fgDeleteFiles = delFiles;
131 Error("AliMDC", "cannot register with the event builder (%s)",
137 if ((mkfifo(fgkFifo, 0644) < 0) && (errno != EEXIST)) {
138 Error("AliMDC", "cannot create fifo %s", fgkFifo);
141 if ((chmod(fgkFifo, 0666) == -1) && (errno != EPERM)) {
142 Error("AliMDC", "cannot change permission of fifo %s", fgkFifo);
145 if ((fFd = open(fgkFifo, O_RDONLY)) == -1) {
146 Error("AliMDC", "cannot open input file %s", fgkFifo);
154 printf("<AliMDC::AliMDC>: input = %s, rawdb size = %f, filter = %s, "
155 "looping = %s, compression = %d, delete files = %s",
156 fUseFifo ? "fifo" : (fUseEb ? "eb" : "file"), fMaxFileSize,
157 fUseFilter ? "on" : "off", fUseLoop ? "yes" : "no", fCompress,
158 fgDeleteFiles ? "yes" : "no");
159 if (fWriteMode == kRFIO)
160 printf(", use RFIO\n");
161 else if (fWriteMode == kROOTD)
162 printf(", use rootd\n");
163 else if (fWriteMode == kCASTOR)
164 printf(", use CASTOR/rootd\n");
165 else if (fWriteMode == kDEVNULL)
166 printf(", write raw data to /dev/null\n");
170 // install SIGUSR1 handler to allow clean interrupts
171 gSystem->AddSignalHandler(new AliMDCInterruptHandler(this));
176 //______________________________________________________________________________
177 AliMDC::AliMDC(const AliMDC& mdc): TObject(mdc)
181 Fatal("AliMDC", "copy constructor not implemented");
184 //______________________________________________________________________________
185 AliMDC& AliMDC::operator = (const AliMDC& /*mdc*/)
187 // assignment operator
189 Fatal("operator =", "assignment operator not implemented");
193 //______________________________________________________________________________
196 // Run the MDC processor. Read from the input stream and only return
197 // when the input gave and EOF or a fatal error occured. On success 0
198 // is returned, 1 in case of a fatality.
203 // Make sure needed directories exist
205 dirs[0] = fgkRawDBFS[0];
206 dirs[1] = fgkRawDBFS[1];
207 dirs[2] = fgkTagDBFS;
208 dirs[3] = fgkRunDBFS;
209 for (int idir = 0; idir < 4; idir++) {
210 gSystem->ResetErrno();
211 gSystem->MakeDirectory(dirs[idir]);
212 if (gSystem->GetErrno() && gSystem->GetErrno() != EEXIST) {
213 SysError("Run", "mkdir %s", dirs[idir]);
218 // Used for statistics
220 Double_t told = 0, tnew = 0;
221 Float_t chunkSize = fMaxFileSize/100, nextChunk = chunkSize;
223 // Event object used to store event data.
224 AliRawEvent *event = new AliRawEvent;
226 // Create new raw DB.
228 if (fWriteMode == kRFIO)
229 rawdb = new AliRawRFIODB(event, fMaxFileSize, fCompress);
230 else if (fWriteMode == kROOTD)
231 rawdb = new AliRawRootdDB(event, fMaxFileSize, fCompress);
232 else if (fWriteMode == kCASTOR)
233 rawdb = new AliRawCastorDB(event, fMaxFileSize, fCompress);
234 else if (fWriteMode == kDEVNULL)
235 rawdb = new AliRawNullDB(event, fMaxFileSize, fCompress);
237 rawdb = new AliRawDB(event, fMaxFileSize, fCompress);
239 if (rawdb->IsZombie()) return 1;
240 printf("Filling raw DB %s\n", rawdb->GetDBName());
242 // Create new tag DB.
245 // no tagdb for the time being to get maximum speed
246 if (fWriteMode == fgkDEVNULL)
247 tagdb = new AliTagNullDB(event->GetHeader(), fgkMaxTagFileSize);
249 tagdb = new AliTagDB(event->GetHeader(), fgkMaxTagFileSize);
250 if (tagdb->IsZombie())
253 printf("Filling tag DB %s\n", tagdb->GetDBName());
256 // Create AliStats object
257 AliStats *stats = new AliStats(rawdb->GetDBName(), fCompress, fUseFilter);
259 // Shortcut for easy header access
260 AliRawEventHeader &header = *event->GetHeader();
262 // Process input stream
265 while (!(eorFlag = ebEor())) {
267 if ((ebvec = ebGetNextEvent()) == (void *)-1) {
268 Error("Run", "error getting next event (%s)", ebGetLastError());
272 // no event, sleep for 1 second and try again
273 gSystem->Sleep(1000);
276 char *ebdata = (char *) ebvec[0].iov_base;
283 if ((status = ReadHeader(header, ebdata)) != header.HeaderSize()) {
287 ::lseek(fFd, 0, SEEK_SET);
291 printf("<AliMDC::Run>: EOF, processed %d events\n", fNumEvents);
299 // If we were in looping mode stop directly after a SIGUSR1 signal
301 Info("Run", "Stopping loop, processed %d events", fNumEvents);
305 // Check if event has any hard track flagged
306 Bool_t callFilter = kFALSE;
307 // This needs to be re-engineered for the next ADC...
308 //if (fUseFilter && TEST_USER_ATTRIBUTE(header.GetTypeAttribute(), 0))
309 // callFilter = kTRUE;
311 // Check event type and skip "Start of Run", "End of Run",
312 // "Start of Run Files" and "End of Run Files"
313 switch (header.GetType()) {
314 case AliRawEventHeader::kStartOfRun:
315 case AliRawEventHeader::kEndOfRun:
316 case AliRawEventHeader::kStartOfRunFiles:
317 case AliRawEventHeader::kEndOfRunFiles:
319 Int_t skip = header.GetEventSize() - header.HeaderSize();
321 ::lseek(fFd, skip, SEEK_CUR);
324 Info("Run", "Skipping %s (%d bytes)", header.GetTypeName(), skip);
329 Int_t s = header.GetEventSize() - header.HeaderSize();
330 Info("Run", "Processing %s (%d bytes)", header.GetTypeName(), s);
334 // Amount of data left to read for this event
335 Int_t toRead = header.GetEventSize() - header.HeaderSize();
337 // If there is less data for this event than the next sub-event
338 // header, something is wrong. Skip to next event...
339 if (toRead < header.HeaderSize()) {
342 "header size (%d) exceeds number of bytes to read (%d)\n",
343 header.HeaderSize(), toRead);
346 if ((status = DumpEvent(toRead)) != toRead) {
351 Error("Run", "discarding event %d (too little data for header)", fNumEvents);
355 // Loop over all sub-events... (LDCs)
359 ebdata = (char *)ebvec[nsub].iov_base;
363 Info("Run", "reading LDC %d", nsub);
365 AliRawEvent *subEvent = event->NextSubEvent();
367 // Read sub-event header
368 AliRawEventHeader &subHeader = *subEvent->GetHeader();
369 if ((status = ReadHeader(subHeader, ebdata)) != subHeader.HeaderSize()) {
371 Error("Run", "unexpected EOF reading sub-event header");
380 toRead -= subHeader.HeaderSize();
383 ebdata = (char *)(ebvec[nsub].iov_base) + subHeader.HeaderSize();
386 Int_t rawSize = subHeader.GetEventSize() - subHeader.HeaderSize();
388 // Read Equipment Header (in case of physics or calibration event)
389 if (header.GetType() == AliRawEventHeader::kPhysicsEvent ||
390 header.GetType() == AliRawEventHeader::kCalibrationEvent) {
391 AliRawEquipmentHeader &equipment = *subEvent->GetEquipmentHeader();
392 Int_t equipHeaderSize = equipment.HeaderSize();
393 if ((status = ReadEquipmentHeader(equipment, header.DataIsSwapped(),
394 ebdata)) != equipHeaderSize) {
396 Error("Run", "unexpected EOF reading equipment-header");
401 toRead -= equipHeaderSize;
402 rawSize -= equipHeaderSize;
404 ebdata = (char *)(ebvec[nsub].iov_base) + subHeader.HeaderSize() +
409 // Make sure raw data less than left over bytes for current event
410 if (rawSize > toRead) {
412 Warning("Run", "raw data size (%d) exceeds number of bytes "
413 "to read (%d)\n", rawSize, toRead);
416 if ((status = DumpEvent(toRead)) != toRead) {
421 Error("Run", "discarding event %d (too much data)", fNumEvents);
425 // Read sub-event raw data
426 AliRawData &subRaw = *subEvent->GetRawData();
427 if ((status = ReadRawData(subRaw, rawSize, ebdata)) != rawSize) {
429 Error("Run", "unexpected EOF reading sub-event raw data");
437 if (TEST_USER_ATTRIBUTE(subHeader.GetTypeAttribute(), 0))
440 // set size of all sectors without hard track flag to 0
450 // Set stat info for first event of this file
451 if (rawdb->GetEvents() == 0)
452 stats->SetFirstId(header.GetRunNumber(), header.GetEventInRun());
454 // Store raw event in tree
457 // Store header in tree
458 if (tagdb) tagdb->Fill();
462 if (!(fNumEvents%10))
463 printf("Processed event %d (%d)\n", fNumEvents, rawdb->GetEvents());
465 // Filling time statistics
466 if (rawdb->GetBytesWritten() > nextChunk) {
467 tnew = timer.RealTime();
468 stats->Fill(tnew-told);
471 nextChunk += chunkSize;
474 // Check size of raw db. If bigger than maxFileSize, close file
475 // and continue with new file.
476 if (rawdb->FileFull()) {
478 printf("Written raw DB at a rate of %.1f MB/s\n",
479 rawdb->GetBytesWritten() / timer.RealTime() / 1000000.);
481 // Write stats object to raw db, run db, MySQL and AliEn
482 stats->WriteToDB(rawdb);
485 if (!rawdb->NextFile()) {
486 Error("Run", "error opening next raw data file");
490 printf("Filling raw DB %s\n", rawdb->GetDBName());
491 stats = new AliStats(rawdb->GetDBName(), fCompress, fUseFilter);
495 nextChunk = chunkSize;
498 // Check size of tag db
499 if (tagdb && tagdb->FileFull()) {
500 if (!tagdb->NextFile())
503 printf("Filling tag DB %s\n", tagdb->GetDBName());
506 // Make top event object ready for next event data
507 //printf("Event %d has %d sub-events\n", fNumEvents, event->GetNSubEvents());
511 if (!ebReleaseEvent(ebvec)) {
512 Error("Run", "problem releasing event (%s)", ebGetLastError());
518 printf("Written raw DB at a rate of %.1f MB/s\n",
519 rawdb->GetBytesWritten() / timer.RealTime() / 1000000.);
521 // Write stats to raw db and run db and delete stats object
522 stats->WriteToDB(rawdb);
531 // Close input source
536 if (fUseFifo && ::unlink(fgkFifo) == -1) {
537 SysError("Run", "unlink");
545 Info("Run", "event builder reported end of run (%d)", eorFlag);
552 //______________________________________________________________________________
553 Int_t AliMDC::Read(void *buffer, Int_t length)
555 // Read exactly length bytes into buffer. Returns number of bytes
556 // received, returns -1 in case of error and 0 for EOF.
560 if (fFd < 0) return -1;
563 char *buf = (char *)buffer;
565 for (n = 0; n < length; n += nrecv) {
566 if ((nrecv = read(fFd, buf+n, length-n)) <= 0) {
570 SysError("Read", "read");
577 //______________________________________________________________________________
578 Int_t AliMDC::ReadHeader(AliRawEventHeader &header, void *eb)
580 // Read header info from DATE data stream. Returns bytes read (i.e.
581 // AliRawEventHeader::HeaderSize()), -1 in case of error and 0 for EOF.
586 // read from event builder memory area
587 memcpy(header.HeaderBegin(), eb, header.HeaderSize());
588 nrecv = header.HeaderSize();
590 // read from fifo or file
591 if ((nrecv = Read(header.HeaderBegin(), header.HeaderSize())) !=
592 header.HeaderSize()) {
599 // Swap header data if needed
600 if (header.IsSwapped())
603 // Is header valid...
604 if (!header.IsValid()) {
605 Error("ReadHeader", "invalid header format");
606 // try recovery... how?
609 if (header.GetEventSize() < (UInt_t)header.HeaderSize()) {
610 Error("ReadHeader", "invalid header size");
611 // try recovery... how?
618 //______________________________________________________________________________
619 Int_t AliMDC::ReadEquipmentHeader(AliRawEquipmentHeader &header,
620 Bool_t isSwapped, void *eb)
622 // Read equipment header info from DATE data stream. Returns bytes read
623 // (i.e. AliRawEquipmentHeader::HeaderSize()), -1 in case of error and
624 // 0 for EOF. If isSwapped is kTRUE the event data is byte swapped
625 // and we will swap the header to host format.
630 // read from event builder memory area
631 memcpy(header.HeaderBegin(), eb, header.HeaderSize());
632 nrecv = header.HeaderSize();
634 // read from fifo or file
635 if ((nrecv = Read(header.HeaderBegin(), header.HeaderSize())) !=
636 header.HeaderSize()) {
643 // Swap equipment header data if needed
647 if (header.GetEquipmentSize() < (UInt_t)header.HeaderSize()) {
648 Error("ReadEquipmentHeader", "invalid equipment header size");
649 // try recovery... how?
656 //______________________________________________________________________________
657 Int_t AliMDC::ReadRawData(AliRawData &raw, Int_t size, void *eb)
659 // Read raw data from DATE data stream. Returns bytes read (i.e.
660 // AliRawEventHeader::HeaderSize()), -1 in case of error and 0 for EOF.
665 // read from event builder memory area
666 raw.SetBuffer(eb, size);
669 // read from fifo or file
671 if ((nrecv = Read(raw.GetBuffer(), size)) != size) {
673 Error("ReadRawData", "unexpected EOF");
683 //______________________________________________________________________________
684 Int_t AliMDC::DumpEvent(Int_t toRead)
686 // This case should not happen, but if it does try to handle it
687 // gracefully by reading the rest of the event and discarding it.
688 // Returns bytes read, -1 in case of fatal error and 0 for EOF.
690 Error("DumpEvent", "dumping %d bytes of event %d", toRead, fNumEvents);
693 char *tbuf = new char[toRead];
694 if ((nrecv = Read(tbuf, toRead)) != toRead) {
696 Error("DumpEvent", "unexpected EOF");
706 //______________________________________________________________________________
707 Int_t AliMDC::Filter(AliRawData &raw)
709 // Call 3rd level filter for this raw data segment.
718 printf("Filter called for event %d\n", fNumEvents);
725 //______________________________________________________________________________
726 AliMDC::AliMDCInterruptHandler::AliMDCInterruptHandler(const
727 AliMDCInterruptHandler&
729 TSignalHandler(handler)
733 Fatal("AliMDCInterruptHandler", "copy constructor not implemented");
736 //______________________________________________________________________________
737 AliMDC::AliMDCInterruptHandler&
738 AliMDC::AliMDCInterruptHandler::operator = (const AliMDCInterruptHandler&
741 // assignment operator
743 Fatal("operator =", "assignment operator not implemented");