client server connection improved, interrupt handler for monitor process added
authortkuhr <tkuhr@f7af4fe6-9843-0410-8265-dc069ae4e863>
Thu, 22 Jan 2004 17:38:13 +0000 (17:38 +0000)
committertkuhr <tkuhr@f7af4fe6-9843-0410-8265-dc069ae4e863>
Thu, 22 Jan 2004 17:38:13 +0000 (17:38 +0000)
MONITOR/AliMonitorClient.cxx
MONITOR/AliMonitorControl.cxx
MONITOR/AliMonitorProcess.cxx
MONITOR/AliMonitorProcess.h

index 6468ef4..c1cec6f 100644 (file)
@@ -512,6 +512,8 @@ AliMonitorClient::~AliMonitorClient()
   delete fHorizontalFrame;
 
   if (fSocket) {
+    fSocketHandler->Remove();
+    fSocket->Send("disconnect");
     fSocket->Close();
     delete fSocket;
     delete fSocketHandler;
@@ -539,6 +541,16 @@ void AliMonitorClient::CloseWindow()
   if (fMenuOptions->IsEntryChecked(kMenuOptSaveOnExit)) {
     SaveSettings();
   }
+  if (fSocket) {
+    fSocketHandler->Remove();
+    fSocket->Send("disconnect");
+    fSocket->Close();
+    delete fSocket;
+    fSocket = NULL;
+    delete fSocketHandler;
+    fSocketHandler = NULL;
+  }
+
   gApplication->Terminate(0);
 }
 
@@ -1042,7 +1054,7 @@ void AliMonitorClient::DisconnectFromServer()
 
   // disconnect from the server
   fSocketHandler->Remove();
-  fSocket->Send("Finished"); 
+  fSocket->Send("disconnect");
   fSocket->Close();
   delete fSocket;
   fSocket = NULL;
@@ -1873,18 +1885,42 @@ Bool_t AliMonitorClient::CheckForNewData()
 {
 // check whether the monitor process server sent new data
 
-  // receive a message from the server
+  // disable the socket handler in this method
   if (!fSocket || !fSocket->IsValid()) return kFALSE;
+  fSocketHandler->Remove();
+
+  // receive a control message from the server
+  char controlMessage[256];
+  if (fSocket->Recv(controlMessage, 255) <= 0) {
+    fSocketHandler->Add();
+    return kFALSE;
+  }
+
+  // if it is new histogram data, send ok
+  if ((strcmp(controlMessage, "histograms") != 0) ||
+      (fSocket->Send("ok") <= 0)) {
+    fSocketHandler->Add();
+    return kFALSE;
+  }
+
+  // get the histogram data
   TMessage* message = NULL;
-  Int_t result = fSocket->Recv(message);
+  if (fSocket->Recv(message) <= 0) {
+    fSocketHandler->Add();
+    return kFALSE;
+  }
 
   // replace the old folder of monitor histos with the new one
-  if (result > 0) {
+  if (message->GetClass()->InheritsFrom(TFolder::Class())) {
     if (fFolder) delete fFolder;
     fFolder = (TFolder*) message->ReadObject(message->GetClass());
+    delete message;
+    fSocketHandler->Add();
     return kTRUE;
   }
 
+  delete message;
+  fSocketHandler->Add();
   return kFALSE;
 }
 
index 83c803d..2fdf7ca 100644 (file)
@@ -35,6 +35,7 @@
 #include <TGLabel.h>
 #include <TGTextEntry.h>
 #include <TTimer.h>
+#include <TApplication.h>
 #include "AliMonitorProcess.h"
 
 
@@ -369,7 +370,10 @@ void AliMonitorControl::HandleMenu(Int_t id)
                 kMBIconQuestion, kMBYes | kMBNo, &result);
     if (result == kMBYes) {
       fMenuFile->EnableEntry(kMenuFileAbort);
-      if (fMonitorProcess->IsStopped()) exit(0);
+      if (fMonitorProcess->IsStopped()) {
+       delete fMonitorProcess;
+       gApplication->Terminate(0);
+      }
       fMonitorProcess->Stop();
       fTerminating = kTRUE;
     }
@@ -456,7 +460,10 @@ Bool_t AliMonitorControl::HandleTimer(TTimer* timer)
 // update the displayed information
 
   timer->TurnOff();
-  if (fTerminating && fMonitorProcess->IsStopped()) exit(0); 
+  if (fTerminating && fMonitorProcess->IsStopped()) {
+    delete fMonitorProcess;
+    gApplication->Terminate(0);
+  }
   UpdateStatus();
   gSystem->ProcessEvents();
   timer->TurnOn();
index 2f0b21f..a9b7ed8 100644 (file)
@@ -37,7 +37,6 @@
 #include "AliRun.h"
 #include "AliTPC.h"
 #include "AliTPCclustererMI.h"
-#include "AliComplexCluster.h"
 #include "AliTPCtrackerMI.h"
 #include "AliITS.h"
 #include "AliITSclustererV2.h"
@@ -171,6 +170,9 @@ AliMonitorProcess::AliMonitorProcess(
 
   SetStatus(kStopped);
   fStopping = kFALSE;
+
+  fInterruptHandler = new AliMonitorInterruptHandler(this);
+  gSystem->AddSignalHandler(fInterruptHandler);
 }
 
 //_____________________________________________________________________________
@@ -214,6 +216,9 @@ AliMonitorProcess::~AliMonitorProcess()
   delete fHLT;
   delete fHLTHough;
 #endif
+
+  gSystem->RemoveSignalHandler(fInterruptHandler);
+  delete fInterruptHandler;
 }
 
 
@@ -301,7 +306,11 @@ Bool_t AliMonitorProcess::CheckForNewFile()
 #if ROOT_VERSION_CODE <= 199169   // 3.10/01
   TGridResult* result = fGrid->Ls();
 #else
-  Grid_ResultHandle_t handle = fGrid->OpenDir(fAlienDir);
+//  Grid_ResultHandle_t handle = fGrid->OpenDir(fAlienDir);
+  TDatime datime;
+  char findName[256];
+  sprintf(findName, "*_%d_*.root", datime.GetDate());
+  Grid_ResultHandle_t handle = fGrid->Find(fAlienDir, findName);
   if (!handle) {
     Error("CheckForNewFile", "could not open alien directory %s", 
          fAlienDir.Data());
@@ -319,6 +328,7 @@ Bool_t AliMonitorProcess::CheckForNewFile()
   while (Grid_Result_t* resultEntry = result->Next()) {
     const char* entry = resultEntry->name.c_str();
 #endif
+    if (rindex(entry, '/')) entry = rindex(entry, '/')+1;
     // entry = host_date_time.root
     TString entryCopy(entry);
     char* p = const_cast<char*>(entryCopy.Data());
@@ -386,6 +396,7 @@ Bool_t AliMonitorProcess::ProcessFile()
 
   // loop over the events
   for (Int_t iEvent = 0; iEvent < nEvents; iEvent++) {
+    CheckForConnections();
     SetStatus(kReading);
     fRunLoader->SetEventNumber(0);
     AliRawReaderRoot rawReader(fFileName, iEvent);
@@ -403,29 +414,38 @@ Bool_t AliMonitorProcess::ProcessFile()
     // monitor only central physics events
     if (rawReader.GetType() != 7) continue;
     if ((rawReader.GetAttributes()[0] & 0x02) == 0) continue;
+    Info("ProcessFile", "run: %d  event: %d %d\n", rawReader.GetRunNumber(), 
+        rawReader.GetEventId()[0], rawReader.GetEventId()[1]);
 
+    CheckForConnections();
     if (!ReconstructTPC(&rawReader)) return kFALSE;
     if (fStopping) break;
+    CheckForConnections();
     if (!ReconstructITS(&rawReader)) return kFALSE;
     if (fStopping) break;
+    CheckForConnections();
     if (!ReconstructV0s()) return kFALSE;
     if (fStopping) break;
+    CheckForConnections();
     if (!ReconstructHLT(iEvent)) return kFALSE;
     if (fStopping) break;
+    CheckForConnections();
     if (!ReconstructHLTHough(iEvent)) return kFALSE;
     if (fStopping) break;
 
     if (fDisplaySocket) fDisplaySocket->Send("new event");
 
     Info("ProcessFile", "filling histograms...");
-    SetStatus(kFilling);
     for (Int_t iMonitor = 0; iMonitor < fMonitors.GetEntriesFast(); iMonitor++) {
+      CheckForConnections();
+      SetStatus(kFilling);
       ((AliMonitor*) fMonitors[iMonitor])->FillHistos(fRunLoader, &rawReader);
       if (fStopping) break;
     }
     if (fStopping) break;
 
     Info("ProcessFile", "updating histograms...");
+    CheckForConnections();
     SetStatus(kUpdating);
     TIterator* iFolder = fTopFolder->GetListOfFolders()->MakeIterator();
     while (TFolder* folder = (TFolder*) iFolder->Next()) {
@@ -937,46 +957,56 @@ void AliMonitorProcess::CheckForConnections()
 {
 // check if new clients want to connect and add them to the list of sockets
 
-  TMessage message(kMESS_OBJECT);
-  message.WriteObject(fTopFolder); 
-  SetStatus(kConnecting);
-
   TSocket* socket;
   while ((socket = fServerSocket->Accept()) != (TSocket*)-1) {
     socket->SetOption(kNoBlock, 1);
     char socketType[256];
-    if (!socket->Recv(socketType, 255)) continue;
-    if (strcmp(socketType, "client") == 0) {
-      if ((fNEvents == 0) || (socket->Send(message) >= 0)) {
-       fSockets.Add(socket);
+    if (socket->Recv(socketType, 255) <= 0) {
+      gSystem->Sleep(1000);
+      if (socket->Recv(socketType, 255) <= 0) {
        TInetAddress adr = socket->GetInetAddress();
-       Info("CheckForConnections", "new client:\n %s (%s), port %d\n",
-            adr.GetHostName(), adr.GetHostAddress(), adr.GetPort());
+       Error("CheckForConnections", "no socket type received - "
+             "disconnect client:\n %s (%s), port %d\n",
+             adr.GetHostName(), adr.GetHostAddress(), adr.GetPort());
+       delete socket;
+       continue;
       }
+    }
+    if (strcmp(socketType, "client") == 0) {
+      fSockets.Add(socket);
+      TInetAddress adr = socket->GetInetAddress();
+      Info("CheckForConnections", "new client:\n %s (%s), port %d\n",
+          adr.GetHostName(), adr.GetHostAddress(), adr.GetPort());
+      if (fNEvents > 0) BroadcastHistos(socket);
     } else if (strcmp(socketType, "display") == 0) {
       if (fDisplaySocket) {
        fDisplaySocket->Close();
        delete fDisplaySocket;
       }
       fDisplaySocket = socket;
-      fDisplaySocket->SetOption(kNoBlock, 1);
       TInetAddress adr = socket->GetInetAddress();
       Info("CheckForConnections", "new display:\n %s (%s), port %d\n",
           adr.GetHostName(), adr.GetHostAddress(), adr.GetPort());
+    } else {
+      TInetAddress adr = socket->GetInetAddress();
+      Error("CheckForConnections", "unknown socket type %s - "
+           "disconnect client:\n %s (%s), port %d\n", socketType,
+           adr.GetHostName(), adr.GetHostAddress(), adr.GetPort());
+      delete socket;
+      continue;
     }
   }
 
+  // remove finished or invalid clients
   for (Int_t iSocket = 0; iSocket < fSockets.GetEntriesFast(); iSocket++) {
     socket = (TSocket*) fSockets[iSocket];
     if (!socket) continue;
-    // remove finished client
-    char str[256];
-    if (socket->Recv(str, 255)) {
-      TString socketMessage(str);
-      if(socketMessage.CompareTo("Finished") == 0) {
+    char controlMessage[256];
+    if (socket->Recv(controlMessage, 255)) {
+      if (strcmp(controlMessage, "disconnect") == 0) {
        TInetAddress adr = socket->GetInetAddress();
        Info("CheckForConnections",
-            "disconnect finished client:\n %s (%s), port %d\n",
+            "disconnect client:\n %s (%s), port %d\n",
             adr.GetHostName(), adr.GetHostAddress(), adr.GetPort());
        delete fSockets.RemoveAt(iSocket);
        continue;
@@ -985,8 +1015,9 @@ void AliMonitorProcess::CheckForConnections()
     if (!socket->IsValid()) {
       // remove invalid sockets from the list
       TInetAddress adr = socket->GetInetAddress();
-      Info("BroadcastHistos", "disconnect client:\n %s (%s), port %d\n",
-          adr.GetHostName(), adr.GetHostAddress(), adr.GetPort());
+      Error("CheckForConnections", 
+           "disconnect invalid client:\n %s (%s), port %d\n",
+           adr.GetHostName(), adr.GetHostAddress(), adr.GetPort());
       delete fSockets.RemoveAt(iSocket);
     }
   }
@@ -994,7 +1025,7 @@ void AliMonitorProcess::CheckForConnections()
 }
 
 //_____________________________________________________________________________
-void AliMonitorProcess::BroadcastHistos()
+void AliMonitorProcess::BroadcastHistos(TSocket* toSocket)
 {
 // send the monitor histograms to the clients
 
@@ -1005,16 +1036,94 @@ void AliMonitorProcess::BroadcastHistos()
   for (Int_t iSocket = 0; iSocket < fSockets.GetEntriesFast(); iSocket++) {
     TSocket* socket = (TSocket*) fSockets[iSocket];
     if (!socket) continue;
+    if (toSocket && (socket != toSocket)) continue;
+
+    // send control message
+    if (!socket->IsValid() || (socket->Send("histograms") <= 0)) {
+      TInetAddress adr = socket->GetInetAddress();
+      Error("BroadcastHistos", "connection to client failed - "
+           "disconnect client:\n %s (%s), port %d\n",
+           adr.GetHostName(), adr.GetHostAddress(), adr.GetPort());
+      delete fSockets.RemoveAt(iSocket);
+    }
+
+    // receive control message
+    char controlMessage[256];
+    Int_t result = socket->Recv(controlMessage, 255);
+    if (result <= 0) {
+      gSystem->Sleep(1000);  // wait one second and try again
+      result = socket->Recv(controlMessage, 255);
+    }
+    if (result <= 0) {
+      TInetAddress adr = socket->GetInetAddress();
+      Error("BroadcastHistos", "no response from client - "
+           "disconnect client:\n %s (%s), port %d\n",
+           adr.GetHostName(), adr.GetHostAddress(), adr.GetPort());
+      delete fSockets.RemoveAt(iSocket);
+      continue;
+    }
+    if (strcmp(controlMessage, "ok") != 0) {
+      TInetAddress adr = socket->GetInetAddress();
+      Error("BroadcastHistos", "no \"ok\" message from client - "
+           "disconnect client:\n %s (%s), port %d\n",
+           adr.GetHostName(), adr.GetHostAddress(), adr.GetPort());
+      delete fSockets.RemoveAt(iSocket);
+      continue;
+    }
+
     socket->SetOption(kNoBlock, 0);
-    if (!socket->IsValid() || (socket->Send(message) < 0)) {
+    if (socket->Send(message) < 0) {
       // remove the socket from the list if there was an error
       TInetAddress adr = socket->GetInetAddress();
-      Info("BroadcastHistos", "disconnect client:\n %s (%s), port %d\n",
-          adr.GetHostName(), adr.GetHostAddress(), adr.GetPort());
+      Error("BroadcastHistos", "sending histograms failed - "
+           "disconnect client:\n %s (%s), port %d\n",
+           adr.GetHostName(), adr.GetHostAddress(), adr.GetPort());
       delete fSockets.RemoveAt(iSocket);
     } else {
+      gSystem->Sleep(100);
       socket->SetOption(kNoBlock, 1);
     }
   }
   fSockets.Compress();
 }
+
+
+//_____________________________________________________________________________
+AliMonitorProcess::AliMonitorInterruptHandler::AliMonitorInterruptHandler
+  (AliMonitorProcess* process):
+  TSignalHandler(kSigUser1, kFALSE), 
+  fProcess(process) 
+{
+// constructor: set process
+}
+
+//_____________________________________________________________________________
+AliMonitorProcess::AliMonitorInterruptHandler::AliMonitorInterruptHandler
+  (const AliMonitorInterruptHandler& handler):
+  TSignalHandler(handler)
+{
+// copy constructor
+
+  Fatal("AliMonitorInterruptHandler", "copy constructor not implemented");
+}
+
+//_____________________________________________________________________________
+AliMonitorProcess::AliMonitorInterruptHandler& 
+  AliMonitorProcess::AliMonitorInterruptHandler::operator = 
+  (const AliMonitorInterruptHandler& /*handler*/) 
+{
+// assignment operator
+
+  Fatal("operator =", "assignment operator not implemented"); 
+  return *this;
+}
+
+//_____________________________________________________________________________
+Bool_t AliMonitorProcess::AliMonitorInterruptHandler::Notify() 
+{
+// interrupt signal -> stop process
+
+  Info("Notify", "the monitoring process will be stopped.");
+  fProcess->Stop(); 
+  return kTRUE;
+}
index a685da2..fcae2fc 100644 (file)
@@ -85,7 +85,7 @@ private:
   void             StartNewRun();
 
   void             CheckForConnections();
-  void             BroadcastHistos();
+  void             BroadcastHistos(TSocket* toSocket = NULL);
   void             SetStatus(EStatus status);
 
   static const Int_t fgkPort;          // port number for client connections
@@ -121,6 +121,19 @@ private:
   EStatus          fStatus;             // current status
   Bool_t           fStopping;           // stop of process requested or not
 
+  class AliMonitorInterruptHandler : public TSignalHandler {
+  public:
+    AliMonitorInterruptHandler(AliMonitorProcess* process);
+    AliMonitorInterruptHandler(const AliMonitorInterruptHandler& handler);
+    AliMonitorInterruptHandler& operator = 
+      (const AliMonitorInterruptHandler& handler);
+    virtual Bool_t Notify();
+  private:
+    AliMonitorProcess* fProcess;       // process to notify
+  };
+
+  AliMonitorInterruptHandler* fInterruptHandler;  // interrupt handler
+
   ClassDef(AliMonitorProcess, 0)   // class for performing the monitoring
 };