1 #include "AliStorageClientThread.h"
2 #include "AliMultiplicity.h"
3 #include "AliStorageTypes.h"
5 #include "AliStorageEventManager.h"
7 //#include "zhelpers.hpp"
15 #include <TSystemDirectory.h>
22 bool gClientQuit = false; // signal flag
23 void GotSignalClient(int){gClientQuit = true;}
25 AliStorageClientThread::AliStorageClientThread() :
26 fConnectionStatus(STATUS_WAITING),
27 fReceivingStatus(STATUS_WAITING),
28 fSavingStatus(STATUS_WAITING),
30 fCommunicationThread(0),
33 fCurrentStorageSize(0),
34 fMaximumStorageSize(0),
36 fNumberOfEventsInFile(0),
37 fStorageOccupationLevel(0),
38 fRemoveEventsPercentage(0)
40 // make sure that when program is closed destructor will be called
42 memset(&sa,0,sizeof(sa));
43 sa.sa_handler = GotSignalClient;
44 sigfillset(&sa.sa_mask);
45 sigaction(SIGINT,&sa,NULL);
47 //load storage parameters from file
49 ifstream configFile (Form("%s/STORAGE/setupStorageDatabase.sh",
50 gSystem->Getenv("ALICE_ROOT")));
51 if (configFile.is_open())
55 while(configFile.good())
57 getline(configFile,line);
58 from = line.find("\"")+1;
59 to = line.find_last_of("\"");
60 if(line.find("STORAGE_PATH=")==0)
62 fStoragePath=line.substr(from,to-from);
64 else if(line.find("MAX_SIZE=")==0)
66 fMaximumStorageSize=atoi(line.substr(from,to-from).c_str());
68 else if(line.find("MAX_OCCUPATION=")==0)
70 fStorageOccupationLevel=atoi(line.substr(from,to-from).c_str());
72 else if(line.find("REMOVE_PERCENT=")==0)
74 fRemoveEventsPercentage=atoi(line.substr(from,to-from).c_str());
76 else if(line.find("EVENTS_IN_FILE=")==0)
78 fNumberOfEventsInFile=atoi(line.substr(from,to-from).c_str());
80 else if(line.find("EVENT_SERVER=")==0)
82 fEventServer=line.substr(from,to-from);
93 cout<<"CLIENT -- Unable to open config file"<<endl;
95 //create directory for storage if it doesn't exist
96 gSystem->Exec(Form("mkdir -p %s",fStoragePath.c_str()));
98 //create database class
99 fDatabase = new AliStorageDatabase();
102 //check current storage size
103 fCurrentStorageSize = GetSizeOfAllChunks();
105 //create two-way commynication thread
106 fCommunicationThread = new TThread("fCommunicationThread",
107 AliStorageClientThread::CommunicationHandler,(void*)this);
108 fCommunicationThread->Run();
111 AliStorageClientThread::~AliStorageClientThread()
113 cout<<"CLIENT -- AliStorageClientThread destructor called";
116 fCurrentFile->Close();
119 if(fCommunicationThread){delete fCommunicationThread;}
120 cout<<" --- OK"<<endl;
123 void* AliStorageClientThread::CommunicationHandler(void *arg)
125 AliStorageClientThread *clientInstance = static_cast<AliStorageClientThread*>(arg);
127 //create socket for two-way communication
128 context_t *context = new context_t();
129 socket_t *socket = new socket_t(*context,ZMQ_REP);
130 socket->bind(Form("tcp://*:%d",gClientCommunicationPort));
132 AliStorageEventManager *eventManager = new AliStorageEventManager();
134 message_t *requestMessage = new message_t();
135 struct clientRequestStruct *request = new struct clientRequestStruct;
136 struct clientRequestStruct *response = new struct clientRequestStruct;
138 cout<<"CLIENT -- Communication stated"<<endl;
142 socket->recv(requestMessage);
143 request = static_cast<struct clientRequestStruct*>(requestMessage->data());
144 switch(request->messageType)
146 case REQUEST_CONNECTION:
147 eventManager->Send((long)clientInstance->fConnectionStatus,socket);
149 case REQUEST_RECEIVING:
150 eventManager->Send((long)clientInstance->fReceivingStatus,socket);
153 eventManager->Send((long)clientInstance->fSavingStatus,socket);
155 case REQUEST_CURRENT_SIZE:
156 eventManager->Send((long)clientInstance->fCurrentStorageSize,socket);
158 case REQUEST_GET_PARAMS:
159 response->maxStorageSize = clientInstance->fMaximumStorageSize;
160 response->maxOccupation = clientInstance->fStorageOccupationLevel;
161 response->removeEvents = clientInstance->fRemoveEventsPercentage;
162 response->eventsInChunk = clientInstance->fNumberOfEventsInFile;
164 eventManager->Send(response,socket);
166 case REQUEST_SET_PARAMS:
167 clientInstance->SetStorageParams(request->maxStorageSize,
168 request->maxOccupation,
169 request->removeEvents,
170 request->eventsInChunk);
172 clientInstance->fMaximumStorageSize = request->maxStorageSize;
173 clientInstance->fStorageOccupationLevel = request->maxOccupation;
174 clientInstance->fRemoveEventsPercentage = request->removeEvents;
175 clientInstance->fNumberOfEventsInFile = request->eventsInChunk;
177 eventManager->Send(true,socket);
182 if(context){delete context;}
183 if(socket){delete socket;}
184 if(eventManager){delete eventManager;}
188 void AliStorageClientThread::SetStorageParams(int maxStorageSize,int maxOccupation,int removeEvents,int eventsInChunk)
190 cout<<maxStorageSize<<endl<<maxOccupation<<endl<<removeEvents<<endl<<eventsInChunk<<endl;
194 ifstream configFile (Form("%s/STORAGE/setupStorageDatabase.sh",
195 gSystem->Getenv("ALICE_ROOT")));
196 ofstream tmpFile("tmpFile.bla");
198 if (configFile.is_open())
203 while(configFile.good())
205 getline(configFile,line);
206 from = line.find("\"")+1;
207 to = line.find_last_of("\"");
209 if(line.find("MAX_SIZE=")==0)
211 tmpLine = Form("MAX_SIZE=\"%d\"",maxStorageSize);
213 else if(line.find("MAX_OCCUPATION=")==0)
215 tmpLine = Form("MAX_OCCUPATION=\"%d\"",maxOccupation);
217 else if(line.find("REMOVE_PERCENT=")==0)
219 tmpLine = Form("REMOVE_PERCENT=\"%d\"",removeEvents);
221 else if(line.find("EVENTS_IN_FILE=")==0)
223 tmpLine = Form("EVENTS_IN_FILE=\"%d\"",eventsInChunk);
234 rename("tmpFile.bla",Form("%s/STORAGE/setupStorageDatabase.sh",
235 gSystem->Getenv("ALICE_ROOT")));
239 cout<<"CLIENT -- Unable to open config file"<<endl;
244 Long64_t AliStorageClientThread::GetSizeOfAllChunks()
246 Long64_t totalStorageSize = 0;
248 TSystemDirectory dir(fStoragePath.c_str(),fStoragePath.c_str());
249 TList *listOfDirectories = dir.GetListOfFiles();
251 if (!listOfDirectories)
253 cout<<"CLIENT -- Storage directory is empty"<<endl;
256 TIter nextDirectory(listOfDirectories);
257 TSystemFile *runDirectory;
258 string directoryName;
260 while ((runDirectory=(TSystemFile*)nextDirectory()))
262 directoryName=runDirectory->GetName();
263 if (runDirectory->IsDirectory() && directoryName.find("run")==0)
265 TSystemDirectory dirChunks(Form("%s/%s",fStoragePath.c_str(),directoryName.c_str()),Form("%s/%s",fStoragePath.c_str(),directoryName.c_str()));
266 TList *listOfChunks = dirChunks.GetListOfFiles();
270 TIter nextChunk(listOfChunks);
272 string chunkFileName;
274 while((chunk=(TSystemFile*)nextChunk()))
276 chunkFileName = chunk->GetName();
277 if(!chunk->IsDirectory() && chunkFileName.find("chunk")==0)
279 TFile *tmpFile = new TFile(Form("%s/%s/%s",fStoragePath.c_str(),directoryName.c_str(),chunkFileName.c_str()),"read");
281 totalStorageSize+=tmpFile->GetSize();
283 if(tmpFile){delete tmpFile;}
286 if(chunk){delete chunk;}
288 if(listOfChunks){delete listOfChunks;}
293 if(listOfDirectories){delete listOfDirectories;}
294 if(runDirectory){delete runDirectory;}
296 printf("CLIENT -- Total storage size:%lld\t(%.2f MB)\n",totalStorageSize,(float)totalStorageSize/(1000.*1000.));
298 return totalStorageSize;
301 void AliStorageClientThread::CollectData()
303 //connect with events' sockets
304 context_t *context = new context_t(1);
305 socket_t *socket = new socket_t(*context,ZMQ_SUB);
306 socket->setsockopt(ZMQ_SUBSCRIBE,"",0);
307 socket->connect(Form("tcp://%s:%d",fEventServer.c_str(),gEventsSubscriberPort));
311 cout<<"CLIENT -- Successfully connected to events' server"<<endl;
312 fConnectionStatus = STATUS_OK;
316 cout<<"CLIENT -- ERROR - could not connect to events' server"<<endl;
317 fConnectionStatus = STATUS_ERROR;
320 AliStorageEventManager *eventManager = new AliStorageEventManager();
323 int previousChunkNumber=-1;
325 int previousRunNumber=-1;
326 AliESDEvent *event = NULL;
330 event = eventManager->GetEvent(socket);
334 fReceivingStatus=STATUS_OK;
336 if(event->GetRunNumber() != previousRunNumber)//when new run starts
338 cout<<"CLIENT -- new run started"<<endl;
339 previousRunNumber = event->GetRunNumber();
340 gSystem->Exec(Form("mkdir -p %s/run%d",fStoragePath.c_str(),event->GetRunNumber()));
344 TSystemDirectory dir(Form("%s/run%d",fStoragePath.c_str(),event->GetRunNumber()),
345 Form("%s/run%d",fStoragePath.c_str(),event->GetRunNumber()));
346 TList *files = dir.GetListOfFiles();
353 while ((file=(TSystemFile*)next()))
355 fname = file->GetName();
357 if (!file->IsDirectory())
359 int from = fname.find("chunk")+5;
360 int to = fname.find(".root");
362 int maxChunkNumber = atoi(fname.substr(from,to-from).c_str());
364 if(maxChunkNumber > chunkNumber)
366 chunkNumber = maxChunkNumber;
374 cout<<"CLIENT -- Received data. Event:"<<event->GetEventNumberInFile()<<"\trun:"<<event->GetRunNumber()<<endl;
376 if(chunkNumber != previousChunkNumber)//when new chunk needs to be created
380 fCurrentFile->Close();
384 fCurrentStorageSize=GetSizeOfAllChunks();
385 CheckCurrentStorageSize();
387 fCurrentFile = new TFile(Form("%s/run%d/chunk%d.root", fStoragePath.c_str(),event->GetRunNumber(),chunkNumber),"recreate");
389 previousChunkNumber = chunkNumber;
392 if(0 != fCurrentFile->WriteObject(event,Form("event%d",event->GetEventNumberInFile())))//if event was written to file
394 fDatabase->InsertEvent(event->GetRunNumber(),
395 event->GetEventNumberInFile(),
396 (char*)event->GetBeamType(),
397 event->GetMultiplicity()->GetNumberOfTracklets(),Form("%s/run%d/chunk%d.root",fStoragePath.c_str(),event->GetRunNumber(),chunkNumber));
401 if(eventsInChunk == fNumberOfEventsInFile)//if max events number in file was reached
407 if(fSavingStatus!=STATUS_OK)
409 fSavingStatus=STATUS_OK;
412 else if(fSavingStatus!=STATUS_ERROR)
414 fSavingStatus=STATUS_ERROR;
416 delete event;event=0;
418 else if(fReceivingStatus!=STATUS_ERROR)
420 cout<<"CLIENT -- ERROR -- NO DATA!"<<endl;
421 fReceivingStatus=STATUS_ERROR;
424 if(event){delete event;}
428 void AliStorageClientThread::CheckCurrentStorageSize()
430 if(fCurrentStorageSize > (float)fStorageOccupationLevel/100. * fMaximumStorageSize)
432 while(GetSizeOfAllChunks() > (float)fRemoveEventsPercentage/100. * fMaximumStorageSize)
434 struct eventStruct oldestEvent = fDatabase->GetOldestEvent();
435 string oldestEventPath = fDatabase->GetFilePath(oldestEvent);
436 //remove oldest event
437 cout<<"CLIENT -- Removing old events:"<<oldestEventPath<<endl;
438 gSystem->Exec(Form("rm -f %s",oldestEventPath.c_str()));
439 fDatabase->RemoveEvent(fDatabase->GetOldestEvent());