]> git.uio.no Git - u/mrichter/AliRoot.git/blob - MONITOR/alistoragemanager/AliStorageClientThread.cxx
Merge branch 'master' of https://git.cern.ch/reps/AliRoot
[u/mrichter/AliRoot.git] / MONITOR / alistoragemanager / AliStorageClientThread.cxx
1 #include "AliStorageClientThread.h"
2 #include "AliMultiplicity.h"
3 #include "AliStorageTypes.h"
4 #include "AliStorageEventManager.h"
5
6 #include <sstream>
7 #include <signal.h>
8 #include <fstream>
9 #include <iostream>
10
11 #include <TSystemDirectory.h>
12 #include <TThread.h>
13 #include <TFile.h>
14
15 using namespace std;
16
17 bool gClientQuit = false;    // signal flag
18 void GotSignalClient(int){gClientQuit = true;}
19
20 AliStorageClientThread::AliStorageClientThread() :
21 fConnectionStatus(STATUS_WAITING),
22 fReceivingStatus(STATUS_WAITING),
23 fSavingStatus(STATUS_WAITING),
24 fCommunicationThread(0),
25 fCurrentFile(0),
26 fDatabase(0),
27 fCurrentStorageSize(0),
28 fMaximumStorageSize(0),
29 fStoragePath(""),
30 fNumberOfEventsInFile(0),
31 fStorageOccupationLevel(0),
32 fRemoveEventsPercentage(0)
33 {       
34         // make sure that when program is closed destructor will be called
35         struct sigaction sa;
36         memset(&sa,0,sizeof(sa));
37         sa.sa_handler = GotSignalClient;
38         sigfillset(&sa.sa_mask);
39         sigaction(SIGINT,&sa,NULL);
40         
41         //load storage parameters from file
42         TThread::Lock();
43         ifstream configFile (GetConfigFilePath());
44         if (configFile.is_open())
45         {
46                 string line;
47                 int from,to;
48                 while(configFile.good())
49                 {
50                         getline(configFile,line);
51                         from = line.find("\"")+1;
52                         to = line.find_last_of("\"");
53                         if(line.find("STORAGE_PATH=")==0)
54                         {
55                                 fStoragePath=line.substr(from,to-from);
56                         }
57                         else if(line.find("MAX_SIZE=")==0)
58                         {
59                                 fMaximumStorageSize=atoi(line.substr(from,to-from).c_str());
60                         }
61                         else if(line.find("MAX_OCCUPATION=")==0)
62                         {
63                                 fStorageOccupationLevel=atoi(line.substr(from,to-from).c_str());
64                         }
65                         else if(line.find("REMOVE_PERCENT=")==0)
66                         {
67                                 fRemoveEventsPercentage=atoi(line.substr(from,to-from).c_str());
68                         }
69                         else if(line.find("EVENTS_IN_FILE=")==0)
70                         {
71                                 fNumberOfEventsInFile=atoi(line.substr(from,to-from).c_str());
72                         }
73                 }
74                 if(configFile.eof())
75                 {
76                         configFile.clear();
77                 }
78                 configFile.close();
79         }
80         else
81         {
82                 cout<<"CLIENT -- Unable to open config file"<<endl;
83         }
84         //create directory for storage if it doesn't exist
85         gSystem->Exec(Form("mkdir -p %s",fStoragePath.c_str()));
86
87         //create database class
88         fDatabase = new AliStorageDatabase();
89         TThread::UnLock();
90
91         //check current storage size
92         fCurrentStorageSize = GetSizeOfAllChunks();
93
94         //create two-way commynication thread
95         fCommunicationThread = new TThread("fCommunicationThread",
96                                        Dispatch,(void*)this);
97         fCommunicationThread->Run();
98 }
99
100 AliStorageClientThread::~AliStorageClientThread()
101 {
102         cout<<"CLIENT -- AliStorageClientThread destructor called";
103         if(fCurrentFile)
104         {
105                 fCurrentFile->Close();
106                 delete fCurrentFile;
107         }
108         if(fCommunicationThread){delete fCommunicationThread;}
109         cout<<" --- OK"<<endl;
110 }
111
112 void AliStorageClientThread::CommunicationHandle()
113 {
114         AliStorageEventManager *eventManager = AliStorageEventManager::GetEventManagerInstance();
115         storageSockets socket = CLIENT_COMMUNICATION_REP;
116         eventManager->CreateSocket(socket);
117         
118         struct clientRequestStruct *request;
119         struct clientRequestStruct *response = new struct clientRequestStruct;
120
121         cout<<"CLIENT -- Communication stated"<<endl;
122         
123         while(!gClientQuit)
124         {
125                 request = eventManager->GetClientStruct(socket);
126                 switch(request->messageType)
127                 {
128                 case REQUEST_CONNECTION:
129                         eventManager->Send((long)fConnectionStatus,socket);
130                         break;
131                 case REQUEST_RECEIVING:
132                         eventManager->Send((long)fReceivingStatus,socket);
133                         break;
134                 case REQUEST_SAVING:
135                         eventManager->Send((long)fSavingStatus,socket);
136                         break;
137                 case REQUEST_CURRENT_SIZE:
138                         eventManager->Send((long)fCurrentStorageSize,socket);
139                         break;
140                 case REQUEST_GET_PARAMS:
141                         response->maxStorageSize = fMaximumStorageSize;
142                         response->maxOccupation = fStorageOccupationLevel;
143                         response->removeEvents = fRemoveEventsPercentage;
144                         response->eventsInChunk = fNumberOfEventsInFile;
145
146                         eventManager->Send(response,socket);
147                         break;
148                 case REQUEST_SET_PARAMS:
149                         SetStorageParams(request->maxStorageSize,
150                                          request->maxOccupation,
151                                          request->removeEvents,
152                                          request->eventsInChunk);
153
154                         fMaximumStorageSize = request->maxStorageSize;
155                         fStorageOccupationLevel = request->maxOccupation;
156                         fRemoveEventsPercentage = request->removeEvents;
157                         fNumberOfEventsInFile = request->eventsInChunk;
158
159                         eventManager->Send(true,socket);
160                         break;
161                 default:break;
162                 }
163                 delete request;
164         }
165 }
166
167 void AliStorageClientThread::SetStorageParams(int maxStorageSize,int maxOccupation,int removeEvents,int eventsInChunk)
168 {
169         cout<<maxStorageSize<<endl<<maxOccupation<<endl<<removeEvents<<endl<<eventsInChunk<<endl;
170
171
172         TThread::Lock();
173         ifstream configFile (GetConfigFilePath());
174         ofstream tmpFile("tmpFile.bla");
175         
176         if (configFile.is_open())
177         {
178                 string line;
179                 string tmpLine;
180                 int from,to;
181                 while(configFile.good())
182                 {
183                         getline(configFile,line);
184                         from = line.find("\"")+1;
185                         to = line.find_last_of("\"");
186                         tmpLine = line;
187                         if(line.find("MAX_SIZE=")==0)
188                         {
189                                 tmpLine = Form("MAX_SIZE=\"%d\"",maxStorageSize);
190                         }
191                         else if(line.find("MAX_OCCUPATION=")==0)
192                         {
193                                 tmpLine = Form("MAX_OCCUPATION=\"%d\"",maxOccupation);
194                         }
195                         else if(line.find("REMOVE_PERCENT=")==0)
196                         {
197                                 tmpLine = Form("REMOVE_PERCENT=\"%d\"",removeEvents);
198                         }
199                         else if(line.find("EVENTS_IN_FILE=")==0)
200                         {
201                                 tmpLine = Form("EVENTS_IN_FILE=\"%d\"",eventsInChunk);
202                         }
203                         tmpLine += "\n";
204                         tmpFile << tmpLine;
205                 }
206                 if(configFile.eof())
207                 {
208                         configFile.clear();
209                 }
210                 configFile.close();
211                 tmpFile.close();
212                 rename("tmpFile.bla",GetConfigFilePath());
213         }
214         else
215         {
216                 cout<<"CLIENT -- Unable to open config file"<<endl;
217         }
218         TThread::UnLock();
219 }
220
221 Long64_t AliStorageClientThread::GetSizeOfAllChunks()
222 {
223         Long64_t totalStorageSize = 0;
224
225         TSystemDirectory dir(fStoragePath.c_str(),fStoragePath.c_str());
226         TList *listOfDirectories = dir.GetListOfFiles();
227
228         if (!listOfDirectories)
229         {
230                 cout<<"CLIENT -- Storage directory is empty"<<endl;
231                 return 0;
232         }
233         TIter nextDirectory(listOfDirectories);
234         TSystemFile *runDirectory;
235         string directoryName;
236         
237         while ((runDirectory=(TSystemFile*)nextDirectory()))
238         {
239                 directoryName=runDirectory->GetName();
240                 if (runDirectory->IsDirectory() && directoryName.find("run")==0)
241                 {
242                         TSystemDirectory dirChunks(Form("%s/%s",fStoragePath.c_str(),directoryName.c_str()),Form("%s/%s",fStoragePath.c_str(),directoryName.c_str()));
243                         TList *listOfChunks = dirChunks.GetListOfFiles();
244
245                         if(listOfChunks)
246                         {
247                                 TIter nextChunk(listOfChunks);
248                                 TSystemFile *chunk;
249                                 string chunkFileName;
250
251                                 while((chunk=(TSystemFile*)nextChunk()))
252                                 {
253                                         chunkFileName = chunk->GetName();
254                                         if(!chunk->IsDirectory() && chunkFileName.find("chunk")==0)
255                                         {
256                                                 TFile *tmpFile = new TFile(Form("%s/%s/%s",fStoragePath.c_str(),directoryName.c_str(),chunkFileName.c_str()),"read");
257                                                 if(tmpFile)
258                                                 {
259                                                         totalStorageSize+=tmpFile->GetSize();
260                                                         tmpFile->Close();
261                                                         delete tmpFile;
262                                                 }
263                                         }
264                                 }
265                                 if(chunk){delete chunk;}
266                         }
267                         if(listOfChunks){delete listOfChunks;}
268                 }
269         }
270
271         //tmpFiles.clear();
272         if(listOfDirectories){delete listOfDirectories;}
273         if(runDirectory){delete runDirectory;}
274         
275         printf("CLIENT -- Total storage size:%lld\t(%.2f MB)\n",totalStorageSize,(float)totalStorageSize/(1000.*1000.));
276
277         return totalStorageSize;
278 }
279
280 void AliStorageClientThread::CollectData()
281 {
282         AliStorageEventManager *eventManager = AliStorageEventManager::GetEventManagerInstance();
283         if(eventManager->CreateSocket(EVENTS_SERVER_SUB))
284         {
285                 fConnectionStatus=STATUS_OK;
286         }
287         else
288         {
289                 fConnectionStatus=STATUS_ERROR;
290         }
291         
292         int chunkNumber=0;
293         int previousChunkNumber=-1;
294         int eventsInChunk=0;
295         int previousRunNumber=-1;
296         AliESDEvent *event = NULL;
297 //        TTree *tree = NULL;
298         
299         while(!gClientQuit)
300         {               
301           event = eventManager->GetEvent(EVENTS_SERVER_SUB);
302
303                 if(event)
304                 {
305                         fReceivingStatus=STATUS_OK;
306                         
307                         if(event->GetRunNumber() != previousRunNumber)//when new run starts
308                         {
309                                 cout<<"CLIENT -- new run started"<<endl;
310                                 previousRunNumber = event->GetRunNumber();
311                                 gSystem->Exec(Form("mkdir -p %s/run%d",fStoragePath.c_str(),event->GetRunNumber()));
312                                 chunkNumber=0;
313                                 eventsInChunk=0;
314                                 
315                                 TSystemDirectory dir(Form("%s/run%d",fStoragePath.c_str(),event->GetRunNumber()),
316                                                      Form("%s/run%d",fStoragePath.c_str(),event->GetRunNumber()));
317                                 TList *files = dir.GetListOfFiles();    
318                                 if (files)
319                                 {
320                                         TSystemFile *file;
321                                         string fname;
322                                         TIter next(files);
323                                         
324                                         while ((file=(TSystemFile*)next()))
325                                         {
326                                                 fname = file->GetName();
327                                         
328                                                 if (!file->IsDirectory())
329                                                 {
330                                                         int from = fname.find("chunk")+5;
331                                                         int to = fname.find(".root");
332
333                                                         int maxChunkNumber = atoi(fname.substr(from,to-from).c_str());
334
335                                                         if(maxChunkNumber > chunkNumber)
336                                                         {
337                                                                 chunkNumber = maxChunkNumber;
338                                                         }
339                                                 }
340                                         }
341                                         chunkNumber++;
342                                 }
343                         }
344
345                         cout<<"CLIENT -- Received data. Event:"<<event->GetEventNumberInFile()<<"\trun:"<<event->GetRunNumber()<<endl;
346                         
347                         if(chunkNumber != previousChunkNumber)//when new chunk needs to be created
348                         {
349                                 if(fCurrentFile)
350                                 {
351                                         fCurrentFile->Close();
352                                         delete fCurrentFile;
353                                         fCurrentFile=0;
354                                 }
355                                 fCurrentStorageSize=GetSizeOfAllChunks();
356                                 CheckCurrentStorageSize();
357                                 
358                                 fCurrentFile = new TFile(Form("%s/run%d/chunk%d.root", fStoragePath.c_str(),event->GetRunNumber(),chunkNumber),"recreate");
359
360                                 previousChunkNumber = chunkNumber;
361                         }
362                         
363                         if(0 != fCurrentFile->WriteObject(event,Form("event%d",event->GetEventNumberInFile())))//if event was written to file
364                         {
365                                 fDatabase->InsertEvent(event->GetRunNumber(),
366                                               event->GetEventNumberInFile(),
367                                               (char*)event->GetBeamType(),
368                                                        event->GetMultiplicity()->GetNumberOfTracklets(),Form("%s/run%d/chunk%d.root",fStoragePath.c_str(),event->GetRunNumber(),chunkNumber));
369                                 
370                                 eventsInChunk++;
371                                 
372                                 if(eventsInChunk == fNumberOfEventsInFile)//if max events number in file was reached
373                                 {
374                                         chunkNumber++;
375                                         eventsInChunk=0;
376                                 }
377                                 
378                                 if(fSavingStatus!=STATUS_OK)
379                                 {
380                                         fSavingStatus=STATUS_OK;
381                                 }
382                         }
383                         else if(fSavingStatus!=STATUS_ERROR)
384                         {
385                                 fSavingStatus=STATUS_ERROR;
386                         }
387                         delete event;event=0;
388                         //delete tree;
389                 }
390                 else if(fReceivingStatus!=STATUS_ERROR)
391                 {
392                         cout<<"CLIENT -- ERROR -- NO DATA!"<<endl;
393                         fReceivingStatus=STATUS_ERROR;
394                 }
395         }
396         if(event){delete event;}
397 }
398
399
400 void AliStorageClientThread::CheckCurrentStorageSize()
401 {
402         if(fCurrentStorageSize >  (float)fStorageOccupationLevel/100. * fMaximumStorageSize)
403         {
404                 while(GetSizeOfAllChunks() > (float)fRemoveEventsPercentage/100. * fMaximumStorageSize)
405                 {
406                         struct eventStruct oldestEvent = fDatabase->GetOldestEvent();
407                         string oldestEventPath = fDatabase->GetFilePath(oldestEvent);
408                         //remove oldest event
409                         cout<<"CLIENT -- Removing old events:"<<oldestEventPath<<endl;
410                         gSystem->Exec(Form("rm -f %s",oldestEventPath.c_str()));
411                         fDatabase->RemoveEvent(fDatabase->GetOldestEvent());
412                 }
413         }
414 }