Client Thread of Storage Manager split into smaller classes. Preparation for moving...
[u/mrichter/AliRoot.git] / MONITOR / alistoragemanager / AliEventsCollectorThread.cxx
1 #include "AliEventsCollectorThread.h"
2 #include "AliStorageEventManager.h"
3
4 #include <TSystemDirectory.h>
5
6 #include <iostream>
7 #include <fstream>
8
9 using namespace std;
10
11 AliEventsCollectorThread::AliEventsCollectorThread(AliStorageClientThread *onlineReconstructionManager) :
12 fManager(onlineReconstructionManager),
13 fCollectorThread(0),
14 fCurrentFile(0),
15 fDatabase(0),
16 fFinished(false)
17 {
18     fDatabase = new AliStorageDatabase();
19     
20     CheckCurrentStorageSize();
21     
22     // start collecting events in a thread
23     fCollectorThread = new TThread("fCollectorThread",Dispatch,(void*)this);
24     fCollectorThread->Run();
25 }
26
27
28 AliEventsCollectorThread::~AliEventsCollectorThread()
29 {
30     if(fCollectorThread){delete fCollectorThread;}
31     
32     if(fCurrentFile){
33         fCurrentFile->Close();
34         delete fCurrentFile;
35     }
36     if(fDatabase){delete fDatabase;}
37     if(fManager){delete fManager;}
38 }
39
40 void AliEventsCollectorThread::Kill()
41 {
42     if(fCollectorThread){
43         fFinished=true;
44         fCollectorThread->Join();
45         fCollectorThread->Kill();
46     }
47 }
48
49 void AliEventsCollectorThread::CollectorHandle()
50 {
51     AliStorageEventManager *eventManager = AliStorageEventManager::GetEventManagerInstance();
52     if(eventManager->CreateSocket(EVENTS_SERVER_SUB)){fManager->fConnectionStatus=STATUS_OK;}
53     else{fManager->fConnectionStatus=STATUS_ERROR;}
54     
55     int chunkNumber=0;
56     int previousChunkNumber=-1;
57     int eventsInChunk=0;
58     int previousRunNumber=-1;
59     AliESDEvent *event = NULL;
60     vector<struct eventStruct> eventsToUpdate;
61     struct eventStruct currentEvent;
62     
63     while(!fFinished)
64     {
65         cout<<"CLIENT -- waiting for event..."<<endl;
66         event = eventManager->GetEvent(EVENTS_SERVER_SUB,5000);
67         
68         if(event)
69         {
70             cout<<"CLIENT -- received event"<<endl;
71             fManager->fReceivingStatus=STATUS_OK;
72             
73             if(event->GetRunNumber() != previousRunNumber)//when new run starts
74             {
75                 cout<<"CLIENT -- new run started"<<endl;
76                 previousRunNumber = event->GetRunNumber();
77                 gSystem->Exec(Form("mkdir -p %s/run%d",fManager->fStoragePath.c_str(),event->GetRunNumber()));
78                 chunkNumber=0;
79                 eventsInChunk=0;
80                 
81                 TSystemDirectory dir(Form("%s/run%d",fManager->fStoragePath.c_str(),event->GetRunNumber()),
82                                      Form("%s/run%d",fManager->fStoragePath.c_str(),event->GetRunNumber()));
83                 TList *files = dir.GetListOfFiles();
84                 if (files)
85                 {
86                     TSystemFile *file;
87                     string fname;
88                     TIter next(files);
89                     
90                     while ((file=(TSystemFile*)next()))
91                     {
92                         fname = file->GetName();
93                         
94                         if (!file->IsDirectory())
95                         {
96                             int from = fname.find("chunk")+5;
97                             int to = fname.find(".root");
98                             
99                             int maxChunkNumber = atoi(fname.substr(from,to-from).c_str());
100                             
101                             if(maxChunkNumber > chunkNumber)
102                             {
103                                 chunkNumber = maxChunkNumber;
104                             }
105                         }
106                     }
107                     chunkNumber++;
108                 }
109             }
110             
111             cout<<"CLIENT -- Received data. Event:"<<event->GetEventNumberInFile()<<"\trun:"<<event->GetRunNumber()<<endl;
112             
113             if(chunkNumber != previousChunkNumber)//when new chunk needs to be created
114             {
115                 if(fCurrentFile)
116                 {
117                     fCurrentFile->Close();
118                     delete fCurrentFile;
119                     fCurrentFile=0;
120                 }
121                 for(unsigned int i=0;i<eventsToUpdate.size();i++)
122                 {
123                     fDatabase->UpdateEventPath(eventsToUpdate[i],Form("%s/run%d/chunk%d.root",
124                                                     fManager->fStoragePath.c_str(),
125                                                     event->GetRunNumber(),
126                                                     chunkNumber-1));
127                 }
128                 eventsToUpdate.clear();
129                 
130                 CheckCurrentStorageSize();
131                 
132                 fCurrentFile = new TFile(Form("%s/run%d/chunk%d.root", fManager->fStoragePath.c_str(),event->GetRunNumber(),chunkNumber),"recreate");
133                 
134                 previousChunkNumber = chunkNumber;
135             }
136             
137             //create new directory for this run
138             TDirectory *currentRun;
139             if((currentRun = fCurrentFile->mkdir(Form("run%d",event->GetRunNumber()))))
140             {
141                 cout<<"CLIENT -- creating new directory for this run"<<endl;
142                 currentRun->cd();
143             }
144             else
145             {
146                 cout<<"CLIENT -- opening existing directory for this run"<<endl;
147                 fCurrentFile->cd(Form("run%d",event->GetRunNumber()));
148             }
149             
150             if(0 != event->Write(Form("event%d",event->GetEventNumberInFile())))
151                 //fCurrentFile->WriteObject(event,Form("event%d",event->GetEventNumberInFile())))//if event was written to file
152             {
153                 eventsInChunk++;
154                 
155                 if(eventsInChunk == fManager->fNumberOfEventsInFile)//if max events number in file was reached
156                 {
157                     chunkNumber++;
158                     eventsInChunk=0;
159                 }
160                 
161                 if(fManager->fSavingStatus!=STATUS_OK){fManager->fSavingStatus=STATUS_OK;}
162             }
163             else if(fManager->fSavingStatus!=STATUS_ERROR){fManager->fSavingStatus=STATUS_ERROR;}
164             
165             // save to event file as well:
166             TFile *eventFile = new TFile(Form("%s/run%d/event%d.root", fManager->fStoragePath.c_str(),event->GetRunNumber(),eventsInChunk),"recreate");
167             
168             if((currentRun = eventFile->mkdir(Form("run%d",event->GetRunNumber()))))
169             {
170                 cout<<"CLIENT -- creating new directory for this run"<<endl;
171                 currentRun->cd();
172             }
173             else
174             {
175                 cout<<"CLIENT -- opening existing directory for this run"<<endl;
176                 eventFile->cd(Form("run%d",event->GetRunNumber()));
177             }
178             
179             if(0 == event->Write(Form("event%d",event->GetEventNumberInFile())) &&
180                fManager->fSavingStatus!=STATUS_ERROR)
181             {
182                 fManager->fSavingStatus=STATUS_ERROR;
183             }
184             else
185             {
186                 eventFile->Close();
187                 delete eventFile;
188                 fDatabase->InsertEvent(event->GetRunNumber(),
189                                        event->GetEventNumberInFile(),
190                                        (char*)event->GetBeamType(),
191                                        event->GetMultiplicity()->GetNumberOfTracklets(),
192                                        Form("%s/run%d/event%d.root",fManager->fStoragePath.c_str(),
193                                             event->GetRunNumber(),
194                                             eventsInChunk));
195                 
196                 currentEvent.runNumber = event->GetRunNumber();
197                 currentEvent.eventNumber = event->GetEventNumberInFile();
198                 eventsToUpdate.push_back(currentEvent);
199             }
200             delete event;event=0;
201         }
202         else
203         {
204             cout<<"CLIENT -- ERROR -- NO DATA!"<<endl;
205             if(fManager->fReceivingStatus!=STATUS_ERROR){fManager->fReceivingStatus=STATUS_ERROR;}
206         }
207     }
208     if(event){delete event;}
209 }
210
211
212 Long64_t AliEventsCollectorThread::GetSizeOfAllChunks()
213 {
214     Long64_t totalStorageSize = 0;
215     
216     TSystemDirectory dir(fManager->fStoragePath.c_str(),fManager->fStoragePath.c_str());
217     TList *listOfDirectories = dir.GetListOfFiles();
218     
219     if (!listOfDirectories){
220         cout<<"CLIENT -- Storage directory is empty"<<endl;
221         return 0;
222     }
223     TIter nextDirectory(listOfDirectories);
224     TSystemFile *runDirectory;
225     string directoryName;
226     
227     while ((runDirectory=(TSystemFile*)nextDirectory()))
228     {
229         directoryName=runDirectory->GetName();
230         if (runDirectory->IsDirectory() && directoryName.find("run")==0)
231         {
232             TSystemDirectory dirChunks(Form("%s/%s",fManager->fStoragePath.c_str(),directoryName.c_str()),Form("%s/%s",fManager->fStoragePath.c_str(),directoryName.c_str()));
233             TList *listOfChunks = dirChunks.GetListOfFiles();
234             
235             if(listOfChunks)
236             {
237                 TIter nextChunk(listOfChunks);
238                 TSystemFile *chunk;
239                 string chunkFileName;
240                 
241                 while((chunk=(TSystemFile*)nextChunk()))
242                 {
243                     chunkFileName = chunk->GetName();
244                     if(!chunk->IsDirectory() && chunkFileName.find("chunk")==0)
245                     {
246                         TFile *tmpFile = new TFile(Form("%s/%s/%s",fManager->fStoragePath.c_str(),directoryName.c_str(),chunkFileName.c_str()),"read");
247                         if(tmpFile)
248                         {
249                             totalStorageSize+=tmpFile->GetSize();
250                             tmpFile->Close();
251                             delete tmpFile;
252                         }
253                     }
254                 }
255                 if(chunk){delete chunk;}
256             }
257             if(listOfChunks){delete listOfChunks;}
258         }
259     }
260
261     if(listOfDirectories){delete listOfDirectories;}
262     if(runDirectory){delete runDirectory;}
263     
264     printf("CLIENT -- Total storage size:%lld\t(%.2f MB)\n",totalStorageSize,(float)totalStorageSize/(1000.*1000.));
265     
266     return totalStorageSize;
267 }
268
269 void AliEventsCollectorThread::CheckCurrentStorageSize()
270 {
271     fManager->fCurrentStorageSize=GetSizeOfAllChunks();
272     
273     if(fManager->fCurrentStorageSize >  (float)fManager->fStorageOccupationLevel/100. * fManager->fMaximumStorageSize)
274     {
275         while(GetSizeOfAllChunks() > (float)fManager->fRemoveEventsPercentage/100. * fManager->fMaximumStorageSize)
276         {
277             struct eventStruct oldestEvent = fDatabase->GetOldestEvent();
278             string oldestEventPath = fDatabase->GetFilePath(oldestEvent);
279             
280             //remove oldest event
281             cout<<"CLIENT -- Removing old events:"<<oldestEventPath<<endl;
282             gSystem->Exec(Form("rm -f %s",oldestEventPath.c_str()));
283             fDatabase->RemoveEventsWithPath(oldestEventPath);
284         }
285     }
286 }
287
288
289