1 #include "AliEventsCollectorThread.h"
2 #include "AliStorageEventManager.h"
4 #include <TSystemDirectory.h>
11 AliEventsCollectorThread::AliEventsCollectorThread(AliStorageClientThread *onlineReconstructionManager) :
12 fManager(onlineReconstructionManager),
18 fDatabase = new AliStorageDatabase();
20 CheckCurrentStorageSize();
22 // start collecting events in a thread
23 fCollectorThread = new TThread("fCollectorThread",Dispatch,(void*)this);
24 fCollectorThread->Run();
28 AliEventsCollectorThread::~AliEventsCollectorThread()
30 if(fCollectorThread){delete fCollectorThread;}
33 fCurrentFile->Close();
36 if(fDatabase){delete fDatabase;}
37 if(fManager){delete fManager;}
40 void AliEventsCollectorThread::Kill()
44 fCollectorThread->Join();
45 fCollectorThread->Kill();
49 void AliEventsCollectorThread::CollectorHandle()
51 AliStorageEventManager *eventManager = AliStorageEventManager::GetEventManagerInstance();
52 if(eventManager->CreateSocket(EVENTS_SERVER_SUB)){fManager->fConnectionStatus=STATUS_OK;}
53 else{fManager->fConnectionStatus=STATUS_ERROR;}
56 int previousChunkNumber=-1;
58 int previousRunNumber=-1;
59 AliESDEvent *event = NULL;
60 vector<struct eventStruct> eventsToUpdate;
61 struct eventStruct currentEvent;
65 cout<<"CLIENT -- waiting for event..."<<endl;
66 event = eventManager->GetEvent(EVENTS_SERVER_SUB,5000);
70 cout<<"CLIENT -- received event"<<endl;
71 fManager->fReceivingStatus=STATUS_OK;
73 if(event->GetRunNumber() != previousRunNumber)//when new run starts
75 cout<<"CLIENT -- new run started"<<endl;
76 previousRunNumber = event->GetRunNumber();
77 gSystem->Exec(Form("mkdir -p %s/run%d",fManager->fStoragePath.c_str(),event->GetRunNumber()));
81 TSystemDirectory dir(Form("%s/run%d",fManager->fStoragePath.c_str(),event->GetRunNumber()),
82 Form("%s/run%d",fManager->fStoragePath.c_str(),event->GetRunNumber()));
83 TList *files = dir.GetListOfFiles();
90 while ((file=(TSystemFile*)next()))
92 fname = file->GetName();
94 if (!file->IsDirectory())
96 int from = fname.find("chunk")+5;
97 int to = fname.find(".root");
99 int maxChunkNumber = atoi(fname.substr(from,to-from).c_str());
101 if(maxChunkNumber > chunkNumber)
103 chunkNumber = maxChunkNumber;
111 cout<<"CLIENT -- Received data. Event:"<<event->GetEventNumberInFile()<<"\trun:"<<event->GetRunNumber()<<endl;
113 if(chunkNumber != previousChunkNumber)//when new chunk needs to be created
117 fCurrentFile->Close();
121 for(unsigned int i=0;i<eventsToUpdate.size();i++)
123 fDatabase->UpdateEventPath(eventsToUpdate[i],Form("%s/run%d/chunk%d.root",
124 fManager->fStoragePath.c_str(),
125 event->GetRunNumber(),
128 eventsToUpdate.clear();
130 CheckCurrentStorageSize();
132 fCurrentFile = new TFile(Form("%s/run%d/chunk%d.root", fManager->fStoragePath.c_str(),event->GetRunNumber(),chunkNumber),"recreate");
134 previousChunkNumber = chunkNumber;
137 //create new directory for this run
138 TDirectory *currentRun;
139 if((currentRun = fCurrentFile->mkdir(Form("run%d",event->GetRunNumber()))))
141 cout<<"CLIENT -- creating new directory for this run"<<endl;
146 cout<<"CLIENT -- opening existing directory for this run"<<endl;
147 fCurrentFile->cd(Form("run%d",event->GetRunNumber()));
150 if(0 != event->Write(Form("event%d",event->GetEventNumberInFile())))
151 //fCurrentFile->WriteObject(event,Form("event%d",event->GetEventNumberInFile())))//if event was written to file
155 if(eventsInChunk == fManager->fNumberOfEventsInFile)//if max events number in file was reached
161 if(fManager->fSavingStatus!=STATUS_OK){fManager->fSavingStatus=STATUS_OK;}
163 else if(fManager->fSavingStatus!=STATUS_ERROR){fManager->fSavingStatus=STATUS_ERROR;}
165 // save to event file as well:
166 TFile *eventFile = new TFile(Form("%s/run%d/event%d.root", fManager->fStoragePath.c_str(),event->GetRunNumber(),eventsInChunk),"recreate");
168 if((currentRun = eventFile->mkdir(Form("run%d",event->GetRunNumber()))))
170 cout<<"CLIENT -- creating new directory for this run"<<endl;
175 cout<<"CLIENT -- opening existing directory for this run"<<endl;
176 eventFile->cd(Form("run%d",event->GetRunNumber()));
179 if(0 == event->Write(Form("event%d",event->GetEventNumberInFile())) &&
180 fManager->fSavingStatus!=STATUS_ERROR)
182 fManager->fSavingStatus=STATUS_ERROR;
188 fDatabase->InsertEvent(event->GetRunNumber(),
189 event->GetEventNumberInFile(),
190 (char*)event->GetBeamType(),
191 event->GetMultiplicity()->GetNumberOfTracklets(),
192 Form("%s/run%d/event%d.root",fManager->fStoragePath.c_str(),
193 event->GetRunNumber(),
196 currentEvent.runNumber = event->GetRunNumber();
197 currentEvent.eventNumber = event->GetEventNumberInFile();
198 eventsToUpdate.push_back(currentEvent);
200 delete event;event=0;
204 cout<<"CLIENT -- ERROR -- NO DATA!"<<endl;
205 if(fManager->fReceivingStatus!=STATUS_ERROR){fManager->fReceivingStatus=STATUS_ERROR;}
208 if(event){delete event;}
212 Long64_t AliEventsCollectorThread::GetSizeOfAllChunks()
214 Long64_t totalStorageSize = 0;
216 TSystemDirectory dir(fManager->fStoragePath.c_str(),fManager->fStoragePath.c_str());
217 TList *listOfDirectories = dir.GetListOfFiles();
219 if (!listOfDirectories){
220 cout<<"CLIENT -- Storage directory is empty"<<endl;
223 TIter nextDirectory(listOfDirectories);
224 TSystemFile *runDirectory;
225 string directoryName;
227 while ((runDirectory=(TSystemFile*)nextDirectory()))
229 directoryName=runDirectory->GetName();
230 if (runDirectory->IsDirectory() && directoryName.find("run")==0)
232 TSystemDirectory dirChunks(Form("%s/%s",fManager->fStoragePath.c_str(),directoryName.c_str()),Form("%s/%s",fManager->fStoragePath.c_str(),directoryName.c_str()));
233 TList *listOfChunks = dirChunks.GetListOfFiles();
237 TIter nextChunk(listOfChunks);
239 string chunkFileName;
241 while((chunk=(TSystemFile*)nextChunk()))
243 chunkFileName = chunk->GetName();
244 if(!chunk->IsDirectory() && chunkFileName.find("chunk")==0)
246 TFile *tmpFile = new TFile(Form("%s/%s/%s",fManager->fStoragePath.c_str(),directoryName.c_str(),chunkFileName.c_str()),"read");
249 totalStorageSize+=tmpFile->GetSize();
255 if(chunk){delete chunk;}
257 if(listOfChunks){delete listOfChunks;}
261 if(listOfDirectories){delete listOfDirectories;}
262 if(runDirectory){delete runDirectory;}
264 printf("CLIENT -- Total storage size:%lld\t(%.2f MB)\n",totalStorageSize,(float)totalStorageSize/(1000.*1000.));
266 return totalStorageSize;
269 void AliEventsCollectorThread::CheckCurrentStorageSize()
271 fManager->fCurrentStorageSize=GetSizeOfAllChunks();
273 if(fManager->fCurrentStorageSize > (float)fManager->fStorageOccupationLevel/100. * fManager->fMaximumStorageSize)
275 while(GetSizeOfAllChunks() > (float)fManager->fRemoveEventsPercentage/100. * fManager->fMaximumStorageSize)
277 struct eventStruct oldestEvent = fDatabase->GetOldestEvent();
278 string oldestEventPath = fDatabase->GetFilePath(oldestEvent);
280 //remove oldest event
281 cout<<"CLIENT -- Removing old events:"<<oldestEventPath<<endl;
282 gSystem->Exec(Form("rm -f %s",oldestEventPath.c_str()));
283 fDatabase->RemoveEventsWithPath(oldestEventPath);