o) Adding time out to the execution of the preprocessors: The Shuttle forks and the...
[u/mrichter/AliRoot.git] / SHUTTLE / AliShuttle.cxx
index af93335..68396bf 100644 (file)
@@ -15,6 +15,9 @@
 
 /*
 $Log$
+Revision 1.17  2006/10/05 16:20:55  jgrosseo
+adapting to new CDB classes
+
 Revision 1.16  2006/10/05 15:46:26  jgrosseo
 applying to the new interface
 
@@ -136,9 +139,13 @@ some docs added
 #include <TSQLServer.h>
 #include <TSQLResult.h>
 #include <TSQLRow.h>
+#include <TMutex.h>
 
 #include <fstream>
 
+#include <sys/types.h>
+#include <sys/wait.h>
+
 ClassImp(AliShuttle)
 
 TString AliShuttle::fgkMainCDB("alien://folder=ShuttleCDB");
@@ -167,7 +174,9 @@ fPreprocessorMap(),
 fLogbookEntry(0),
 fCurrentDetector(""),
 fStatusEntry(0),
-fGridError(kFALSE)
+fGridError(kFALSE),
+fMonitoringMutex(0),
+fLastActionTime(0)
 {
        //
        // config: AliShuttleConfig used
@@ -181,29 +190,8 @@ fGridError(kFALSE)
                fFESlist[iSys].SetOwner(kTRUE);
        }
        fPreprocessorMap.SetOwner(kTRUE);
-}
-
-//______________________________________________________________________
-AliShuttle::AliShuttle(const AliShuttle& /*other*/):
-AliShuttleInterface(),
-fConfig(0),
-fTimeout(0), fRetries(0),
-fPreprocessorMap(),
-fLogbookEntry(0),
-fCurrentDetector(""),
-fStatusEntry(0),
-fGridError(kFALSE)
-{
-// copy constructor (not implemented)
-
-}
-
-//______________________________________________________________________
-AliShuttle &AliShuttle::operator=(const AliShuttle& /*other*/)
-{
-// assignment operator (not implemented)
-
-return *this;
+       
+       fMonitoringMutex = new TMutex();
 }
 
 //______________________________________________________________________________________________
@@ -216,12 +204,19 @@ AliShuttle::~AliShuttle()
                if(fServer[iSys]) {
                        fServer[iSys]->Close();
                        delete fServer[iSys];
+      fServer[iSys] = 0;
                }
 
        if (fStatusEntry){
                delete fStatusEntry;
                fStatusEntry = 0;
        }
+       
+       if (fMonitoringMutex) 
+       {
+               delete fMonitoringMutex;
+               fMonitoringMutex = 0;
+       }
 }
 
 //______________________________________________________________________________________________
@@ -412,8 +407,11 @@ void AliShuttle::UpdateShuttleStatus(AliShuttleStatus::Status newStatus, Bool_t
                return;
        }
 
-       Log("SHUTTLE", Form("UpdateShuttleStatus - %s: Changing state from %s to %s", fCurrentDetector.Data(),
-                               status->GetStatusName(), status->GetStatusName(newStatus)));
+       TString actionStr;
+       actionStr.Form("UpdateShuttleStatus - %s: Changing state from %s to %s", fCurrentDetector.Data(),
+                               status->GetStatusName(), status->GetStatusName(newStatus));
+       Log("SHUTTLE", actionStr);
+       SetLastAction(actionStr);
 
        status->SetStatus(newStatus);
        if (increaseCount) status->IncreaseCount();
@@ -485,10 +483,7 @@ Bool_t AliShuttle::ContinueProcessing()
 
        // abort conditions
        // TODO we should add two counters, one for PP and one for DCS!
-       if ((status->GetStatus() == AliShuttleStatus::kPPStarted ||
-            status->GetStatus() == AliShuttleStatus::kPPError) &&
-           (status->GetCount() >= fConfig->GetMaxPPRetries() ||
-            status->GetCount() >= fConfig->GetMaxRetries())) {
+       if (status->GetCount() >= fConfig->GetMaxRetries()) {
                Log("SHUTTLE",
                        Form("ContinueProcessing - %s failed %d times in status %s - Updating Shuttle Logbook",
                                fCurrentDetector.Data(),
@@ -501,7 +496,7 @@ Bool_t AliShuttle::ContinueProcessing()
                        fCurrentDetector.Data(),
                        status->GetStatusName(), status->GetCount()));
 
-       UpdateShuttleStatus(AliShuttleStatus::kStarted);
+       UpdateShuttleStatus(AliShuttleStatus::kStarted, kTRUE);
 
        return kTRUE;
 }
@@ -554,7 +549,7 @@ Bool_t AliShuttle::Process(AliShuttleLogbookEntry* entry)
                AliPreprocessor* aPreprocessor =
                        dynamic_cast<AliPreprocessor*> (fPreprocessorMap.GetValue(fCurrentDetector));
                if(!aPreprocessor){
-                       Log("SHUTTLE",Form("Process - %s: no preprocessor registered. Skipping"));
+                       Log("SHUTTLE",Form("Process: no preprocessor registered. Skipping %s", fCurrentDetector.Data()));
                        continue;
                }
 
@@ -563,37 +558,105 @@ Bool_t AliShuttle::Process(AliShuttleLogbookEntry* entry)
                AliInfo(Form("\n\n \t\t\t****** run %d - %s: START  ******",
                                                GetCurrentRun(), aDetector->GetName()));
 
-               UInt_t result = ProcessCurrentDetector();
 
-               if(!result) {
-                       hasError = kTRUE;
-                       AliInfo(Form("\n \t\t\t****** run %d - %s: PREPROCESSOR ERROR ****** \n\n",
-                                                       GetCurrentRun(), aDetector->GetName()));
-                       continue;
-               }
+    Int_t pid = fork();
 
-               if(result == 2) {
-                       AliInfo(Form("\n \t\t\t****** run %d - %s: STORAGE ERROR ****** \n\n",
-                                                       GetCurrentRun(), aDetector->GetName()));
-               } else {
-                       AliInfo(Form("\n \t\t\t****** run %d - %s: DONE ****** \n\n",
-                                                       GetCurrentRun(), aDetector->GetName()));
-               }
+    if (pid < 0)
+    {
+      Log("SHUTTLE", "ERROR: Forking failed");
+    }
+    else if (pid > 0)
+    {
+      // parent
+      AliInfo(Form("In parent process of %d - %s: Starting monitoring", GetCurrentRun(), aDetector->GetName()));
 
-               // Process successful: Update time_processed field in FES logbooks!
-               if(fFESCalled[kDAQ]) {
-                       hasError = (UpdateDAQTable() == kFALSE);
-                       fFESlist[kDAQ].Clear();
-               }
-               //if(fFESCalled[kDCS]) {
-               //      hasError = UpdateDCSTable(aDetector->GetName());
-               //      fFESlist[kDCS].Clear();
-               //}
-               //if(fFESCalled[kHLT]) {
-               //      hasError = UpdateHLTTable(aDetector->GetName());
-               //      fFESlist[kHLT].Clear();
-               //}
+      Long_t begin = time(0);
+
+      int status; // to be used with waitpid, on purpose an int (not Int_t)!
+      while (waitpid(pid, &status, WNOHANG) == 0)
+      {
+        Long_t expiredTime = time(0) - begin;
+
+        if (expiredTime > fConfig->GetPPTimeOut())
+        {
+          Log("SHUTTLE", Form("Process time out. Run time: %d seconds. Killing...", expiredTime));
 
+          kill(pid, 9);
+
+          hasError = kTRUE;
+
+          gSystem->Sleep(1000);
+        }
+        else
+        {
+          if (expiredTime % 60 == 0)
+            Log("SHUTTLE", Form("Checked process. Run time: %d seconds.", expiredTime));
+
+          gSystem->Sleep(1000);
+        }
+      }
+
+      AliInfo(Form("In parent process of %d - %s: Client has terminated.", GetCurrentRun(), aDetector->GetName()));
+
+      if (WIFEXITED(status))
+      {
+        Int_t returnCode = WEXITSTATUS(status);
+
+        Log("SHUTTLE", Form("The return code is %d", returnCode));
+
+        if (returnCode != 0)
+          hasError = kTRUE;
+      }
+    }
+    else if (pid == 0)
+    {
+      // client
+      AliInfo(Form("In client process of %d - %s", GetCurrentRun(), aDetector->GetName()));
+
+      UInt_t result = ProcessCurrentDetector();
+
+      Int_t returnCode = 0; // will be set to 1 in case of an error
+
+      if (!result) {
+        returnCode = 1;
+        AliInfo(Form("\n \t\t\t****** run %d - %s: PREPROCESSOR ERROR ****** \n\n",
+                GetCurrentRun(), aDetector->GetName()));
+      }
+      else if(result == 2) {
+        AliInfo(Form("\n \t\t\t****** run %d - %s: STORAGE ERROR ****** \n\n",
+                GetCurrentRun(), aDetector->GetName()));
+      } else {
+        AliInfo(Form("\n \t\t\t****** run %d - %s: DONE ****** \n\n",
+                GetCurrentRun(), aDetector->GetName()));
+      }
+
+      if (result > 0)
+      {
+        // Process successful: Update time_processed field in FES logbooks!
+        if(fFESCalled[kDAQ]) {
+          if (UpdateDAQTable() == kFALSE)
+            returnCode = 1;
+          fFESlist[kDAQ].Clear();
+        }
+        //if(fFESCalled[kDCS]) {
+        //  if (UpdateDCSTable(aDetector->GetName()) == kFALSE)
+        //    returnCode = 1;
+        //  fFESlist[kDCS].Clear();
+        //}
+        //if(fFESCalled[kHLT]) {
+        //  if (UpdateHLTTable(aDetector->GetName()) == kFALSE)
+        //    returnCode = 1;
+        //     fFESlist[kHLT].Clear();
+        //}
+      }
+
+      AliInfo(Form("Client process of %d - %s is exiting now with %d.", GetCurrentRun(), aDetector->GetName(), returnCode));
+
+      // the client exits here
+      gSystem->Exit(returnCode);
+
+      AliError("We should never get here!!!");
+    }
        }
 
        AliInfo(Form("\n\n \t\t\t^*^*^*^*^*^*^*^*^*^*^*^* run %d: FINISH ^*^*^*^*^*^*^*^*^*^*^*^* \n",
@@ -671,12 +734,11 @@ UInt_t AliShuttle::ProcessCurrentDetector()
                dynamic_cast<AliPreprocessor*> (fPreprocessorMap.GetValue(fCurrentDetector));
 
        aPreprocessor->Initialize(GetCurrentRun(), GetCurrentStartTime(), GetCurrentEndTime());
-
        UInt_t aPPResult = aPreprocessor->Process(&aliasMap);
 
        UInt_t returnValue = 0;
        if (aPPResult == 0) { // Preprocessor error
-               UpdateShuttleStatus(AliShuttleStatus::kPPError, kTRUE);
+               UpdateShuttleStatus(AliShuttleStatus::kPPError);
                returnValue = 0;
        } else if (fGridError == kFALSE) { // process and Grid storage ok!
                UpdateShuttleStatus(AliShuttleStatus::kDone);
@@ -1462,7 +1524,7 @@ void AliShuttle::Log(const char* detector, const char* message)
                gSystem->FreeDirectory(dir);
        }
 
-       TString toLog = Form("%s: %s - ", TTimeStamp(time(0)).AsString("s"), detector);
+       TString toLog = Form("%s (%d): %s - ", TTimeStamp(time(0)).AsString("s"), getpid(), detector);
        if(GetCurrentRun()>=0 ) toLog += Form("run %d - ", GetCurrentRun());
        toLog += Form("%s", message);
 
@@ -1485,93 +1547,41 @@ void AliShuttle::Log(const char* detector, const char* message)
        logFile.close();
 }
 
-
 //______________________________________________________________________________________________
 Bool_t AliShuttle::Collect(Int_t run)
 {
        //
-       // Collects conditions data for the given run.
-       //
-
-       AliInfo(Form("Collecting conditions data for run %d", run));
-
-       TString whereClause("where run=");
-       whereClause += run;
-
-       TObjArray dateEntries;
-       if (!QueryShuttleLogbook(whereClause, dateEntries)) {
-               AliError("Can't retrieve entries from Shuttle logbook!");
-               return kFALSE;
-        }
-
-       if (!dateEntries.GetEntriesFast()) {
-               AliError(Form("Retrieval of parameters for run %d failed!", run));
-               return kFALSE;
-       }
-
-       if (dateEntries.GetEntriesFast() > 1) {
-               AliError(Form("There is more than one entry for run <%d> in Shuttle logbook!", run));
-               return kFALSE;
-       }
-
-       if (!RetrieveConditionsData(dateEntries)) {
-               AliError("An error occured during conditions data retrieval!");
-               return kFALSE;
-       }
-
-       return kTRUE;
-}
-
-//______________________________________________________________________________________________
-Bool_t AliShuttle::CollectNew()
-{
-       //
-       // Collects conditions data for all UNPROCESSED run written to DAQ LogBook.
+       // Collects conditions data for all UNPROCESSED run written to DAQ LogBook in case of run = -1 (default)
+  // If a dedicated run is given this run is processed
+  //
        // In operational mode, this is the Shuttle function triggered by the EOR signal.
        //
 
-       Log("SHUTTLE","CollectNew - Shuttle called. Collecting conditions data for unprocessed runs");
+  if (run == -1)
+       Log("SHUTTLE","Collect - Shuttle called. Collecting conditions data for unprocessed runs");
+  else
+       Log("SHUTTLE", Form("Collect - Shuttle called. Collecting conditions data for run %d", run));
+
+       SetLastAction("Starting");
 
        TString whereClause("where shuttle_done=0");
+  if (run != -1)
+    whereClause += Form(" and run=%d", run);
 
        TObjArray shuttleLogbookEntries;
        if (!QueryShuttleLogbook(whereClause, shuttleLogbookEntries)) {
-               Log("SHUTTLE", "CollectNew - Can't retrieve entries from Shuttle logbook");
+               Log("SHUTTLE", "Collect - Can't retrieve entries from Shuttle logbook");
                return kFALSE;
        }
 
        if (!RetrieveConditionsData(shuttleLogbookEntries)) {
-               Log("SHUTTLE", "CollectNew - Process of at least one run failed");
-               return kFALSE;
-       }
-
-       return kTRUE;
-}
-
-//______________________________________________________________________________________________
-Bool_t AliShuttle::CollectAll()
-{
-       //
-       // Collects conditions data for all runs (even if they're already done!) written in Shuttle LogBook.
-       //
-
-       AliInfo("Collecting conditions data for all runs ...");
-
-       TObjArray dateEntries;
-       if (!QueryShuttleLogbook("", dateEntries)) {
-               AliError("Can't retrieve entries from Shuttle logbook");
-               return kFALSE;
-       }
-
-       if (!RetrieveConditionsData(dateEntries)) {
-               AliError("An error occured during conditions data retrieval!");
+               Log("SHUTTLE", "Collect - Process of at least one run failed");
                return kFALSE;
        }
 
-       return kTRUE;
+  return kTRUE;
 }
 
-
 //______________________________________________________________________________________________
 Bool_t AliShuttle::RetrieveConditionsData(const TObjArray& dateEntries)
 {
@@ -1590,3 +1600,46 @@ Bool_t AliShuttle::RetrieveConditionsData(const TObjArray& dateEntries)
 
        return hasError == kFALSE;
 }
+
+//______________________________________________________________________________________________
+ULong_t AliShuttle::GetTimeOfLastAction() const
+{
+       ULong_t tmp;
+       
+       fMonitoringMutex->Lock();
+       
+       tmp = fLastActionTime;
+       
+       fMonitoringMutex->UnLock();
+       
+       return tmp;
+}
+
+//______________________________________________________________________________________________
+const TString AliShuttle::GetLastAction() const
+{
+       // returns a string description of the last action
+
+       TString tmp;
+       
+       fMonitoringMutex->Lock();
+       
+       tmp = fLastAction;
+       
+       fMonitoringMutex->UnLock();
+
+       return tmp;     
+}
+
+//______________________________________________________________________________________________
+void AliShuttle::SetLastAction(const char* action)
+{
+       // updates the monitoring variables
+       
+       fMonitoringMutex->Lock();
+       
+       fLastAction = action;
+       fLastActionTime = time(0);
+       
+       fMonitoringMutex->UnLock();
+}