introducing strict run ordering flag
[u/mrichter/AliRoot.git] / SHUTTLE / AliShuttle.cxx
index abc17f0..afa94a7 100644 (file)
 
 /*
 $Log$
+Revision 1.19  2006/11/06 14:23:04  jgrosseo
+major update (Alberto)
+o) reading of run parameters from the logbook
+o) online offline naming conversion
+o) standalone DCSclient package
+
 Revision 1.18  2006/10/20 15:22:59  jgrosseo
 o) Adding time out to the execution of the preprocessors: The Shuttle forks and the parent process monitors the child
 o) Merging Collect, CollectAll, CollectNew function
@@ -184,12 +190,16 @@ fLastAction()
        //
 
        if (!fConfig->IsValid()) AliFatal("********** !!!!! Invalid configuration !!!!! **********");
-       for(int iSys=0;iSys<3;iSys++) {
+       for(int iSys=0;iSys<4;iSys++) {
                fServer[iSys]=0;
-               fFESlist[iSys].SetOwner(kTRUE);
+               if (iSys < 3)
+                       fFESlist[iSys].SetOwner(kTRUE);
        }
        fPreprocessorMap.SetOwner(kTRUE);
-       
+
+       for (UInt_t iDet=0; iDet<NDetectors(); iDet++)
+               fFirstUnprocessed[iDet] = kFALSE;
+
        fMonitoringMutex = new TMutex();
 }
 
@@ -199,7 +209,7 @@ AliShuttle::~AliShuttle()
 // destructor
 
        fPreprocessorMap.DeleteAll();
-       for(int iSys=0;iSys<3;iSys++)
+       for(int iSys=0;iSys<4;iSys++)
                if(fServer[iSys]) {
                        fServer[iSys]->Close();
                        delete fServer[iSys];
@@ -439,6 +449,15 @@ Bool_t AliShuttle::ContinueProcessing()
        }
 
        // if we get here, according to Shuttle logbook subdetector is in UNPROCESSED state
+
+       // check if current run is first unprocessed run for current detector
+       if (fConfig->StrictRunOrder(fCurrentDetector) &&
+               !fFirstUnprocessed[GetDetPos(fCurrentDetector)])
+       {
+               Log("SHUTTLE", Form("ContinueProcessing - %s requires strict run ordering but this is not the first unprocessed run!"));
+               return kFALSE;
+       }
+
        AliShuttleStatus* status = ReadShuttleStatus();
        if (!status) {
                // first time
@@ -537,15 +556,17 @@ Bool_t AliShuttle::Process(AliShuttleLogbookEntry* entry)
        TIter iter(fConfig->GetDetectors());
        TObjString* aDetector = 0;
 
-       while ((aDetector = (TObjString*) iter.Next())) {
+       while ((aDetector = (TObjString*) iter.Next()))
+       {
                fCurrentDetector = aDetector->String();
 
                if (!fConfig->HostProcessDetector(fCurrentDetector)) continue;
 
                AliPreprocessor* aPreprocessor =
                        dynamic_cast<AliPreprocessor*> (fPreprocessorMap.GetValue(fCurrentDetector));
-               if(!aPreprocessor){
-                       Log("SHUTTLE",Form("Process - %s: no preprocessor registered. Skipping", 
+               if (!aPreprocessor)
+               {
+                       Log("SHUTTLE",Form("Process - %s: no preprocessor registered. Skipping",
                                                        fCurrentDetector.Data()));
                        continue;
                }
@@ -556,104 +577,112 @@ Bool_t AliShuttle::Process(AliShuttleLogbookEntry* entry)
                                                GetCurrentRun(), aDetector->GetName()));
 
 
-    Int_t pid = fork();
-
-    if (pid < 0)
-    {
-      Log("SHUTTLE", "ERROR: Forking failed");
-    }
-    else if (pid > 0)
-    {
-      // parent
-      AliInfo(Form("In parent process of %d - %s: Starting monitoring", GetCurrentRun(), aDetector->GetName()));
-
-      Long_t begin = time(0);
-
-      int status; // to be used with waitpid, on purpose an int (not Int_t)!
-      while (waitpid(pid, &status, WNOHANG) == 0)
-      {
-        Long_t expiredTime = time(0) - begin;
-
-        if (expiredTime > fConfig->GetPPTimeOut())
-        {
-          Log("SHUTTLE", Form("Process time out. Run time: %d seconds. Killing...", expiredTime));
-
-          kill(pid, 9);
-
-          hasError = kTRUE;
-
-          gSystem->Sleep(1000);
-        }
-        else
-        {
-          if (expiredTime % 60 == 0)
-            Log("SHUTTLE", Form("Checked process. Run time: %d seconds.", expiredTime));
-
-          gSystem->Sleep(1000);
-        }
-      }
-
-      AliInfo(Form("In parent process of %d - %s: Client has terminated.", GetCurrentRun(), aDetector->GetName()));
-
-      if (WIFEXITED(status))
-      {
-        Int_t returnCode = WEXITSTATUS(status);
-
-        Log("SHUTTLE", Form("The return code is %d", returnCode));
-
-        if (returnCode != 0)
-          hasError = kTRUE;
-      }
-    }
-    else if (pid == 0)
-    {
-      // client
-      AliInfo(Form("In client process of %d - %s", GetCurrentRun(), aDetector->GetName()));
-
-      UInt_t result = ProcessCurrentDetector();
-
-      Int_t returnCode = 0; // will be set to 1 in case of an error
-
-      if (!result) {
-        returnCode = 1;
-        AliInfo(Form("\n \t\t\t****** run %d - %s: PREPROCESSOR ERROR ****** \n\n",
-                GetCurrentRun(), aDetector->GetName()));
-      }
-      else if(result == 2) {
-        AliInfo(Form("\n \t\t\t****** run %d - %s: STORAGE ERROR ****** \n\n",
-                GetCurrentRun(), aDetector->GetName()));
-      } else {
-        AliInfo(Form("\n \t\t\t****** run %d - %s: DONE ****** \n\n",
-                GetCurrentRun(), aDetector->GetName()));
-      }
-
-      if (result > 0)
-      {
-        // Process successful: Update time_processed field in FES logbooks!
-        if(fFESCalled[kDAQ]) {
-          if (UpdateDAQTable() == kFALSE)
-            returnCode = 1;
-          fFESlist[kDAQ].Clear();
-        }
-        //if(fFESCalled[kDCS]) {
-        //  if (UpdateDCSTable(aDetector->GetName()) == kFALSE)
-        //    returnCode = 1;
-        //  fFESlist[kDCS].Clear();
-        //}
-        //if(fFESCalled[kHLT]) {
-        //  if (UpdateHLTTable(aDetector->GetName()) == kFALSE)
-        //    returnCode = 1;
-        //     fFESlist[kHLT].Clear();
-        //}
-      }
-
-      AliInfo(Form("Client process of %d - %s is exiting now with %d.", GetCurrentRun(), aDetector->GetName(), returnCode));
-
-      // the client exits here
-      gSystem->Exit(returnCode);
-
-      AliError("We should never get here!!!");
-    }
+               Int_t pid = fork();
+
+               if (pid < 0)
+               {
+                       Log("SHUTTLE", "ERROR: Forking failed");
+               }
+               else if (pid > 0)
+               {
+                       // parent
+                       AliInfo(Form("In parent process of %d - %s: Starting monitoring",
+                                                       GetCurrentRun(), aDetector->GetName()));
+
+                       Long_t begin = time(0);
+
+                       int status; // to be used with waitpid, on purpose an int (not Int_t)!
+                       while (waitpid(pid, &status, WNOHANG) == 0)
+                       {
+                               Long_t expiredTime = time(0) - begin;
+
+                               if (expiredTime > fConfig->GetPPTimeOut())
+                               {
+                                       Log("SHUTTLE", Form("Process time out. Run time: %d seconds. Killing...",
+                                                               expiredTime));
+
+                                       kill(pid, 9);
+
+                                       hasError = kTRUE;
+
+                                       gSystem->Sleep(1000);
+                               }
+                               else
+                               {
+                                       if (expiredTime % 60 == 0)
+                                       Log("SHUTTLE", Form("Checked process. Run time: %d seconds.",
+                                                               expiredTime));
+                                       gSystem->Sleep(1000);
+                               }
+                       }
+
+                       AliInfo(Form("In parent process of %d - %s: Client has terminated.",
+                                                               GetCurrentRun(), aDetector->GetName()));
+
+                       if (WIFEXITED(status))
+                       {
+                               Int_t returnCode = WEXITSTATUS(status);
+
+                               Log("SHUTTLE", Form("The return code is %d", returnCode));
+
+                               if (returnCode != 0)
+                               hasError = kTRUE;
+                       }
+               }
+               else if (pid == 0)
+               {
+                       // client
+                       AliInfo(Form("In client process of %d - %s", GetCurrentRun(), aDetector->GetName()));
+
+                       UInt_t result = ProcessCurrentDetector();
+
+                       Int_t returnCode = 0; // will be set to 1 in case of an error
+
+                       if (!result)
+                       {
+                               returnCode = 1;
+                               AliInfo(Form("\n \t\t\t****** run %d - %s: PREPROCESSOR ERROR ****** \n\n",
+                                                       GetCurrentRun(), aDetector->GetName()));
+                       }
+                       else if (result == 2)
+                       {
+                               AliInfo(Form("\n \t\t\t****** run %d - %s: STORAGE ERROR ****** \n\n",
+                                                       GetCurrentRun(), aDetector->GetName()));
+                       } else
+                       {
+                               AliInfo(Form("\n \t\t\t****** run %d - %s: DONE ****** \n\n",
+                                                       GetCurrentRun(), aDetector->GetName()));
+                       }
+
+                       if (result > 0)
+                       {
+                               // Process successful: Update time_processed field in FES logbooks!
+                               if (fFESCalled[kDAQ])
+                               {
+                                       if (UpdateDAQTable() == kFALSE)
+                                       returnCode = 1;
+                                       fFESlist[kDAQ].Clear();
+                               }
+                               //if(fFESCalled[kDCS]) {
+                               //  if (UpdateDCSTable(aDetector->GetName()) == kFALSE)
+                               //    returnCode = 1;
+                               //  fFESlist[kDCS].Clear();
+                               //}
+                               //if(fFESCalled[kHLT]) {
+                               //  if (UpdateHLTTable(aDetector->GetName()) == kFALSE)
+                               //    returnCode = 1;
+                               //      fFESlist[kHLT].Clear();
+                               //}
+                       }
+
+                       AliInfo(Form("Client process of %d - %s is exiting now with %d.",
+                                                       GetCurrentRun(), aDetector->GetName(), returnCode));
+
+                       // the client exits here
+                       gSystem->Exit(returnCode);
+
+                       AliError("We should never get here!!!");
+               }
        }
 
        AliInfo(Form("\n\n \t\t\t^*^*^*^*^*^*^*^*^*^*^*^* run %d: FINISH ^*^*^*^*^*^*^*^*^*^*^*^* \n",
@@ -663,14 +692,30 @@ Bool_t AliShuttle::Process(AliShuttleLogbookEntry* entry)
        TObjArray checkEntryArray;
        checkEntryArray.SetOwner(1);
        TString whereClause = Form("where run=%d",GetCurrentRun());
-       if(QueryShuttleLogbook(whereClause.Data(), checkEntryArray)) {
+       if (QueryShuttleLogbook(whereClause.Data(), checkEntryArray)) {
 
                AliShuttleLogbookEntry* checkEntry = dynamic_cast<AliShuttleLogbookEntry*>
                                                        (checkEntryArray.At(0));
 
-               if(checkEntry && checkEntry->IsDone()){
-                       Log("SHUTTLE","Process - Shuttle is DONE. Updating logbook");
-                       UpdateShuttleLogbook("shuttle_done");
+               if (checkEntry)
+               {
+                       if (checkEntry->IsDone())
+                       {
+                               Log("SHUTTLE","Process - Shuttle is DONE. Updating logbook");
+                               UpdateShuttleLogbook("shuttle_done");
+                       }
+                       else
+                       {
+                               for (UInt_t iDet=0; iDet<NDetectors(); iDet++)
+                               {
+                                       if (checkEntry->GetDetectorStatus(iDet) == AliShuttleLogbookEntry::kUnprocessed)
+                                       {
+                                               AliDebug(2, Form("Run %d: setting %s as \"not first time unprocessed\"",
+                                                               checkEntry->GetRun(), GetDetName(iDet)));
+                                               fFirstUnprocessed[iDet] = kFALSE;
+                                       }
+                               }
+                       }
                }
        }
 
@@ -761,12 +806,12 @@ Bool_t AliShuttle::QueryShuttleLogbook(const char* whereClause,
 // Call QueryRunParameters to query DAQ logbook for run parameters.
 
        // check connection, in case connect
-       if(!Connect(kDAQ)) return kFALSE;
+       if(!Connect(3)) return kFALSE;
 
        TString sqlQuery;
        sqlQuery = Form("select * from logbook_shuttle %s order by run", whereClause);
 
-       TSQLResult* aResult = fServer[kDAQ]->Query(sqlQuery);
+       TSQLResult* aResult = fServer[3]->Query(sqlQuery);
        if (!aResult) {
                AliError(Form("Can't execute query <%s>!", sqlQuery.Data()));
                return kFALSE;
@@ -826,13 +871,13 @@ AliShuttleLogbookEntry* AliShuttle::QueryRunParameters(Int_t run)
        //
 
        // check connection, in case connect
-       if (!Connect(kDAQ))
+       if (!Connect(3))
                return 0;
 
        TString sqlQuery;
        sqlQuery.Form("select * from logbook where run=%d", run);
 
-       TSQLResult* aResult = fServer[kDAQ]->Query(sqlQuery);
+       TSQLResult* aResult = fServer[3]->Query(sqlQuery);
        if (!aResult) {
                AliError(Form("Can't execute query <%s>!", sqlQuery.Data()));
                return 0;
@@ -952,11 +997,18 @@ Bool_t AliShuttle::TryToStoreAgain(TString& gridURI)
                TIter gridIter(gridIds);
                AliCDBId* aGridId = 0;
                while((aGridId = dynamic_cast<AliCDBId*> (gridIter.Next()))){
-                       // If local object is valid up to infinity we store it anyway
-                       // TODO This does not work! It may hide more recent objects...
-                       if(aLocId.GetLastRun() == AliCDBRunRange::Infinity()) {
-                               // TODO Check that it won't hide more recent files! how????
-                               break;
+                       // If local object is valid up to infinity we store it only if it is
+                       // the first unprocessed run!
+                       if (aLocId.GetLastRun() == AliCDBRunRange::Infinity())
+                       {
+                               if (!fFirstUnprocessed[GetDetPos(fCurrentDetector)])
+                               {
+                                       Log(fCurrentDetector.Data(),
+                                               ("TryToStoreAgain - This object has validity infinite but there are previous unprocessed runs!"));
+                                       continue;
+                               } else {
+                                       break;
+                               }
                        }
                        if(aGridId->GetPath() != aLocId.GetPath()) continue;
                        // skip all objects valid up to infinity
@@ -1092,14 +1144,29 @@ Bool_t AliShuttle::Connect(Int_t system)
        // check connection: if already connected return
        if(fServer[system] && fServer[system]->IsConnected()) return kTRUE;
 
-       TString aFESlbHost= Form("mysql://%s", fConfig->GetFESlbHost(system));
+       TString lbHost, lbUser, lbPass;
+
+       if (system < 3) // FES logbook servers
+       {
+               lbHost = Form("mysql://%s", fConfig->GetFESlbHost(system));
+               lbUser = fConfig->GetFESlbUser(system);
+               lbPass = fConfig->GetFESlbPass(system);
+       } else { // Run & Shuttle logbook servers
+       // TODO Will the Shuttle logbook server be the same as the Run logbook server ???
+               lbHost = Form("mysql://%s", fConfig->GetDAQlbHost());
+               lbUser = fConfig->GetDAQlbUser();
+               lbPass = fConfig->GetDAQlbPass();
+       }
 
-       fServer[system] = TSQLServer::Connect(aFESlbHost,
-                       fConfig->GetFESlbUser(system),
-                       fConfig->GetFESlbPass(system));
+       fServer[system] = TSQLServer::Connect(lbHost.Data(), lbUser.Data(), lbPass.Data());
        if (!fServer[system] || !fServer[system]->IsConnected()) {
+               if(system < 3)
+               {
                AliError(Form("Can't establish connection to FES logbook for %s",
                                        AliShuttleInterface::GetSystemName(system)));
+               } else {
+               AliError("Can't establish connection to Run logbook.");
+               }
                if(fServer[system]) delete fServer[system];
                return kFALSE;
        }
@@ -1118,6 +1185,7 @@ Bool_t AliShuttle::Connect(Int_t system)
                        //aResult = fServer[kHLT]->GetTables("REFSYSLOG");
                        break;
                default:
+                       aResult = fServer[3]->GetTables("REFSYSLOG");
                        break;
        }
 
@@ -1380,7 +1448,7 @@ Bool_t AliShuttle::UpdateDAQTable()
 
                // Query execution
                TSQLResult* aResult;
-               aResult = dynamic_cast<TSQLResult*> (fServer[kDAQ]->Query(sqlQuery));
+               aResult = dynamic_cast<TSQLResult*> (fServer[3]->Query(sqlQuery));
                if (!aResult) {
                        Log(fCurrentDetector, Form("UpdateDAQTable - Can't execute SQL query <%s>", sqlQuery.Data()));
                        return kFALSE;
@@ -1399,7 +1467,7 @@ Bool_t AliShuttle::UpdateShuttleLogbook(const char* detector, const char* status
 // ex. of usage: UpdateShuttleLogbook("PHOS", "DONE") or UpdateShuttleLogbook("shuttle_done")
 
        // check connection, in case connect
-       if(!Connect(kDAQ)){
+       if(!Connect(3)){
                Log("SHUTTLE", "UpdateShuttleLogbook - Couldn't connect to DAQ Logbook.");
                return kFALSE;
        }
@@ -1430,7 +1498,7 @@ Bool_t AliShuttle::UpdateShuttleLogbook(const char* detector, const char* status
 
        // Query execution
        TSQLResult* aResult;
-       aResult = dynamic_cast<TSQLResult*> (fServer[kDAQ]->Query(sqlQuery));
+       aResult = dynamic_cast<TSQLResult*> (fServer[3]->Query(sqlQuery));
        if (!aResult) {
                Log("SHUTTLE", Form("UpdateShuttleLogbook - Can't execute query <%s>", sqlQuery.Data()));
                return kFALSE;
@@ -1525,12 +1593,47 @@ Bool_t AliShuttle::Collect(Int_t run)
                whereClause += Form(" and run=%d", run);
 
        TObjArray shuttleLogbookEntries;
-       if (!QueryShuttleLogbook(whereClause, shuttleLogbookEntries)) {
+       if (!QueryShuttleLogbook(whereClause, shuttleLogbookEntries))
+       {
                Log("SHUTTLE", "Collect - Can't retrieve entries from Shuttle logbook");
                return kFALSE;
        }
 
-       if (!RetrieveConditionsData(shuttleLogbookEntries)) {
+       for (UInt_t iDet=0; iDet<NDetectors(); iDet++)
+               fFirstUnprocessed[iDet] = kTRUE;
+
+       if (run != 1)
+       {
+               // query Shuttle logbook for earlier runs, check if some detectors are unprocessed,
+               // flag them into fFirstUnprocessed array
+               TString whereClause(Form("where shuttle_done=0 and run < %d", run));
+               TObjArray tmpLogbookEntries;
+               if (!QueryShuttleLogbook(whereClause, tmpLogbookEntries))
+               {
+                       Log("SHUTTLE", "Collect - Can't retrieve entries from Shuttle logbook");
+                       return kFALSE;
+               }
+
+               TIter iter(&tmpLogbookEntries);
+               AliShuttleLogbookEntry* anEntry = 0;
+               while ((anEntry = dynamic_cast<AliShuttleLogbookEntry*> (iter.Next())))
+               {
+                       for (UInt_t iDet=0; iDet<NDetectors(); iDet++)
+                       {
+                               if (anEntry->GetDetectorStatus(iDet) == AliShuttleLogbookEntry::kUnprocessed)
+                               {
+                                       AliDebug(2, Form("Run %d: setting %s as \"not first time unprocessed\"",
+                                                       anEntry->GetRun(), GetDetName(iDet)));
+                                       fFirstUnprocessed[iDet] = kFALSE;
+                               }
+                       }
+
+               }
+
+       }
+
+       if (!RetrieveConditionsData(shuttleLogbookEntries))
+       {
                Log("SHUTTLE", "Collect - Process of at least one run failed");
                return kFALSE;
        }
@@ -1563,7 +1666,7 @@ ULong_t AliShuttle::GetTimeOfLastAction() const
        ULong_t tmp;
        
        fMonitoringMutex->Lock();
-       
+
        tmp = fLastActionTime;
        
        fMonitoringMutex->UnLock();