]>
Commit | Line | Data |
---|---|---|
5eb34a26 | 1 | #include "AliStorageClientThread.h" |
2 | #include "AliMultiplicity.h" | |
3 | #include "AliStorageTypes.h" | |
5eb34a26 | 4 | #include "AliStorageEventManager.h" |
5 | ||
5eb34a26 | 6 | #include <sstream> |
7 | #include <signal.h> | |
8 | #include <fstream> | |
9 | #include <iostream> | |
10 | ||
11 | #include <TSystemDirectory.h> | |
12 | #include <TThread.h> | |
13 | #include <TFile.h> | |
14 | ||
15 | using namespace std; | |
5eb34a26 | 16 | |
17 | bool gClientQuit = false; // signal flag | |
18 | void GotSignalClient(int){gClientQuit = true;} | |
19 | ||
20 | AliStorageClientThread::AliStorageClientThread() : | |
21 | fConnectionStatus(STATUS_WAITING), | |
22 | fReceivingStatus(STATUS_WAITING), | |
23 | fSavingStatus(STATUS_WAITING), | |
5eb34a26 | 24 | fCommunicationThread(0), |
25 | fCurrentFile(0), | |
26 | fDatabase(0), | |
27 | fCurrentStorageSize(0), | |
28 | fMaximumStorageSize(0), | |
29 | fStoragePath(""), | |
30 | fNumberOfEventsInFile(0), | |
31 | fStorageOccupationLevel(0), | |
32 | fRemoveEventsPercentage(0) | |
33 | { | |
34 | // make sure that when program is closed destructor will be called | |
35 | struct sigaction sa; | |
36 | memset(&sa,0,sizeof(sa)); | |
37 | sa.sa_handler = GotSignalClient; | |
38 | sigfillset(&sa.sa_mask); | |
39 | sigaction(SIGINT,&sa,NULL); | |
40 | ||
41 | //load storage parameters from file | |
42 | TThread::Lock(); | |
164d3d29 | 43 | ifstream configFile (GetConfigFilePath()); |
5eb34a26 | 44 | if (configFile.is_open()) |
45 | { | |
46 | string line; | |
47 | int from,to; | |
48 | while(configFile.good()) | |
49 | { | |
50 | getline(configFile,line); | |
51 | from = line.find("\"")+1; | |
52 | to = line.find_last_of("\""); | |
53 | if(line.find("STORAGE_PATH=")==0) | |
54 | { | |
55 | fStoragePath=line.substr(from,to-from); | |
56 | } | |
57 | else if(line.find("MAX_SIZE=")==0) | |
58 | { | |
59 | fMaximumStorageSize=atoi(line.substr(from,to-from).c_str()); | |
60 | } | |
61 | else if(line.find("MAX_OCCUPATION=")==0) | |
62 | { | |
63 | fStorageOccupationLevel=atoi(line.substr(from,to-from).c_str()); | |
64 | } | |
65 | else if(line.find("REMOVE_PERCENT=")==0) | |
66 | { | |
67 | fRemoveEventsPercentage=atoi(line.substr(from,to-from).c_str()); | |
68 | } | |
69 | else if(line.find("EVENTS_IN_FILE=")==0) | |
70 | { | |
71 | fNumberOfEventsInFile=atoi(line.substr(from,to-from).c_str()); | |
72 | } | |
5eb34a26 | 73 | } |
74 | if(configFile.eof()) | |
75 | { | |
76 | configFile.clear(); | |
77 | } | |
78 | configFile.close(); | |
79 | } | |
80 | else | |
81 | { | |
82 | cout<<"CLIENT -- Unable to open config file"<<endl; | |
83 | } | |
84 | //create directory for storage if it doesn't exist | |
85 | gSystem->Exec(Form("mkdir -p %s",fStoragePath.c_str())); | |
86 | ||
87 | //create database class | |
88 | fDatabase = new AliStorageDatabase(); | |
89 | TThread::UnLock(); | |
90 | ||
91 | //check current storage size | |
92 | fCurrentStorageSize = GetSizeOfAllChunks(); | |
93 | ||
94 | //create two-way commynication thread | |
95 | fCommunicationThread = new TThread("fCommunicationThread", | |
164d3d29 | 96 | Dispatch,(void*)this); |
5eb34a26 | 97 | fCommunicationThread->Run(); |
98 | } | |
99 | ||
100 | AliStorageClientThread::~AliStorageClientThread() | |
101 | { | |
102 | cout<<"CLIENT -- AliStorageClientThread destructor called"; | |
103 | if(fCurrentFile) | |
104 | { | |
105 | fCurrentFile->Close(); | |
106 | delete fCurrentFile; | |
107 | } | |
108 | if(fCommunicationThread){delete fCommunicationThread;} | |
a410aca4 | 109 | if(fDatabase){delete fDatabase;} |
5eb34a26 | 110 | cout<<" --- OK"<<endl; |
111 | } | |
112 | ||
164d3d29 | 113 | void AliStorageClientThread::CommunicationHandle() |
5eb34a26 | 114 | { |
164d3d29 | 115 | AliStorageEventManager *eventManager = AliStorageEventManager::GetEventManagerInstance(); |
116 | storageSockets socket = CLIENT_COMMUNICATION_REP; | |
117 | eventManager->CreateSocket(socket); | |
5eb34a26 | 118 | |
164d3d29 | 119 | struct clientRequestStruct *request; |
5eb34a26 | 120 | struct clientRequestStruct *response = new struct clientRequestStruct; |
121 | ||
122 | cout<<"CLIENT -- Communication stated"<<endl; | |
123 | ||
124 | while(!gClientQuit) | |
125 | { | |
164d3d29 | 126 | request = eventManager->GetClientStruct(socket); |
5eb34a26 | 127 | switch(request->messageType) |
128 | { | |
129 | case REQUEST_CONNECTION: | |
164d3d29 | 130 | eventManager->Send((long)fConnectionStatus,socket); |
5eb34a26 | 131 | break; |
132 | case REQUEST_RECEIVING: | |
164d3d29 | 133 | eventManager->Send((long)fReceivingStatus,socket); |
5eb34a26 | 134 | break; |
135 | case REQUEST_SAVING: | |
164d3d29 | 136 | eventManager->Send((long)fSavingStatus,socket); |
5eb34a26 | 137 | break; |
138 | case REQUEST_CURRENT_SIZE: | |
164d3d29 | 139 | eventManager->Send((long)fCurrentStorageSize,socket); |
5eb34a26 | 140 | break; |
141 | case REQUEST_GET_PARAMS: | |
164d3d29 | 142 | response->maxStorageSize = fMaximumStorageSize; |
143 | response->maxOccupation = fStorageOccupationLevel; | |
144 | response->removeEvents = fRemoveEventsPercentage; | |
145 | response->eventsInChunk = fNumberOfEventsInFile; | |
5eb34a26 | 146 | |
147 | eventManager->Send(response,socket); | |
148 | break; | |
149 | case REQUEST_SET_PARAMS: | |
164d3d29 | 150 | SetStorageParams(request->maxStorageSize, |
151 | request->maxOccupation, | |
152 | request->removeEvents, | |
153 | request->eventsInChunk); | |
5eb34a26 | 154 | |
164d3d29 | 155 | fMaximumStorageSize = request->maxStorageSize; |
156 | fStorageOccupationLevel = request->maxOccupation; | |
157 | fRemoveEventsPercentage = request->removeEvents; | |
158 | fNumberOfEventsInFile = request->eventsInChunk; | |
5eb34a26 | 159 | |
160 | eventManager->Send(true,socket); | |
161 | break; | |
162 | default:break; | |
163 | } | |
164d3d29 | 164 | delete request; |
5eb34a26 | 165 | } |
5eb34a26 | 166 | } |
167 | ||
168 | void AliStorageClientThread::SetStorageParams(int maxStorageSize,int maxOccupation,int removeEvents,int eventsInChunk) | |
169 | { | |
170 | cout<<maxStorageSize<<endl<<maxOccupation<<endl<<removeEvents<<endl<<eventsInChunk<<endl; | |
171 | ||
172 | ||
173 | TThread::Lock(); | |
164d3d29 | 174 | ifstream configFile (GetConfigFilePath()); |
5eb34a26 | 175 | ofstream tmpFile("tmpFile.bla"); |
176 | ||
177 | if (configFile.is_open()) | |
178 | { | |
179 | string line; | |
180 | string tmpLine; | |
181 | int from,to; | |
182 | while(configFile.good()) | |
183 | { | |
184 | getline(configFile,line); | |
185 | from = line.find("\"")+1; | |
186 | to = line.find_last_of("\""); | |
187 | tmpLine = line; | |
188 | if(line.find("MAX_SIZE=")==0) | |
189 | { | |
190 | tmpLine = Form("MAX_SIZE=\"%d\"",maxStorageSize); | |
191 | } | |
192 | else if(line.find("MAX_OCCUPATION=")==0) | |
193 | { | |
194 | tmpLine = Form("MAX_OCCUPATION=\"%d\"",maxOccupation); | |
195 | } | |
196 | else if(line.find("REMOVE_PERCENT=")==0) | |
197 | { | |
198 | tmpLine = Form("REMOVE_PERCENT=\"%d\"",removeEvents); | |
199 | } | |
200 | else if(line.find("EVENTS_IN_FILE=")==0) | |
201 | { | |
202 | tmpLine = Form("EVENTS_IN_FILE=\"%d\"",eventsInChunk); | |
203 | } | |
204 | tmpLine += "\n"; | |
205 | tmpFile << tmpLine; | |
206 | } | |
207 | if(configFile.eof()) | |
208 | { | |
209 | configFile.clear(); | |
210 | } | |
211 | configFile.close(); | |
212 | tmpFile.close(); | |
164d3d29 | 213 | rename("tmpFile.bla",GetConfigFilePath()); |
5eb34a26 | 214 | } |
215 | else | |
216 | { | |
217 | cout<<"CLIENT -- Unable to open config file"<<endl; | |
218 | } | |
219 | TThread::UnLock(); | |
220 | } | |
221 | ||
222 | Long64_t AliStorageClientThread::GetSizeOfAllChunks() | |
223 | { | |
224 | Long64_t totalStorageSize = 0; | |
225 | ||
226 | TSystemDirectory dir(fStoragePath.c_str(),fStoragePath.c_str()); | |
227 | TList *listOfDirectories = dir.GetListOfFiles(); | |
228 | ||
229 | if (!listOfDirectories) | |
230 | { | |
231 | cout<<"CLIENT -- Storage directory is empty"<<endl; | |
232 | return 0; | |
233 | } | |
234 | TIter nextDirectory(listOfDirectories); | |
235 | TSystemFile *runDirectory; | |
236 | string directoryName; | |
237 | ||
238 | while ((runDirectory=(TSystemFile*)nextDirectory())) | |
239 | { | |
240 | directoryName=runDirectory->GetName(); | |
241 | if (runDirectory->IsDirectory() && directoryName.find("run")==0) | |
242 | { | |
243 | TSystemDirectory dirChunks(Form("%s/%s",fStoragePath.c_str(),directoryName.c_str()),Form("%s/%s",fStoragePath.c_str(),directoryName.c_str())); | |
244 | TList *listOfChunks = dirChunks.GetListOfFiles(); | |
245 | ||
246 | if(listOfChunks) | |
247 | { | |
248 | TIter nextChunk(listOfChunks); | |
249 | TSystemFile *chunk; | |
250 | string chunkFileName; | |
251 | ||
252 | while((chunk=(TSystemFile*)nextChunk())) | |
253 | { | |
254 | chunkFileName = chunk->GetName(); | |
255 | if(!chunk->IsDirectory() && chunkFileName.find("chunk")==0) | |
256 | { | |
257 | TFile *tmpFile = new TFile(Form("%s/%s/%s",fStoragePath.c_str(),directoryName.c_str(),chunkFileName.c_str()),"read"); | |
164d3d29 | 258 | if(tmpFile) |
259 | { | |
260 | totalStorageSize+=tmpFile->GetSize(); | |
261 | tmpFile->Close(); | |
262 | delete tmpFile; | |
263 | } | |
5eb34a26 | 264 | } |
265 | } | |
266 | if(chunk){delete chunk;} | |
267 | } | |
268 | if(listOfChunks){delete listOfChunks;} | |
269 | } | |
270 | } | |
271 | ||
272 | //tmpFiles.clear(); | |
273 | if(listOfDirectories){delete listOfDirectories;} | |
274 | if(runDirectory){delete runDirectory;} | |
275 | ||
276 | printf("CLIENT -- Total storage size:%lld\t(%.2f MB)\n",totalStorageSize,(float)totalStorageSize/(1000.*1000.)); | |
277 | ||
278 | return totalStorageSize; | |
279 | } | |
280 | ||
281 | void AliStorageClientThread::CollectData() | |
282 | { | |
164d3d29 | 283 | AliStorageEventManager *eventManager = AliStorageEventManager::GetEventManagerInstance(); |
6ce16778 | 284 | if(eventManager->CreateSocket(EVENTS_SERVER_SUB)){fConnectionStatus=STATUS_OK;} |
285 | else{fConnectionStatus=STATUS_ERROR;} | |
5eb34a26 | 286 | |
287 | int chunkNumber=0; | |
288 | int previousChunkNumber=-1; | |
289 | int eventsInChunk=0; | |
290 | int previousRunNumber=-1; | |
291 | AliESDEvent *event = NULL; | |
6ce16778 | 292 | vector<struct eventStruct> eventsToUpdate; |
293 | struct eventStruct currentEvent; | |
294 | ||
295 | ||
5eb34a26 | 296 | while(!gClientQuit) |
297 | { | |
164d3d29 | 298 | event = eventManager->GetEvent(EVENTS_SERVER_SUB); |
5eb34a26 | 299 | |
300 | if(event) | |
301 | { | |
302 | fReceivingStatus=STATUS_OK; | |
303 | ||
304 | if(event->GetRunNumber() != previousRunNumber)//when new run starts | |
305 | { | |
306 | cout<<"CLIENT -- new run started"<<endl; | |
307 | previousRunNumber = event->GetRunNumber(); | |
308 | gSystem->Exec(Form("mkdir -p %s/run%d",fStoragePath.c_str(),event->GetRunNumber())); | |
309 | chunkNumber=0; | |
310 | eventsInChunk=0; | |
311 | ||
312 | TSystemDirectory dir(Form("%s/run%d",fStoragePath.c_str(),event->GetRunNumber()), | |
313 | Form("%s/run%d",fStoragePath.c_str(),event->GetRunNumber())); | |
314 | TList *files = dir.GetListOfFiles(); | |
315 | if (files) | |
316 | { | |
317 | TSystemFile *file; | |
318 | string fname; | |
319 | TIter next(files); | |
320 | ||
321 | while ((file=(TSystemFile*)next())) | |
322 | { | |
323 | fname = file->GetName(); | |
324 | ||
325 | if (!file->IsDirectory()) | |
326 | { | |
327 | int from = fname.find("chunk")+5; | |
328 | int to = fname.find(".root"); | |
329 | ||
330 | int maxChunkNumber = atoi(fname.substr(from,to-from).c_str()); | |
331 | ||
332 | if(maxChunkNumber > chunkNumber) | |
333 | { | |
334 | chunkNumber = maxChunkNumber; | |
335 | } | |
336 | } | |
337 | } | |
338 | chunkNumber++; | |
339 | } | |
340 | } | |
341 | ||
342 | cout<<"CLIENT -- Received data. Event:"<<event->GetEventNumberInFile()<<"\trun:"<<event->GetRunNumber()<<endl; | |
343 | ||
344 | if(chunkNumber != previousChunkNumber)//when new chunk needs to be created | |
345 | { | |
346 | if(fCurrentFile) | |
347 | { | |
348 | fCurrentFile->Close(); | |
349 | delete fCurrentFile; | |
350 | fCurrentFile=0; | |
351 | } | |
6ce16778 | 352 | for(unsigned int i=0;i<eventsToUpdate.size();i++) |
353 | { | |
354 | fDatabase->UpdateEventPath(eventsToUpdate[i], | |
355 | Form("%s/run%d/chunk%d.root", | |
356 | fStoragePath.c_str(), | |
357 | event->GetRunNumber(), | |
358 | chunkNumber-1)); | |
359 | } | |
360 | eventsToUpdate.clear(); | |
361 | ||
362 | ||
5eb34a26 | 363 | fCurrentStorageSize=GetSizeOfAllChunks(); |
364 | CheckCurrentStorageSize(); | |
365 | ||
366 | fCurrentFile = new TFile(Form("%s/run%d/chunk%d.root", fStoragePath.c_str(),event->GetRunNumber(),chunkNumber),"recreate"); | |
367 | ||
368 | previousChunkNumber = chunkNumber; | |
369 | } | |
6ce16778 | 370 | |
371 | //create new directory for this run | |
372 | TDirectory *currentRun; | |
373 | if((currentRun = fCurrentFile->mkdir(Form("run%d",event->GetRunNumber())))) | |
374 | { | |
375 | cout<<"CLIENT -- creating new directory for this run"<<endl; | |
376 | currentRun->cd(); | |
377 | } | |
378 | else | |
379 | { | |
380 | cout<<"CLIENT -- opening existing directory for this run"<<endl; | |
381 | fCurrentFile->cd(Form("run%d",event->GetRunNumber())); | |
382 | } | |
383 | ||
384 | if(0 != event->Write(Form("event%d",event->GetEventNumberInFile()))) | |
385 | //fCurrentFile->WriteObject(event,Form("event%d",event->GetEventNumberInFile())))//if event was written to file | |
5eb34a26 | 386 | { |
5eb34a26 | 387 | eventsInChunk++; |
388 | ||
389 | if(eventsInChunk == fNumberOfEventsInFile)//if max events number in file was reached | |
390 | { | |
391 | chunkNumber++; | |
392 | eventsInChunk=0; | |
393 | } | |
394 | ||
395 | if(fSavingStatus!=STATUS_OK) | |
396 | { | |
397 | fSavingStatus=STATUS_OK; | |
398 | } | |
399 | } | |
400 | else if(fSavingStatus!=STATUS_ERROR) | |
401 | { | |
402 | fSavingStatus=STATUS_ERROR; | |
403 | } | |
6ce16778 | 404 | |
405 | // save to event file as well: | |
406 | ||
407 | TFile *eventFile = new TFile(Form("%s/run%d/event%d.root", fStoragePath.c_str(),event->GetRunNumber(),eventsInChunk),"recreate"); | |
408 | ||
409 | if((currentRun = eventFile->mkdir(Form("run%d",event->GetRunNumber())))) | |
410 | { | |
411 | cout<<"CLIENT -- creating new directory for this run"<<endl; | |
412 | currentRun->cd(); | |
413 | } | |
414 | else | |
415 | { | |
416 | cout<<"CLIENT -- opening existing directory for this run"<<endl; | |
417 | eventFile->cd(Form("run%d",event->GetRunNumber())); | |
418 | } | |
419 | ||
420 | if(0 == event->Write(Form("event%d",event->GetEventNumberInFile())) && | |
421 | fSavingStatus!=STATUS_ERROR){fSavingStatus=STATUS_ERROR;} | |
422 | else | |
423 | { | |
424 | eventFile->Close(); | |
425 | delete eventFile; | |
426 | fDatabase->InsertEvent(event->GetRunNumber(), | |
427 | event->GetEventNumberInFile(), | |
428 | (char*)event->GetBeamType(), | |
429 | event->GetMultiplicity()->GetNumberOfTracklets(), | |
430 | Form("%s/run%d/event%d.root",fStoragePath.c_str(), | |
431 | event->GetRunNumber(), | |
432 | eventsInChunk)); | |
433 | ||
434 | currentEvent.runNumber = event->GetRunNumber(); | |
435 | currentEvent.eventNumber = event->GetEventNumberInFile(); | |
436 | eventsToUpdate.push_back(currentEvent); | |
437 | } | |
5eb34a26 | 438 | delete event;event=0; |
164d3d29 | 439 | //delete tree; |
5eb34a26 | 440 | } |
441 | else if(fReceivingStatus!=STATUS_ERROR) | |
442 | { | |
443 | cout<<"CLIENT -- ERROR -- NO DATA!"<<endl; | |
444 | fReceivingStatus=STATUS_ERROR; | |
445 | } | |
446 | } | |
447 | if(event){delete event;} | |
448 | } | |
449 | ||
450 | ||
451 | void AliStorageClientThread::CheckCurrentStorageSize() | |
452 | { | |
453 | if(fCurrentStorageSize > (float)fStorageOccupationLevel/100. * fMaximumStorageSize) | |
454 | { | |
455 | while(GetSizeOfAllChunks() > (float)fRemoveEventsPercentage/100. * fMaximumStorageSize) | |
456 | { | |
457 | struct eventStruct oldestEvent = fDatabase->GetOldestEvent(); | |
458 | string oldestEventPath = fDatabase->GetFilePath(oldestEvent); | |
459 | //remove oldest event | |
460 | cout<<"CLIENT -- Removing old events:"<<oldestEventPath<<endl; | |
461 | gSystem->Exec(Form("rm -f %s",oldestEventPath.c_str())); | |
fae81379 | 462 | fDatabase->RemoveEventsWithPath(oldestEventPath); |
463 | // fDatabase->RemoveEvent(oldestEvent); | |
5eb34a26 | 464 | } |
465 | } | |
466 | } |