alistoragemanager/AliStorageAdministratorPanelSetStorageParams.cxx
alistoragemanager/AliStorageEventManager.cxx
alistoragemanager/AliStorageDatabase.cxx
+ alistoragemanager/AliDIMListenerThread.cxx
+ alistoragemanager/AliEventsCollectorThread.cxx
+ alistoragemanager/AliCommunicationThread.cxx
alieventserver/AliEventServerReconstruction.cxx
alieventserver/AliEventServerWindow.cxx
alieventserver/AliEventServer.cxx
// storage manager:
#pragma link C++ class AliStorageDatabase+;
#pragma link C++ class AliStorageClientThread+;
+#pragma link C++ class AliDIMListenerThread+;
+#pragma link C++ class AliEventsCollectorThread+;
+#pragma link C++ class AliCommunicationThread+;
#pragma link C++ class AliStorageServerThread+;
#pragma link C++ class AliStorageEventManager+;
#pragma link C++ class AliStorageAdministratorPanel+;
--- /dev/null
+#include "AliCommunicationThread.h"
+#include "AliStorageEventManager.h"
+
+#include <iostream>
+#include <fstream>
+
+using namespace std;
+
+AliCommunicationThread::AliCommunicationThread(AliStorageClientThread *onlineReconstructionManager) :
+ fFinished(false),
+ fManager(onlineReconstructionManager),
+ fCommunicationThread(0)
+{
+ //create two-way communication thread
+ fCommunicationThread = new TThread("fCommunicationThread",Dispatch,(void*)this);
+ fCommunicationThread->Run();
+}
+
+AliCommunicationThread::~AliCommunicationThread()
+{
+ if(fCommunicationThread){delete fCommunicationThread;}
+}
+
+void AliCommunicationThread::Kill()
+{
+ if(fCommunicationThread)
+ {
+ fFinished=true;
+ fCommunicationThread->Join();
+ fCommunicationThread->Kill();
+ }
+}
+
+void AliCommunicationThread::CommunicationHandle()
+{
+ AliStorageEventManager *eventManager = AliStorageEventManager::GetEventManagerInstance();
+ storageSockets socket = CLIENT_COMMUNICATION_REP;
+ eventManager->CreateSocket(socket);
+
+ struct clientRequestStruct *request;
+ struct clientRequestStruct *response = new struct clientRequestStruct;
+
+ cout<<"CLIENT -- Communication stated"<<endl;
+
+ mutex mtx;
+
+ while(!fFinished)
+ {
+ cout<<"COMMUNICATION -- waiting for requests"<<endl;
+ request = eventManager->GetClientStruct(socket,5000);
+
+ if(request)
+ {
+ lock_guard<mutex> lock(mtx);
+ cout<<"COMMUNICATION -- received request"<<endl;
+ switch(request->messageType)
+ {
+ case REQUEST_CONNECTION:
+ eventManager->Send((long)fManager->fConnectionStatus,socket);
+ break;
+ case REQUEST_RECEIVING:
+ eventManager->Send((long)fManager->fReceivingStatus,socket);
+ break;
+ case REQUEST_SAVING:
+ eventManager->Send((long)fManager->fSavingStatus,socket);
+ break;
+ case REQUEST_CURRENT_SIZE:
+ eventManager->Send((long)fManager->fCurrentStorageSize,socket);
+ break;
+ case REQUEST_GET_PARAMS:
+ response->maxStorageSize = fManager->fMaximumStorageSize;
+ response->maxOccupation = fManager->fStorageOccupationLevel;
+ response->removeEvents = fManager->fRemoveEventsPercentage;
+ response->eventsInChunk = fManager->fNumberOfEventsInFile;
+
+ eventManager->Send(response,socket);
+ break;
+ case REQUEST_SET_PARAMS:
+ SetStorageParams(request->maxStorageSize,
+ request->maxOccupation,
+ request->removeEvents,
+ request->eventsInChunk);
+
+ fManager->fMaximumStorageSize = request->maxStorageSize;
+ fManager->fStorageOccupationLevel = request->maxOccupation;
+ fManager->fRemoveEventsPercentage = request->removeEvents;
+ fManager->fNumberOfEventsInFile = request->eventsInChunk;
+
+ eventManager->Send(true,socket);
+ break;
+ default:break;
+ }
+ delete request;
+ }
+ else{cout<<"COMMUNICATION -- received NO request"<<endl;}
+ }
+}
+
+void AliCommunicationThread::SetStorageParams(int maxStorageSize,int maxOccupation,int removeEvents,int eventsInChunk)
+{
+ cout<<maxStorageSize<<endl<<maxOccupation<<endl<<removeEvents<<endl<<eventsInChunk<<endl;
+
+ TThread::Lock();
+ ifstream configFile (GetConfigFilePath());
+ ofstream tmpFile("tmpFile.bla");
+
+ if (configFile.is_open())
+ {
+ string line;
+ string tmpLine;
+ int from,to;
+ while(configFile.good())
+ {
+ getline(configFile,line);
+ from = line.find("\"")+1;
+ to = line.find_last_of("\"");
+ tmpLine = line;
+ if(line.find("MAX_SIZE=")==0){
+ tmpLine = Form("MAX_SIZE=\"%d\"",maxStorageSize);
+ }
+ else if(line.find("MAX_OCCUPATION=")==0){
+ tmpLine = Form("MAX_OCCUPATION=\"%d\"",maxOccupation);
+ }
+ else if(line.find("REMOVE_PERCENT=")==0){
+ tmpLine = Form("REMOVE_PERCENT=\"%d\"",removeEvents);
+ }
+ else if(line.find("EVENTS_IN_FILE=")==0){
+ tmpLine = Form("EVENTS_IN_FILE=\"%d\"",eventsInChunk);
+ }
+ tmpLine += "\n";
+ tmpFile << tmpLine;
+ }
+ if(configFile.eof()){configFile.clear();}
+ configFile.close();
+ tmpFile.close();
+ rename("tmpFile.bla",GetConfigFilePath());
+ }
+ else{cout<<"CLIENT -- Unable to open config file"<<endl;}
+ TThread::UnLock();
+}
\ No newline at end of file
--- /dev/null
+#ifndef __AliCommunicationThread__
+#define __AliCommunicationThread__
+
+#include "AliStorageClientThread.h"
+
+#include <TThread.h>
+
+class AliStorageClientThread;
+
+class AliCommunicationThread
+{
+public:
+ AliCommunicationThread(AliStorageClientThread *onlineReconstructionManager);
+ ~AliCommunicationThread();
+
+ void Kill();
+private:
+ bool fFinished;
+ AliStorageClientThread *fManager;
+
+ static void* Dispatch(void *arg)
+ {
+ static_cast<AliCommunicationThread*>(arg)->CommunicationHandle();
+ return nullptr;
+ }
+ void CommunicationHandle();
+ TThread *fCommunicationThread;
+
+ void SetStorageParams(int maxStorageSize,int maxOccupation,int removeEvents,int eventsInChunk);
+};
+
+#endif /* defined(__AliCommunicationThread__) */
--- /dev/null
+#include "AliDIMListenerThread.h"
+
+#include <iostream>
+
+using namespace std;
+
+AliDIMListenerThread::AliDIMListenerThread()
+{
+ InitDIMListeners();
+
+#ifdef ALI_DATE
+ DimCurrentInfo SORrunNumber("/LOGBOOK/SUBSCRIBE/DAQ_SOR_PHYSICS_1",-1);
+ DimCurrentInfo EORrunNumber("/LOGBOOK/SUBSCRIBE/DAQ_EOR_PHYSICS_1",-1);
+
+ if(SORrunNumber.getData() && EORrunNumber.getData())
+ {
+ cout<<"DIM Listener -- current SOR signal:"<<SORrunNumber.getInt()<<endl;
+ cout<<"DIM Listener -- current EOR signal:"<<EORrunNumber.getInt()<<endl;
+
+ if(SORrunNumber.getInt() != EORrunNumber.getInt()){StartOfRun(SORrunNumber.getInt());}
+ }
+ else{cout<<"DIM Listener -- no data received from dim server"<<endl;}
+#endif
+}
+
+AliDIMListenerThread::~AliDIMListenerThread()
+{
+ for (int i = 0; i < 5; ++i){
+ if(fDimSORListener[i]) delete fDimSORListener[i];
+ if(fDimEORListener[i]) delete fDimEORListener[i];
+
+ fDimSORListener[i] = 0;
+ fDimEORListener[i] = 0;
+ }
+}
+
+void AliDIMListenerThread::InitDIMListeners()
+{
+ for (int i = 0; i < 5; ++i)
+ {
+#ifdef ALI_DATE
+ if (i == 0)
+ {
+ fDimSORListener[i] = new AliDimIntNotifier("/LOGBOOK/SUBSCRIBE/DAQ_SOR_PHYSICS");
+ fDimEORListener[i] = new AliDimIntNotifier("/LOGBOOK/SUBSCRIBE/DAQ_EOR_PHYSICS");
+ }
+ else
+ {
+ fDimSORListener[i] = new AliDimIntNotifier(Form("/LOGBOOK/SUBSCRIBE/DAQ_SOR_PHYSICS_%d", i));
+ fDimEORListener[i] = new AliDimIntNotifier(Form("/LOGBOOK/SUBSCRIBE/DAQ_EOR_PHYSICS_%d", i));
+ }
+
+ fDimSORListener[i]->Connect("DimMessage(int)", "AliDIMListenerThread", this, "StartOfRun(int)");
+ fDimEORListener[i]->Connect("DimMessage(int)", "AliDIMListenerThread", this, "EndOfRun(int)");
+#else
+ fDimSORListener[i]=0x0;
+ fDimEORListener[i]=0x0;
+#endif
+ }
+
+}
+
+void AliDIMListenerThread::StartOfRun(int run)
+{
+ cout<<"DIM Listener -- SOR signal received for run:"<<run<<endl;
+}
+
+void AliDIMListenerThread::EndOfRun(int run)
+{
+ cout<<"DIM Listener -- EOR signal received for run:"<<run<<endl;
+}
\ No newline at end of file
--- /dev/null
+#ifndef __AliDIMListenerThread__
+#define __AliDIMListenerThread__
+
+#ifdef ALI_DATE
+#include <dic.hxx>
+#endif
+
+class AliDimIntNotifier;
+
+class AliDIMListenerThread
+{
+public:
+ AliDIMListenerThread();
+ ~AliDIMListenerThread();
+
+ void StartOfRun(int run);
+ void EndOfRun(int run);
+
+private:
+ void InitDIMListeners();
+
+ AliDimIntNotifier *fDimSORListener[5];
+ AliDimIntNotifier *fDimEORListener[5];
+};
+
+#endif /* defined(__AliDIMListenerThread__) */
--- /dev/null
+#include "AliEventsCollectorThread.h"
+#include "AliStorageEventManager.h"
+
+#include <TSystemDirectory.h>
+
+#include <iostream>
+#include <fstream>
+
+using namespace std;
+
+AliEventsCollectorThread::AliEventsCollectorThread(AliStorageClientThread *onlineReconstructionManager) :
+fManager(onlineReconstructionManager),
+fCollectorThread(0),
+fCurrentFile(0),
+fDatabase(0),
+fFinished(false)
+{
+ fDatabase = new AliStorageDatabase();
+
+ CheckCurrentStorageSize();
+
+ // start collecting events in a thread
+ fCollectorThread = new TThread("fCollectorThread",Dispatch,(void*)this);
+ fCollectorThread->Run();
+}
+
+
+AliEventsCollectorThread::~AliEventsCollectorThread()
+{
+ if(fCollectorThread){delete fCollectorThread;}
+
+ if(fCurrentFile){
+ fCurrentFile->Close();
+ delete fCurrentFile;
+ }
+ if(fDatabase){delete fDatabase;}
+ if(fManager){delete fManager;}
+}
+
+void AliEventsCollectorThread::Kill()
+{
+ if(fCollectorThread){
+ fFinished=true;
+ fCollectorThread->Join();
+ fCollectorThread->Kill();
+ }
+}
+
+void AliEventsCollectorThread::CollectorHandle()
+{
+ AliStorageEventManager *eventManager = AliStorageEventManager::GetEventManagerInstance();
+ if(eventManager->CreateSocket(EVENTS_SERVER_SUB)){fManager->fConnectionStatus=STATUS_OK;}
+ else{fManager->fConnectionStatus=STATUS_ERROR;}
+
+ int chunkNumber=0;
+ int previousChunkNumber=-1;
+ int eventsInChunk=0;
+ int previousRunNumber=-1;
+ AliESDEvent *event = NULL;
+ vector<struct eventStruct> eventsToUpdate;
+ struct eventStruct currentEvent;
+
+ while(!fFinished)
+ {
+ cout<<"CLIENT -- waiting for event..."<<endl;
+ event = eventManager->GetEvent(EVENTS_SERVER_SUB,5000);
+
+ if(event)
+ {
+ cout<<"CLIENT -- received event"<<endl;
+ fManager->fReceivingStatus=STATUS_OK;
+
+ if(event->GetRunNumber() != previousRunNumber)//when new run starts
+ {
+ cout<<"CLIENT -- new run started"<<endl;
+ previousRunNumber = event->GetRunNumber();
+ gSystem->Exec(Form("mkdir -p %s/run%d",fManager->fStoragePath.c_str(),event->GetRunNumber()));
+ chunkNumber=0;
+ eventsInChunk=0;
+
+ TSystemDirectory dir(Form("%s/run%d",fManager->fStoragePath.c_str(),event->GetRunNumber()),
+ Form("%s/run%d",fManager->fStoragePath.c_str(),event->GetRunNumber()));
+ TList *files = dir.GetListOfFiles();
+ if (files)
+ {
+ TSystemFile *file;
+ string fname;
+ TIter next(files);
+
+ while ((file=(TSystemFile*)next()))
+ {
+ fname = file->GetName();
+
+ if (!file->IsDirectory())
+ {
+ int from = fname.find("chunk")+5;
+ int to = fname.find(".root");
+
+ int maxChunkNumber = atoi(fname.substr(from,to-from).c_str());
+
+ if(maxChunkNumber > chunkNumber)
+ {
+ chunkNumber = maxChunkNumber;
+ }
+ }
+ }
+ chunkNumber++;
+ }
+ }
+
+ cout<<"CLIENT -- Received data. Event:"<<event->GetEventNumberInFile()<<"\trun:"<<event->GetRunNumber()<<endl;
+
+ if(chunkNumber != previousChunkNumber)//when new chunk needs to be created
+ {
+ if(fCurrentFile)
+ {
+ fCurrentFile->Close();
+ delete fCurrentFile;
+ fCurrentFile=0;
+ }
+ for(unsigned int i=0;i<eventsToUpdate.size();i++)
+ {
+ fDatabase->UpdateEventPath(eventsToUpdate[i],Form("%s/run%d/chunk%d.root",
+ fManager->fStoragePath.c_str(),
+ event->GetRunNumber(),
+ chunkNumber-1));
+ }
+ eventsToUpdate.clear();
+
+ CheckCurrentStorageSize();
+
+ fCurrentFile = new TFile(Form("%s/run%d/chunk%d.root", fManager->fStoragePath.c_str(),event->GetRunNumber(),chunkNumber),"recreate");
+
+ previousChunkNumber = chunkNumber;
+ }
+
+ //create new directory for this run
+ TDirectory *currentRun;
+ if((currentRun = fCurrentFile->mkdir(Form("run%d",event->GetRunNumber()))))
+ {
+ cout<<"CLIENT -- creating new directory for this run"<<endl;
+ currentRun->cd();
+ }
+ else
+ {
+ cout<<"CLIENT -- opening existing directory for this run"<<endl;
+ fCurrentFile->cd(Form("run%d",event->GetRunNumber()));
+ }
+
+ if(0 != event->Write(Form("event%d",event->GetEventNumberInFile())))
+ //fCurrentFile->WriteObject(event,Form("event%d",event->GetEventNumberInFile())))//if event was written to file
+ {
+ eventsInChunk++;
+
+ if(eventsInChunk == fManager->fNumberOfEventsInFile)//if max events number in file was reached
+ {
+ chunkNumber++;
+ eventsInChunk=0;
+ }
+
+ if(fManager->fSavingStatus!=STATUS_OK){fManager->fSavingStatus=STATUS_OK;}
+ }
+ else if(fManager->fSavingStatus!=STATUS_ERROR){fManager->fSavingStatus=STATUS_ERROR;}
+
+ // save to event file as well:
+ TFile *eventFile = new TFile(Form("%s/run%d/event%d.root", fManager->fStoragePath.c_str(),event->GetRunNumber(),eventsInChunk),"recreate");
+
+ if((currentRun = eventFile->mkdir(Form("run%d",event->GetRunNumber()))))
+ {
+ cout<<"CLIENT -- creating new directory for this run"<<endl;
+ currentRun->cd();
+ }
+ else
+ {
+ cout<<"CLIENT -- opening existing directory for this run"<<endl;
+ eventFile->cd(Form("run%d",event->GetRunNumber()));
+ }
+
+ if(0 == event->Write(Form("event%d",event->GetEventNumberInFile())) &&
+ fManager->fSavingStatus!=STATUS_ERROR)
+ {
+ fManager->fSavingStatus=STATUS_ERROR;
+ }
+ else
+ {
+ eventFile->Close();
+ delete eventFile;
+ fDatabase->InsertEvent(event->GetRunNumber(),
+ event->GetEventNumberInFile(),
+ (char*)event->GetBeamType(),
+ event->GetMultiplicity()->GetNumberOfTracklets(),
+ Form("%s/run%d/event%d.root",fManager->fStoragePath.c_str(),
+ event->GetRunNumber(),
+ eventsInChunk));
+
+ currentEvent.runNumber = event->GetRunNumber();
+ currentEvent.eventNumber = event->GetEventNumberInFile();
+ eventsToUpdate.push_back(currentEvent);
+ }
+ delete event;event=0;
+ }
+ else
+ {
+ cout<<"CLIENT -- ERROR -- NO DATA!"<<endl;
+ if(fManager->fReceivingStatus!=STATUS_ERROR){fManager->fReceivingStatus=STATUS_ERROR;}
+ }
+ }
+ if(event){delete event;}
+}
+
+
+Long64_t AliEventsCollectorThread::GetSizeOfAllChunks()
+{
+ Long64_t totalStorageSize = 0;
+
+ TSystemDirectory dir(fManager->fStoragePath.c_str(),fManager->fStoragePath.c_str());
+ TList *listOfDirectories = dir.GetListOfFiles();
+
+ if (!listOfDirectories){
+ cout<<"CLIENT -- Storage directory is empty"<<endl;
+ return 0;
+ }
+ TIter nextDirectory(listOfDirectories);
+ TSystemFile *runDirectory;
+ string directoryName;
+
+ while ((runDirectory=(TSystemFile*)nextDirectory()))
+ {
+ directoryName=runDirectory->GetName();
+ if (runDirectory->IsDirectory() && directoryName.find("run")==0)
+ {
+ TSystemDirectory dirChunks(Form("%s/%s",fManager->fStoragePath.c_str(),directoryName.c_str()),Form("%s/%s",fManager->fStoragePath.c_str(),directoryName.c_str()));
+ TList *listOfChunks = dirChunks.GetListOfFiles();
+
+ if(listOfChunks)
+ {
+ TIter nextChunk(listOfChunks);
+ TSystemFile *chunk;
+ string chunkFileName;
+
+ while((chunk=(TSystemFile*)nextChunk()))
+ {
+ chunkFileName = chunk->GetName();
+ if(!chunk->IsDirectory() && chunkFileName.find("chunk")==0)
+ {
+ TFile *tmpFile = new TFile(Form("%s/%s/%s",fManager->fStoragePath.c_str(),directoryName.c_str(),chunkFileName.c_str()),"read");
+ if(tmpFile)
+ {
+ totalStorageSize+=tmpFile->GetSize();
+ tmpFile->Close();
+ delete tmpFile;
+ }
+ }
+ }
+ if(chunk){delete chunk;}
+ }
+ if(listOfChunks){delete listOfChunks;}
+ }
+ }
+
+ if(listOfDirectories){delete listOfDirectories;}
+ if(runDirectory){delete runDirectory;}
+
+ printf("CLIENT -- Total storage size:%lld\t(%.2f MB)\n",totalStorageSize,(float)totalStorageSize/(1000.*1000.));
+
+ return totalStorageSize;
+}
+
+void AliEventsCollectorThread::CheckCurrentStorageSize()
+{
+ fManager->fCurrentStorageSize=GetSizeOfAllChunks();
+
+ if(fManager->fCurrentStorageSize > (float)fManager->fStorageOccupationLevel/100. * fManager->fMaximumStorageSize)
+ {
+ while(GetSizeOfAllChunks() > (float)fManager->fRemoveEventsPercentage/100. * fManager->fMaximumStorageSize)
+ {
+ struct eventStruct oldestEvent = fDatabase->GetOldestEvent();
+ string oldestEventPath = fDatabase->GetFilePath(oldestEvent);
+
+ //remove oldest event
+ cout<<"CLIENT -- Removing old events:"<<oldestEventPath<<endl;
+ gSystem->Exec(Form("rm -f %s",oldestEventPath.c_str()));
+ fDatabase->RemoveEventsWithPath(oldestEventPath);
+ }
+ }
+}
+
+
+
--- /dev/null
+#ifndef __AliEventsCollectorThread__
+#define __AliEventsCollectorThread__
+
+#include "AliStorageDatabase.h"
+#include "AliStorageClientThread.h"
+
+#include <TThread.h>
+#include <TFile.h>
+
+class AliStorageClientThread;
+
+class AliEventsCollectorThread
+{
+public:
+ AliEventsCollectorThread(AliStorageClientThread *onlineReconstructionManager);
+ ~AliEventsCollectorThread();
+
+ void Kill();
+private:
+ AliStorageClientThread *fManager;
+
+ static void* Dispatch(void *arg)
+ {
+ static_cast<AliEventsCollectorThread*>(arg)->CollectorHandle();
+ return nullptr;
+ }
+ void CollectorHandle();
+ TThread *fCollectorThread;
+
+ TFile *fCurrentFile;
+ AliStorageDatabase *fDatabase;
+ void CheckCurrentStorageSize();
+ Long64_t GetSizeOfAllChunks();
+
+ bool fFinished;
+};
+
+
+#endif /* defined(__AliEventsCollectorThread__) */
#include "AliStorageClientThread.h"
-#include "AliMultiplicity.h"
-#include "AliStorageTypes.h"
-#include "AliStorageEventManager.h"
-#include <sstream>
+//#include <sstream>
#include <signal.h>
#include <fstream>
#include <iostream>
-#include <TSystemDirectory.h>
-#include <TThread.h>
-#include <TFile.h>
-
using namespace std;
-bool gClientQuit = false; // signal flag
+bool gClientQuit = false;
void GotSignalClient(int){gClientQuit = true;}
AliStorageClientThread::AliStorageClientThread() :
+fDIMListenerThread(0),
+fEventsCollectorThread(0),
+fCommunicationThread(0),
fConnectionStatus(STATUS_WAITING),
fReceivingStatus(STATUS_WAITING),
fSavingStatus(STATUS_WAITING),
-fCommunicationThread(0),
-fCurrentFile(0),
-fDatabase(0),
fCurrentStorageSize(0),
fMaximumStorageSize(0),
fStoragePath(""),
fNumberOfEventsInFile(0),
fStorageOccupationLevel(0),
fRemoveEventsPercentage(0)
-{
- // make sure that when program is closed destructor will be called
- struct sigaction sa;
- memset(&sa,0,sizeof(sa));
- sa.sa_handler = GotSignalClient;
- sigfillset(&sa.sa_mask);
- sigaction(SIGINT,&sa,NULL);
-
- //load storage parameters from file
- TThread::Lock();
- ifstream configFile (GetConfigFilePath());
- if (configFile.is_open())
- {
- string line;
- int from,to;
- while(configFile.good())
- {
- getline(configFile,line);
- from = line.find("\"")+1;
- to = line.find_last_of("\"");
- if(line.find("STORAGE_PATH=")==0)
- {
- fStoragePath=line.substr(from,to-from);
- }
- else if(line.find("MAX_SIZE=")==0)
- {
- fMaximumStorageSize=atoi(line.substr(from,to-from).c_str());
- }
- else if(line.find("MAX_OCCUPATION=")==0)
- {
- fStorageOccupationLevel=atoi(line.substr(from,to-from).c_str());
- }
- else if(line.find("REMOVE_PERCENT=")==0)
- {
- fRemoveEventsPercentage=atoi(line.substr(from,to-from).c_str());
- }
- else if(line.find("EVENTS_IN_FILE=")==0)
- {
- fNumberOfEventsInFile=atoi(line.substr(from,to-from).c_str());
- }
- }
- if(configFile.eof())
- {
- configFile.clear();
- }
- configFile.close();
- }
- else
- {
- cout<<"CLIENT -- Unable to open config file"<<endl;
- }
- //create directory for storage if it doesn't exist
- gSystem->Exec(Form("mkdir -p %s",fStoragePath.c_str()));
-
- //create database class
- fDatabase = new AliStorageDatabase();
- TThread::UnLock();
-
- //check current storage size
- fCurrentStorageSize = GetSizeOfAllChunks();
-
- //create two-way commynication thread
- fCommunicationThread = new TThread("fCommunicationThread",
- Dispatch,(void*)this);
- fCommunicationThread->Run();
-}
-
-AliStorageClientThread::~AliStorageClientThread()
-{
- cout<<"CLIENT -- AliStorageClientThread destructor called";
- if(fCurrentFile)
- {
- fCurrentFile->Close();
- delete fCurrentFile;
- }
- if(fCommunicationThread){delete fCommunicationThread;}
- if(fDatabase){delete fDatabase;}
- cout<<" --- OK"<<endl;
-}
-
-void AliStorageClientThread::CommunicationHandle()
-{
- AliStorageEventManager *eventManager = AliStorageEventManager::GetEventManagerInstance();
- storageSockets socket = CLIENT_COMMUNICATION_REP;
- eventManager->CreateSocket(socket);
-
- struct clientRequestStruct *request;
- struct clientRequestStruct *response = new struct clientRequestStruct;
-
- cout<<"CLIENT -- Communication stated"<<endl;
-
- while(!gClientQuit)
- {
- request = eventManager->GetClientStruct(socket);
- switch(request->messageType)
- {
- case REQUEST_CONNECTION:
- eventManager->Send((long)fConnectionStatus,socket);
- break;
- case REQUEST_RECEIVING:
- eventManager->Send((long)fReceivingStatus,socket);
- break;
- case REQUEST_SAVING:
- eventManager->Send((long)fSavingStatus,socket);
- break;
- case REQUEST_CURRENT_SIZE:
- eventManager->Send((long)fCurrentStorageSize,socket);
- break;
- case REQUEST_GET_PARAMS:
- response->maxStorageSize = fMaximumStorageSize;
- response->maxOccupation = fStorageOccupationLevel;
- response->removeEvents = fRemoveEventsPercentage;
- response->eventsInChunk = fNumberOfEventsInFile;
-
- eventManager->Send(response,socket);
- break;
- case REQUEST_SET_PARAMS:
- SetStorageParams(request->maxStorageSize,
- request->maxOccupation,
- request->removeEvents,
- request->eventsInChunk);
-
- fMaximumStorageSize = request->maxStorageSize;
- fStorageOccupationLevel = request->maxOccupation;
- fRemoveEventsPercentage = request->removeEvents;
- fNumberOfEventsInFile = request->eventsInChunk;
-
- eventManager->Send(true,socket);
- break;
- default:break;
- }
- delete request;
- }
-}
-
-void AliStorageClientThread::SetStorageParams(int maxStorageSize,int maxOccupation,int removeEvents,int eventsInChunk)
-{
- cout<<maxStorageSize<<endl<<maxOccupation<<endl<<removeEvents<<endl<<eventsInChunk<<endl;
-
-
- TThread::Lock();
- ifstream configFile (GetConfigFilePath());
- ofstream tmpFile("tmpFile.bla");
-
- if (configFile.is_open())
- {
- string line;
- string tmpLine;
- int from,to;
- while(configFile.good())
- {
- getline(configFile,line);
- from = line.find("\"")+1;
- to = line.find_last_of("\"");
- tmpLine = line;
- if(line.find("MAX_SIZE=")==0)
- {
- tmpLine = Form("MAX_SIZE=\"%d\"",maxStorageSize);
- }
- else if(line.find("MAX_OCCUPATION=")==0)
- {
- tmpLine = Form("MAX_OCCUPATION=\"%d\"",maxOccupation);
- }
- else if(line.find("REMOVE_PERCENT=")==0)
- {
- tmpLine = Form("REMOVE_PERCENT=\"%d\"",removeEvents);
- }
- else if(line.find("EVENTS_IN_FILE=")==0)
- {
- tmpLine = Form("EVENTS_IN_FILE=\"%d\"",eventsInChunk);
- }
- tmpLine += "\n";
- tmpFile << tmpLine;
- }
- if(configFile.eof())
- {
- configFile.clear();
- }
- configFile.close();
- tmpFile.close();
- rename("tmpFile.bla",GetConfigFilePath());
- }
- else
- {
- cout<<"CLIENT -- Unable to open config file"<<endl;
- }
- TThread::UnLock();
-}
-
-Long64_t AliStorageClientThread::GetSizeOfAllChunks()
-{
- Long64_t totalStorageSize = 0;
-
- TSystemDirectory dir(fStoragePath.c_str(),fStoragePath.c_str());
- TList *listOfDirectories = dir.GetListOfFiles();
-
- if (!listOfDirectories)
- {
- cout<<"CLIENT -- Storage directory is empty"<<endl;
- return 0;
- }
- TIter nextDirectory(listOfDirectories);
- TSystemFile *runDirectory;
- string directoryName;
-
- while ((runDirectory=(TSystemFile*)nextDirectory()))
- {
- directoryName=runDirectory->GetName();
- if (runDirectory->IsDirectory() && directoryName.find("run")==0)
- {
- TSystemDirectory dirChunks(Form("%s/%s",fStoragePath.c_str(),directoryName.c_str()),Form("%s/%s",fStoragePath.c_str(),directoryName.c_str()));
- TList *listOfChunks = dirChunks.GetListOfFiles();
-
- if(listOfChunks)
- {
- TIter nextChunk(listOfChunks);
- TSystemFile *chunk;
- string chunkFileName;
-
- while((chunk=(TSystemFile*)nextChunk()))
- {
- chunkFileName = chunk->GetName();
- if(!chunk->IsDirectory() && chunkFileName.find("chunk")==0)
- {
- TFile *tmpFile = new TFile(Form("%s/%s/%s",fStoragePath.c_str(),directoryName.c_str(),chunkFileName.c_str()),"read");
- if(tmpFile)
- {
- totalStorageSize+=tmpFile->GetSize();
- tmpFile->Close();
- delete tmpFile;
- }
- }
- }
- if(chunk){delete chunk;}
- }
- if(listOfChunks){delete listOfChunks;}
- }
- }
-
- //tmpFiles.clear();
- if(listOfDirectories){delete listOfDirectories;}
- if(runDirectory){delete runDirectory;}
-
- printf("CLIENT -- Total storage size:%lld\t(%.2f MB)\n",totalStorageSize,(float)totalStorageSize/(1000.*1000.));
-
- return totalStorageSize;
-}
-
-void AliStorageClientThread::CollectData()
{
- AliStorageEventManager *eventManager = AliStorageEventManager::GetEventManagerInstance();
- if(eventManager->CreateSocket(EVENTS_SERVER_SUB)){fConnectionStatus=STATUS_OK;}
- else{fConnectionStatus=STATUS_ERROR;}
-
- int chunkNumber=0;
- int previousChunkNumber=-1;
- int eventsInChunk=0;
- int previousRunNumber=-1;
- AliESDEvent *event = NULL;
- vector<struct eventStruct> eventsToUpdate;
- struct eventStruct currentEvent;
-
-
- while(!gClientQuit)
- {
- event = eventManager->GetEvent(EVENTS_SERVER_SUB);
-
- if(event)
- {
- fReceivingStatus=STATUS_OK;
-
- if(event->GetRunNumber() != previousRunNumber)//when new run starts
- {
- cout<<"CLIENT -- new run started"<<endl;
- previousRunNumber = event->GetRunNumber();
- gSystem->Exec(Form("mkdir -p %s/run%d",fStoragePath.c_str(),event->GetRunNumber()));
- chunkNumber=0;
- eventsInChunk=0;
-
- TSystemDirectory dir(Form("%s/run%d",fStoragePath.c_str(),event->GetRunNumber()),
- Form("%s/run%d",fStoragePath.c_str(),event->GetRunNumber()));
- TList *files = dir.GetListOfFiles();
- if (files)
- {
- TSystemFile *file;
- string fname;
- TIter next(files);
-
- while ((file=(TSystemFile*)next()))
- {
- fname = file->GetName();
-
- if (!file->IsDirectory())
- {
- int from = fname.find("chunk")+5;
- int to = fname.find(".root");
-
- int maxChunkNumber = atoi(fname.substr(from,to-from).c_str());
-
- if(maxChunkNumber > chunkNumber)
- {
- chunkNumber = maxChunkNumber;
- }
- }
- }
- chunkNumber++;
- }
- }
-
- cout<<"CLIENT -- Received data. Event:"<<event->GetEventNumberInFile()<<"\trun:"<<event->GetRunNumber()<<endl;
-
- if(chunkNumber != previousChunkNumber)//when new chunk needs to be created
- {
- if(fCurrentFile)
- {
- fCurrentFile->Close();
- delete fCurrentFile;
- fCurrentFile=0;
- }
- for(unsigned int i=0;i<eventsToUpdate.size();i++)
- {
- fDatabase->UpdateEventPath(eventsToUpdate[i],
- Form("%s/run%d/chunk%d.root",
- fStoragePath.c_str(),
- event->GetRunNumber(),
- chunkNumber-1));
- }
- eventsToUpdate.clear();
-
-
- fCurrentStorageSize=GetSizeOfAllChunks();
- CheckCurrentStorageSize();
-
- fCurrentFile = new TFile(Form("%s/run%d/chunk%d.root", fStoragePath.c_str(),event->GetRunNumber(),chunkNumber),"recreate");
-
- previousChunkNumber = chunkNumber;
- }
-
- //create new directory for this run
- TDirectory *currentRun;
- if((currentRun = fCurrentFile->mkdir(Form("run%d",event->GetRunNumber()))))
- {
- cout<<"CLIENT -- creating new directory for this run"<<endl;
- currentRun->cd();
- }
- else
- {
- cout<<"CLIENT -- opening existing directory for this run"<<endl;
- fCurrentFile->cd(Form("run%d",event->GetRunNumber()));
- }
-
- if(0 != event->Write(Form("event%d",event->GetEventNumberInFile())))
- //fCurrentFile->WriteObject(event,Form("event%d",event->GetEventNumberInFile())))//if event was written to file
- {
- eventsInChunk++;
-
- if(eventsInChunk == fNumberOfEventsInFile)//if max events number in file was reached
- {
- chunkNumber++;
- eventsInChunk=0;
- }
-
- if(fSavingStatus!=STATUS_OK)
- {
- fSavingStatus=STATUS_OK;
- }
- }
- else if(fSavingStatus!=STATUS_ERROR)
- {
- fSavingStatus=STATUS_ERROR;
- }
-
- // save to event file as well:
-
- TFile *eventFile = new TFile(Form("%s/run%d/event%d.root", fStoragePath.c_str(),event->GetRunNumber(),eventsInChunk),"recreate");
-
- if((currentRun = eventFile->mkdir(Form("run%d",event->GetRunNumber()))))
- {
- cout<<"CLIENT -- creating new directory for this run"<<endl;
- currentRun->cd();
- }
- else
- {
- cout<<"CLIENT -- opening existing directory for this run"<<endl;
- eventFile->cd(Form("run%d",event->GetRunNumber()));
- }
-
- if(0 == event->Write(Form("event%d",event->GetEventNumberInFile())) &&
- fSavingStatus!=STATUS_ERROR){fSavingStatus=STATUS_ERROR;}
- else
- {
- eventFile->Close();
- delete eventFile;
- fDatabase->InsertEvent(event->GetRunNumber(),
- event->GetEventNumberInFile(),
- (char*)event->GetBeamType(),
- event->GetMultiplicity()->GetNumberOfTracklets(),
- Form("%s/run%d/event%d.root",fStoragePath.c_str(),
- event->GetRunNumber(),
- eventsInChunk));
-
- currentEvent.runNumber = event->GetRunNumber();
- currentEvent.eventNumber = event->GetEventNumberInFile();
- eventsToUpdate.push_back(currentEvent);
- }
- delete event;event=0;
- //delete tree;
- }
- else if(fReceivingStatus!=STATUS_ERROR)
- {
- cout<<"CLIENT -- ERROR -- NO DATA!"<<endl;
- fReceivingStatus=STATUS_ERROR;
- }
- }
- if(event){delete event;}
+ // make sure that when program is closed destructor will be called
+ struct sigaction sa;
+ memset(&sa,0,sizeof(sa));
+ sa.sa_handler = GotSignalClient;
+ sigfillset(&sa.sa_mask);
+ sigaction(SIGINT,&sa,NULL);
+
+ ifstream configFile (GetConfigFilePath());
+ if (configFile.is_open())
+ {
+ string line;
+ int from,to;
+ while(configFile.good())
+ {
+ getline(configFile,line);
+ from = line.find("\"")+1;
+ to = line.find_last_of("\"");
+ if(line.find("STORAGE_PATH=")==0){
+ fStoragePath=line.substr(from,to-from);
+ }
+ else if(line.find("MAX_SIZE=")==0){
+ fMaximumStorageSize=atoi(line.substr(from,to-from).c_str());
+ }
+ else if(line.find("MAX_OCCUPATION=")==0){
+ fStorageOccupationLevel=atoi(line.substr(from,to-from).c_str());
+ }
+ else if(line.find("REMOVE_PERCENT=")==0){
+ fRemoveEventsPercentage=atoi(line.substr(from,to-from).c_str());
+ }
+ else if(line.find("EVENTS_IN_FILE=")==0){
+ fNumberOfEventsInFile=atoi(line.substr(from,to-from).c_str());
+ }
+ }
+ if(configFile.eof()){configFile.clear();}
+ configFile.close();
+ }
+ else{cout<<"CLIENT -- Unable to open config file"<<endl;}
+
+ //create directory for storage if it doesn't exist
+ gSystem->Exec(Form("mkdir -p %s",fStoragePath.c_str()));
+
+ fDIMListenerThread = new AliDIMListenerThread();
+ fEventsCollectorThread = new AliEventsCollectorThread(this);
+ fCommunicationThread = new AliCommunicationThread(this);
}
-
-void AliStorageClientThread::CheckCurrentStorageSize()
+AliStorageClientThread::~AliStorageClientThread()
{
- if(fCurrentStorageSize > (float)fStorageOccupationLevel/100. * fMaximumStorageSize)
- {
- while(GetSizeOfAllChunks() > (float)fRemoveEventsPercentage/100. * fMaximumStorageSize)
- {
- struct eventStruct oldestEvent = fDatabase->GetOldestEvent();
- string oldestEventPath = fDatabase->GetFilePath(oldestEvent);
- //remove oldest event
- cout<<"CLIENT -- Removing old events:"<<oldestEventPath<<endl;
- gSystem->Exec(Form("rm -f %s",oldestEventPath.c_str()));
- fDatabase->RemoveEventsWithPath(oldestEventPath);
-// fDatabase->RemoveEvent(oldestEvent);
- }
- }
-}
+ while(!gClientQuit){sleep(1);}
+ fEventsCollectorThread->Kill();
+ fCommunicationThread->Kill();
+}
\ No newline at end of file
#ifndef AliStorageClientThread_H
#define AliStorageClientThread_H
-#include "AliStorageDatabase.h"
+#include "AliStorageTypes.h"
+
+#include "AliDIMListenerThread.h"
+#include "AliEventsCollectorThread.h"
+#include "AliCommunicationThread.h"
#include <string>
-#include <TThread.h>
+class AliCommunicationThread;
+class AliEventsCollectorThread;
class AliStorageClientThread
{
+ friend class AliEventsCollectorThread;
+ friend class AliCommunicationThread;
+
public:
AliStorageClientThread();
~AliStorageClientThread();
- void CollectData();
-
private:
- //status flags
- Int_t fConnectionStatus;
- Int_t fReceivingStatus;
- Int_t fSavingStatus;
-
- //communication with admin panel
- static void* Dispatch(void *arg){static_cast<AliStorageClientThread*>(arg)->CommunicationHandle();}
- void CommunicationHandle();
- TThread *fCommunicationThread;
-
- //storage file system
- void CheckCurrentStorageSize();
- void SetStorageParams(int maxStorageSize,
- int maxOccupation,
- int removeEvents,
- int eventsInChunk);
- TFile *fCurrentFile;
-
- AliStorageDatabase *fDatabase;
- int fCurrentStorageSize;
- int fMaximumStorageSize;
- std::string fStoragePath;
- int fNumberOfEventsInFile;
- int fStorageOccupationLevel;
- int fRemoveEventsPercentage;
-
- Long64_t GetSizeOfAllChunks();
-
+ AliDIMListenerThread *fDIMListenerThread;
+ AliEventsCollectorThread *fEventsCollectorThread;
+ AliCommunicationThread *fCommunicationThread;
+
AliStorageClientThread(const AliStorageClientThread&);
AliStorageClientThread& operator=(const AliStorageClientThread&);
-
+
+protected:
+ // status flags
+ Int_t fConnectionStatus;
+ Int_t fReceivingStatus;
+ Int_t fSavingStatus;
+
+ // storage parameters
+ int fCurrentStorageSize;
+ int fMaximumStorageSize;
+ std::string fStoragePath;
+ int fNumberOfEventsInFile;
+ int fStorageOccupationLevel;
+ int fRemoveEventsPercentage;
};
-#endif
+#endif
\ No newline at end of file
return request;
}
-struct clientRequestStruct* AliStorageEventManager::GetClientStruct(storageSockets socket)
+struct clientRequestStruct* AliStorageEventManager::GetClientStruct(storageSockets socket,int timeout)
{
+ pollitem_t items[1] = {{*fSockets[socket],0,ZMQ_POLLIN,0}} ;
+ if(timeout>=0){if(poll (&items[0], 1, timeout)==0){return NULL;}}
+
struct clientRequestStruct *request = new struct clientRequestStruct;
message_t *requestMessage = new message_t();
try{
std::vector<serverListStruct> GetServerListVector(storageSockets socket);
AliESDEvent* GetEvent(storageSockets socket,int timeout=-1);
struct serverRequestStruct* GetServerStruct(storageSockets socket);
- struct clientRequestStruct* GetClientStruct(storageSockets socket);
+ struct clientRequestStruct* GetClientStruct(storageSockets socket,int timeout=-1);
long GetLong(storageSockets socket);
bool GetBool(storageSockets socket);
#include <TSystem.h>
-inline const char* GetConfigFilePath()
-{
- return Form("%s/MONITOR/alistoragemanager/setupStorageDatabase.sh",
- gSystem->Getenv("ALICE_ROOT"));
+inline const char* GetConfigFilePath(){
+ return Form("%s/MONITOR/alistoragemanager/setupStorageDatabase.sh",gSystem->Getenv("ALICE_ROOT"));
}
enum storageSockets{
#pragma link C++ class AliStorageDatabase+;
#pragma link C++ class AliStorageClientThread+;
+#pragma link C++ class AliDIMListenerThread+;
+#pragma link C++ class AliEventsCollectorThread+;
+#pragma link C++ class AliCommunicationThread+;
#pragma link C++ class AliStorageServerThread+;
#pragma link C++ class AliStorageEventManager+;
#pragma link C++ class AliStorageAdministratorPanel+;
if(client)
{
- client->CollectData();
+// client->CollectData();
}
else
{