- added new task for pi0 reconstruction using purely calorimeter clusters
[u/mrichter/AliRoot.git] / STORAGE / AliStorageClientThread.cxx
1 #include "AliStorageClientThread.h"
2 #include "AliMultiplicity.h"
3 #include "AliStorageTypes.h"
4 #include "AliSocket.h"
5 #include "AliStorageEventManager.h"
6
7 //#include "zhelpers.hpp"
8 #include "zmq.hpp"
9
10 #include <sstream>
11 #include <signal.h>
12 #include <fstream>
13 #include <iostream>
14
15 #include <TSystemDirectory.h>
16 #include <TThread.h>
17 #include <TFile.h>
18
19 using namespace std;
20 using namespace zmq;
21
22 bool gClientQuit = false;    // signal flag
23 void GotSignalClient(int){gClientQuit = true;}
24
25 AliStorageClientThread::AliStorageClientThread() :
26 fConnectionStatus(STATUS_WAITING),
27 fReceivingStatus(STATUS_WAITING),
28 fSavingStatus(STATUS_WAITING),
29 fEventServer(""),
30 fCommunicationThread(0),
31 fCurrentFile(0),
32 fDatabase(0),
33 fCurrentStorageSize(0),
34 fMaximumStorageSize(0),
35 fStoragePath(""),
36 fNumberOfEventsInFile(0),
37 fStorageOccupationLevel(0),
38 fRemoveEventsPercentage(0)
39 {       
40         // make sure that when program is closed destructor will be called
41         struct sigaction sa;
42         memset(&sa,0,sizeof(sa));
43         sa.sa_handler = GotSignalClient;
44         sigfillset(&sa.sa_mask);
45         sigaction(SIGINT,&sa,NULL);
46         
47         //load storage parameters from file
48         TThread::Lock();
49         ifstream configFile (Form("%s/STORAGE/setupStorageDatabase.sh",
50                                   gSystem->Getenv("ALICE_ROOT")));
51         if (configFile.is_open())
52         {
53                 string line;
54                 int from,to;
55                 while(configFile.good())
56                 {
57                         getline(configFile,line);
58                         from = line.find("\"")+1;
59                         to = line.find_last_of("\"");
60                         if(line.find("STORAGE_PATH=")==0)
61                         {
62                                 fStoragePath=line.substr(from,to-from);
63                         }
64                         else if(line.find("MAX_SIZE=")==0)
65                         {
66                                 fMaximumStorageSize=atoi(line.substr(from,to-from).c_str());
67                         }
68                         else if(line.find("MAX_OCCUPATION=")==0)
69                         {
70                                 fStorageOccupationLevel=atoi(line.substr(from,to-from).c_str());
71                         }
72                         else if(line.find("REMOVE_PERCENT=")==0)
73                         {
74                                 fRemoveEventsPercentage=atoi(line.substr(from,to-from).c_str());
75                         }
76                         else if(line.find("EVENTS_IN_FILE=")==0)
77                         {
78                                 fNumberOfEventsInFile=atoi(line.substr(from,to-from).c_str());
79                         }
80             else if(line.find("EVENT_SERVER=")==0)
81                         {
82                                 fEventServer=line.substr(from,to-from);
83                         }
84                 }
85                 if(configFile.eof())
86                 {
87                         configFile.clear();
88                 }
89                 configFile.close();
90         }
91         else
92         {
93                 cout<<"CLIENT -- Unable to open config file"<<endl;
94         }
95         //create directory for storage if it doesn't exist
96         gSystem->Exec(Form("mkdir -p %s",fStoragePath.c_str()));
97
98         //create database class
99         fDatabase = new AliStorageDatabase();
100         TThread::UnLock();
101
102         //check current storage size
103         fCurrentStorageSize = GetSizeOfAllChunks();
104
105         //create two-way commynication thread
106         fCommunicationThread = new TThread("fCommunicationThread",
107                                            AliStorageClientThread::CommunicationHandler,(void*)this);
108         fCommunicationThread->Run();
109 }
110
111 AliStorageClientThread::~AliStorageClientThread()
112 {
113         cout<<"CLIENT -- AliStorageClientThread destructor called";
114         if(fCurrentFile)
115         {
116                 fCurrentFile->Close();
117                 delete fCurrentFile;
118         }
119         if(fCommunicationThread){delete fCommunicationThread;}
120         cout<<" --- OK"<<endl;
121 }
122
123 void* AliStorageClientThread::CommunicationHandler(void *arg)
124 {
125         AliStorageClientThread *clientInstance = static_cast<AliStorageClientThread*>(arg);
126
127 //create socket for two-way communication
128         context_t *context = new context_t();
129         socket_t *socket = new socket_t(*context,ZMQ_REP);
130         socket->bind(Form("tcp://*:%d",gClientCommunicationPort));      
131
132         AliStorageEventManager *eventManager = new AliStorageEventManager();
133         
134         message_t *requestMessage = new message_t();
135         struct clientRequestStruct *request = new struct clientRequestStruct;
136         struct clientRequestStruct *response = new struct clientRequestStruct;
137
138         cout<<"CLIENT -- Communication stated"<<endl;
139         
140         while(!gClientQuit)
141         {
142                 socket->recv(requestMessage);
143                 request = static_cast<struct clientRequestStruct*>(requestMessage->data());
144                 switch(request->messageType)
145                 {
146                 case REQUEST_CONNECTION:
147                         eventManager->Send((long)clientInstance->fConnectionStatus,socket);
148                         break;
149                 case REQUEST_RECEIVING:
150                         eventManager->Send((long)clientInstance->fReceivingStatus,socket);
151                         break;
152                 case REQUEST_SAVING:
153                         eventManager->Send((long)clientInstance->fSavingStatus,socket);
154                         break;
155                 case REQUEST_CURRENT_SIZE:
156                         eventManager->Send((long)clientInstance->fCurrentStorageSize,socket);
157                         break;
158                 case REQUEST_GET_PARAMS:
159                         response->maxStorageSize = clientInstance->fMaximumStorageSize;
160                         response->maxOccupation = clientInstance->fStorageOccupationLevel;
161                         response->removeEvents = clientInstance->fRemoveEventsPercentage;
162                         response->eventsInChunk = clientInstance->fNumberOfEventsInFile;
163
164                         eventManager->Send(response,socket);
165                         break;
166                 case REQUEST_SET_PARAMS:
167                         clientInstance->SetStorageParams(request->maxStorageSize,
168                                                          request->maxOccupation,
169                                                          request->removeEvents,
170                                                          request->eventsInChunk);
171
172                         clientInstance->fMaximumStorageSize = request->maxStorageSize;
173                         clientInstance->fStorageOccupationLevel = request->maxOccupation;
174                         clientInstance->fRemoveEventsPercentage = request->removeEvents;
175                         clientInstance->fNumberOfEventsInFile = request->eventsInChunk;
176
177                         eventManager->Send(true,socket);
178                         break;
179                 default:break;
180                 }
181         }
182         if(context){delete context;}
183         if(socket){delete socket;}
184         if(eventManager){delete eventManager;}
185         return nullptr;
186 }
187
188 void AliStorageClientThread::SetStorageParams(int maxStorageSize,int maxOccupation,int removeEvents,int eventsInChunk)
189 {
190         cout<<maxStorageSize<<endl<<maxOccupation<<endl<<removeEvents<<endl<<eventsInChunk<<endl;
191
192
193         TThread::Lock();
194         ifstream configFile (Form("%s/STORAGE/setupStorageDatabase.sh",
195                                   gSystem->Getenv("ALICE_ROOT")));
196         ofstream tmpFile("tmpFile.bla");
197         
198         if (configFile.is_open())
199         {
200                 string line;
201                 string tmpLine;
202                 int from,to;
203                 while(configFile.good())
204                 {
205                         getline(configFile,line);
206                         from = line.find("\"")+1;
207                         to = line.find_last_of("\"");
208                         tmpLine = line;
209                         if(line.find("MAX_SIZE=")==0)
210                         {
211                                 tmpLine = Form("MAX_SIZE=\"%d\"",maxStorageSize);
212                         }
213                         else if(line.find("MAX_OCCUPATION=")==0)
214                         {
215                                 tmpLine = Form("MAX_OCCUPATION=\"%d\"",maxOccupation);
216                         }
217                         else if(line.find("REMOVE_PERCENT=")==0)
218                         {
219                                 tmpLine = Form("REMOVE_PERCENT=\"%d\"",removeEvents);
220                         }
221                         else if(line.find("EVENTS_IN_FILE=")==0)
222                         {
223                                 tmpLine = Form("EVENTS_IN_FILE=\"%d\"",eventsInChunk);
224                         }
225                         tmpLine += "\n";
226                         tmpFile << tmpLine;
227                 }
228                 if(configFile.eof())
229                 {
230                         configFile.clear();
231                 }
232                 configFile.close();
233                 tmpFile.close();
234                 rename("tmpFile.bla",Form("%s/STORAGE/setupStorageDatabase.sh",
235                                           gSystem->Getenv("ALICE_ROOT")));
236         }
237         else
238         {
239                 cout<<"CLIENT -- Unable to open config file"<<endl;
240         }
241         TThread::UnLock();
242 }
243
244 Long64_t AliStorageClientThread::GetSizeOfAllChunks()
245 {
246         Long64_t totalStorageSize = 0;
247
248         TSystemDirectory dir(fStoragePath.c_str(),fStoragePath.c_str());
249         TList *listOfDirectories = dir.GetListOfFiles();
250
251         if (!listOfDirectories)
252         {
253                 cout<<"CLIENT -- Storage directory is empty"<<endl;
254                 return 0;
255         }
256         TIter nextDirectory(listOfDirectories);
257         TSystemFile *runDirectory;
258         string directoryName;
259         
260         while ((runDirectory=(TSystemFile*)nextDirectory()))
261         {
262                 directoryName=runDirectory->GetName();
263                 if (runDirectory->IsDirectory() && directoryName.find("run")==0)
264                 {
265                         TSystemDirectory dirChunks(Form("%s/%s",fStoragePath.c_str(),directoryName.c_str()),Form("%s/%s",fStoragePath.c_str(),directoryName.c_str()));
266                         TList *listOfChunks = dirChunks.GetListOfFiles();
267
268                         if(listOfChunks)
269                         {
270                                 TIter nextChunk(listOfChunks);
271                                 TSystemFile *chunk;
272                                 string chunkFileName;
273
274                                 while((chunk=(TSystemFile*)nextChunk()))
275                                 {
276                                         chunkFileName = chunk->GetName();
277                                         if(!chunk->IsDirectory() && chunkFileName.find("chunk")==0)
278                                         {
279                                                 TFile *tmpFile = new TFile(Form("%s/%s/%s",fStoragePath.c_str(),directoryName.c_str(),chunkFileName.c_str()),"read");
280
281                                                 totalStorageSize+=tmpFile->GetSize();
282                                                 tmpFile->Close();
283                                                 if(tmpFile){delete tmpFile;}
284                                         }
285                                 }
286                                 if(chunk){delete chunk;}
287                         }
288                         if(listOfChunks){delete listOfChunks;}
289                 }
290         }
291
292         //tmpFiles.clear();
293         if(listOfDirectories){delete listOfDirectories;}
294         if(runDirectory){delete runDirectory;}
295         
296         printf("CLIENT -- Total storage size:%lld\t(%.2f MB)\n",totalStorageSize,(float)totalStorageSize/(1000.*1000.));
297
298         return totalStorageSize;
299 }
300
301 void AliStorageClientThread::CollectData()
302 {
303         //connect with events' sockets
304         context_t *context = new context_t(1);
305         socket_t *socket = new socket_t(*context,ZMQ_SUB);
306         socket->setsockopt(ZMQ_SUBSCRIBE,"",0);
307         socket->connect(Form("tcp://%s:%d",fEventServer.c_str(),gEventsSubscriberPort));
308
309         if(socket)
310         {
311                 cout<<"CLIENT -- Successfully connected to events' server"<<endl;
312                 fConnectionStatus = STATUS_OK;
313         }
314         else
315         {
316                 cout<<"CLIENT -- ERROR - could not connect to events' server"<<endl;
317                 fConnectionStatus = STATUS_ERROR;
318         }
319
320         AliStorageEventManager *eventManager = new AliStorageEventManager();
321         
322         int chunkNumber=0;
323         int previousChunkNumber=-1;
324         int eventsInChunk=0;
325         int previousRunNumber=-1;
326         AliESDEvent *event = NULL;
327
328         while(!gClientQuit)
329         {               
330                 event = eventManager->GetEvent(socket);
331
332                 if(event)
333                 {
334                         fReceivingStatus=STATUS_OK;
335                         
336                         if(event->GetRunNumber() != previousRunNumber)//when new run starts
337                         {
338                                 cout<<"CLIENT -- new run started"<<endl;
339                                 previousRunNumber = event->GetRunNumber();
340                                 gSystem->Exec(Form("mkdir -p %s/run%d",fStoragePath.c_str(),event->GetRunNumber()));
341                                 chunkNumber=0;
342                                 eventsInChunk=0;
343                                 
344                                 TSystemDirectory dir(Form("%s/run%d",fStoragePath.c_str(),event->GetRunNumber()),
345                                                      Form("%s/run%d",fStoragePath.c_str(),event->GetRunNumber()));
346                                 TList *files = dir.GetListOfFiles();    
347                                 if (files)
348                                 {
349                                         TSystemFile *file;
350                                         string fname;
351                                         TIter next(files);
352                                         
353                                         while ((file=(TSystemFile*)next()))
354                                         {
355                                                 fname = file->GetName();
356                                         
357                                                 if (!file->IsDirectory())
358                                                 {
359                                                         int from = fname.find("chunk")+5;
360                                                         int to = fname.find(".root");
361
362                                                         int maxChunkNumber = atoi(fname.substr(from,to-from).c_str());
363
364                                                         if(maxChunkNumber > chunkNumber)
365                                                         {
366                                                                 chunkNumber = maxChunkNumber;
367                                                         }
368                                                 }
369                                         }
370                                         chunkNumber++;
371                                 }
372                         }
373
374                         cout<<"CLIENT -- Received data. Event:"<<event->GetEventNumberInFile()<<"\trun:"<<event->GetRunNumber()<<endl;
375                         
376                         if(chunkNumber != previousChunkNumber)//when new chunk needs to be created
377                         {
378                                 if(fCurrentFile)
379                                 {
380                                         fCurrentFile->Close();
381                                         delete fCurrentFile;
382                                         fCurrentFile=0;
383                                 }
384                                 fCurrentStorageSize=GetSizeOfAllChunks();
385                                 CheckCurrentStorageSize();
386                                 
387                                 fCurrentFile = new TFile(Form("%s/run%d/chunk%d.root", fStoragePath.c_str(),event->GetRunNumber(),chunkNumber),"recreate");
388
389                                 previousChunkNumber = chunkNumber;
390                         }
391                         
392                         if(0 != fCurrentFile->WriteObject(event,Form("event%d",event->GetEventNumberInFile())))//if event was written to file
393                         {
394                                 fDatabase->InsertEvent(event->GetRunNumber(),
395                                               event->GetEventNumberInFile(),
396                                               (char*)event->GetBeamType(),
397                                                        event->GetMultiplicity()->GetNumberOfTracklets(),Form("%s/run%d/chunk%d.root",fStoragePath.c_str(),event->GetRunNumber(),chunkNumber));
398                                 
399                                 eventsInChunk++;
400                                 
401                                 if(eventsInChunk == fNumberOfEventsInFile)//if max events number in file was reached
402                                 {
403                                         chunkNumber++;
404                                         eventsInChunk=0;
405                                 }
406                                 
407                                 if(fSavingStatus!=STATUS_OK)
408                                 {
409                                         fSavingStatus=STATUS_OK;
410                                 }
411                         }
412                         else if(fSavingStatus!=STATUS_ERROR)
413                         {
414                                 fSavingStatus=STATUS_ERROR;
415                         }
416                         delete event;event=0;
417                 }
418                 else if(fReceivingStatus!=STATUS_ERROR)
419                 {
420                         cout<<"CLIENT -- ERROR -- NO DATA!"<<endl;
421                         fReceivingStatus=STATUS_ERROR;
422                 }
423         }
424         if(event){delete event;}
425 }
426
427
428 void AliStorageClientThread::CheckCurrentStorageSize()
429 {
430         if(fCurrentStorageSize >  (float)fStorageOccupationLevel/100. * fMaximumStorageSize)
431         {
432                 while(GetSizeOfAllChunks() > (float)fRemoveEventsPercentage/100. * fMaximumStorageSize)
433                 {
434                         struct eventStruct oldestEvent = fDatabase->GetOldestEvent();
435                         string oldestEventPath = fDatabase->GetFilePath(oldestEvent);
436                         //remove oldest event
437                         cout<<"CLIENT -- Removing old events:"<<oldestEventPath<<endl;
438                         gSystem->Exec(Form("rm -f %s",oldestEventPath.c_str()));
439                         fDatabase->RemoveEvent(fDatabase->GetOldestEvent());
440                 }
441         }
442 }