added port numbers to ldap schema and config classes (alberto)
[u/mrichter/AliRoot.git] / SHUTTLE / AliShuttle.cxx
1 /**************************************************************************
2  * Copyright(c) 1998-1999, ALICE Experiment at CERN, All rights reserved. *
3  *                                                                        *
4  * Author: The ALICE Off-line Project.                                    *
5  * Contributors are mentioned in the code where appropriate.              *
6  *                                                                        *
7  * Permission to use, copy, modify and distribute this software and its   *
8  * documentation strictly for non-commercial purposes is hereby granted   *
9  * without fee, provided that the above copyright notice appears in all   *
10  * copies and that both the copyright notice and this permission notice   *
11  * appear in the supporting documentation. The authors make no claims     *
12  * about the suitability of this software for any purpose. It is          *
13  * provided "as is" without express or implied warranty.                  *
14  **************************************************************************/
15
16 /*
17 $Log$
18 Revision 1.21  2006/12/07 08:51:26  jgrosseo
19 update (alberto):
20 table, db names in ldap configuration
21 added GRP preprocessor
22 DCS data can also be retrieved by data point
23
24 Revision 1.20  2006/11/16 16:16:48  jgrosseo
25 introducing strict run ordering flag
26 removed giving preprocessor name to preprocessor, they have to know their name themselves ;-)
27
28 Revision 1.19  2006/11/06 14:23:04  jgrosseo
29 major update (Alberto)
30 o) reading of run parameters from the logbook
31 o) online offline naming conversion
32 o) standalone DCSclient package
33
34 Revision 1.18  2006/10/20 15:22:59  jgrosseo
35 o) Adding time out to the execution of the preprocessors: The Shuttle forks and the parent process monitors the child
36 o) Merging Collect, CollectAll, CollectNew function
37 o) Removing implementation of empty copy constructors (declaration still there!)
38
39 Revision 1.17  2006/10/05 16:20:55  jgrosseo
40 adapting to new CDB classes
41
42 Revision 1.16  2006/10/05 15:46:26  jgrosseo
43 applying to the new interface
44
45 Revision 1.15  2006/10/02 16:38:39  jgrosseo
46 update (alberto):
47 fixed memory leaks
48 storing of objects that failed to be stored to the grid before
49 interfacing of shuttle status table in daq system
50
51 Revision 1.14  2006/08/29 09:16:05  jgrosseo
52 small update
53
54 Revision 1.13  2006/08/15 10:50:00  jgrosseo
55 effc++ corrections (alberto)
56
57 Revision 1.12  2006/08/08 14:19:29  jgrosseo
58 Update to shuttle classes (Alberto)
59
60 - Possibility to set the full object's path in the Preprocessor's and
61 Shuttle's  Store functions
62 - Possibility to extend the object's run validity in the same classes
63 ("startValidity" and "validityInfinite" parameters)
64 - Implementation of the StoreReferenceData function to store reference
65 data in a dedicated CDB storage.
66
67 Revision 1.11  2006/07/21 07:37:20  jgrosseo
68 last run is stored after each run
69
70 Revision 1.10  2006/07/20 09:54:40  jgrosseo
71 introducing status management: The processing per subdetector is divided into several steps,
72 after each step the status is stored on disk. If the system crashes in any of the steps the Shuttle
73 can keep track of the number of failures and skips further processing after a certain threshold is
74 exceeded. These thresholds can be configured in LDAP.
75
76 Revision 1.9  2006/07/19 10:09:55  jgrosseo
77 new configuration, accesst to DAQ FES (Alberto)
78
79 Revision 1.8  2006/07/11 12:44:36  jgrosseo
80 adding parameters for extended validity range of data produced by preprocessor
81
82 Revision 1.7  2006/07/10 14:37:09  jgrosseo
83 small fix + todo comment
84
85 Revision 1.6  2006/07/10 13:01:41  jgrosseo
86 enhanced storing of last sucessfully processed run (alberto)
87
88 Revision 1.5  2006/07/04 14:59:57  jgrosseo
89 revision of AliDCSValue: Removed wrapper classes, reduced storage size per value by factor 2
90
91 Revision 1.4  2006/06/12 09:11:16  jgrosseo
92 coding conventions (Alberto)
93
94 Revision 1.3  2006/06/06 14:26:40  jgrosseo
95 o) removed files that were moved to STEER
96 o) shuttle updated to follow the new interface (Alberto)
97
98 Revision 1.2  2006/03/07 07:52:34  hristov
99 New version (B.Yordanov)
100
101 Revision 1.6  2005/11/19 17:19:14  byordano
102 RetrieveDATEEntries and RetrieveConditionsData added
103
104 Revision 1.5  2005/11/19 11:09:27  byordano
105 AliShuttle declaration added
106
107 Revision 1.4  2005/11/17 17:47:34  byordano
108 TList changed to TObjArray
109
110 Revision 1.3  2005/11/17 14:43:23  byordano
111 import to local CVS
112
113 Revision 1.1.1.1  2005/10/28 07:33:58  hristov
114 Initial import as subdirectory in AliRoot
115
116 Revision 1.2  2005/09/13 08:41:15  byordano
117 default startTime endTime added
118
119 Revision 1.4  2005/08/30 09:13:02  byordano
120 some docs added
121
122 Revision 1.3  2005/08/29 21:15:47  byordano
123 some docs added
124
125 */
126
127 //
128 // This class is the main manager for AliShuttle. 
129 // It organizes the data retrieval from DCS and call the 
130 // interface methods of AliPreprocessor.
131 // For every detector in AliShuttleConfgi (see AliShuttleConfig),
132 // data for its set of aliases is retrieved. If there is registered
133 // AliPreprocessor for this detector then it will be used
134 // accroding to the schema (see AliPreprocessor).
135 // If there isn't registered AliPreprocessor than the retrieved
136 // data is stored automatically to the undelying AliCDBStorage.
137 // For detSpec is used the alias name.
138 //
139
140 #include "AliShuttle.h"
141
142 #include "AliCDBManager.h"
143 #include "AliCDBStorage.h"
144 #include "AliCDBId.h"
145 #include "AliCDBRunRange.h"
146 #include "AliCDBPath.h"
147 #include "AliCDBEntry.h"
148 #include "AliShuttleConfig.h"
149 #include "DCSClient/AliDCSClient.h"
150 #include "AliLog.h"
151 #include "AliPreprocessor.h"
152 #include "AliShuttleStatus.h"
153 #include "AliShuttleLogbookEntry.h"
154
155 #include <TSystem.h>
156 #include <TObject.h>
157 #include <TString.h>
158 #include <TTimeStamp.h>
159 #include <TObjString.h>
160 #include <TSQLServer.h>
161 #include <TSQLResult.h>
162 #include <TSQLRow.h>
163 #include <TMutex.h>
164
165 #include <fstream>
166
167 #include <sys/types.h>
168 #include <sys/wait.h>
169
170 ClassImp(AliShuttle)
171
172 TString AliShuttle::fgkMainCDB("alien://folder=ShuttleCDB");
173 TString AliShuttle::fgkLocalCDB("local://LocalShuttleCDB");
174 TString AliShuttle::fgkMainRefStorage("alien://folder=ShuttleReference");
175 TString AliShuttle::fgkLocalRefStorage("local://LocalReferenceStorage");
176
177 Bool_t AliShuttle::fgkProcessDCS(kTRUE); 
178
179 const char* AliShuttle::fgkShuttleTempDir = gSystem->ExpandPathName("$ALICE_ROOT/SHUTTLE/temp");
180 const char* AliShuttle::fgkShuttleLogDir = gSystem->ExpandPathName("$ALICE_ROOT/SHUTTLE/log");
181
182 //______________________________________________________________________________________________
183 AliShuttle::AliShuttle(const AliShuttleConfig* config,
184                 UInt_t timeout, Int_t retries):
185 fConfig(config),
186 fTimeout(timeout), fRetries(retries),
187 fPreprocessorMap(),
188 fLogbookEntry(0),
189 fCurrentDetector(),
190 fStatusEntry(0),
191 fGridError(kFALSE),
192 fMonitoringMutex(0),
193 fLastActionTime(0),
194 fLastAction()
195 {
196         //
197         // config: AliShuttleConfig used
198         // timeout: timeout used for AliDCSClient connection
199         // retries: the number of retries in case of connection error.
200         //
201
202         if (!fConfig->IsValid()) AliFatal("********** !!!!! Invalid configuration !!!!! **********");
203         for(int iSys=0;iSys<4;iSys++) {
204                 fServer[iSys]=0;
205                 if (iSys < 3)
206                         fFXSlist[iSys].SetOwner(kTRUE);
207         }
208         fPreprocessorMap.SetOwner(kTRUE);
209
210         for (UInt_t iDet=0; iDet<NDetectors(); iDet++)
211                 fFirstUnprocessed[iDet] = kFALSE;
212
213         fMonitoringMutex = new TMutex();
214 }
215
216 //______________________________________________________________________________________________
217 AliShuttle::~AliShuttle()
218 {
219 // destructor
220
221         fPreprocessorMap.DeleteAll();
222         for(int iSys=0;iSys<4;iSys++)
223                 if(fServer[iSys]) {
224                         fServer[iSys]->Close();
225                         delete fServer[iSys];
226                         fServer[iSys] = 0;
227                 }
228
229         if (fStatusEntry){
230                 delete fStatusEntry;
231                 fStatusEntry = 0;
232         }
233         
234         if (fMonitoringMutex) 
235         {
236                 delete fMonitoringMutex;
237                 fMonitoringMutex = 0;
238         }
239 }
240
241 //______________________________________________________________________________________________
242 void AliShuttle::RegisterPreprocessor(AliPreprocessor* preprocessor)
243 {
244         //
245         // Registers new AliPreprocessor.
246         // It uses GetName() for indentificator of the pre processor.
247         // The pre processor is registered it there isn't any other
248         // with the same identificator (GetName()).
249         //
250
251         const char* detName = preprocessor->GetName();
252         if(GetDetPos(detName) < 0)
253                 AliFatal(Form("********** !!!!! Invalid detector name: %s !!!!! **********", detName));
254
255         if (fPreprocessorMap.GetValue(detName)) {
256                 AliWarning(Form("AliPreprocessor %s is already registered!", detName));
257                 return;
258         }
259
260         fPreprocessorMap.Add(new TObjString(detName), preprocessor);
261 }
262 //______________________________________________________________________________________________
263 UInt_t AliShuttle::Store(const AliCDBPath& path, TObject* object,
264                 AliCDBMetaData* metaData, Int_t validityStart, Bool_t validityInfinite)
265 {
266   // Stores a CDB object in the storage for offline reconstruction. Objects that are not needed for
267   // offline reconstruction, but should be stored anyway (e.g. for debugging) should NOT be stored
268   // using this function. Use StoreReferenceData instead!
269   // It calls WriteToCDB function which perform actual storage
270
271         return WriteToCDB(fgkMainCDB, fgkLocalCDB, path, object,
272                                 metaData, validityStart, validityInfinite);
273
274 }
275
276 //______________________________________________________________________________________________
277 UInt_t AliShuttle::StoreReferenceData(const AliCDBPath& path, TObject* object, AliCDBMetaData* metaData)
278 {
279   // Stores a CDB object in the storage for reference data. This objects will not be available during
280   // offline reconstrunction. Use this function for reference data only!
281   // It calls WriteToCDB function which perform actual storage
282
283         return WriteToCDB(fgkMainRefStorage, fgkLocalRefStorage, path, object, metaData);
284
285 }
286
287 //______________________________________________________________________________________________
288 UInt_t AliShuttle::WriteToCDB(const char* mainUri, const char* localUri,
289                         const AliCDBPath& path, TObject* object, AliCDBMetaData* metaData,
290                         Int_t validityStart, Bool_t validityInfinite)
291 {
292   // write object into the CDB. Parameters are passed by Store and StoreReferenceData functions.
293   // The parameters are:
294   //   1) Uri of the main storage (Grid)
295   //   2) Uri of the backup storage (Local)
296   //   3) the object's path.
297   //   4) the object to be stored
298   //   5) the metaData to be associated with the object
299   //   6) the validity start run number w.r.t. the current run,
300   //      if the data is valid only for this run leave the default 0
301   //   7) specifies if the calibration data is valid for infinity (this means until updated),
302   //      typical for calibration runs, the default is kFALSE
303   //
304   // returns 0 if fail
305   //         1 if stored in main (Grid) storage
306   //         2 if stored in backup (Local) storage
307
308         const char* cdbType = (mainUri == fgkMainCDB) ? "CDB" : "Reference";
309
310         Int_t firstRun = GetCurrentRun() - validityStart;
311         if(firstRun < 0) {
312                 AliError("First valid run happens to be less than 0! Setting it to 0.");
313                 firstRun=0;
314         }
315
316         Int_t lastRun = -1;
317         if(validityInfinite) {
318                 lastRun = AliCDBRunRange::Infinity();
319         } else {
320                 lastRun = GetCurrentRun();
321         }
322
323         AliCDBId id(path, firstRun, lastRun, -1, -1);
324
325         if(! dynamic_cast<TObjString*> (metaData->GetProperty("RunUsed(TObjString)"))){
326                 TObjString runUsed = Form("%d", GetCurrentRun());
327                 metaData->SetProperty("RunUsed(TObjString)",&runUsed);
328         }
329
330         UInt_t result = 0;
331
332         if (!(AliCDBManager::Instance()->GetStorage(mainUri))) {
333                 AliError(Form("WriteToCDB - Cannot activate main %s storage", cdbType));
334         } else {
335                 result = (UInt_t) AliCDBManager::Instance()->GetStorage(mainUri)
336                                         ->Put(object, id, metaData);
337         }
338
339         if(!result) {
340
341                 Log(fCurrentDetector,
342                         Form("WriteToCDB - Problem with main %s storage. Putting <%s> into backup storage",
343                                 cdbType, path.GetPath().Data()));
344
345                 // Set Grid version to current run number, to ease retrieval later
346                 id.SetVersion(GetCurrentRun());
347
348                 result = AliCDBManager::Instance()->GetStorage(localUri)
349                                         ->Put(object, id, metaData);
350
351                 if(result) {
352                         result = 2;
353                         fGridError = kTRUE;
354                 }else{
355                         Log(fCurrentDetector, "WriteToCDB - Can't store data!");
356                 }
357         }
358
359         return result;
360
361 }
362
363 //______________________________________________________________________________________________
364 AliShuttleStatus* AliShuttle::ReadShuttleStatus()
365 {
366 // Reads the AliShuttleStatus from the CDB
367
368         if (fStatusEntry){
369                 delete fStatusEntry;
370                 fStatusEntry = 0;
371         }
372
373         fStatusEntry = AliCDBManager::Instance()->GetStorage(AliShuttle::GetLocalCDB())
374                 ->Get(Form("/SHUTTLE/STATUS/%s", fCurrentDetector.Data()), GetCurrentRun());
375
376         if (!fStatusEntry) return 0;
377         fStatusEntry->SetOwner(1);
378
379         AliShuttleStatus* status = dynamic_cast<AliShuttleStatus*> (fStatusEntry->GetObject());
380         if (!status) {
381                 AliError("Invalid object stored to CDB!");
382                 return 0;
383         }
384
385         return status;
386 }
387
388 //______________________________________________________________________________________________
389 Bool_t AliShuttle::WriteShuttleStatus(AliShuttleStatus* status)
390 {
391 // writes the status for one subdetector
392
393         if (fStatusEntry){
394                 delete fStatusEntry;
395                 fStatusEntry = 0;
396         }
397
398         Int_t run = GetCurrentRun();
399
400         AliCDBId id(AliCDBPath("SHUTTLE", "STATUS", fCurrentDetector), run, run);
401
402         fStatusEntry = new AliCDBEntry(status, id, new AliCDBMetaData);
403         fStatusEntry->SetOwner(1);
404
405         UInt_t result = AliCDBManager::Instance()->GetStorage(fgkLocalCDB)->Put(fStatusEntry);
406
407         if (!result) {
408                 AliError(Form("WriteShuttleStatus for %s, run %d failed", fCurrentDetector.Data(), run));
409                 return kFALSE;
410         }
411
412         return kTRUE;
413 }
414
415 //______________________________________________________________________________________________
416 void AliShuttle::UpdateShuttleStatus(AliShuttleStatus::Status newStatus, Bool_t increaseCount)
417 {
418   // changes the AliShuttleStatus for the given detector and run to the given status
419
420         if (!fStatusEntry){
421                 AliError("UNEXPECTED: fStatusEntry empty");
422                 return;
423         }
424
425         AliShuttleStatus* status = dynamic_cast<AliShuttleStatus*> (fStatusEntry->GetObject());
426
427         if (!status){
428                 AliError("UNEXPECTED: status could not be read from current CDB entry");
429                 return;
430         }
431
432         TString actionStr = Form("UpdateShuttleStatus - %s: Changing state from %s to %s",
433                                 fCurrentDetector.Data(),
434                                 status->GetStatusName(), 
435                                 status->GetStatusName(newStatus));
436         Log("SHUTTLE", actionStr);
437         SetLastAction(actionStr);
438
439         status->SetStatus(newStatus);
440         if (increaseCount) status->IncreaseCount();
441
442         AliCDBManager::Instance()->GetStorage(fgkLocalCDB)->Put(fStatusEntry);
443 }
444 //______________________________________________________________________________________________
445 Bool_t AliShuttle::ContinueProcessing()
446 {
447 // this function reads the AliShuttleStatus information from CDB and
448 // checks if the processing should be continued
449 // if yes it returns kTRUE and updates the AliShuttleStatus with nextStatus
450
451         AliShuttleLogbookEntry::Status entryStatus =
452                 fLogbookEntry->GetDetectorStatus(fCurrentDetector);
453
454         if(entryStatus != AliShuttleLogbookEntry::kUnprocessed) {
455                 Log("SHUTTLE", Form("ContinueProcessing - %s is %s",
456                                 fCurrentDetector.Data(),
457                                 fLogbookEntry->GetDetectorStatusName(entryStatus)));
458                 return kFALSE;
459         }
460
461         // if we get here, according to Shuttle logbook subdetector is in UNPROCESSED state
462
463         // check if current run is first unprocessed run for current detector
464         if (fConfig->StrictRunOrder(fCurrentDetector) &&
465                 !fFirstUnprocessed[GetDetPos(fCurrentDetector)])
466         {
467                 Log("SHUTTLE", Form("ContinueProcessing - %s requires strict run ordering but this is not the first unprocessed run!"));
468                 return kFALSE;
469         }
470
471         AliShuttleStatus* status = ReadShuttleStatus();
472         if (!status) {
473                 // first time
474                 Log("SHUTTLE", Form("ContinueProcessing - %s: Processing first time",
475                                 fCurrentDetector.Data()));
476                 status = new AliShuttleStatus(AliShuttleStatus::kStarted);
477                 return WriteShuttleStatus(status);
478         }
479
480         // The following two cases shouldn't happen if Shuttle Logbook was correctly updated.
481         // If it happens it may mean Logbook updating failed... let's do it now!
482         if (status->GetStatus() == AliShuttleStatus::kDone ||
483             status->GetStatus() == AliShuttleStatus::kFailed){
484                 Log("SHUTTLE", Form("ContinueProcessing - %s is already %s. Updating Shuttle Logbook",
485                                         fCurrentDetector.Data(),
486                                         status->GetStatusName(status->GetStatus())));
487                 UpdateShuttleLogbook(fCurrentDetector.Data(),
488                                         status->GetStatusName(status->GetStatus()));
489                 return kFALSE;
490         }
491
492         if (status->GetStatus() == AliShuttleStatus::kStoreFailed) {
493                 Log("SHUTTLE",
494                         Form("ContinueProcessing - %s: Grid storage of one or more objects failed. Trying again now",
495                                 fCurrentDetector.Data()));
496                 if(TryToStoreAgain()){
497                         Log(fCurrentDetector.Data(), "ContinueProcessing - All objects successfully stored into OCDB");
498                         UpdateShuttleStatus(AliShuttleStatus::kDone);
499                         UpdateShuttleLogbook(fCurrentDetector.Data(), "DONE");
500                 } else {
501                         Log("SHUTTLE",
502                                 Form("ContinueProcessing - %s: Grid storage failed again",
503                                         fCurrentDetector.Data()));
504                 }
505                 return kFALSE;
506         }
507
508         // if we get here, there is a restart
509
510         // abort conditions
511         if (status->GetCount() >= fConfig->GetMaxRetries()) {
512                 Log("SHUTTLE",
513                         Form("ContinueProcessing - %s failed %d times in status %s - Updating Shuttle Logbook",
514                                 fCurrentDetector.Data(),
515                                 status->GetCount(), status->GetStatusName()));
516                 UpdateShuttleLogbook(fCurrentDetector.Data(), "FAILED");
517                 return kFALSE;
518         }
519
520         Log("SHUTTLE", Form("ContinueProcessing - %s: restarting. Aborted before with %s. Retry number %d.",
521                         fCurrentDetector.Data(),
522                         status->GetStatusName(), status->GetCount()));
523
524         UpdateShuttleStatus(AliShuttleStatus::kStarted, kTRUE);
525
526         return kTRUE;
527 }
528
529 //______________________________________________________________________________________________
530 Bool_t AliShuttle::Process(AliShuttleLogbookEntry* entry)
531 {
532         //
533         // Makes data retrieval for all detectors in the configuration.
534         // entry: Shuttle logbook entry, contains run paramenters and status of detectors
535         // (Unprocessed, Inactive, Failed or Done).
536         // Returns kFALSE in case of error occured and kTRUE otherwise
537         //
538
539         if(!entry) return kFALSE;
540
541         fLogbookEntry = entry;
542
543         if(fLogbookEntry->IsDone()){
544                 Log("SHUTTLE","Process - Shuttle is already DONE. Updating logbook");
545                 UpdateShuttleLogbook("shuttle_done");
546                 fLogbookEntry = 0;
547                 return kTRUE;
548         }
549
550
551         AliInfo(Form("\n\n \t\t\t^*^*^*^*^*^*^*^*^*^*^*^* run %d: START ^*^*^*^*^*^*^*^*^*^*^*^* \n",
552                                         GetCurrentRun()));
553
554         fLogbookEntry->Print("all");
555
556         // Initialization
557         Bool_t hasError = kFALSE;
558         for(Int_t iSys=0;iSys<3;iSys++) fFXSCalled[iSys]=kFALSE;
559
560         AliCDBStorage *mainCDBSto = AliCDBManager::Instance()->GetStorage(fgkMainCDB);
561         if(mainCDBSto) mainCDBSto->QueryCDB(GetCurrentRun());
562         AliCDBStorage *mainRefSto = AliCDBManager::Instance()->GetStorage(fgkMainRefStorage);
563         if(mainRefSto) mainRefSto->QueryCDB(GetCurrentRun());
564
565         // Loop on detectors in the configuration
566         TIter iter(fConfig->GetDetectors());
567         TObjString* aDetector = 0;
568
569         while ((aDetector = (TObjString*) iter.Next()))
570         {
571                 fCurrentDetector = aDetector->String();
572
573                 if (!fConfig->HostProcessDetector(fCurrentDetector)) continue;
574
575                 AliPreprocessor* aPreprocessor =
576                         dynamic_cast<AliPreprocessor*> (fPreprocessorMap.GetValue(fCurrentDetector));
577                 if (!aPreprocessor)
578                 {
579                         Log("SHUTTLE",Form("Process - %s: no preprocessor registered. Skipping",
580                                                         fCurrentDetector.Data()));
581                         continue;
582                 }
583
584                 if (ContinueProcessing() == kFALSE) continue;
585
586                 AliInfo(Form("\n\n \t\t\t****** run %d - %s: START  ******",
587                                                 GetCurrentRun(), aDetector->GetName()));
588
589
590                 Int_t pid = fork();
591
592                 if (pid < 0)
593                 {
594                         Log("SHUTTLE", "ERROR: Forking failed");
595                 }
596                 else if (pid > 0)
597                 {
598                         // parent
599                         AliInfo(Form("In parent process of %d - %s: Starting monitoring",
600                                                         GetCurrentRun(), aDetector->GetName()));
601
602                         Long_t begin = time(0);
603
604                         int status; // to be used with waitpid, on purpose an int (not Int_t)!
605                         while (waitpid(pid, &status, WNOHANG) == 0)
606                         {
607                                 Long_t expiredTime = time(0) - begin;
608
609                                 if (expiredTime > fConfig->GetPPTimeOut())
610                                 {
611                                         Log("SHUTTLE", Form("Process time out. Run time: %d seconds. Killing...",
612                                                                 expiredTime));
613
614                                         kill(pid, 9);
615
616                                         hasError = kTRUE;
617
618                                         gSystem->Sleep(1000);
619                                 }
620                                 else
621                                 {
622                                         if (expiredTime % 60 == 0)
623                                         Log("SHUTTLE", Form("Checked process. Run time: %d seconds.",
624                                                                 expiredTime));
625                                         gSystem->Sleep(1000);
626                                 }
627                         }
628
629                         AliInfo(Form("In parent process of %d - %s: Client has terminated.",
630                                                                 GetCurrentRun(), aDetector->GetName()));
631
632                         if (WIFEXITED(status))
633                         {
634                                 Int_t returnCode = WEXITSTATUS(status);
635
636                                 Log("SHUTTLE", Form("The return code is %d", returnCode));
637
638                                 if (returnCode != 0)
639                                 hasError = kTRUE;
640                         }
641                 }
642                 else if (pid == 0)
643                 {
644                         // client
645                         AliInfo(Form("In client process of %d - %s", GetCurrentRun(), aDetector->GetName()));
646
647                         UInt_t result = ProcessCurrentDetector();
648
649                         Int_t returnCode = 0; // will be set to 1 in case of an error
650
651                         if (!result)
652                         {
653                                 returnCode = 1;
654                                 AliInfo(Form("\n \t\t\t****** run %d - %s: PREPROCESSOR ERROR ****** \n\n",
655                                                         GetCurrentRun(), aDetector->GetName()));
656                         }
657                         else if (result == 2)
658                         {
659                                 AliInfo(Form("\n \t\t\t****** run %d - %s: STORAGE ERROR ****** \n\n",
660                                                         GetCurrentRun(), aDetector->GetName()));
661                         } else
662                         {
663                                 AliInfo(Form("\n \t\t\t****** run %d - %s: DONE ****** \n\n",
664                                                         GetCurrentRun(), aDetector->GetName()));
665                         }
666
667                         if (result > 0)
668                         {
669                                 // Process successful: Update time_processed field in FXS logbooks!
670                                 if (fFXSCalled[kDAQ])
671                                 {
672                                         if (UpdateDAQTable() == kFALSE)
673                                         returnCode = 1;
674                                         fFXSlist[kDAQ].Clear();
675                                 }
676                                 //if(fFXSCalled[kDCS]) {
677                                 //  if (UpdateDCSTable(aDetector->GetName()) == kFALSE)
678                                 //    returnCode = 1;
679                                 //  fFXSlist[kDCS].Clear();
680                                 //}
681                                 //if(fFXSCalled[kHLT]) {
682                                 //  if (UpdateHLTTable(aDetector->GetName()) == kFALSE)
683                                 //    returnCode = 1;
684                                 //      fFXSlist[kHLT].Clear();
685                                 //}
686                         }
687
688                         AliInfo(Form("Client process of %d - %s is exiting now with %d.",
689                                                         GetCurrentRun(), aDetector->GetName(), returnCode));
690
691                         // the client exits here
692                         gSystem->Exit(returnCode);
693
694                         AliError("We should never get here!!!");
695                 }
696         }
697
698         AliInfo(Form("\n\n \t\t\t^*^*^*^*^*^*^*^*^*^*^*^* run %d: FINISH ^*^*^*^*^*^*^*^*^*^*^*^* \n",
699                                                         GetCurrentRun()));
700
701         //check if shuttle is done for this run, if so update logbook
702         TObjArray checkEntryArray;
703         checkEntryArray.SetOwner(1);
704         TString whereClause = Form("where run=%d",GetCurrentRun());
705         if (QueryShuttleLogbook(whereClause.Data(), checkEntryArray)) {
706
707                 AliShuttleLogbookEntry* checkEntry = dynamic_cast<AliShuttleLogbookEntry*>
708                                                         (checkEntryArray.At(0));
709
710                 if (checkEntry)
711                 {
712                         if (checkEntry->IsDone())
713                         {
714                                 Log("SHUTTLE","Process - Shuttle is DONE. Updating logbook");
715                                 UpdateShuttleLogbook("shuttle_done");
716                         }
717                         else
718                         {
719                                 for (UInt_t iDet=0; iDet<NDetectors(); iDet++)
720                                 {
721                                         if (checkEntry->GetDetectorStatus(iDet) == AliShuttleLogbookEntry::kUnprocessed)
722                                         {
723                                                 AliDebug(2, Form("Run %d: setting %s as \"not first time unprocessed\"",
724                                                                 checkEntry->GetRun(), GetDetName(iDet)));
725                                                 fFirstUnprocessed[iDet] = kFALSE;
726                                         }
727                                 }
728                         }
729                 }
730         }
731
732         fLogbookEntry = 0;
733
734         return hasError == kFALSE;
735 }
736
737 //______________________________________________________________________________________________
738 UInt_t AliShuttle::ProcessCurrentDetector()
739 {
740         //
741         // Makes data retrieval just for a specific detector (fCurrentDetector).
742         // Threre should be a configuration for this detector.
743
744         AliInfo(Form("Retrieving values for %s, run %d", fCurrentDetector.Data(), GetCurrentRun()));
745
746         UpdateShuttleStatus(AliShuttleStatus::kDCSStarted);
747
748         TMap dcsMap;
749         dcsMap.SetOwner(1);
750
751         Bool_t aDCSError = kFALSE;
752         fGridError = kFALSE;
753
754         // TODO Test only... I've added a flag that allows to
755         // exclude DCS archive DB query
756         if (!fgkProcessDCS)
757         {
758                 AliInfo("Skipping DCS processing!");
759                 aDCSError = kFALSE;
760         } else {
761                 TString host(fConfig->GetDCSHost(fCurrentDetector));
762                 Int_t port = fConfig->GetDCSPort(fCurrentDetector);
763
764                 // Retrieval of Aliases
765                 TObjString* anAlias = 0;
766                 TIter iterAliases(fConfig->GetDCSAliases(fCurrentDetector));
767                 while ((anAlias = (TObjString*) iterAliases.Next()))
768                 {
769                         TObjArray *valueSet = new TObjArray();
770                         valueSet->SetOwner(1);
771
772                         AliInfo("Querying DCS archive DB (Aliases)...");
773                         aDCSError = (GetValueSet(host, port, anAlias->String(), valueSet, kAlias) == 0);
774
775                         if(!aDCSError)
776                         {
777                                 dcsMap.Add(anAlias->Clone(), valueSet);
778                         } else {
779                                 Log(fCurrentDetector,
780                                         Form("ProcessCurrentDetector - Error while retrieving alias %s",
781                                                 anAlias->GetName()));
782                                 UpdateShuttleStatus(AliShuttleStatus::kDCSError);
783                                 dcsMap.DeleteAll();
784                                 return 0;
785                         }
786                 }
787
788                 // Retrieval of Data Points
789                 TObjString* aDP = 0;
790                 TIter iterDP(fConfig->GetDCSDataPoints(fCurrentDetector));
791                 while ((aDP = (TObjString*) iterDP.Next()))
792                 {
793                         TObjArray *valueSet = new TObjArray();
794                         valueSet->SetOwner(1);
795                         AliInfo("Querying DCS archive DB (Data Points)...");
796                         aDCSError = (GetValueSet(host, port, aDP->String(), valueSet, kDP) == 0);
797
798                         if(!aDCSError)
799                         {
800                                 dcsMap.Add(aDP->Clone(), valueSet);
801                         } else {
802                                 Log(fCurrentDetector,
803                                         Form("ProcessCurrentDetector - Error while retrieving data point %s",
804                                                 aDP->GetName()));
805                                 UpdateShuttleStatus(AliShuttleStatus::kDCSError);
806                                 dcsMap.DeleteAll();
807                                 return 0;
808                         }
809                 }
810         }
811
812         // DCS Archive DB processing successful. Call Preprocessor!
813         UpdateShuttleStatus(AliShuttleStatus::kPPStarted);
814
815         AliPreprocessor* aPreprocessor =
816                 dynamic_cast<AliPreprocessor*> (fPreprocessorMap.GetValue(fCurrentDetector));
817
818         aPreprocessor->Initialize(GetCurrentRun(), GetCurrentStartTime(), GetCurrentEndTime());
819         UInt_t aPPResult = aPreprocessor->Process(&dcsMap);
820
821         UInt_t returnValue = 0;
822         if (aPPResult == 0) { // Preprocessor error
823                 UpdateShuttleStatus(AliShuttleStatus::kPPError);
824                 returnValue = 0;
825         } else if (fGridError == kFALSE) { // process and Grid storage ok!
826                 UpdateShuttleStatus(AliShuttleStatus::kDone);
827                 UpdateShuttleLogbook(fCurrentDetector, "DONE");
828                 Log(fCurrentDetector.Data(),
829                         "ProcessCurrentDetector - Preprocessor and Grid storage ended successfully");
830                 returnValue = 1;
831         } else { // Grid storage error (process ok, but object put in local storage)
832                 UpdateShuttleStatus(AliShuttleStatus::kStoreFailed);
833                 returnValue = 2;
834         }
835
836         dcsMap.DeleteAll();
837
838         return returnValue;
839 }
840
841 //______________________________________________________________________________________________
842 Bool_t AliShuttle::QueryShuttleLogbook(const char* whereClause,
843                 TObjArray& entries)
844 {
845 // Query DAQ's Shuttle logbook and fills detector status object.
846 // Call QueryRunParameters to query DAQ logbook for run parameters.
847
848         entries.SetOwner(1);
849
850         // check connection, in case connect
851         if(!Connect(3)) return kFALSE;
852
853         TString sqlQuery;
854         sqlQuery = Form("select * from logbook_shuttle %s order by run", whereClause);
855
856         TSQLResult* aResult = fServer[3]->Query(sqlQuery);
857         if (!aResult) {
858                 AliError(Form("Can't execute query <%s>!", sqlQuery.Data()));
859                 return kFALSE;
860         }
861
862         AliDebug(2,Form("Query = %s", sqlQuery.Data()));
863
864         if(aResult->GetRowCount() == 0) {
865                 if(sqlQuery.EndsWith("where shuttle_done=0 order by run")){
866                         Log("SHUTTLE", "QueryShuttleLogbook - All runs in Shuttle Logbook are already DONE");
867                         delete aResult;
868                         return kTRUE;
869                 } else {
870                         AliError("No entries in Shuttle Logbook match request");
871                         delete aResult;
872                         return kFALSE;
873                 }
874         }
875
876         // TODO Check field count!
877         const UInt_t nCols = 22;
878         if (aResult->GetFieldCount() != (Int_t) nCols) {
879                 AliError("Invalid SQL result field number!");
880                 delete aResult;
881                 return kFALSE;
882         }
883
884         TSQLRow* aRow;
885         while ((aRow = aResult->Next())) {
886                 TString runString(aRow->GetField(0), aRow->GetFieldLength(0));
887                 Int_t run = runString.Atoi();
888
889                 AliShuttleLogbookEntry *entry = QueryRunParameters(run);
890                 if (!entry)
891                         continue;
892
893                 // loop on detectors
894                 for(UInt_t ii = 0; ii < nCols; ii++)
895                         entry->SetDetectorStatus(aResult->GetFieldName(ii), aRow->GetField(ii));
896
897                 entries.AddLast(entry);
898                 delete aRow;
899         }
900
901         if(sqlQuery.EndsWith("where shuttle_done=0 order by run"))
902                 Log("SHUTTLE", Form("QueryShuttleLogbook - Found %d unprocessed runs in Shuttle Logbook",
903                                                         entries.GetEntriesFast()));
904         delete aResult;
905         return kTRUE;
906 }
907
908 //______________________________________________________________________________________________
909 AliShuttleLogbookEntry* AliShuttle::QueryRunParameters(Int_t run)
910 {
911         //
912         // Retrieve run parameters written in the DAQ logbook and sets them into AliShuttleLogbookEntry object
913         //
914
915         // check connection, in case connect
916         if (!Connect(3))
917                 return 0;
918
919         TString sqlQuery;
920         sqlQuery.Form("select * from %s where run=%d", fConfig->GetDAQlbTable(), run);
921
922         TSQLResult* aResult = fServer[3]->Query(sqlQuery);
923         if (!aResult) {
924                 AliError(Form("Can't execute query <%s>!", sqlQuery.Data()));
925                 return 0;
926         }
927
928         if (aResult->GetRowCount() == 0) {
929                 Log("SHUTTLE", Form("QueryRunParameters - No entry in DAQ Logbook for run %d. Skipping", run));
930                 delete aResult;
931                 return 0;
932         }
933
934         if (aResult->GetRowCount() > 1) {
935                 AliError(Form("More than one entry in DAQ Logbook for run %d. Skipping", run));
936                 delete aResult;
937                 return 0;
938         }
939
940         TSQLRow* aRow = aResult->Next();
941         if (!aRow)
942         {
943                 AliError(Form("Could not retrieve row for run %d. Skipping", run));
944                 delete aResult;
945                 return 0;
946         }
947
948         AliShuttleLogbookEntry* entry = new AliShuttleLogbookEntry(run);
949
950         for (Int_t ii = 0; ii < aResult->GetFieldCount(); ii++)
951                 entry->SetRunParameter(aResult->GetFieldName(ii), aRow->GetField(ii));
952
953         UInt_t startTime = entry->GetStartTime();
954         UInt_t endTime = entry->GetEndTime();
955
956         if (!startTime || !endTime || startTime > endTime) {
957                 Log("SHUTTLE",
958                         Form("QueryRunParameters - Invalid parameters for Run %d: startTime = %d, endTime = %d",
959                                 run, startTime, endTime));
960                 delete entry;
961                 delete aRow;
962                 delete aResult;
963                 return 0;
964         }
965
966         delete aRow;
967         delete aResult;
968
969         return entry;
970 }
971
972 //______________________________________________________________________________________________
973 Bool_t AliShuttle::TryToStoreAgain()
974 {
975   // Called in case the detector failed to store the object in Grid OCDB
976   // It tries to store the object again, if it does not find more recent and overlapping objects
977   // Calls underlying TryToStoreAgain(const char*) function twice, for OCDB and Reference storage.
978
979         AliInfo("Trying to store OCDB data again...");
980         Bool_t resultCDB = TryToStoreAgain(fgkMainCDB);
981
982         AliInfo("Trying to store reference data again...");
983         Bool_t resultRef = TryToStoreAgain(fgkMainRefStorage);
984
985         return resultCDB && resultRef;
986 }
987
988 //______________________________________________________________________________________________
989 Bool_t AliShuttle::TryToStoreAgain(TString& gridURI)
990 {
991   // Called by TryToStoreAgain(), performs actual storage retry
992
993         TObjArray* gridIds=0;
994
995         Bool_t result = kTRUE;
996
997         const char* type = 0;
998         TString backupURI;
999         if(gridURI == fgkMainCDB) {
1000                 type = "OCDB";
1001                 backupURI = fgkLocalCDB;
1002         } else if(gridURI == fgkMainRefStorage) {
1003                 type = "reference";
1004                 backupURI = fgkLocalRefStorage;
1005         } else {
1006                 AliError(Form("Invalid storage URI: %s", gridURI.Data()));
1007                 return kFALSE;
1008         }
1009
1010         AliCDBManager* man = AliCDBManager::Instance();
1011
1012         AliCDBStorage *gridSto = man->GetStorage(gridURI);
1013         if(!gridSto) {
1014                 Log(fCurrentDetector.Data(),
1015                         Form("TryToStoreAgain - cannot activate main %s storage", type));
1016                 return kFALSE;
1017         }
1018
1019         gridIds = gridSto->GetQueryCDBList();
1020
1021         // get objects previously stored in local CDB
1022         AliCDBStorage *backupSto = man->GetStorage(backupURI);
1023         AliCDBPath aPath(GetOfflineDetName(fCurrentDetector.Data()),"*","*");
1024         // Local objects were stored with current run as Grid version!
1025         TList* localEntries = backupSto->GetAll(aPath.GetPath(), GetCurrentRun(), GetCurrentRun());
1026         localEntries->SetOwner(1);
1027
1028         // loop on local stored objects
1029         TIter localIter(localEntries);
1030         AliCDBEntry *aLocEntry = 0;
1031         while((aLocEntry = dynamic_cast<AliCDBEntry*> (localIter.Next()))){
1032                 aLocEntry->SetOwner(1);
1033                 AliCDBId aLocId = aLocEntry->GetId();
1034                 aLocEntry->SetVersion(-1);
1035                 aLocEntry->SetSubVersion(-1);
1036
1037                 // loop on Grid valid Id's
1038                 Bool_t store = kTRUE;
1039                 TIter gridIter(gridIds);
1040                 AliCDBId* aGridId = 0;
1041                 while((aGridId = dynamic_cast<AliCDBId*> (gridIter.Next()))){
1042                         // If local object is valid up to infinity we store it only if it is
1043                         // the first unprocessed run!
1044                         if (aLocId.GetLastRun() == AliCDBRunRange::Infinity())
1045                         {
1046                                 if (!fFirstUnprocessed[GetDetPos(fCurrentDetector)])
1047                                 {
1048                                         Log(fCurrentDetector.Data(),
1049                                                 ("TryToStoreAgain - This object has validity infinite but "
1050                                                  "there are previous unprocessed runs!"));
1051                                         continue;
1052                                 } else {
1053                                         break;
1054                                 }
1055                         }
1056                         if(aGridId->GetPath() != aLocId.GetPath()) continue;
1057                         // skip all objects valid up to infinity
1058                         if(aGridId->GetLastRun() == AliCDBRunRange::Infinity()) continue;
1059                         // if we get here, it means there's already some more recent object stored on Grid!
1060                         store = kFALSE;
1061                         break;
1062                 }
1063
1064                 if(!store){
1065                         Log(fCurrentDetector.Data(),
1066                                 Form("TryToStoreAgain - A more recent object already exists in %s storage: <%s>",
1067                                         type, aGridId->ToString().Data()));
1068                         // removing local filename...
1069                         // TODO maybe it's better not to remove it, it was not copied to the Grid!
1070                         TString filename;
1071                         backupSto->IdToFilename(aLocId, filename);
1072                         AliInfo(Form("Removing local file %s", filename.Data()));
1073                         gSystem->Exec(Form("rm %s",filename.Data()));
1074                         continue;
1075                 }
1076
1077                 // If we get here, the file can be stored!
1078                 Bool_t storeOk = gridSto->Put(aLocEntry);
1079                 if(storeOk){
1080                         Log(fCurrentDetector.Data(),
1081                                 Form("TryToStoreAgain - Object <%s> successfully put into %s storage",
1082                                         aLocId.ToString().Data(), type));
1083
1084                         // removing local filename...
1085                         TString filename;
1086                         backupSto->IdToFilename(aLocId, filename);
1087                         AliInfo(Form("Removing local file %s", filename.Data()));
1088                         gSystem->Exec(Form("rm %s", filename.Data()));
1089                         continue;
1090                 } else  {
1091                         Log(fCurrentDetector.Data(),
1092                                 Form("TryToStoreAgain - Grid %s storage of object <%s> failed again",
1093                                         type, aLocId.ToString().Data()));
1094                         result = kFALSE;
1095                 }
1096         }
1097         localEntries->Clear();
1098
1099         return result;
1100 }
1101
1102 //______________________________________________________________________________________________
1103 Bool_t AliShuttle::GetValueSet(const char* host, Int_t port, const char* entry,
1104                                 TObjArray* valueSet, DCSType type)
1105 {
1106 // Retrieve all "entry" data points from the DCS server
1107 // host, port: TSocket connection parameters
1108 // entry: name of the alias or data point
1109 // valueSet: array of retrieved AliDCSValue's
1110 // type: kAlias or kDP
1111
1112         AliDCSClient client(host, port, fTimeout, fRetries);
1113         if (!client.IsConnected())
1114         {
1115                 return kFALSE;
1116         }
1117
1118         Int_t result=0;
1119
1120         if (type == kAlias)
1121         {
1122                 result = client.GetAliasValues(entry,
1123                         GetCurrentStartTime(), GetCurrentEndTime(), valueSet);
1124         } else
1125         if (type == kDP)
1126         {
1127                 result = client.GetDPValues(entry,
1128                         GetCurrentStartTime(), GetCurrentEndTime(), valueSet);
1129         }
1130
1131         if (result < 0)
1132         {
1133                 Log(fCurrentDetector.Data(), Form("GetValueSet - Can't get '%s'! Reason: %s",
1134                         entry, AliDCSClient::GetErrorString(result)));
1135
1136                 if (result == AliDCSClient::fgkServerError)
1137                 {
1138                         Log(fCurrentDetector.Data(), Form("GetValueSet - Server error: %s",
1139                                 client.GetServerError().Data()));
1140                 }
1141
1142                 return kFALSE;
1143         }
1144
1145         return kTRUE;
1146 }
1147
1148 //______________________________________________________________________________________________
1149 const char* AliShuttle::GetFile(Int_t system, const char* detector,
1150                 const char* id, const char* source)
1151 {
1152 // Get calibration file from file exchange servers
1153 // calls specific getter according to system index (kDAQ, kDCS, kHLT)
1154
1155         switch(system){
1156                 case kDAQ:
1157                         return GetDAQFileName(detector, id, source);
1158                         break;
1159                 case kDCS:
1160                         return GetDCSFileName(detector, id, source);
1161                         break;
1162                 case kHLT:
1163                         return GetHLTFileName(detector, id, source);
1164                         break;
1165                 default:
1166                         AliError(Form("No valid system index: %d",system));
1167         }
1168
1169         return 0;
1170 }
1171
1172 //______________________________________________________________________________________________
1173 TList* AliShuttle::GetFileSources(Int_t system, const char* detector, const char* id)
1174 {
1175 // Get sources producing the condition file Id from file exchange servers
1176 // calls specific getter according to system index (kDAQ, kDCS, kHLT)
1177
1178         switch(system){
1179                 case kDAQ:
1180                         return GetDAQFileSources(detector, id);
1181                         break;
1182                 case kDCS:
1183                         return GetDCSFileSources(detector, id);
1184                         break;
1185                 case kHLT:
1186                         return GetHLTFileSources(detector, id);
1187                         break;
1188                 default:
1189                         AliError(Form("No valid system index: %d",system));
1190         }
1191
1192         return NULL;
1193 }
1194
1195 //______________________________________________________________________________________________
1196 Bool_t AliShuttle::Connect(Int_t system)
1197 {
1198 // Connect to MySQL Server of the system's FXS MySQL databases
1199 // DAQ Logbook, Shuttle Logbook and DAQ FXS db are on the same host
1200
1201         // check connection: if already connected return
1202         if(fServer[system] && fServer[system]->IsConnected()) return kTRUE;
1203
1204         TString dbHost, dbUser, dbPass, dbName;
1205
1206         if (system < 3) // FXS db servers
1207         {
1208                 dbHost = Form("mysql://%s:%d", fConfig->GetFXSdbHost(system), fConfig->GetFXSdbPort(system));
1209                 dbUser = fConfig->GetFXSdbUser(system);
1210                 dbPass = fConfig->GetFXSdbPass(system);
1211                 dbName =   fConfig->GetFXSdbName(system);
1212         } else { // Run & Shuttle logbook servers
1213         // TODO Will the Shuttle logbook server be the same as the Run logbook server ???
1214                 dbHost = Form("mysql://%s:%d", fConfig->GetDAQlbHost(), fConfig->GetDAQlbPort());
1215                 dbUser = fConfig->GetDAQlbUser();
1216                 dbPass = fConfig->GetDAQlbPass();
1217                 dbName =   fConfig->GetDAQlbDB();
1218         }
1219
1220         fServer[system] = TSQLServer::Connect(dbHost.Data(), dbUser.Data(), dbPass.Data());
1221         if (!fServer[system] || !fServer[system]->IsConnected()) {
1222                 if(system < 3)
1223                 {
1224                 AliError(Form("Can't establish connection to FXS database for %s",
1225                                         AliShuttleInterface::GetSystemName(system)));
1226                 } else {
1227                 AliError("Can't establish connection to Run logbook.");
1228                 }
1229                 if(fServer[system]) delete fServer[system];
1230                 return kFALSE;
1231         }
1232
1233         // Get tables
1234         // TODO in the configuration should the table name be there too?
1235         TSQLResult* aResult=0;
1236         switch(system){
1237                 case kDAQ:
1238                         aResult = fServer[kDAQ]->GetTables(dbName.Data());
1239                         break;
1240                 case kDCS:
1241                         //aResult = fServer[kDCS]->GetTables(dbName.Data());
1242                         break;
1243                 case kHLT:
1244                         //aResult = fServer[kHLT]->GetTables(dbName.Data());
1245                         break;
1246                 default:
1247                         aResult = fServer[3]->GetTables(dbName.Data());
1248                         break;
1249         }
1250
1251         delete aResult;
1252         return kTRUE;
1253 }
1254
1255 //______________________________________________________________________________________________
1256 const char* AliShuttle::GetDAQFileName(const char* detector, const char* id, const char* source)
1257 {
1258 // Retrieves a file from the DAQ FXS.
1259 // First queris the DAQ FXS database for the DAQ file name, using the run, detector, id and source info
1260 // then calls RetrieveDAQFile(DAQfilename) for actual copy to local disk
1261 // run: current run being processed (given by Logbook entry fLogbookEntry)
1262 // detector: the Preprocessor name
1263 // id: provided as a parameter by the Preprocessor
1264 // source: provided by the Preprocessor through GetFileSources function
1265
1266         // check connection, in case connect
1267         if (!Connect(kDAQ))
1268         {
1269                 Log(detector, "GetDAQFileName - Couldn't connect to DAQ FXS database");
1270                 return 0;
1271         }
1272
1273         // Query preparation
1274         TString sqlQueryStart = Form("select filePath from %s where", fConfig->GetFXSdbTable(kDAQ));
1275         TString whereClause = Form("run=%d and detector=\"%s\" and fileId=\"%s\" and DAQsource=\"%s\"",
1276                                 GetCurrentRun(), detector, id, source);
1277         TString sqlQuery = Form("%s %s", sqlQueryStart.Data(), whereClause.Data());
1278
1279         AliDebug(2, Form("SQL query: \n%s",sqlQuery.Data()));
1280
1281         // Query execution
1282         TSQLResult* aResult = 0;
1283         aResult = dynamic_cast<TSQLResult*> (fServer[kDAQ]->Query(sqlQuery));
1284         if (!aResult) {
1285                 Log(detector, Form("GetDAQFileName - Can't execute SQL query for: id = %s, source = %s",
1286                                 id, source));
1287                 return 0;
1288         }
1289
1290         if(aResult->GetRowCount() == 0)
1291         {
1292                 Log(detector,
1293                         Form("GetDAQFileName - No entry in FXS table for: id = %s, source = %s",
1294                                 id, source));
1295                 delete aResult;
1296                 return 0;
1297         }
1298
1299         if (aResult->GetRowCount() > 1) {
1300                 Log(detector,
1301                         Form("GetDAQFileName - More than one entry in FXS table for: id = %s, source = %s",
1302                                 id, source));
1303                 delete aResult;
1304                 return 0;
1305         }
1306
1307         TSQLRow* aRow = dynamic_cast<TSQLRow*> (aResult->Next());
1308
1309         if (!aRow){
1310                 Log(detector, Form("GetDAQFileName - Empty set result from query: id = %s, source = %s",
1311                                 id, source));
1312                 delete aResult;
1313                 return 0;
1314         }
1315
1316         TString filePath(aRow->GetField(0), aRow->GetFieldLength(0));
1317
1318         delete aResult;
1319         delete aRow;
1320
1321         AliDebug(2, Form("filePath = %s",filePath.Data()));
1322
1323         // retrieved file is renamed to make it unique
1324         TString localFileName = Form("%s_%d_%s_%s.shuttle",
1325                                         detector, GetCurrentRun(), id, source);
1326
1327         // file retrieval from DAQ FXS
1328         Bool_t result = RetrieveDAQFile(filePath.Data(), localFileName.Data());
1329         if(!result) {
1330                 Log(detector, Form("GetDAQFileName - Copy of file %s from DAQ FXS failed", filePath.Data()));
1331                 return 0;
1332         } else {
1333                 AliInfo(Form("File %s copied from DAQ FXS into %s/%s",
1334                         filePath.Data(), fgkShuttleTempDir, localFileName.Data()));
1335         }
1336
1337
1338         fFXSCalled[kDAQ]=kTRUE;
1339         TObjString *fileParams = new TObjString(Form("%s_!?!_%s", id, source));
1340         fFXSlist[kDAQ].Add(fileParams);
1341
1342         return localFileName.Data();
1343
1344 }
1345
1346 //______________________________________________________________________________________________
1347 Bool_t AliShuttle::RetrieveDAQFile(const char* daqFileName, const char* localFileName)
1348 {
1349
1350         // check temp directory: trying to cd to temp; if it does not exist, create it
1351         AliDebug(2, Form("Copy file %s from DAQ FXS into folder %s and rename it as %s",
1352                         daqFileName,fgkShuttleTempDir, localFileName));
1353
1354         void* dir = gSystem->OpenDirectory(fgkShuttleTempDir);
1355         if (dir == NULL) {
1356                 if (gSystem->mkdir(fgkShuttleTempDir, kTRUE)) {
1357                         AliError(Form("Can't open directory <%s>", fgkShuttleTempDir));
1358                         return kFALSE;
1359                 }
1360
1361         } else {
1362                 gSystem->FreeDirectory(dir);
1363         }
1364
1365         TString baseDAQFXSFolder = "DAQ";
1366         TString command = Form("scp -oPort=%d -2 %s@%s:%s/%s %s/%s",
1367                 fConfig->GetFXSPort(kDAQ),
1368                 fConfig->GetFXSUser(kDAQ),
1369                 fConfig->GetFXSHost(kDAQ),
1370                 baseDAQFXSFolder.Data(),
1371                 daqFileName,
1372                 fgkShuttleTempDir,
1373                 localFileName);
1374
1375         AliDebug(2, Form("%s",command.Data()));
1376
1377         UInt_t nRetries = 0;
1378         UInt_t maxRetries = 3;
1379
1380         // copy!! if successful TSystem::Exec returns 0
1381         while(nRetries++ < maxRetries) {
1382                 AliDebug(2, Form("Trying to copy file. Retry # %d", nRetries));
1383                 if(gSystem->Exec(command.Data()) == 0) return kTRUE;
1384         }
1385
1386         return kFALSE;
1387
1388 }
1389
1390 //______________________________________________________________________________________________
1391 TList* AliShuttle::GetDAQFileSources(const char* detector, const char* id)
1392 {
1393 // Retrieves a file from the DCS FXS.
1394
1395         // check connection, in case connect
1396         if(!Connect(kDAQ)){
1397                 Log(detector, "GetDAQFileSources - Couldn't connect to DAQ FXS database");
1398                 return 0;
1399         }
1400
1401         // Query preparation
1402         TString sqlQueryStart = Form("select DAQsource from %s where", fConfig->GetFXSdbTable(kDAQ));
1403         TString whereClause = Form("run=%d and detector=\"%s\" and fileId=\"%s\"",
1404                                 GetCurrentRun(), detector, id);
1405         TString sqlQuery = Form("%s %s", sqlQueryStart.Data(), whereClause.Data());
1406
1407         AliDebug(2, Form("SQL query: \n%s",sqlQuery.Data()));
1408
1409         // Query execution
1410         TSQLResult* aResult;
1411         aResult = fServer[kDAQ]->Query(sqlQuery);
1412         if (!aResult) {
1413                 Log(detector, Form("GetDAQFileSources - Can't execute SQL query for id: %s", id));
1414                 return 0;
1415         }
1416
1417         if (aResult->GetRowCount() == 0) {
1418                 Log(detector,
1419                         Form("GetDAQFileSources - No entry in FXS table for id: %s", id));
1420                 delete aResult;
1421                 return 0;
1422         }
1423
1424         TSQLRow* aRow;
1425         TList *list = new TList();
1426         list->SetOwner(1);
1427
1428         while((aRow = aResult->Next())){
1429
1430                 TString daqSource(aRow->GetField(0), aRow->GetFieldLength(0));
1431                 AliDebug(2, Form("daqSource = %s", daqSource.Data()));
1432                 list->Add(new TObjString(daqSource));
1433                 delete aRow;
1434         }
1435         delete aResult;
1436
1437         return list;
1438
1439 }
1440
1441 //______________________________________________________________________________________________
1442 const char* AliShuttle::GetDCSFileName(const char* /*detector*/, const char* /*id*/, const char* /*source*/){
1443 // Retrieves a file from the DCS FXS.
1444
1445 return "You're in DCS";
1446
1447 }
1448
1449 //______________________________________________________________________________________________
1450 TList* AliShuttle::GetDCSFileSources(const char* /*detector*/, const char* /*id*/){
1451 // Retrieves file sources from the DCS FXS.
1452
1453 return NULL;
1454
1455 }
1456
1457 //______________________________________________________________________________________________
1458 const char* AliShuttle::GetHLTFileName(const char* /*detector*/, const char* /*id*/, const char* /*source*/){
1459 // Retrieves a file from the HLT FXS.
1460
1461 return "You're in HLT";
1462
1463 }
1464
1465 //______________________________________________________________________________________________
1466 TList* AliShuttle::GetHLTFileSources(const char* /*detector*/, const char* /*id*/){
1467 // Retrieves file sources from the HLT FXS.
1468
1469 return NULL;
1470
1471 }
1472
1473 //______________________________________________________________________________________________
1474 Bool_t AliShuttle::UpdateDAQTable()
1475 {
1476 // Update DAQ table filling time_processed field in all rows corresponding to current run and detector
1477
1478         // check connection, in case connect
1479         if(!Connect(kDAQ)){
1480                 Log(fCurrentDetector, "UpdateDAQTable - Couldn't connect to DAQ FXS database");
1481                 return kFALSE;
1482         }
1483
1484         TTimeStamp now; // now
1485
1486         // Loop on FXS list entries
1487         TIter iter(&fFXSlist[kDAQ]);
1488         TObjString *aFXSentry=0;
1489         while((aFXSentry = dynamic_cast<TObjString*> (iter.Next()))){
1490                 TString aFXSentrystr = aFXSentry->String();
1491                 TObjArray *aFXSarray = aFXSentrystr.Tokenize("_!?!_");
1492                 if(!aFXSarray || aFXSarray->GetEntries() != 2 ) {
1493                         Log(fCurrentDetector, Form("UpdateDAQTable - error updating FXS entry. Check string: <%s>",
1494                                 aFXSentrystr.Data()));
1495                         if(aFXSarray) delete aFXSarray;
1496                         return kFALSE;
1497                 }
1498                 const char* fileId = ((TObjString*) aFXSarray->At(0))->GetName();
1499                 const char* daqSource = ((TObjString*) aFXSarray->At(1))->GetName();
1500                 TString whereClause = Form("where run=%d and detector=\"%s\" and fileId=\"%s\" and DAQsource=\"%s\";",
1501                         GetCurrentRun(), fCurrentDetector.Data(), fileId, daqSource);
1502
1503                 delete aFXSarray;
1504
1505                 TString sqlQuery = Form("update %s set time_processed=%d %s", fConfig->GetFXSdbTable(kDAQ),
1506                                                         now.GetSec(), whereClause.Data());
1507
1508                 AliDebug(2, Form("SQL query: \n%s",sqlQuery.Data()));
1509
1510                 // Query execution
1511                 TSQLResult* aResult;
1512                 aResult = dynamic_cast<TSQLResult*> (fServer[3]->Query(sqlQuery));
1513                 if (!aResult) {
1514                         Log(fCurrentDetector, Form("UpdateDAQTable - Can't execute SQL query <%s>", sqlQuery.Data()));
1515                         return kFALSE;
1516                 }
1517                 delete aResult;
1518         }
1519
1520         return kTRUE;
1521 }
1522
1523
1524 //______________________________________________________________________________________________
1525 Bool_t AliShuttle::UpdateShuttleLogbook(const char* detector, const char* status)
1526 {
1527 // Update Shuttle logbook filling detector or shuttle_done column
1528 // ex. of usage: UpdateShuttleLogbook("PHOS", "DONE") or UpdateShuttleLogbook("shuttle_done")
1529
1530         // check connection, in case connect
1531         if(!Connect(3)){
1532                 Log("SHUTTLE", "UpdateShuttleLogbook - Couldn't connect to DAQ Logbook.");
1533                 return kFALSE;
1534         }
1535
1536         TString detName(detector);
1537         TString setClause;
1538         if(detName == "shuttle_done") {
1539                 setClause = "set shuttle_done=1";
1540         } else {
1541                 TString statusStr(status);
1542                 if(statusStr.Contains("done", TString::kIgnoreCase) ||
1543                    statusStr.Contains("failed", TString::kIgnoreCase)){
1544                         setClause = Form("set %s=\"%s\"", detector, status);
1545                 } else {
1546                         Log("SHUTTLE",
1547                                 Form("UpdateShuttleLogbook - Invalid status <%s> for detector %s",
1548                                         status, detector));
1549                         return kFALSE;
1550                 }
1551         }
1552
1553         TString whereClause = Form("where run=%d", GetCurrentRun());
1554
1555         TString sqlQuery = Form("update logbook_shuttle %s %s",
1556                                         setClause.Data(), whereClause.Data());
1557
1558         AliDebug(2, Form("SQL query: \n%s",sqlQuery.Data()));
1559
1560         // Query execution
1561         TSQLResult* aResult;
1562         aResult = dynamic_cast<TSQLResult*> (fServer[3]->Query(sqlQuery));
1563         if (!aResult) {
1564                 Log("SHUTTLE", Form("UpdateShuttleLogbook - Can't execute query <%s>", sqlQuery.Data()));
1565                 return kFALSE;
1566         }
1567         delete aResult;
1568
1569         return kTRUE;
1570 }
1571
1572 //______________________________________________________________________________________________
1573 Int_t AliShuttle::GetCurrentRun() const
1574 {
1575 // Get current run from logbook entry
1576
1577         return fLogbookEntry ? fLogbookEntry->GetRun() : -1;
1578 }
1579
1580 //______________________________________________________________________________________________
1581 UInt_t AliShuttle::GetCurrentStartTime() const
1582 {
1583 // get current start time
1584
1585         return fLogbookEntry ? fLogbookEntry->GetStartTime() : 0;
1586 }
1587
1588 //______________________________________________________________________________________________
1589 UInt_t AliShuttle::GetCurrentEndTime() const
1590 {
1591 // get current end time from logbook entry
1592
1593         return fLogbookEntry ? fLogbookEntry->GetEndTime() : 0;
1594 }
1595
1596 //______________________________________________________________________________________________
1597 void AliShuttle::Log(const char* detector, const char* message)
1598 {
1599 // Fill log string with a message
1600
1601         void* dir = gSystem->OpenDirectory(fgkShuttleLogDir);
1602         if (dir == NULL) {
1603                 if (gSystem->mkdir(fgkShuttleLogDir, kTRUE)) {
1604                         AliError(Form("Can't open directory <%s>", fgkShuttleTempDir));
1605                         return;
1606                 }
1607
1608         } else {
1609                 gSystem->FreeDirectory(dir);
1610         }
1611
1612         TString toLog = Form("%s (%d): %s - ", TTimeStamp(time(0)).AsString("s"), getpid(), detector);
1613         if(GetCurrentRun()>=0 ) toLog += Form("run %d - ", GetCurrentRun());
1614         toLog += Form("%s", message);
1615
1616         AliInfo(toLog.Data());
1617
1618         TString fileName;
1619         fileName.Form("%s/%s.log", fgkShuttleLogDir, detector);
1620         gSystem->ExpandPathName(fileName);
1621
1622         ofstream logFile;
1623         logFile.open(fileName, ofstream::out | ofstream::app);
1624
1625         if (!logFile.is_open()) {
1626                 AliError(Form("Could not open file %s", fileName.Data()));
1627                 return;
1628         }
1629
1630         logFile << toLog.Data() << "\n";
1631
1632         logFile.close();
1633 }
1634
1635 //______________________________________________________________________________________________
1636 Bool_t AliShuttle::Collect(Int_t run)
1637 {
1638 //
1639 // Collects conditions data for all UNPROCESSED run written to DAQ LogBook in case of run = -1 (default)
1640 // If a dedicated run is given this run is processed
1641 //
1642 // In operational mode, this is the Shuttle function triggered by the EOR signal.
1643 //
1644
1645         if (run == -1)
1646                 Log("SHUTTLE","Collect - Shuttle called. Collecting conditions data for unprocessed runs");
1647         else
1648                 Log("SHUTTLE", Form("Collect - Shuttle called. Collecting conditions data for run %d", run));
1649
1650         SetLastAction("Starting");
1651
1652         TString whereClause("where shuttle_done=0");
1653         if (run != -1)
1654                 whereClause += Form(" and run=%d", run);
1655
1656         TObjArray shuttleLogbookEntries;
1657         if (!QueryShuttleLogbook(whereClause, shuttleLogbookEntries))
1658         {
1659                 Log("SHUTTLE", "Collect - Can't retrieve entries from Shuttle logbook");
1660                 return kFALSE;
1661         }
1662
1663         for (UInt_t iDet=0; iDet<NDetectors(); iDet++)
1664                 fFirstUnprocessed[iDet] = kTRUE;
1665
1666         if (run != -1)
1667         {
1668                 // query Shuttle logbook for earlier runs, check if some detectors are unprocessed,
1669                 // flag them into fFirstUnprocessed array
1670                 TString whereClause(Form("where shuttle_done=0 and run < %d", run));
1671                 TObjArray tmpLogbookEntries;
1672                 if (!QueryShuttleLogbook(whereClause, tmpLogbookEntries))
1673                 {
1674                         Log("SHUTTLE", "Collect - Can't retrieve entries from Shuttle logbook");
1675                         return kFALSE;
1676                 }
1677
1678                 TIter iter(&tmpLogbookEntries);
1679                 AliShuttleLogbookEntry* anEntry = 0;
1680                 while ((anEntry = dynamic_cast<AliShuttleLogbookEntry*> (iter.Next())))
1681                 {
1682                         for (UInt_t iDet=0; iDet<NDetectors(); iDet++)
1683                         {
1684                                 if (anEntry->GetDetectorStatus(iDet) == AliShuttleLogbookEntry::kUnprocessed)
1685                                 {
1686                                         AliDebug(2, Form("Run %d: setting %s as \"not first time unprocessed\"",
1687                                                         anEntry->GetRun(), GetDetName(iDet)));
1688                                         fFirstUnprocessed[iDet] = kFALSE;
1689                                 }
1690                         }
1691
1692                 }
1693
1694         }
1695
1696         if (!RetrieveConditionsData(shuttleLogbookEntries))
1697         {
1698                 Log("SHUTTLE", "Collect - Process of at least one run failed");
1699                 return kFALSE;
1700         }
1701
1702         return kTRUE;
1703 }
1704
1705 //______________________________________________________________________________________________
1706 Bool_t AliShuttle::RetrieveConditionsData(const TObjArray& dateEntries)
1707 {
1708 // Retrieve conditions data for all runs that aren't processed yet
1709
1710         Bool_t hasError = kFALSE;
1711
1712         TIter iter(&dateEntries);
1713         AliShuttleLogbookEntry* anEntry;
1714
1715         while ((anEntry = (AliShuttleLogbookEntry*) iter.Next())){
1716                 if (!Process(anEntry)){
1717                         hasError = kTRUE;
1718                 }
1719         }
1720
1721         return hasError == kFALSE;
1722 }
1723
1724 //______________________________________________________________________________________________
1725 ULong_t AliShuttle::GetTimeOfLastAction() const
1726 {
1727         ULong_t tmp;
1728         
1729         fMonitoringMutex->Lock();
1730
1731         tmp = fLastActionTime;
1732         
1733         fMonitoringMutex->UnLock();
1734         
1735         return tmp;
1736 }
1737
1738 //______________________________________________________________________________________________
1739 const TString AliShuttle::GetLastAction() const
1740 {
1741         // returns a string description of the last action
1742
1743         TString tmp;
1744         
1745         fMonitoringMutex->Lock();
1746         
1747         tmp = fLastAction;
1748         
1749         fMonitoringMutex->UnLock();
1750
1751         return tmp;     
1752 }
1753
1754 //______________________________________________________________________________________________
1755 void AliShuttle::SetLastAction(const char* action)
1756 {
1757         // updates the monitoring variables
1758         
1759         fMonitoringMutex->Lock();
1760         
1761         fLastAction = action;
1762         fLastActionTime = time(0);
1763         
1764         fMonitoringMutex->UnLock();
1765 }
1766
1767 //______________________________________________________________________________________________
1768 const char* AliShuttle::GetRunParameter(const char* param)
1769 {
1770 // returns run parameter read from DAQ logbook
1771
1772         if(!fLogbookEntry) {
1773                 AliError("No logbook entry!");
1774                 return 0;
1775         }
1776
1777         return fLogbookEntry->GetRunParameter(param);
1778 }