From 46c62a26e8ed52d2f9417556d38df1a459d69b32 Mon Sep 17 00:00:00 2001 From: tkuhr Date: Thu, 22 Jan 2004 17:38:13 +0000 Subject: [PATCH] client server connection improved, interrupt handler for monitor process added --- MONITOR/AliMonitorClient.cxx | 44 +++++++++- MONITOR/AliMonitorControl.cxx | 11 ++- MONITOR/AliMonitorProcess.cxx | 161 ++++++++++++++++++++++++++++------ MONITOR/AliMonitorProcess.h | 15 +++- 4 files changed, 198 insertions(+), 33 deletions(-) diff --git a/MONITOR/AliMonitorClient.cxx b/MONITOR/AliMonitorClient.cxx index 6468ef4699b..c1cec6faf14 100644 --- a/MONITOR/AliMonitorClient.cxx +++ b/MONITOR/AliMonitorClient.cxx @@ -512,6 +512,8 @@ AliMonitorClient::~AliMonitorClient() delete fHorizontalFrame; if (fSocket) { + fSocketHandler->Remove(); + fSocket->Send("disconnect"); fSocket->Close(); delete fSocket; delete fSocketHandler; @@ -539,6 +541,16 @@ void AliMonitorClient::CloseWindow() if (fMenuOptions->IsEntryChecked(kMenuOptSaveOnExit)) { SaveSettings(); } + if (fSocket) { + fSocketHandler->Remove(); + fSocket->Send("disconnect"); + fSocket->Close(); + delete fSocket; + fSocket = NULL; + delete fSocketHandler; + fSocketHandler = NULL; + } + gApplication->Terminate(0); } @@ -1042,7 +1054,7 @@ void AliMonitorClient::DisconnectFromServer() // disconnect from the server fSocketHandler->Remove(); - fSocket->Send("Finished"); + fSocket->Send("disconnect"); fSocket->Close(); delete fSocket; fSocket = NULL; @@ -1873,18 +1885,42 @@ Bool_t AliMonitorClient::CheckForNewData() { // check whether the monitor process server sent new data - // receive a message from the server + // disable the socket handler in this method if (!fSocket || !fSocket->IsValid()) return kFALSE; + fSocketHandler->Remove(); + + // receive a control message from the server + char controlMessage[256]; + if (fSocket->Recv(controlMessage, 255) <= 0) { + fSocketHandler->Add(); + return kFALSE; + } + + // if it is new histogram data, send ok + if ((strcmp(controlMessage, "histograms") != 0) || + (fSocket->Send("ok") <= 0)) { + fSocketHandler->Add(); + return kFALSE; + } + + // get the histogram data TMessage* message = NULL; - Int_t result = fSocket->Recv(message); + if (fSocket->Recv(message) <= 0) { + fSocketHandler->Add(); + return kFALSE; + } // replace the old folder of monitor histos with the new one - if (result > 0) { + if (message->GetClass()->InheritsFrom(TFolder::Class())) { if (fFolder) delete fFolder; fFolder = (TFolder*) message->ReadObject(message->GetClass()); + delete message; + fSocketHandler->Add(); return kTRUE; } + delete message; + fSocketHandler->Add(); return kFALSE; } diff --git a/MONITOR/AliMonitorControl.cxx b/MONITOR/AliMonitorControl.cxx index 83c803d81f2..2fdf7cac112 100644 --- a/MONITOR/AliMonitorControl.cxx +++ b/MONITOR/AliMonitorControl.cxx @@ -35,6 +35,7 @@ #include #include #include +#include #include "AliMonitorProcess.h" @@ -369,7 +370,10 @@ void AliMonitorControl::HandleMenu(Int_t id) kMBIconQuestion, kMBYes | kMBNo, &result); if (result == kMBYes) { fMenuFile->EnableEntry(kMenuFileAbort); - if (fMonitorProcess->IsStopped()) exit(0); + if (fMonitorProcess->IsStopped()) { + delete fMonitorProcess; + gApplication->Terminate(0); + } fMonitorProcess->Stop(); fTerminating = kTRUE; } @@ -456,7 +460,10 @@ Bool_t AliMonitorControl::HandleTimer(TTimer* timer) // update the displayed information timer->TurnOff(); - if (fTerminating && fMonitorProcess->IsStopped()) exit(0); + if (fTerminating && fMonitorProcess->IsStopped()) { + delete fMonitorProcess; + gApplication->Terminate(0); + } UpdateStatus(); gSystem->ProcessEvents(); timer->TurnOn(); diff --git a/MONITOR/AliMonitorProcess.cxx b/MONITOR/AliMonitorProcess.cxx index 2f0b21f22e6..a9b7ed89c49 100644 --- a/MONITOR/AliMonitorProcess.cxx +++ b/MONITOR/AliMonitorProcess.cxx @@ -37,7 +37,6 @@ #include "AliRun.h" #include "AliTPC.h" #include "AliTPCclustererMI.h" -#include "AliComplexCluster.h" #include "AliTPCtrackerMI.h" #include "AliITS.h" #include "AliITSclustererV2.h" @@ -171,6 +170,9 @@ AliMonitorProcess::AliMonitorProcess( SetStatus(kStopped); fStopping = kFALSE; + + fInterruptHandler = new AliMonitorInterruptHandler(this); + gSystem->AddSignalHandler(fInterruptHandler); } //_____________________________________________________________________________ @@ -214,6 +216,9 @@ AliMonitorProcess::~AliMonitorProcess() delete fHLT; delete fHLTHough; #endif + + gSystem->RemoveSignalHandler(fInterruptHandler); + delete fInterruptHandler; } @@ -301,7 +306,11 @@ Bool_t AliMonitorProcess::CheckForNewFile() #if ROOT_VERSION_CODE <= 199169 // 3.10/01 TGridResult* result = fGrid->Ls(); #else - Grid_ResultHandle_t handle = fGrid->OpenDir(fAlienDir); +// Grid_ResultHandle_t handle = fGrid->OpenDir(fAlienDir); + TDatime datime; + char findName[256]; + sprintf(findName, "*_%d_*.root", datime.GetDate()); + Grid_ResultHandle_t handle = fGrid->Find(fAlienDir, findName); if (!handle) { Error("CheckForNewFile", "could not open alien directory %s", fAlienDir.Data()); @@ -319,6 +328,7 @@ Bool_t AliMonitorProcess::CheckForNewFile() while (Grid_Result_t* resultEntry = result->Next()) { const char* entry = resultEntry->name.c_str(); #endif + if (rindex(entry, '/')) entry = rindex(entry, '/')+1; // entry = host_date_time.root TString entryCopy(entry); char* p = const_cast(entryCopy.Data()); @@ -386,6 +396,7 @@ Bool_t AliMonitorProcess::ProcessFile() // loop over the events for (Int_t iEvent = 0; iEvent < nEvents; iEvent++) { + CheckForConnections(); SetStatus(kReading); fRunLoader->SetEventNumber(0); AliRawReaderRoot rawReader(fFileName, iEvent); @@ -403,29 +414,38 @@ Bool_t AliMonitorProcess::ProcessFile() // monitor only central physics events if (rawReader.GetType() != 7) continue; if ((rawReader.GetAttributes()[0] & 0x02) == 0) continue; + Info("ProcessFile", "run: %d event: %d %d\n", rawReader.GetRunNumber(), + rawReader.GetEventId()[0], rawReader.GetEventId()[1]); + CheckForConnections(); if (!ReconstructTPC(&rawReader)) return kFALSE; if (fStopping) break; + CheckForConnections(); if (!ReconstructITS(&rawReader)) return kFALSE; if (fStopping) break; + CheckForConnections(); if (!ReconstructV0s()) return kFALSE; if (fStopping) break; + CheckForConnections(); if (!ReconstructHLT(iEvent)) return kFALSE; if (fStopping) break; + CheckForConnections(); if (!ReconstructHLTHough(iEvent)) return kFALSE; if (fStopping) break; if (fDisplaySocket) fDisplaySocket->Send("new event"); Info("ProcessFile", "filling histograms..."); - SetStatus(kFilling); for (Int_t iMonitor = 0; iMonitor < fMonitors.GetEntriesFast(); iMonitor++) { + CheckForConnections(); + SetStatus(kFilling); ((AliMonitor*) fMonitors[iMonitor])->FillHistos(fRunLoader, &rawReader); if (fStopping) break; } if (fStopping) break; Info("ProcessFile", "updating histograms..."); + CheckForConnections(); SetStatus(kUpdating); TIterator* iFolder = fTopFolder->GetListOfFolders()->MakeIterator(); while (TFolder* folder = (TFolder*) iFolder->Next()) { @@ -937,46 +957,56 @@ void AliMonitorProcess::CheckForConnections() { // check if new clients want to connect and add them to the list of sockets - TMessage message(kMESS_OBJECT); - message.WriteObject(fTopFolder); - SetStatus(kConnecting); - TSocket* socket; while ((socket = fServerSocket->Accept()) != (TSocket*)-1) { socket->SetOption(kNoBlock, 1); char socketType[256]; - if (!socket->Recv(socketType, 255)) continue; - if (strcmp(socketType, "client") == 0) { - if ((fNEvents == 0) || (socket->Send(message) >= 0)) { - fSockets.Add(socket); + if (socket->Recv(socketType, 255) <= 0) { + gSystem->Sleep(1000); + if (socket->Recv(socketType, 255) <= 0) { TInetAddress adr = socket->GetInetAddress(); - Info("CheckForConnections", "new client:\n %s (%s), port %d\n", - adr.GetHostName(), adr.GetHostAddress(), adr.GetPort()); + Error("CheckForConnections", "no socket type received - " + "disconnect client:\n %s (%s), port %d\n", + adr.GetHostName(), adr.GetHostAddress(), adr.GetPort()); + delete socket; + continue; } + } + if (strcmp(socketType, "client") == 0) { + fSockets.Add(socket); + TInetAddress adr = socket->GetInetAddress(); + Info("CheckForConnections", "new client:\n %s (%s), port %d\n", + adr.GetHostName(), adr.GetHostAddress(), adr.GetPort()); + if (fNEvents > 0) BroadcastHistos(socket); } else if (strcmp(socketType, "display") == 0) { if (fDisplaySocket) { fDisplaySocket->Close(); delete fDisplaySocket; } fDisplaySocket = socket; - fDisplaySocket->SetOption(kNoBlock, 1); TInetAddress adr = socket->GetInetAddress(); Info("CheckForConnections", "new display:\n %s (%s), port %d\n", adr.GetHostName(), adr.GetHostAddress(), adr.GetPort()); + } else { + TInetAddress adr = socket->GetInetAddress(); + Error("CheckForConnections", "unknown socket type %s - " + "disconnect client:\n %s (%s), port %d\n", socketType, + adr.GetHostName(), adr.GetHostAddress(), adr.GetPort()); + delete socket; + continue; } } + // remove finished or invalid clients for (Int_t iSocket = 0; iSocket < fSockets.GetEntriesFast(); iSocket++) { socket = (TSocket*) fSockets[iSocket]; if (!socket) continue; - // remove finished client - char str[256]; - if (socket->Recv(str, 255)) { - TString socketMessage(str); - if(socketMessage.CompareTo("Finished") == 0) { + char controlMessage[256]; + if (socket->Recv(controlMessage, 255)) { + if (strcmp(controlMessage, "disconnect") == 0) { TInetAddress adr = socket->GetInetAddress(); Info("CheckForConnections", - "disconnect finished client:\n %s (%s), port %d\n", + "disconnect client:\n %s (%s), port %d\n", adr.GetHostName(), adr.GetHostAddress(), adr.GetPort()); delete fSockets.RemoveAt(iSocket); continue; @@ -985,8 +1015,9 @@ void AliMonitorProcess::CheckForConnections() if (!socket->IsValid()) { // remove invalid sockets from the list TInetAddress adr = socket->GetInetAddress(); - Info("BroadcastHistos", "disconnect client:\n %s (%s), port %d\n", - adr.GetHostName(), adr.GetHostAddress(), adr.GetPort()); + Error("CheckForConnections", + "disconnect invalid client:\n %s (%s), port %d\n", + adr.GetHostName(), adr.GetHostAddress(), adr.GetPort()); delete fSockets.RemoveAt(iSocket); } } @@ -994,7 +1025,7 @@ void AliMonitorProcess::CheckForConnections() } //_____________________________________________________________________________ -void AliMonitorProcess::BroadcastHistos() +void AliMonitorProcess::BroadcastHistos(TSocket* toSocket) { // send the monitor histograms to the clients @@ -1005,16 +1036,94 @@ void AliMonitorProcess::BroadcastHistos() for (Int_t iSocket = 0; iSocket < fSockets.GetEntriesFast(); iSocket++) { TSocket* socket = (TSocket*) fSockets[iSocket]; if (!socket) continue; + if (toSocket && (socket != toSocket)) continue; + + // send control message + if (!socket->IsValid() || (socket->Send("histograms") <= 0)) { + TInetAddress adr = socket->GetInetAddress(); + Error("BroadcastHistos", "connection to client failed - " + "disconnect client:\n %s (%s), port %d\n", + adr.GetHostName(), adr.GetHostAddress(), adr.GetPort()); + delete fSockets.RemoveAt(iSocket); + } + + // receive control message + char controlMessage[256]; + Int_t result = socket->Recv(controlMessage, 255); + if (result <= 0) { + gSystem->Sleep(1000); // wait one second and try again + result = socket->Recv(controlMessage, 255); + } + if (result <= 0) { + TInetAddress adr = socket->GetInetAddress(); + Error("BroadcastHistos", "no response from client - " + "disconnect client:\n %s (%s), port %d\n", + adr.GetHostName(), adr.GetHostAddress(), adr.GetPort()); + delete fSockets.RemoveAt(iSocket); + continue; + } + if (strcmp(controlMessage, "ok") != 0) { + TInetAddress adr = socket->GetInetAddress(); + Error("BroadcastHistos", "no \"ok\" message from client - " + "disconnect client:\n %s (%s), port %d\n", + adr.GetHostName(), adr.GetHostAddress(), adr.GetPort()); + delete fSockets.RemoveAt(iSocket); + continue; + } + socket->SetOption(kNoBlock, 0); - if (!socket->IsValid() || (socket->Send(message) < 0)) { + if (socket->Send(message) < 0) { // remove the socket from the list if there was an error TInetAddress adr = socket->GetInetAddress(); - Info("BroadcastHistos", "disconnect client:\n %s (%s), port %d\n", - adr.GetHostName(), adr.GetHostAddress(), adr.GetPort()); + Error("BroadcastHistos", "sending histograms failed - " + "disconnect client:\n %s (%s), port %d\n", + adr.GetHostName(), adr.GetHostAddress(), adr.GetPort()); delete fSockets.RemoveAt(iSocket); } else { + gSystem->Sleep(100); socket->SetOption(kNoBlock, 1); } } fSockets.Compress(); } + + +//_____________________________________________________________________________ +AliMonitorProcess::AliMonitorInterruptHandler::AliMonitorInterruptHandler + (AliMonitorProcess* process): + TSignalHandler(kSigUser1, kFALSE), + fProcess(process) +{ +// constructor: set process +} + +//_____________________________________________________________________________ +AliMonitorProcess::AliMonitorInterruptHandler::AliMonitorInterruptHandler + (const AliMonitorInterruptHandler& handler): + TSignalHandler(handler) +{ +// copy constructor + + Fatal("AliMonitorInterruptHandler", "copy constructor not implemented"); +} + +//_____________________________________________________________________________ +AliMonitorProcess::AliMonitorInterruptHandler& + AliMonitorProcess::AliMonitorInterruptHandler::operator = + (const AliMonitorInterruptHandler& /*handler*/) +{ +// assignment operator + + Fatal("operator =", "assignment operator not implemented"); + return *this; +} + +//_____________________________________________________________________________ +Bool_t AliMonitorProcess::AliMonitorInterruptHandler::Notify() +{ +// interrupt signal -> stop process + + Info("Notify", "the monitoring process will be stopped."); + fProcess->Stop(); + return kTRUE; +} diff --git a/MONITOR/AliMonitorProcess.h b/MONITOR/AliMonitorProcess.h index a685da29b7a..fcae2fc53e1 100644 --- a/MONITOR/AliMonitorProcess.h +++ b/MONITOR/AliMonitorProcess.h @@ -85,7 +85,7 @@ private: void StartNewRun(); void CheckForConnections(); - void BroadcastHistos(); + void BroadcastHistos(TSocket* toSocket = NULL); void SetStatus(EStatus status); static const Int_t fgkPort; // port number for client connections @@ -121,6 +121,19 @@ private: EStatus fStatus; // current status Bool_t fStopping; // stop of process requested or not + class AliMonitorInterruptHandler : public TSignalHandler { + public: + AliMonitorInterruptHandler(AliMonitorProcess* process); + AliMonitorInterruptHandler(const AliMonitorInterruptHandler& handler); + AliMonitorInterruptHandler& operator = + (const AliMonitorInterruptHandler& handler); + virtual Bool_t Notify(); + private: + AliMonitorProcess* fProcess; // process to notify + }; + + AliMonitorInterruptHandler* fInterruptHandler; // interrupt handler + ClassDef(AliMonitorProcess, 0) // class for performing the monitoring }; -- 2.43.0