]> git.uio.no Git - u/mrichter/AliRoot.git/blob - MONITOR/alistoragemanager/AliStorageClientThread.cxx
215410300fac4a9f53f9e2682a72de9a908e9a2e
[u/mrichter/AliRoot.git] / MONITOR / alistoragemanager / AliStorageClientThread.cxx
1 #include "AliStorageClientThread.h"
2 #include "AliMultiplicity.h"
3 #include "AliStorageTypes.h"
4 #include "AliStorageEventManager.h"
5
6 #include <sstream>
7 #include <signal.h>
8 #include <fstream>
9 #include <iostream>
10
11 #include <TSystemDirectory.h>
12 #include <TThread.h>
13 #include <TFile.h>
14
15 using namespace std;
16
17 bool gClientQuit = false;    // signal flag
18 void GotSignalClient(int){gClientQuit = true;}
19
20 AliStorageClientThread::AliStorageClientThread() :
21 fConnectionStatus(STATUS_WAITING),
22 fReceivingStatus(STATUS_WAITING),
23 fSavingStatus(STATUS_WAITING),
24 fCommunicationThread(0),
25 fCurrentFile(0),
26 fDatabase(0),
27 fCurrentStorageSize(0),
28 fMaximumStorageSize(0),
29 fStoragePath(""),
30 fNumberOfEventsInFile(0),
31 fStorageOccupationLevel(0),
32 fRemoveEventsPercentage(0)
33 {       
34         // make sure that when program is closed destructor will be called
35         struct sigaction sa;
36         memset(&sa,0,sizeof(sa));
37         sa.sa_handler = GotSignalClient;
38         sigfillset(&sa.sa_mask);
39         sigaction(SIGINT,&sa,NULL);
40         
41         //load storage parameters from file
42         TThread::Lock();
43         ifstream configFile (GetConfigFilePath());
44         if (configFile.is_open())
45         {
46                 string line;
47                 int from,to;
48                 while(configFile.good())
49                 {
50                         getline(configFile,line);
51                         from = line.find("\"")+1;
52                         to = line.find_last_of("\"");
53                         if(line.find("STORAGE_PATH=")==0)
54                         {
55                                 fStoragePath=line.substr(from,to-from);
56                         }
57                         else if(line.find("MAX_SIZE=")==0)
58                         {
59                                 fMaximumStorageSize=atoi(line.substr(from,to-from).c_str());
60                         }
61                         else if(line.find("MAX_OCCUPATION=")==0)
62                         {
63                                 fStorageOccupationLevel=atoi(line.substr(from,to-from).c_str());
64                         }
65                         else if(line.find("REMOVE_PERCENT=")==0)
66                         {
67                                 fRemoveEventsPercentage=atoi(line.substr(from,to-from).c_str());
68                         }
69                         else if(line.find("EVENTS_IN_FILE=")==0)
70                         {
71                                 fNumberOfEventsInFile=atoi(line.substr(from,to-from).c_str());
72                         }
73                 }
74                 if(configFile.eof())
75                 {
76                         configFile.clear();
77                 }
78                 configFile.close();
79         }
80         else
81         {
82                 cout<<"CLIENT -- Unable to open config file"<<endl;
83         }
84         //create directory for storage if it doesn't exist
85         gSystem->Exec(Form("mkdir -p %s",fStoragePath.c_str()));
86
87         //create database class
88         fDatabase = new AliStorageDatabase();
89         TThread::UnLock();
90
91         //check current storage size
92         fCurrentStorageSize = GetSizeOfAllChunks();
93
94         //create two-way commynication thread
95         fCommunicationThread = new TThread("fCommunicationThread",
96                                        Dispatch,(void*)this);
97         fCommunicationThread->Run();
98 }
99
100 AliStorageClientThread::~AliStorageClientThread()
101 {
102         cout<<"CLIENT -- AliStorageClientThread destructor called";
103         if(fCurrentFile)
104         {
105                 fCurrentFile->Close();
106                 delete fCurrentFile;
107         }
108         if(fCommunicationThread){delete fCommunicationThread;}
109         if(fDatabase){delete fDatabase;}
110         cout<<" --- OK"<<endl;
111 }
112
113 void AliStorageClientThread::CommunicationHandle()
114 {
115         AliStorageEventManager *eventManager = AliStorageEventManager::GetEventManagerInstance();
116         storageSockets socket = CLIENT_COMMUNICATION_REP;
117         eventManager->CreateSocket(socket);
118         
119         struct clientRequestStruct *request;
120         struct clientRequestStruct *response = new struct clientRequestStruct;
121
122         cout<<"CLIENT -- Communication stated"<<endl;
123         
124         while(!gClientQuit)
125         {
126                 request = eventManager->GetClientStruct(socket);
127                 switch(request->messageType)
128                 {
129                 case REQUEST_CONNECTION:
130                         eventManager->Send((long)fConnectionStatus,socket);
131                         break;
132                 case REQUEST_RECEIVING:
133                         eventManager->Send((long)fReceivingStatus,socket);
134                         break;
135                 case REQUEST_SAVING:
136                         eventManager->Send((long)fSavingStatus,socket);
137                         break;
138                 case REQUEST_CURRENT_SIZE:
139                         eventManager->Send((long)fCurrentStorageSize,socket);
140                         break;
141                 case REQUEST_GET_PARAMS:
142                         response->maxStorageSize = fMaximumStorageSize;
143                         response->maxOccupation = fStorageOccupationLevel;
144                         response->removeEvents = fRemoveEventsPercentage;
145                         response->eventsInChunk = fNumberOfEventsInFile;
146
147                         eventManager->Send(response,socket);
148                         break;
149                 case REQUEST_SET_PARAMS:
150                         SetStorageParams(request->maxStorageSize,
151                                          request->maxOccupation,
152                                          request->removeEvents,
153                                          request->eventsInChunk);
154
155                         fMaximumStorageSize = request->maxStorageSize;
156                         fStorageOccupationLevel = request->maxOccupation;
157                         fRemoveEventsPercentage = request->removeEvents;
158                         fNumberOfEventsInFile = request->eventsInChunk;
159
160                         eventManager->Send(true,socket);
161                         break;
162                 default:break;
163                 }
164                 delete request;
165         }
166 }
167
168 void AliStorageClientThread::SetStorageParams(int maxStorageSize,int maxOccupation,int removeEvents,int eventsInChunk)
169 {
170         cout<<maxStorageSize<<endl<<maxOccupation<<endl<<removeEvents<<endl<<eventsInChunk<<endl;
171
172
173         TThread::Lock();
174         ifstream configFile (GetConfigFilePath());
175         ofstream tmpFile("tmpFile.bla");
176         
177         if (configFile.is_open())
178         {
179                 string line;
180                 string tmpLine;
181                 int from,to;
182                 while(configFile.good())
183                 {
184                         getline(configFile,line);
185                         from = line.find("\"")+1;
186                         to = line.find_last_of("\"");
187                         tmpLine = line;
188                         if(line.find("MAX_SIZE=")==0)
189                         {
190                                 tmpLine = Form("MAX_SIZE=\"%d\"",maxStorageSize);
191                         }
192                         else if(line.find("MAX_OCCUPATION=")==0)
193                         {
194                                 tmpLine = Form("MAX_OCCUPATION=\"%d\"",maxOccupation);
195                         }
196                         else if(line.find("REMOVE_PERCENT=")==0)
197                         {
198                                 tmpLine = Form("REMOVE_PERCENT=\"%d\"",removeEvents);
199                         }
200                         else if(line.find("EVENTS_IN_FILE=")==0)
201                         {
202                                 tmpLine = Form("EVENTS_IN_FILE=\"%d\"",eventsInChunk);
203                         }
204                         tmpLine += "\n";
205                         tmpFile << tmpLine;
206                 }
207                 if(configFile.eof())
208                 {
209                         configFile.clear();
210                 }
211                 configFile.close();
212                 tmpFile.close();
213                 rename("tmpFile.bla",GetConfigFilePath());
214         }
215         else
216         {
217                 cout<<"CLIENT -- Unable to open config file"<<endl;
218         }
219         TThread::UnLock();
220 }
221
222 Long64_t AliStorageClientThread::GetSizeOfAllChunks()
223 {
224         Long64_t totalStorageSize = 0;
225
226         TSystemDirectory dir(fStoragePath.c_str(),fStoragePath.c_str());
227         TList *listOfDirectories = dir.GetListOfFiles();
228
229         if (!listOfDirectories)
230         {
231                 cout<<"CLIENT -- Storage directory is empty"<<endl;
232                 return 0;
233         }
234         TIter nextDirectory(listOfDirectories);
235         TSystemFile *runDirectory;
236         string directoryName;
237         
238         while ((runDirectory=(TSystemFile*)nextDirectory()))
239         {
240                 directoryName=runDirectory->GetName();
241                 if (runDirectory->IsDirectory() && directoryName.find("run")==0)
242                 {
243                         TSystemDirectory dirChunks(Form("%s/%s",fStoragePath.c_str(),directoryName.c_str()),Form("%s/%s",fStoragePath.c_str(),directoryName.c_str()));
244                         TList *listOfChunks = dirChunks.GetListOfFiles();
245
246                         if(listOfChunks)
247                         {
248                                 TIter nextChunk(listOfChunks);
249                                 TSystemFile *chunk;
250                                 string chunkFileName;
251
252                                 while((chunk=(TSystemFile*)nextChunk()))
253                                 {
254                                         chunkFileName = chunk->GetName();
255                                         if(!chunk->IsDirectory() && chunkFileName.find("chunk")==0)
256                                         {
257                                                 TFile *tmpFile = new TFile(Form("%s/%s/%s",fStoragePath.c_str(),directoryName.c_str(),chunkFileName.c_str()),"read");
258                                                 if(tmpFile)
259                                                 {
260                                                         totalStorageSize+=tmpFile->GetSize();
261                                                         tmpFile->Close();
262                                                         delete tmpFile;
263                                                 }
264                                         }
265                                 }
266                                 if(chunk){delete chunk;}
267                         }
268                         if(listOfChunks){delete listOfChunks;}
269                 }
270         }
271
272         //tmpFiles.clear();
273         if(listOfDirectories){delete listOfDirectories;}
274         if(runDirectory){delete runDirectory;}
275         
276         printf("CLIENT -- Total storage size:%lld\t(%.2f MB)\n",totalStorageSize,(float)totalStorageSize/(1000.*1000.));
277
278         return totalStorageSize;
279 }
280
281 void AliStorageClientThread::CollectData()
282 {
283         AliStorageEventManager *eventManager = AliStorageEventManager::GetEventManagerInstance();
284         if(eventManager->CreateSocket(EVENTS_SERVER_SUB)){fConnectionStatus=STATUS_OK;}
285         else{fConnectionStatus=STATUS_ERROR;}
286         
287         int chunkNumber=0;
288         int previousChunkNumber=-1;
289         int eventsInChunk=0;
290         int previousRunNumber=-1;
291         AliESDEvent *event = NULL;
292         vector<struct eventStruct> eventsToUpdate;
293         struct eventStruct currentEvent;
294         
295
296         while(!gClientQuit)
297         {               
298           event = eventManager->GetEvent(EVENTS_SERVER_SUB);
299
300                 if(event)
301                 {
302                         fReceivingStatus=STATUS_OK;
303                         
304                         if(event->GetRunNumber() != previousRunNumber)//when new run starts
305                         {
306                                 cout<<"CLIENT -- new run started"<<endl;
307                                 previousRunNumber = event->GetRunNumber();
308                                 gSystem->Exec(Form("mkdir -p %s/run%d",fStoragePath.c_str(),event->GetRunNumber()));
309                                 chunkNumber=0;
310                                 eventsInChunk=0;
311                                 
312                                 TSystemDirectory dir(Form("%s/run%d",fStoragePath.c_str(),event->GetRunNumber()),
313                                                      Form("%s/run%d",fStoragePath.c_str(),event->GetRunNumber()));
314                                 TList *files = dir.GetListOfFiles();    
315                                 if (files)
316                                 {
317                                         TSystemFile *file;
318                                         string fname;
319                                         TIter next(files);
320                                         
321                                         while ((file=(TSystemFile*)next()))
322                                         {
323                                                 fname = file->GetName();
324                                         
325                                                 if (!file->IsDirectory())
326                                                 {
327                                                         int from = fname.find("chunk")+5;
328                                                         int to = fname.find(".root");
329
330                                                         int maxChunkNumber = atoi(fname.substr(from,to-from).c_str());
331
332                                                         if(maxChunkNumber > chunkNumber)
333                                                         {
334                                                                 chunkNumber = maxChunkNumber;
335                                                         }
336                                                 }
337                                         }
338                                         chunkNumber++;
339                                 }
340                         }
341
342                         cout<<"CLIENT -- Received data. Event:"<<event->GetEventNumberInFile()<<"\trun:"<<event->GetRunNumber()<<endl;
343                         
344                         if(chunkNumber != previousChunkNumber)//when new chunk needs to be created
345                         {
346                                 if(fCurrentFile)
347                                 {
348                                         fCurrentFile->Close();
349                                         delete fCurrentFile;
350                                         fCurrentFile=0;
351                                 }
352                                 for(unsigned int i=0;i<eventsToUpdate.size();i++)
353                                   {
354                                     fDatabase->UpdateEventPath(eventsToUpdate[i],
355                                                                Form("%s/run%d/chunk%d.root", 
356                                                                     fStoragePath.c_str(),
357                                                                     event->GetRunNumber(),
358                                                                     chunkNumber-1));
359                                   }
360                                 eventsToUpdate.clear();
361
362
363                                 fCurrentStorageSize=GetSizeOfAllChunks();
364                                 CheckCurrentStorageSize();
365                                 
366                                 fCurrentFile = new TFile(Form("%s/run%d/chunk%d.root", fStoragePath.c_str(),event->GetRunNumber(),chunkNumber),"recreate");
367
368                                 previousChunkNumber = chunkNumber;
369                         }
370
371                         //create new directory for this run
372                         TDirectory *currentRun;
373                         if((currentRun = fCurrentFile->mkdir(Form("run%d",event->GetRunNumber()))))
374                           {
375                             cout<<"CLIENT -- creating new directory for this run"<<endl;
376                             currentRun->cd();
377                           }
378                         else
379                           {
380                             cout<<"CLIENT -- opening existing directory for this run"<<endl;
381                             fCurrentFile->cd(Form("run%d",event->GetRunNumber()));
382                           }
383
384                         if(0 != event->Write(Form("event%d",event->GetEventNumberInFile()))) 
385                           //fCurrentFile->WriteObject(event,Form("event%d",event->GetEventNumberInFile())))//if event was written to file
386                         {
387                                 eventsInChunk++;
388                                 
389                                 if(eventsInChunk == fNumberOfEventsInFile)//if max events number in file was reached
390                                 {
391                                         chunkNumber++;
392                                         eventsInChunk=0;
393                                 }
394                                 
395                                 if(fSavingStatus!=STATUS_OK)
396                                 {
397                                         fSavingStatus=STATUS_OK;
398                                 }
399                         }
400                         else if(fSavingStatus!=STATUS_ERROR)
401                         {
402                                 fSavingStatus=STATUS_ERROR;
403                         }
404
405                 // save to event file as well:
406
407                 TFile *eventFile = new TFile(Form("%s/run%d/event%d.root", fStoragePath.c_str(),event->GetRunNumber(),eventsInChunk),"recreate");
408
409                 if((currentRun = eventFile->mkdir(Form("run%d",event->GetRunNumber()))))
410                           {
411                             cout<<"CLIENT -- creating new directory for this run"<<endl;
412                             currentRun->cd();
413                           }
414                         else
415                           {
416                             cout<<"CLIENT -- opening existing directory for this run"<<endl;
417                             eventFile->cd(Form("run%d",event->GetRunNumber()));
418                           }
419
420                 if(0 == event->Write(Form("event%d",event->GetEventNumberInFile())) && 
421                    fSavingStatus!=STATUS_ERROR){fSavingStatus=STATUS_ERROR;}
422                 else
423                   {
424                     eventFile->Close();
425                     delete eventFile;
426                     fDatabase->InsertEvent(event->GetRunNumber(),
427                                            event->GetEventNumberInFile(),
428                                            (char*)event->GetBeamType(),
429                                            event->GetMultiplicity()->GetNumberOfTracklets(),
430                                            Form("%s/run%d/event%d.root",fStoragePath.c_str(),
431                                                 event->GetRunNumber(),
432                                                 eventsInChunk));
433                     
434                     currentEvent.runNumber = event->GetRunNumber();
435                     currentEvent.eventNumber = event->GetEventNumberInFile();
436                     eventsToUpdate.push_back(currentEvent);
437                   }
438                         delete event;event=0;
439                         //delete tree;
440                 }
441                 else if(fReceivingStatus!=STATUS_ERROR)
442                 {
443                         cout<<"CLIENT -- ERROR -- NO DATA!"<<endl;
444                         fReceivingStatus=STATUS_ERROR;
445                 }
446         }
447         if(event){delete event;}
448 }
449
450
451 void AliStorageClientThread::CheckCurrentStorageSize()
452 {
453         if(fCurrentStorageSize >  (float)fStorageOccupationLevel/100. * fMaximumStorageSize)
454         {
455                 while(GetSizeOfAllChunks() > (float)fRemoveEventsPercentage/100. * fMaximumStorageSize)
456                 {
457                         struct eventStruct oldestEvent = fDatabase->GetOldestEvent();
458                         string oldestEventPath = fDatabase->GetFilePath(oldestEvent);
459                         //remove oldest event
460                         cout<<"CLIENT -- Removing old events:"<<oldestEventPath<<endl;
461                         gSystem->Exec(Form("rm -f %s",oldestEventPath.c_str()));
462             fDatabase->RemoveEventsWithPath(oldestEventPath);
463 //                      fDatabase->RemoveEvent(oldestEvent);
464                 }
465         }
466 }