#include "monitor.h"
#include <stdio.h>
#include <stdlib.h>
-#include <pthread.h>
#include <vector>
//
//Root includes
#include "TString.h"
#include "TObjString.h"
#include "TDatime.h"
+#include "TStopwatch.h"
#include "TMap.h"
#include "TGraph.h"
#include "TMath.h"
//functios, implementation below
void SendToAmoreDB(AliTPCCalibCE &calibCE, unsigned long32 runNb);
//for threaded processing
-void *processEventBuffer(void *arg);
-
-//common event processing variables for threaded processing
-std::vector<eventHeaderStruct*> eventBuffer;
-volatile int bStop = false;
-struct timespec duree_nanosleep;
-Int_t forceNevents=-1;
-Bool_t forceBufferEnds=kFALSE;
/* Main routine
char localfile[255];
unsigned long32 runNb=0; //run number
- //
- // thread wait time
- //
- duree_nanosleep.tv_sec=0;
- duree_nanosleep.tv_nsec=1000000; //1 ms
-
+
//
// DA configuration from configuration file
//
laserTriggerName=config.GetConfigurationMap()->GetValue("LaserTriggerName")->GetName();
printf("Laser trigger class name set to: %s.\n",laserTriggerName.Data());
}
-
- if ( config.GetConfigurationMap()->GetValue("BufferSize") ) {
- bufferSize=(size_t)config.GetValue("BufferSize");
- printf("Setting event buffer size to: %d.\n",bufferSize);
- }
-
+
if ( config.GetConfigurationMap()->GetValue("ForceTriggerId") ) {
forceTriggerId=TMath::Nint(config.GetValue("ForceTriggerId"));
printf("Only processing triggers with Id: %d.\n",forceTriggerId);
}
- if ( config.GetConfigurationMap()->GetValue("ForceBufferEndsGlobal") ) {
- forceBufferEndsGlobal=config.GetValue("ForceBufferEndsGlobal")!=0.;
- printf("Process all buffered events in global partition: %s.\n",forceBufferEndsGlobal?"yes":"no");
- }
-
- if ( config.GetConfigurationMap()->GetValue("ForceNMaxEvents") ) {
- forceNevents=TMath::Nint(config.GetValue("ForceNMaxEvents"));
- printf("Forcing maximum number of %d events.\n",forceNeventsStandalone);
- }
//subsribe to laser triggers only in physics partition
//if the trigger class is not available the return value is -1
//in this case we are most probably running as a standalone
// laser run and should request all events
- unsigned char *classIdptr=0;
- int retClassId=daqDA_getClassIdFromName(laserTriggerName.Data(),classIdptr);
+ unsigned char classIdptr=0;
+ int retClassId=daqDA_getClassIdFromName(laserTriggerName.Data(),&classIdptr);
if (retClassId==0){
//interleaved laser in physics runs
+ //select proper trigger class id
char c[5];
- sprintf(c,"%u",*classIdptr);
+ snprintf(c,sizeof(c),"%u",(unsigned int)classIdptr);
char *table[5] = {"PHY","Y","*",c,NULL};
monitorDeclareTableExtended(table);
printf("Using trigger class Id: %s\n",c);
-// forceBufferEndsGlobal=forceBufferEndsGlobalDummy;
} else if (retClassId==-1){
- //defaul case, accept all physics events
- char *table[3] = {"PHY","Y",NULL};
+ //global partition without laser triggered events
+ //the DA should exit properly without processing
+ printf("Laser trigger class '%s' was not found among trigger class names. Will stop processing.\n",laserTriggerName.Data());
+ return 0;
+ } else if (retClassId==-2){
+ //standalone case, accept all physics events
+ char *table[5] = {"PHY","Y","*","*",NULL};
monitorDeclareTableExtended(table);
printf("Using all trigger class Ids\n");
-// forceNevents=forceNeventsStandalone;
} else {
printf("Unknown return value of 'daqDA_getClassIdFromName': %d\n",retClassId);
return -2;
AliTPCCalibCE calibCE(config.GetConfigurationMap()); // central electrode calibration
calibCE.SetAltroMapping(mapping->GetAltroMapping()); // Use altro mapping we got from daqDetDb
- //
- // start thread
- //
-// sleep(5);
- pthread_t threadId=0;
- int threadStatus=0;
- threadStatus = pthread_create( &threadId, NULL, processEventBuffer, (void*)(&calibCE));
- eventBuffer.resize(bufferSize);
- struct timespec duree_out;
+ //amore update interval
+ Double_t updateInterval=300; //seconds
+ Double_t valConf=config.GetValue("AmoreUpdateInterval");
+ if ( valConf>0 ) updateInterval=valConf;
+ //timer
+ TStopwatch stopWatch;
+
//===========================//
// loop over RAW data files //
//==========================//
/* check shutdown condition */
if (daqDA_checkShutdown()) {break;}
-
- //check for predefined number of events
- if (forceNevents>0 && calibCE.GetNeventsProcessed()>=forceNevents) {
- printf("Requested number of events reached (%d).\n",forceNevents);
- break;
+
+ /* get next event (blocking call until timeout) */
+ status=monitorGetEventDynamic((void **)&event);
+ if (status==MON_ERR_EOF) {
+ printf ("End of File %d detected\n",i);
+ break; /* end of monitoring file has been reached */
}
- //buffer events, only read them if a buffer position is free
- if (eventBuffer[counter]==0) {
-
- /* get next event (blocking call until timeout) */
- status=monitorGetEventDynamic((void **)&event);
- if (status==MON_ERR_EOF) {
- printf ("End of File %d detected\n",i);
- break; /* end of monitoring file has been reached */
- }
-
- if (status!=0) {
- printf("monitorGetEventDynamic() failed : %s\n",monitorDecodeError(status));
- break;
- }
+ if (status!=0) {
+ printf("monitorGetEventDynamic() failed : %s\n",monitorDecodeError(status));
+ break;
+ }
/* retry if got no event */
- if (event==NULL)
- continue;
+ if (event==NULL)
+ continue;
/* skip start/end of run events */
- if ( (event->eventType != physicsEvent) && (event->eventType != calibrationEvent) ){
- free(event);
- continue;
- }
-
+ if ( (event->eventType != physicsEvent) && (event->eventType != calibrationEvent) ){
+ free(event);
+ continue;
+ }
- // get the run number
- runNb = event->eventRunNb;
-
-// printf(" trigger (%05d-%03d) = %8.8x %8.8x - %02u\n",nevents, calibCE.GetNeventsProcessed(),
-// event->eventTriggerPattern[1], event->eventTriggerPattern[0],event->eventType);
-
-// printf("filling buffer %d\n",counter);
- eventBuffer[counter]=event;
-
- ++nevents;
- ++counter;
- if (counter >= eventBuffer.size()) {counter=0;}
- }else{
-// printf("buffer already used: %d\n",counter);
- nanosleep(&duree_nanosleep,&duree_out);
+
+ // get the run number
+ runNb = event->eventRunNb;
+
+ // CE calibration
+ calibCE.ProcessEvent(event);
+
+ // sending to AMOREdb
+ if (stopWatch.RealTime()>updateInterval){
+ SendToAmoreDB(calibCE,runNb);
+ stopWatch.Start();
+ } else {
+ stopWatch.Continue();
}
+
+ /* free resources */
+ free(event);
+ ++nevents;
}
}
-
- //
- // wait for thread to end
- //
- if (!forceBufferEndsGlobal) bStop = true;
- else forceBufferEnds=forceBufferEndsGlobal;
-
- pthread_join( threadId, NULL);
-// printf("Event Processing Thread ended with: %d\n",threadStatus);
- //
- // free unprocessed events
- //
- for (size_t i=0;i<eventBuffer.size();++i){
- if (eventBuffer[i]) {
- free(eventBuffer[i]);
- eventBuffer[i]=0;
-// printf("freeing buffer %d\n",i);
- }
- }
-
//
// Analyse CE data and write them to rootfile
//
return status;
}
-void *processEventBuffer(void *arg)
-{
- //
- // event procssing thread functio
- //
-
- //cast argument
- AliTPCCalibCE *ce=(AliTPCCalibCE*)arg;
- AliTPCCalibCE &calibCE=*ce;
-
- size_t counter=0;
- unsigned long32 runNb=0;
- Bool_t published=kTRUE;
- struct timespec duree_out;
-
- struct eventHeaderStruct *event;
-
- //wait for the first buffer to be filled
- while (!eventBuffer[0]) nanosleep(&duree_nanosleep,&duree_out);
- //loop over buffer
- while (!bStop){
-// printf("testing buffer: %d\n",counter);
- if (eventBuffer[counter]) {
- event=eventBuffer[counter];
- runNb = event->eventRunNb;
-// printf("processing buffer: %d\n",counter);
- eventBuffer[counter]=0;
- calibCE.ProcessEvent(event);
- free(event);
- published=kFALSE;
- } else {
- //in case of empty buffer publish the results it this was not done
- if (!published) {
- SendToAmoreDB(calibCE,runNb);
- published=kTRUE;
- }
- nanosleep(&duree_nanosleep,&duree_out);
- }
- ++counter;
- if (counter >= eventBuffer.size()) {
- counter=0;
- if (forceBufferEnds) break;
- }
- }
-}
void SendToAmoreDB(AliTPCCalibCE &calibCE, unsigned long32 runNb)
{
//AMORE
- printf ("AMORE part\n");
+// printf ("AMORE part\n");
const char *amoreDANameorig=gSystem->Getenv("AMORE_DA_NAME");
//cheet a little -- temporary solution (hopefully)
//