1 #include "AliStorageClientThread.h"
2 #include "AliMultiplicity.h"
3 #include "AliStorageTypes.h"
4 #include "AliStorageEventManager.h"
11 #include <TSystemDirectory.h>
17 bool gClientQuit = false; // signal flag
18 void GotSignalClient(int){gClientQuit = true;}
20 AliStorageClientThread::AliStorageClientThread() :
21 fConnectionStatus(STATUS_WAITING),
22 fReceivingStatus(STATUS_WAITING),
23 fSavingStatus(STATUS_WAITING),
24 fCommunicationThread(0),
27 fCurrentStorageSize(0),
28 fMaximumStorageSize(0),
30 fNumberOfEventsInFile(0),
31 fStorageOccupationLevel(0),
32 fRemoveEventsPercentage(0)
34 // make sure that when program is closed destructor will be called
36 memset(&sa,0,sizeof(sa));
37 sa.sa_handler = GotSignalClient;
38 sigfillset(&sa.sa_mask);
39 sigaction(SIGINT,&sa,NULL);
41 //load storage parameters from file
43 ifstream configFile (GetConfigFilePath());
44 if (configFile.is_open())
48 while(configFile.good())
50 getline(configFile,line);
51 from = line.find("\"")+1;
52 to = line.find_last_of("\"");
53 if(line.find("STORAGE_PATH=")==0)
55 fStoragePath=line.substr(from,to-from);
57 else if(line.find("MAX_SIZE=")==0)
59 fMaximumStorageSize=atoi(line.substr(from,to-from).c_str());
61 else if(line.find("MAX_OCCUPATION=")==0)
63 fStorageOccupationLevel=atoi(line.substr(from,to-from).c_str());
65 else if(line.find("REMOVE_PERCENT=")==0)
67 fRemoveEventsPercentage=atoi(line.substr(from,to-from).c_str());
69 else if(line.find("EVENTS_IN_FILE=")==0)
71 fNumberOfEventsInFile=atoi(line.substr(from,to-from).c_str());
82 cout<<"CLIENT -- Unable to open config file"<<endl;
84 //create directory for storage if it doesn't exist
85 gSystem->Exec(Form("mkdir -p %s",fStoragePath.c_str()));
87 //create database class
88 fDatabase = new AliStorageDatabase();
91 //check current storage size
92 fCurrentStorageSize = GetSizeOfAllChunks();
94 //create two-way commynication thread
95 fCommunicationThread = new TThread("fCommunicationThread",
96 Dispatch,(void*)this);
97 fCommunicationThread->Run();
100 AliStorageClientThread::~AliStorageClientThread()
102 cout<<"CLIENT -- AliStorageClientThread destructor called";
105 fCurrentFile->Close();
108 if(fCommunicationThread){delete fCommunicationThread;}
109 cout<<" --- OK"<<endl;
112 void AliStorageClientThread::CommunicationHandle()
114 AliStorageEventManager *eventManager = AliStorageEventManager::GetEventManagerInstance();
115 storageSockets socket = CLIENT_COMMUNICATION_REP;
116 eventManager->CreateSocket(socket);
118 struct clientRequestStruct *request;
119 struct clientRequestStruct *response = new struct clientRequestStruct;
121 cout<<"CLIENT -- Communication stated"<<endl;
125 request = eventManager->GetClientStruct(socket);
126 switch(request->messageType)
128 case REQUEST_CONNECTION:
129 eventManager->Send((long)fConnectionStatus,socket);
131 case REQUEST_RECEIVING:
132 eventManager->Send((long)fReceivingStatus,socket);
135 eventManager->Send((long)fSavingStatus,socket);
137 case REQUEST_CURRENT_SIZE:
138 eventManager->Send((long)fCurrentStorageSize,socket);
140 case REQUEST_GET_PARAMS:
141 response->maxStorageSize = fMaximumStorageSize;
142 response->maxOccupation = fStorageOccupationLevel;
143 response->removeEvents = fRemoveEventsPercentage;
144 response->eventsInChunk = fNumberOfEventsInFile;
146 eventManager->Send(response,socket);
148 case REQUEST_SET_PARAMS:
149 SetStorageParams(request->maxStorageSize,
150 request->maxOccupation,
151 request->removeEvents,
152 request->eventsInChunk);
154 fMaximumStorageSize = request->maxStorageSize;
155 fStorageOccupationLevel = request->maxOccupation;
156 fRemoveEventsPercentage = request->removeEvents;
157 fNumberOfEventsInFile = request->eventsInChunk;
159 eventManager->Send(true,socket);
167 void AliStorageClientThread::SetStorageParams(int maxStorageSize,int maxOccupation,int removeEvents,int eventsInChunk)
169 cout<<maxStorageSize<<endl<<maxOccupation<<endl<<removeEvents<<endl<<eventsInChunk<<endl;
173 ifstream configFile (GetConfigFilePath());
174 ofstream tmpFile("tmpFile.bla");
176 if (configFile.is_open())
181 while(configFile.good())
183 getline(configFile,line);
184 from = line.find("\"")+1;
185 to = line.find_last_of("\"");
187 if(line.find("MAX_SIZE=")==0)
189 tmpLine = Form("MAX_SIZE=\"%d\"",maxStorageSize);
191 else if(line.find("MAX_OCCUPATION=")==0)
193 tmpLine = Form("MAX_OCCUPATION=\"%d\"",maxOccupation);
195 else if(line.find("REMOVE_PERCENT=")==0)
197 tmpLine = Form("REMOVE_PERCENT=\"%d\"",removeEvents);
199 else if(line.find("EVENTS_IN_FILE=")==0)
201 tmpLine = Form("EVENTS_IN_FILE=\"%d\"",eventsInChunk);
212 rename("tmpFile.bla",GetConfigFilePath());
216 cout<<"CLIENT -- Unable to open config file"<<endl;
221 Long64_t AliStorageClientThread::GetSizeOfAllChunks()
223 Long64_t totalStorageSize = 0;
225 TSystemDirectory dir(fStoragePath.c_str(),fStoragePath.c_str());
226 TList *listOfDirectories = dir.GetListOfFiles();
228 if (!listOfDirectories)
230 cout<<"CLIENT -- Storage directory is empty"<<endl;
233 TIter nextDirectory(listOfDirectories);
234 TSystemFile *runDirectory;
235 string directoryName;
237 while ((runDirectory=(TSystemFile*)nextDirectory()))
239 directoryName=runDirectory->GetName();
240 if (runDirectory->IsDirectory() && directoryName.find("run")==0)
242 TSystemDirectory dirChunks(Form("%s/%s",fStoragePath.c_str(),directoryName.c_str()),Form("%s/%s",fStoragePath.c_str(),directoryName.c_str()));
243 TList *listOfChunks = dirChunks.GetListOfFiles();
247 TIter nextChunk(listOfChunks);
249 string chunkFileName;
251 while((chunk=(TSystemFile*)nextChunk()))
253 chunkFileName = chunk->GetName();
254 if(!chunk->IsDirectory() && chunkFileName.find("chunk")==0)
256 TFile *tmpFile = new TFile(Form("%s/%s/%s",fStoragePath.c_str(),directoryName.c_str(),chunkFileName.c_str()),"read");
259 totalStorageSize+=tmpFile->GetSize();
265 if(chunk){delete chunk;}
267 if(listOfChunks){delete listOfChunks;}
272 if(listOfDirectories){delete listOfDirectories;}
273 if(runDirectory){delete runDirectory;}
275 printf("CLIENT -- Total storage size:%lld\t(%.2f MB)\n",totalStorageSize,(float)totalStorageSize/(1000.*1000.));
277 return totalStorageSize;
280 void AliStorageClientThread::CollectData()
282 AliStorageEventManager *eventManager = AliStorageEventManager::GetEventManagerInstance();
283 if(eventManager->CreateSocket(EVENTS_SERVER_SUB))
285 fConnectionStatus=STATUS_OK;
289 fConnectionStatus=STATUS_ERROR;
293 int previousChunkNumber=-1;
295 int previousRunNumber=-1;
296 AliESDEvent *event = NULL;
297 // TTree *tree = NULL;
301 event = eventManager->GetEvent(EVENTS_SERVER_SUB);
305 fReceivingStatus=STATUS_OK;
307 if(event->GetRunNumber() != previousRunNumber)//when new run starts
309 cout<<"CLIENT -- new run started"<<endl;
310 previousRunNumber = event->GetRunNumber();
311 gSystem->Exec(Form("mkdir -p %s/run%d",fStoragePath.c_str(),event->GetRunNumber()));
315 TSystemDirectory dir(Form("%s/run%d",fStoragePath.c_str(),event->GetRunNumber()),
316 Form("%s/run%d",fStoragePath.c_str(),event->GetRunNumber()));
317 TList *files = dir.GetListOfFiles();
324 while ((file=(TSystemFile*)next()))
326 fname = file->GetName();
328 if (!file->IsDirectory())
330 int from = fname.find("chunk")+5;
331 int to = fname.find(".root");
333 int maxChunkNumber = atoi(fname.substr(from,to-from).c_str());
335 if(maxChunkNumber > chunkNumber)
337 chunkNumber = maxChunkNumber;
345 cout<<"CLIENT -- Received data. Event:"<<event->GetEventNumberInFile()<<"\trun:"<<event->GetRunNumber()<<endl;
347 if(chunkNumber != previousChunkNumber)//when new chunk needs to be created
351 fCurrentFile->Close();
355 fCurrentStorageSize=GetSizeOfAllChunks();
356 CheckCurrentStorageSize();
358 fCurrentFile = new TFile(Form("%s/run%d/chunk%d.root", fStoragePath.c_str(),event->GetRunNumber(),chunkNumber),"recreate");
360 previousChunkNumber = chunkNumber;
363 if(0 != fCurrentFile->WriteObject(event,Form("event%d",event->GetEventNumberInFile())))//if event was written to file
365 fDatabase->InsertEvent(event->GetRunNumber(),
366 event->GetEventNumberInFile(),
367 (char*)event->GetBeamType(),
368 event->GetMultiplicity()->GetNumberOfTracklets(),Form("%s/run%d/chunk%d.root",fStoragePath.c_str(),event->GetRunNumber(),chunkNumber));
372 if(eventsInChunk == fNumberOfEventsInFile)//if max events number in file was reached
378 if(fSavingStatus!=STATUS_OK)
380 fSavingStatus=STATUS_OK;
383 else if(fSavingStatus!=STATUS_ERROR)
385 fSavingStatus=STATUS_ERROR;
387 delete event;event=0;
390 else if(fReceivingStatus!=STATUS_ERROR)
392 cout<<"CLIENT -- ERROR -- NO DATA!"<<endl;
393 fReceivingStatus=STATUS_ERROR;
396 if(event){delete event;}
400 void AliStorageClientThread::CheckCurrentStorageSize()
402 if(fCurrentStorageSize > (float)fStorageOccupationLevel/100. * fMaximumStorageSize)
404 while(GetSizeOfAllChunks() > (float)fRemoveEventsPercentage/100. * fMaximumStorageSize)
406 struct eventStruct oldestEvent = fDatabase->GetOldestEvent();
407 string oldestEventPath = fDatabase->GetFilePath(oldestEvent);
408 //remove oldest event
409 cout<<"CLIENT -- Removing old events:"<<oldestEventPath<<endl;
410 gSystem->Exec(Form("rm -f %s",oldestEventPath.c_str()));
411 fDatabase->RemoveEvent(fDatabase->GetOldestEvent());