1 // @(#)alimdc:$Name$:$Id$
2 // Author: Fons Rademakers 26/11/99
3 // Updated: Dario Favretto 15/04/2003
5 /**************************************************************************
6 * Copyright(c) 1998-2003, ALICE Experiment at CERN, All rights reserved. *
8 * Author: The ALICE Off-line Project. *
9 * Contributors are mentioned in the code where appropriate. *
11 * Permission to use, copy, modify and distribute this software and its *
12 * documentation strictly for non-commercial purposes is hereby granted *
13 * without fee, provided that the above copyright notice appears in all *
14 * copies and that both the copyright notice and this permission notice *
15 * appear in the supporting documentation. The authors make no claims *
16 * about the suitability of this software for any purpose. It is *
17 * provided "as is" without express or implied warranty. *
18 **************************************************************************/
22 //////////////////////////////////////////////////////////////////////////
26 // Set of classes defining the ALICE RAW event format. The AliRawEvent //
27 // class defines a RAW event. It consists of an AliEventHeader object //
28 // an AliEquipmentHeader object, an AliRawData object and an array of //
29 // sub-events, themselves also being AliRawEvents. The number of //
30 // sub-events depends on the number of DATE LDC's. //
31 // The AliRawEvent objects are written to a ROOT file using different //
32 // technologies, i.e. to local disk via AliRawDB or via rfiod using //
33 // AliRawRFIODB or via rootd using AliRawRootdDB or to CASTOR via //
34 // rootd using AliRawCastorDB (and for performance testing there is //
35 // also AliRawNullDB). //
36 // The AliRunDB class provides the interface to the run and file //
37 // catalogues (AliEn or plain MySQL). //
38 // The AliStats class provides statics information that is added as //
39 // a single keyed object to each raw file. //
40 // The AliTagDB provides an interface to a TAG database. //
41 // The AliMDC class is usid by the "alimdc" stand-alone program //
42 // that reads data directly from DATE. //
44 //////////////////////////////////////////////////////////////////////////
46 #include <sys/types.h>
53 #include <TStopwatch.h>
59 #include "libDateEb.h"
63 #include <AliL3StandardIncludes.h>
65 #include "AliL3Logging.h"
67 #include <AliL3Transform.h>
68 #include "AliRawReaderRoot.h"
69 #include <AliL3Hough.h>
73 #include "AliRawEvent.h"
74 #include "AliRawEventHeader.h"
75 #include "AliRawEquipment.h"
76 #include "AliRawEquipmentHeader.h"
77 #include "AliRawData.h"
80 #include "AliRawRFIODB.h"
81 #include "AliRawCastorDB.h"
82 #include "AliRawRootdDB.h"
83 #include "AliRawNullDB.h"
93 #define ALIDEBUG(level) \
94 if (AliMDC::Instance() && (AliMDC::Instance()->GetDebugLevel() >= (level)))
97 // Fixed file system locations for the different DB's
99 const char* const AliMDC::fgkFifo = "/tmp/alimdc.fifo";
100 const char* const AliMDC::fgkRawDBFS[2] = { "/tmp/mdc1", "/tmp/mdc2" };
101 const char* const AliMDC::fgkTagDBFS = "/tmp/mdc1/tags";
102 const char* const AliMDC::fgkRunDBFS = "/tmp/mdc1/meta";
103 const char* const AliMDC::fgkRFIOFS = "rfio:/castor/cern.ch/user/r/rdm";
104 const char* const AliMDC::fgkCastorFS = "castor:/castor/cern.ch/user/r/rdm";
105 const char* const AliMDC::fgkRootdFS = "root://localhost//tmp/mdc1";
106 const char* const AliMDC::fgkAlienHost = "alien://aliens7.cern.ch:15000/?direct";
107 const char* const AliMDC::fgkAlienDir = "/alice_mdc/DC";
109 const char* const AliMDC::fgkFifo = "/tmp/alimdc.fifo";
110 const char* const AliMDC::fgkRawDBFS[2] = { "/data1/mdc", "/data2/mdc" };
111 const char* const AliMDC::fgkTagDBFS = "/data1/mdc/tags";
112 const char* const AliMDC::fgkRunDBFS = "/data1/mdc/meta";
113 const char* const AliMDC::fgkRFIOFS = "rfio:/castor/cern.ch/lcg/dc5";
114 const char* const AliMDC::fgkCastorFS = "castor:/castor/cern.ch/lcg/dc5";
115 const char* const AliMDC::fgkRootdFS = "root://localhost//tmp/mdc1";
116 const char* const AliMDC::fgkAlienHost = "alien://aliens7.cern.ch:15000/?direct";
117 const char* const AliMDC::fgkAlienDir = "/alice_mdc/DC";
120 // Maximum size of tag db files
121 const Double_t AliMDC::fgkMaxTagFileSize = 2.5e8; // 250MB
123 Bool_t AliMDC::fgDeleteFiles = kFALSE;
124 AliMDC* AliMDC::fgInstance = NULL;
127 //______________________________________________________________________________
128 AliMDC::AliMDC(Int_t fd, Int_t compress, Double_t maxFileSize, Bool_t useFilter,
129 EWriteMode mode, Bool_t useLoop, Bool_t delFiles)
131 // Create MDC processor object.
134 fCompress = compress;
135 fMaxFileSize = maxFileSize;
136 fUseFilter = useFilter;
144 fgDeleteFiles = delFiles;
149 Error("AliMDC", "cannot register with the event builder (%s)",
155 if ((mkfifo(fgkFifo, 0644) < 0) && (errno != EEXIST)) {
156 Error("AliMDC", "cannot create fifo %s", fgkFifo);
159 if ((chmod(fgkFifo, 0666) == -1) && (errno != EPERM)) {
160 Error("AliMDC", "cannot change permission of fifo %s", fgkFifo);
163 if ((fFd = open(fgkFifo, O_RDONLY)) == -1) {
164 Error("AliMDC", "cannot open input file %s", fgkFifo);
172 printf("<AliMDC::AliMDC>: input = %s, rawdb size = %f, filter = %s, "
173 "looping = %s, compression = %d, delete files = %s",
174 fUseFifo ? "fifo" : (fUseEb ? "eb" : "file"), fMaxFileSize,
175 fUseFilter ? "on" : "off", fUseLoop ? "yes" : "no", fCompress,
176 fgDeleteFiles ? "yes" : "no");
177 if (fWriteMode == kRFIO)
178 printf(", use RFIO\n");
179 else if (fWriteMode == kROOTD)
180 printf(", use rootd\n");
181 else if (fWriteMode == kCASTOR)
182 printf(", use CASTOR/rootd\n");
183 else if (fWriteMode == kDEVNULL)
184 printf(", write raw data to /dev/null\n");
188 // install SIGUSR1 handler to allow clean interrupts
189 gSystem->AddSignalHandler(new AliMDCInterruptHandler(this));
194 //______________________________________________________________________________
195 AliMDC::AliMDC(const AliMDC& mdc): TObject(mdc)
199 Fatal("AliMDC", "copy constructor not implemented");
202 //______________________________________________________________________________
203 AliMDC& AliMDC::operator = (const AliMDC& /*mdc*/)
205 // assignment operator
207 Fatal("operator =", "assignment operator not implemented");
211 //______________________________________________________________________________
214 // Run the MDC processor. Read from the input stream and only return
215 // when the input gave and EOF or a fatal error occured. On success 0
216 // is returned, 1 in case of a fatality.
221 // Make sure needed directories exist
223 dirs[0] = fgkRawDBFS[0];
224 dirs[1] = fgkRawDBFS[1];
225 dirs[2] = fgkTagDBFS;
226 dirs[3] = fgkRunDBFS;
227 for (int idir = 0; idir < 4; idir++) {
228 gSystem->ResetErrno();
229 gSystem->MakeDirectory(dirs[idir]);
230 if (gSystem->GetErrno() && gSystem->GetErrno() != EEXIST) {
231 SysError("Run", "mkdir %s", dirs[idir]);
236 // Used for statistics
238 Double_t told = 0, tnew = 0;
239 Float_t chunkSize = fMaxFileSize/100, nextChunk = chunkSize;
241 // Event object used to store event data.
242 AliRawEvent *event = new AliRawEvent;
246 AliL3Log::fgLevel=AliL3Log::kError;
248 AliL3Log::fgLevel=AliL3Log::kWarning;
250 AliL3Log::fgLevel=AliL3Log::kWarning;
252 AliL3Log::fgLevel=AliL3Log::kNone;
255 if (!AliL3Transform::Init("./", kFALSE)) {
256 Error("Run","HLT initialization failed!");
261 AliESD *esd = new AliESD;
264 // Create new raw DB.
266 if (fWriteMode == kRFIO)
267 rawdb = new AliRawRFIODB(event, esd, fMaxFileSize, fCompress);
268 else if (fWriteMode == kROOTD)
269 rawdb = new AliRawRootdDB(event, esd, fMaxFileSize, fCompress);
270 else if (fWriteMode == kCASTOR)
271 rawdb = new AliRawCastorDB(event, esd, fMaxFileSize, fCompress);
272 else if (fWriteMode == kDEVNULL)
273 rawdb = new AliRawNullDB(event, esd, fMaxFileSize, fCompress);
275 rawdb = new AliRawDB(event, esd, fMaxFileSize, fCompress);
277 if (rawdb->IsZombie()) return 1;
278 printf("Filling raw DB %s\n", rawdb->GetDBName());
280 // Create new tag DB.
283 // no tagdb for the time being to get maximum speed
284 if (fWriteMode == fgkDEVNULL)
285 tagdb = new AliTagNullDB(event->GetHeader(), fgkMaxTagFileSize);
287 tagdb = new AliTagDB(event->GetHeader(), fgkMaxTagFileSize);
288 if (tagdb->IsZombie())
291 printf("Filling tag DB %s\n", tagdb->GetDBName());
294 // Create AliStats object
295 AliStats *stats = new AliStats(rawdb->GetDBName(), fCompress, fUseFilter);
297 // Shortcut for easy header access
298 AliRawEventHeader &header = *event->GetHeader();
300 // Process input stream
303 while (!(eorFlag = ebEor())) {
305 if ((ebvec = ebGetNextEvent()) == (void *)-1) {
306 Error("Run", "error getting next event (%s)", ebGetLastError());
310 // no event, sleep for 1 second and try again
311 gSystem->Sleep(1000);
314 char *ebdata = (char *) ebvec[0].iov_base;
321 if ((status = ReadHeader(header, ebdata)) != header.HeaderSize()) {
325 ::lseek(fFd, 0, SEEK_SET);
329 printf("<AliMDC::Run>: EOF, processed %d events\n", fNumEvents);
337 // If we were in looping mode stop directly after a SIGUSR1 signal
339 Info("Run", "Stopping loop, processed %d events", fNumEvents);
343 // Check if event has any hard track flagged
344 Bool_t callFilter = kFALSE;
348 // Check event type and skip "Start of Run", "End of Run",
349 // "Start of Run Files" and "End of Run Files"
350 switch (header.GetType()) {
351 case AliRawEventHeader::kStartOfRun:
352 case AliRawEventHeader::kEndOfRun:
353 case AliRawEventHeader::kStartOfRunFiles:
354 case AliRawEventHeader::kEndOfRunFiles:
356 Int_t skip = header.GetEventSize() - header.HeaderSize();
358 ::lseek(fFd, skip, SEEK_CUR);
361 Info("Run", "Skipping %s (%d bytes)", header.GetTypeName(), skip);
366 Int_t s = header.GetEventSize() - header.HeaderSize();
367 Info("Run", "Processing %s (%d bytes)", header.GetTypeName(), s);
371 // Amount of data left to read for this event
372 Int_t toRead = header.GetEventSize() - header.HeaderSize();
374 // If there is less data for this event than the next sub-event
375 // header, something is wrong. Skip to next event...
376 if (toRead < header.HeaderSize()) {
379 "header size (%d) exceeds number of bytes to read (%d)\n",
380 header.HeaderSize(), toRead);
383 if ((status = DumpEvent(toRead)) != toRead) {
388 Error("Run", "discarding event %d (too little data for header)", fNumEvents);
392 // Loop over all sub-events... (LDCs)
396 ebdata = (char *)ebvec[nsub].iov_base;
400 Info("Run", "reading LDC %d", nsub);
402 AliRawEvent *subEvent = event->NextSubEvent();
404 // Read sub-event header
405 AliRawEventHeader &subHeader = *subEvent->GetHeader();
406 if ((status = ReadHeader(subHeader, ebdata)) != subHeader.HeaderSize()) {
408 Error("Run", "unexpected EOF reading sub-event header");
417 toRead -= subHeader.HeaderSize();
420 ebdata = (char *)(ebvec[nsub].iov_base) + subHeader.HeaderSize();
423 Int_t rawSize = subHeader.GetEventSize() - subHeader.HeaderSize();
425 // Make sure raw data less than left over bytes for current event
426 if (rawSize > toRead) {
428 Warning("Run", "raw data size (%d) exceeds number of "
429 "bytes to read (%d)\n", rawSize, toRead);
432 if ((status = DumpEvent(toRead)) != toRead) {
437 Error("Run", "discarding event %d (too much data)", fNumEvents);
441 // Read Equipment Headers (in case of physics or calibration event)
442 if (header.GetType() == AliRawEventHeader::kPhysicsEvent ||
443 header.GetType() == AliRawEventHeader::kCalibrationEvent) {
444 while (rawSize > 0) {
445 AliRawEquipment &equipment = *subEvent->NextEquipment();
446 AliRawEquipmentHeader &equipmentHeader =
447 *equipment.GetEquipmentHeader();
448 Int_t equipHeaderSize = equipmentHeader.HeaderSize();
449 if ((status = ReadEquipmentHeader(equipmentHeader, header.DataIsSwapped(),
450 ebdata)) != equipHeaderSize) {
452 Error("Run", "unexpected EOF reading equipment-header");
457 toRead -= equipHeaderSize;
458 rawSize -= equipHeaderSize;
460 ebdata = (char *)(ebvec[nsub].iov_base) +
461 subHeader.HeaderSize() + equipHeaderSize;
464 // Read equipment raw data
465 AliRawData &subRaw = *equipment.GetRawData();
467 Int_t eqSize = equipmentHeader.GetEquipmentSize() -
469 if ((status = ReadRawData(subRaw, eqSize, ebdata)) != eqSize) {
471 Error("Run", "unexpected EOF reading sub-event raw data");
481 } else { // Read only raw data but no equipment header
482 AliRawEquipment &equipment = *subEvent->NextEquipment();
483 AliRawData &subRaw = *equipment.GetRawData();
484 if ((status = ReadRawData(subRaw, rawSize, ebdata)) != rawSize) {
486 Error("Run", "unexpected EOF reading sub-event raw data");
501 if(header.GetType() == AliRawEventHeader::kPhysicsEvent ||
502 header.GetType() == AliRawEventHeader::kCalibrationEvent)
511 // Set stat info for first event of this file
512 if (rawdb->GetEvents() == 0)
513 stats->SetFirstId(header.GetRunNumber(), header.GetEventInRun());
515 // Store raw event in tree
518 // Store header in tree
519 if (tagdb) tagdb->Fill();
523 if (!(fNumEvents%10))
524 printf("Processed event %d (%d)\n", fNumEvents, rawdb->GetEvents());
526 // Filling time statistics
527 if (rawdb->GetBytesWritten() > nextChunk) {
528 tnew = timer.RealTime();
529 stats->Fill(tnew-told);
532 nextChunk += chunkSize;
535 // Check size of raw db. If bigger than maxFileSize, close file
536 // and continue with new file.
537 if (rawdb->FileFull()) {
539 printf("Written raw DB at a rate of %.1f MB/s\n",
540 rawdb->GetBytesWritten() / timer.RealTime() / 1000000.);
542 // Write stats object to raw db, run db, MySQL and AliEn
543 rawdb->WriteStats(stats);
544 AliRunDB::WriteStats(stats);
547 if (!rawdb->NextFile()) {
548 Error("Run", "error opening next raw data file");
552 printf("Filling raw DB %s\n", rawdb->GetDBName());
553 stats = new AliStats(rawdb->GetDBName(), fCompress, fUseFilter);
557 nextChunk = chunkSize;
560 // Check size of tag db
561 if (tagdb && tagdb->FileFull()) {
562 if (!tagdb->NextFile())
565 printf("Filling tag DB %s\n", tagdb->GetDBName());
568 // Make top event object ready for next event data
569 //printf("Event %d has %d sub-events\n", fNumEvents, event->GetNSubEvents());
572 // Clean up HLT ESD for the next event
573 // Probably we could add esd->Reset() method to AliESD?
577 if (!ebReleaseEvent(ebvec)) {
578 Error("Run", "problem releasing event (%s)", ebGetLastError());
584 printf("Written raw DB at a rate of %.1f MB/s\n",
585 rawdb->GetBytesWritten() / timer.RealTime() / 1000000.);
587 // Write stats to raw db and run db and delete stats object
588 rawdb->WriteStats(stats);
589 AliRunDB::WriteStats(stats);
598 // Close input source
603 if (fUseFifo && ::unlink(fgkFifo) == -1) {
604 SysError("Run", "unlink");
612 Info("Run", "event builder reported end of run (%d)", eorFlag);
619 //______________________________________________________________________________
620 Int_t AliMDC::Read(void *buffer, Int_t length)
622 // Read exactly length bytes into buffer. Returns number of bytes
623 // received, returns -1 in case of error and 0 for EOF.
627 if (fFd < 0) return -1;
630 char *buf = (char *)buffer;
632 for (n = 0; n < length; n += nrecv) {
633 if ((nrecv = read(fFd, buf+n, length-n)) <= 0) {
637 SysError("Read", "read");
644 //______________________________________________________________________________
645 Int_t AliMDC::ReadHeader(AliRawEventHeader &header, void *eb)
647 // Read header info from DATE data stream. Returns bytes read (i.e.
648 // AliRawEventHeader::HeaderSize()), -1 in case of error and 0 for EOF.
653 // read from event builder memory area
654 memcpy(header.HeaderBegin(), eb, header.HeaderSize());
655 nrecv = header.HeaderSize();
657 // read from fifo or file
658 if ((nrecv = Read(header.HeaderBegin(), header.HeaderSize())) !=
659 header.HeaderSize()) {
666 // Swap header data if needed
667 if (header.IsSwapped())
670 // Is header valid...
671 if (!header.IsValid()) {
672 Error("ReadHeader", "invalid header format");
673 // try recovery... how?
676 if (header.GetEventSize() < (UInt_t)header.HeaderSize()) {
677 Error("ReadHeader", "invalid header size");
678 // try recovery... how?
685 //______________________________________________________________________________
686 Int_t AliMDC::ReadEquipmentHeader(AliRawEquipmentHeader &header,
687 Bool_t isSwapped, void *eb)
689 // Read equipment header info from DATE data stream. Returns bytes read
690 // (i.e. AliRawEquipmentHeader::HeaderSize()), -1 in case of error and
691 // 0 for EOF. If isSwapped is kTRUE the event data is byte swapped
692 // and we will swap the header to host format.
697 // read from event builder memory area
698 memcpy(header.HeaderBegin(), eb, header.HeaderSize());
699 nrecv = header.HeaderSize();
701 // read from fifo or file
702 if ((nrecv = Read(header.HeaderBegin(), header.HeaderSize())) !=
703 header.HeaderSize()) {
710 // Swap equipment header data if needed
714 if (header.GetEquipmentSize() < (UInt_t)header.HeaderSize()) {
715 Error("ReadEquipmentHeader", "invalid equipment header size");
716 // try recovery... how?
723 //______________________________________________________________________________
724 Int_t AliMDC::ReadRawData(AliRawData &raw, Int_t size, void *eb)
726 // Read raw data from DATE data stream. Returns bytes read (i.e.
727 // AliRawEventHeader::HeaderSize()), -1 in case of error and 0 for EOF.
732 // read from event builder memory area
733 raw.SetBuffer(eb, size);
736 // read from fifo or file
738 if ((nrecv = Read(raw.GetBuffer(), size)) != size) {
740 Error("ReadRawData", "unexpected EOF");
750 //______________________________________________________________________________
751 Int_t AliMDC::DumpEvent(Int_t toRead)
753 // This case should not happen, but if it does try to handle it
754 // gracefully by reading the rest of the event and discarding it.
755 // Returns bytes read, -1 in case of fatal error and 0 for EOF.
757 Error("DumpEvent", "dumping %d bytes of event %d", toRead, fNumEvents);
760 char *tbuf = new char[toRead];
761 if ((nrecv = Read(tbuf, toRead)) != toRead) {
763 Error("DumpEvent", "unexpected EOF");
773 //______________________________________________________________________________
774 Int_t AliMDC::Filter(
776 AliRawEvent *event,AliESD *esd
780 // Call 3rd level filter for this raw data event.
789 AliL3Hough *hough1 = new AliL3Hough();
791 hough1->SetThreshold(4);
792 hough1->SetTransformerParams(76,140,0.4,-1);
793 hough1->SetPeakThreshold(70,-1);
794 // Attention Z of the vertex to be taken from the event head!
795 // So far for debug purposes it is fixed by hand...
796 hough1->Init(100,4,event,3.82147);
797 hough1->SetAddHistograms();
799 AliL3Hough *hough2 = new AliL3Hough();
801 hough2->SetThreshold(4);
802 hough2->SetTransformerParams(76,140,0.4,-1);
803 hough2->SetPeakThreshold(70,-1);
804 hough2->Init(100,4,event,3.82147);
805 hough2->SetAddHistograms();
807 Int_t nglobaltracks = 0;
808 /* In case we run HLT code in 2 threads */
809 hough1->StartProcessInThread(0,17);
810 hough2->StartProcessInThread(18,35);
812 if(hough1->WaitForThreadFinish())
813 ::Fatal("AliL3Hough::WaitForThreadFinish"," Can not join the required thread! ");
814 if(hough2->WaitForThreadFinish())
815 ::Fatal("AliL3Hough::WaitForThreadFinish"," Can not join the required thread! ");
817 /* In case we run HLT code in the main thread
818 for(Int_t slice=0; slice<=17; slice++)
820 hough1->ReadData(slice,0);
822 hough1->AddAllHistogramsRows();
823 hough1->FindTrackCandidatesRow();
826 for(Int_t slice=18; slice<=35; slice++)
828 hough2->ReadData(slice,0);
830 hough2->AddAllHistogramsRows();
831 hough2->FindTrackCandidatesRow();
836 nglobaltracks += hough1->FillESD(esd);
837 nglobaltracks += hough2->FillESD(esd);
839 /* In case we want to debug the ESD
840 gSystem->MakeDirectory("hough1");
841 hough1->WriteTracks("./hough1");
842 gSystem->MakeDirectory("hough2");
843 hough2->WriteTracks("./hough2");
849 printf("Filter called for event %d\n", fNumEvents);
850 printf("Filter has found %d TPC tracks in %f seconds\n", nglobaltracks,timer.RealTime());
855 printf("Filter called for event %d\n", fNumEvents);
862 //______________________________________________________________________________
863 AliMDC::AliMDCInterruptHandler::AliMDCInterruptHandler(const
864 AliMDCInterruptHandler&
866 TSignalHandler(handler)
870 Fatal("AliMDCInterruptHandler", "copy constructor not implemented");
873 //______________________________________________________________________________
874 AliMDC::AliMDCInterruptHandler&
875 AliMDC::AliMDCInterruptHandler::operator = (const AliMDCInterruptHandler&
878 // assignment operator
880 Fatal("operator =", "assignment operator not implemented");