]> git.uio.no Git - u/mrichter/AliRoot.git/blame - MONITOR/alistoragemanager/AliStorageClientThread.cxx
1. Fixing memory leaks in alistorage
[u/mrichter/AliRoot.git] / MONITOR / alistoragemanager / AliStorageClientThread.cxx
CommitLineData
5eb34a26 1#include "AliStorageClientThread.h"
2#include "AliMultiplicity.h"
3#include "AliStorageTypes.h"
5eb34a26 4#include "AliStorageEventManager.h"
5
5eb34a26 6#include <sstream>
7#include <signal.h>
8#include <fstream>
9#include <iostream>
10
11#include <TSystemDirectory.h>
12#include <TThread.h>
13#include <TFile.h>
14
15using namespace std;
5eb34a26 16
17bool gClientQuit = false; // signal flag
18void GotSignalClient(int){gClientQuit = true;}
19
20AliStorageClientThread::AliStorageClientThread() :
21fConnectionStatus(STATUS_WAITING),
22fReceivingStatus(STATUS_WAITING),
23fSavingStatus(STATUS_WAITING),
5eb34a26 24fCommunicationThread(0),
25fCurrentFile(0),
26fDatabase(0),
27fCurrentStorageSize(0),
28fMaximumStorageSize(0),
29fStoragePath(""),
30fNumberOfEventsInFile(0),
31fStorageOccupationLevel(0),
32fRemoveEventsPercentage(0)
33{
34 // make sure that when program is closed destructor will be called
35 struct sigaction sa;
36 memset(&sa,0,sizeof(sa));
37 sa.sa_handler = GotSignalClient;
38 sigfillset(&sa.sa_mask);
39 sigaction(SIGINT,&sa,NULL);
40
41 //load storage parameters from file
42 TThread::Lock();
164d3d29 43 ifstream configFile (GetConfigFilePath());
5eb34a26 44 if (configFile.is_open())
45 {
46 string line;
47 int from,to;
48 while(configFile.good())
49 {
50 getline(configFile,line);
51 from = line.find("\"")+1;
52 to = line.find_last_of("\"");
53 if(line.find("STORAGE_PATH=")==0)
54 {
55 fStoragePath=line.substr(from,to-from);
56 }
57 else if(line.find("MAX_SIZE=")==0)
58 {
59 fMaximumStorageSize=atoi(line.substr(from,to-from).c_str());
60 }
61 else if(line.find("MAX_OCCUPATION=")==0)
62 {
63 fStorageOccupationLevel=atoi(line.substr(from,to-from).c_str());
64 }
65 else if(line.find("REMOVE_PERCENT=")==0)
66 {
67 fRemoveEventsPercentage=atoi(line.substr(from,to-from).c_str());
68 }
69 else if(line.find("EVENTS_IN_FILE=")==0)
70 {
71 fNumberOfEventsInFile=atoi(line.substr(from,to-from).c_str());
72 }
5eb34a26 73 }
74 if(configFile.eof())
75 {
76 configFile.clear();
77 }
78 configFile.close();
79 }
80 else
81 {
82 cout<<"CLIENT -- Unable to open config file"<<endl;
83 }
84 //create directory for storage if it doesn't exist
85 gSystem->Exec(Form("mkdir -p %s",fStoragePath.c_str()));
86
87 //create database class
88 fDatabase = new AliStorageDatabase();
89 TThread::UnLock();
90
91 //check current storage size
92 fCurrentStorageSize = GetSizeOfAllChunks();
93
94 //create two-way commynication thread
95 fCommunicationThread = new TThread("fCommunicationThread",
164d3d29 96 Dispatch,(void*)this);
5eb34a26 97 fCommunicationThread->Run();
98}
99
100AliStorageClientThread::~AliStorageClientThread()
101{
102 cout<<"CLIENT -- AliStorageClientThread destructor called";
103 if(fCurrentFile)
104 {
105 fCurrentFile->Close();
106 delete fCurrentFile;
107 }
108 if(fCommunicationThread){delete fCommunicationThread;}
a410aca4 109 if(fDatabase){delete fDatabase;}
5eb34a26 110 cout<<" --- OK"<<endl;
111}
112
164d3d29 113void AliStorageClientThread::CommunicationHandle()
5eb34a26 114{
164d3d29 115 AliStorageEventManager *eventManager = AliStorageEventManager::GetEventManagerInstance();
116 storageSockets socket = CLIENT_COMMUNICATION_REP;
117 eventManager->CreateSocket(socket);
5eb34a26 118
164d3d29 119 struct clientRequestStruct *request;
5eb34a26 120 struct clientRequestStruct *response = new struct clientRequestStruct;
121
122 cout<<"CLIENT -- Communication stated"<<endl;
123
124 while(!gClientQuit)
125 {
164d3d29 126 request = eventManager->GetClientStruct(socket);
5eb34a26 127 switch(request->messageType)
128 {
129 case REQUEST_CONNECTION:
164d3d29 130 eventManager->Send((long)fConnectionStatus,socket);
5eb34a26 131 break;
132 case REQUEST_RECEIVING:
164d3d29 133 eventManager->Send((long)fReceivingStatus,socket);
5eb34a26 134 break;
135 case REQUEST_SAVING:
164d3d29 136 eventManager->Send((long)fSavingStatus,socket);
5eb34a26 137 break;
138 case REQUEST_CURRENT_SIZE:
164d3d29 139 eventManager->Send((long)fCurrentStorageSize,socket);
5eb34a26 140 break;
141 case REQUEST_GET_PARAMS:
164d3d29 142 response->maxStorageSize = fMaximumStorageSize;
143 response->maxOccupation = fStorageOccupationLevel;
144 response->removeEvents = fRemoveEventsPercentage;
145 response->eventsInChunk = fNumberOfEventsInFile;
5eb34a26 146
147 eventManager->Send(response,socket);
148 break;
149 case REQUEST_SET_PARAMS:
164d3d29 150 SetStorageParams(request->maxStorageSize,
151 request->maxOccupation,
152 request->removeEvents,
153 request->eventsInChunk);
5eb34a26 154
164d3d29 155 fMaximumStorageSize = request->maxStorageSize;
156 fStorageOccupationLevel = request->maxOccupation;
157 fRemoveEventsPercentage = request->removeEvents;
158 fNumberOfEventsInFile = request->eventsInChunk;
5eb34a26 159
160 eventManager->Send(true,socket);
161 break;
162 default:break;
163 }
164d3d29 164 delete request;
5eb34a26 165 }
5eb34a26 166}
167
168void AliStorageClientThread::SetStorageParams(int maxStorageSize,int maxOccupation,int removeEvents,int eventsInChunk)
169{
170 cout<<maxStorageSize<<endl<<maxOccupation<<endl<<removeEvents<<endl<<eventsInChunk<<endl;
171
172
173 TThread::Lock();
164d3d29 174 ifstream configFile (GetConfigFilePath());
5eb34a26 175 ofstream tmpFile("tmpFile.bla");
176
177 if (configFile.is_open())
178 {
179 string line;
180 string tmpLine;
181 int from,to;
182 while(configFile.good())
183 {
184 getline(configFile,line);
185 from = line.find("\"")+1;
186 to = line.find_last_of("\"");
187 tmpLine = line;
188 if(line.find("MAX_SIZE=")==0)
189 {
190 tmpLine = Form("MAX_SIZE=\"%d\"",maxStorageSize);
191 }
192 else if(line.find("MAX_OCCUPATION=")==0)
193 {
194 tmpLine = Form("MAX_OCCUPATION=\"%d\"",maxOccupation);
195 }
196 else if(line.find("REMOVE_PERCENT=")==0)
197 {
198 tmpLine = Form("REMOVE_PERCENT=\"%d\"",removeEvents);
199 }
200 else if(line.find("EVENTS_IN_FILE=")==0)
201 {
202 tmpLine = Form("EVENTS_IN_FILE=\"%d\"",eventsInChunk);
203 }
204 tmpLine += "\n";
205 tmpFile << tmpLine;
206 }
207 if(configFile.eof())
208 {
209 configFile.clear();
210 }
211 configFile.close();
212 tmpFile.close();
164d3d29 213 rename("tmpFile.bla",GetConfigFilePath());
5eb34a26 214 }
215 else
216 {
217 cout<<"CLIENT -- Unable to open config file"<<endl;
218 }
219 TThread::UnLock();
220}
221
222Long64_t AliStorageClientThread::GetSizeOfAllChunks()
223{
224 Long64_t totalStorageSize = 0;
225
226 TSystemDirectory dir(fStoragePath.c_str(),fStoragePath.c_str());
227 TList *listOfDirectories = dir.GetListOfFiles();
228
229 if (!listOfDirectories)
230 {
231 cout<<"CLIENT -- Storage directory is empty"<<endl;
232 return 0;
233 }
234 TIter nextDirectory(listOfDirectories);
235 TSystemFile *runDirectory;
236 string directoryName;
237
238 while ((runDirectory=(TSystemFile*)nextDirectory()))
239 {
240 directoryName=runDirectory->GetName();
241 if (runDirectory->IsDirectory() && directoryName.find("run")==0)
242 {
243 TSystemDirectory dirChunks(Form("%s/%s",fStoragePath.c_str(),directoryName.c_str()),Form("%s/%s",fStoragePath.c_str(),directoryName.c_str()));
244 TList *listOfChunks = dirChunks.GetListOfFiles();
245
246 if(listOfChunks)
247 {
248 TIter nextChunk(listOfChunks);
249 TSystemFile *chunk;
250 string chunkFileName;
251
252 while((chunk=(TSystemFile*)nextChunk()))
253 {
254 chunkFileName = chunk->GetName();
255 if(!chunk->IsDirectory() && chunkFileName.find("chunk")==0)
256 {
257 TFile *tmpFile = new TFile(Form("%s/%s/%s",fStoragePath.c_str(),directoryName.c_str(),chunkFileName.c_str()),"read");
164d3d29 258 if(tmpFile)
259 {
260 totalStorageSize+=tmpFile->GetSize();
261 tmpFile->Close();
262 delete tmpFile;
263 }
5eb34a26 264 }
265 }
266 if(chunk){delete chunk;}
267 }
268 if(listOfChunks){delete listOfChunks;}
269 }
270 }
271
272 //tmpFiles.clear();
273 if(listOfDirectories){delete listOfDirectories;}
274 if(runDirectory){delete runDirectory;}
275
276 printf("CLIENT -- Total storage size:%lld\t(%.2f MB)\n",totalStorageSize,(float)totalStorageSize/(1000.*1000.));
277
278 return totalStorageSize;
279}
280
281void AliStorageClientThread::CollectData()
282{
164d3d29 283 AliStorageEventManager *eventManager = AliStorageEventManager::GetEventManagerInstance();
284 if(eventManager->CreateSocket(EVENTS_SERVER_SUB))
5eb34a26 285 {
164d3d29 286 fConnectionStatus=STATUS_OK;
5eb34a26 287 }
164d3d29 288 else
5eb34a26 289 {
164d3d29 290 fConnectionStatus=STATUS_ERROR;
5eb34a26 291 }
5eb34a26 292
293 int chunkNumber=0;
294 int previousChunkNumber=-1;
295 int eventsInChunk=0;
296 int previousRunNumber=-1;
297 AliESDEvent *event = NULL;
164d3d29 298// TTree *tree = NULL;
299
5eb34a26 300 while(!gClientQuit)
301 {
164d3d29 302 event = eventManager->GetEvent(EVENTS_SERVER_SUB);
5eb34a26 303
304 if(event)
305 {
306 fReceivingStatus=STATUS_OK;
307
308 if(event->GetRunNumber() != previousRunNumber)//when new run starts
309 {
310 cout<<"CLIENT -- new run started"<<endl;
311 previousRunNumber = event->GetRunNumber();
312 gSystem->Exec(Form("mkdir -p %s/run%d",fStoragePath.c_str(),event->GetRunNumber()));
313 chunkNumber=0;
314 eventsInChunk=0;
315
316 TSystemDirectory dir(Form("%s/run%d",fStoragePath.c_str(),event->GetRunNumber()),
317 Form("%s/run%d",fStoragePath.c_str(),event->GetRunNumber()));
318 TList *files = dir.GetListOfFiles();
319 if (files)
320 {
321 TSystemFile *file;
322 string fname;
323 TIter next(files);
324
325 while ((file=(TSystemFile*)next()))
326 {
327 fname = file->GetName();
328
329 if (!file->IsDirectory())
330 {
331 int from = fname.find("chunk")+5;
332 int to = fname.find(".root");
333
334 int maxChunkNumber = atoi(fname.substr(from,to-from).c_str());
335
336 if(maxChunkNumber > chunkNumber)
337 {
338 chunkNumber = maxChunkNumber;
339 }
340 }
341 }
342 chunkNumber++;
343 }
344 }
345
346 cout<<"CLIENT -- Received data. Event:"<<event->GetEventNumberInFile()<<"\trun:"<<event->GetRunNumber()<<endl;
347
348 if(chunkNumber != previousChunkNumber)//when new chunk needs to be created
349 {
350 if(fCurrentFile)
351 {
352 fCurrentFile->Close();
353 delete fCurrentFile;
354 fCurrentFile=0;
355 }
356 fCurrentStorageSize=GetSizeOfAllChunks();
357 CheckCurrentStorageSize();
358
359 fCurrentFile = new TFile(Form("%s/run%d/chunk%d.root", fStoragePath.c_str(),event->GetRunNumber(),chunkNumber),"recreate");
360
361 previousChunkNumber = chunkNumber;
362 }
363
364 if(0 != fCurrentFile->WriteObject(event,Form("event%d",event->GetEventNumberInFile())))//if event was written to file
365 {
366 fDatabase->InsertEvent(event->GetRunNumber(),
367 event->GetEventNumberInFile(),
368 (char*)event->GetBeamType(),
369 event->GetMultiplicity()->GetNumberOfTracklets(),Form("%s/run%d/chunk%d.root",fStoragePath.c_str(),event->GetRunNumber(),chunkNumber));
370
371 eventsInChunk++;
372
373 if(eventsInChunk == fNumberOfEventsInFile)//if max events number in file was reached
374 {
375 chunkNumber++;
376 eventsInChunk=0;
377 }
378
379 if(fSavingStatus!=STATUS_OK)
380 {
381 fSavingStatus=STATUS_OK;
382 }
383 }
384 else if(fSavingStatus!=STATUS_ERROR)
385 {
386 fSavingStatus=STATUS_ERROR;
387 }
388 delete event;event=0;
164d3d29 389 //delete tree;
5eb34a26 390 }
391 else if(fReceivingStatus!=STATUS_ERROR)
392 {
393 cout<<"CLIENT -- ERROR -- NO DATA!"<<endl;
394 fReceivingStatus=STATUS_ERROR;
395 }
396 }
397 if(event){delete event;}
398}
399
400
401void AliStorageClientThread::CheckCurrentStorageSize()
402{
403 if(fCurrentStorageSize > (float)fStorageOccupationLevel/100. * fMaximumStorageSize)
404 {
405 while(GetSizeOfAllChunks() > (float)fRemoveEventsPercentage/100. * fMaximumStorageSize)
406 {
407 struct eventStruct oldestEvent = fDatabase->GetOldestEvent();
408 string oldestEventPath = fDatabase->GetFilePath(oldestEvent);
409 //remove oldest event
410 cout<<"CLIENT -- Removing old events:"<<oldestEventPath<<endl;
411 gSystem->Exec(Form("rm -f %s",oldestEventPath.c_str()));
412 fDatabase->RemoveEvent(fDatabase->GetOldestEvent());
413 }
414 }
415}