1 /**************************************************************************
2 * Copyright(c) 1998-2007, ALICE Experiment at CERN, All rights reserved. *
4 * Author: The ALICE Off-line Project. *
5 * Contributors are mentioned in the code where appropriate. *
7 * Permission to use, copy, modify and distribute this software and its *
8 * documentation strictly for non-commercial purposes is hereby granted *
9 * without fee, provided that the above copyright notice appears in all *
10 * copies and that both the copyright notice and this permission notice *
11 * appear in the supporting documentation. The authors make no claims *
12 * about the suitability of this software for any purpose. It is *
13 * provided "as is" without express or implied warranty. *
14 **************************************************************************/
16 // Author: Mihaela Gheata, 01/09/2008
18 //==============================================================================
19 // AliAnalysisAlien - AliEn utility class. Provides interface for creating
20 // a personalized JDL, finding and creating a dataset.
21 //==============================================================================
23 #include "AliAnalysisAlien.h"
25 #include "Riostream.h"
33 #include "TFileCollection.h"
35 #include "TObjString.h"
36 #include "TObjArray.h"
39 #include "TGridResult.h"
40 #include "TGridCollection.h"
42 #include "TGridJobStatusList.h"
43 #include "TGridJobStatus.h"
44 #include "TFileMerger.h"
45 #include "AliAnalysisManager.h"
46 #include "AliAnalysisTaskCfg.h"
47 #include "AliVEventHandler.h"
48 #include "AliAnalysisDataContainer.h"
49 #include "AliMultiInputEventHandler.h"
51 ClassImp(AliAnalysisAlien)
57 Bool_t copyLocal2Alien(const char* where, const char* loc, const char* rem)
59 TString sl(Form("file:%s", loc));
60 TString sr(Form("alien://%s", rem));
61 Bool_t ret = TFile::Cp(sl, sr);
63 Warning(where, "Failed to copy %s to %s", sl.Data(), sr.Data());
69 //______________________________________________________________________________
70 AliAnalysisAlien::AliAnalysisAlien()
76 fSplitMaxInputFileNumber(0),
78 fMasterResubmitThreshold(0),
91 fNproofWorkersPerSlave(0),
101 fAdditionalRootLibs(),
129 fRootVersionForProof(),
141 //______________________________________________________________________________
142 AliAnalysisAlien::AliAnalysisAlien(const char *name)
143 :AliAnalysisGrid(name),
148 fSplitMaxInputFileNumber(0),
150 fMasterResubmitThreshold(0),
163 fNproofWorkersPerSlave(0),
167 fExecutableCommand(),
173 fAdditionalRootLibs(),
201 fRootVersionForProof(),
213 //______________________________________________________________________________
214 AliAnalysisAlien::AliAnalysisAlien(const AliAnalysisAlien& other)
215 :AliAnalysisGrid(other),
218 fPrice(other.fPrice),
220 fSplitMaxInputFileNumber(other.fSplitMaxInputFileNumber),
221 fMaxInitFailed(other.fMaxInitFailed),
222 fMasterResubmitThreshold(other.fMasterResubmitThreshold),
223 fNtestFiles(other.fNtestFiles),
224 fNrunsPerMaster(other.fNrunsPerMaster),
225 fMaxMergeFiles(other.fMaxMergeFiles),
226 fMaxMergeStages(other.fMaxMergeStages),
227 fNsubmitted(other.fNsubmitted),
228 fProductionMode(other.fProductionMode),
229 fOutputToRunNo(other.fOutputToRunNo),
230 fMergeViaJDL(other.fMergeViaJDL),
231 fFastReadOption(other.fFastReadOption),
232 fOverwriteMode(other.fOverwriteMode),
233 fNreplicas(other.fNreplicas),
234 fNproofWorkers(other.fNproofWorkers),
235 fNproofWorkersPerSlave(other.fNproofWorkersPerSlave),
236 fProofReset(other.fProofReset),
237 fRunNumbers(other.fRunNumbers),
238 fExecutable(other.fExecutable),
239 fExecutableCommand(other.fExecutableCommand),
240 fArguments(other.fArguments),
241 fExecutableArgs(other.fExecutableArgs),
242 fAnalysisMacro(other.fAnalysisMacro),
243 fAnalysisSource(other.fAnalysisSource),
244 fValidationScript(other.fValidationScript),
245 fAdditionalRootLibs(other.fAdditionalRootLibs),
246 fAdditionalLibs(other.fAdditionalLibs),
247 fSplitMode(other.fSplitMode),
248 fAPIVersion(other.fAPIVersion),
249 fROOTVersion(other.fROOTVersion),
250 fAliROOTVersion(other.fAliROOTVersion),
251 fExternalPackages(other.fExternalPackages),
253 fGridWorkingDir(other.fGridWorkingDir),
254 fGridDataDir(other.fGridDataDir),
255 fDataPattern(other.fDataPattern),
256 fGridOutputDir(other.fGridOutputDir),
257 fOutputArchive(other.fOutputArchive),
258 fOutputFiles(other.fOutputFiles),
259 fInputFormat(other.fInputFormat),
260 fDatasetName(other.fDatasetName),
261 fJDLName(other.fJDLName),
262 fTerminateFiles(other.fTerminateFiles),
263 fMergeExcludes(other.fMergeExcludes),
264 fIncludePath(other.fIncludePath),
265 fCloseSE(other.fCloseSE),
266 fFriendChainName(other.fFriendChainName),
267 fJobTag(other.fJobTag),
268 fOutputSingle(other.fOutputSingle),
269 fRunPrefix(other.fRunPrefix),
270 fProofCluster(other.fProofCluster),
271 fProofDataSet(other.fProofDataSet),
272 fFileForTestMode(other.fFileForTestMode),
273 fRootVersionForProof(other.fRootVersionForProof),
274 fAliRootMode(other.fAliRootMode),
275 fMergeDirName(other.fMergeDirName),
282 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
283 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
284 fRunRange[0] = other.fRunRange[0];
285 fRunRange[1] = other.fRunRange[1];
286 if (other.fInputFiles) {
287 fInputFiles = new TObjArray();
288 TIter next(other.fInputFiles);
290 while ((obj=next())) fInputFiles->Add(new TObjString(obj->GetName()));
291 fInputFiles->SetOwner();
293 if (other.fPackages) {
294 fPackages = new TObjArray();
295 TIter next(other.fPackages);
297 while ((obj=next())) fPackages->Add(new TObjString(obj->GetName()));
298 fPackages->SetOwner();
300 if (other.fModules) {
301 fModules = new TObjArray();
302 fModules->SetOwner();
303 TIter next(other.fModules);
304 AliAnalysisTaskCfg *mod, *crt;
305 while ((crt=(AliAnalysisTaskCfg*)next())) {
306 mod = new AliAnalysisTaskCfg(*crt);
312 //______________________________________________________________________________
313 AliAnalysisAlien::~AliAnalysisAlien()
321 fProofParam.DeleteAll();
324 //______________________________________________________________________________
325 AliAnalysisAlien &AliAnalysisAlien::operator=(const AliAnalysisAlien& other)
328 if (this != &other) {
329 AliAnalysisGrid::operator=(other);
330 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
331 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
332 fPrice = other.fPrice;
334 fSplitMaxInputFileNumber = other.fSplitMaxInputFileNumber;
335 fMaxInitFailed = other.fMaxInitFailed;
336 fMasterResubmitThreshold = other.fMasterResubmitThreshold;
337 fNtestFiles = other.fNtestFiles;
338 fNrunsPerMaster = other.fNrunsPerMaster;
339 fMaxMergeFiles = other.fMaxMergeFiles;
340 fMaxMergeStages = other.fMaxMergeStages;
341 fNsubmitted = other.fNsubmitted;
342 fProductionMode = other.fProductionMode;
343 fOutputToRunNo = other.fOutputToRunNo;
344 fMergeViaJDL = other.fMergeViaJDL;
345 fFastReadOption = other.fFastReadOption;
346 fOverwriteMode = other.fOverwriteMode;
347 fNreplicas = other.fNreplicas;
348 fNproofWorkers = other.fNproofWorkers;
349 fNproofWorkersPerSlave = other.fNproofWorkersPerSlave;
350 fProofReset = other.fProofReset;
351 fRunNumbers = other.fRunNumbers;
352 fExecutable = other.fExecutable;
353 fExecutableCommand = other.fExecutableCommand;
354 fArguments = other.fArguments;
355 fExecutableArgs = other.fExecutableArgs;
356 fAnalysisMacro = other.fAnalysisMacro;
357 fAnalysisSource = other.fAnalysisSource;
358 fValidationScript = other.fValidationScript;
359 fAdditionalRootLibs = other.fAdditionalRootLibs;
360 fAdditionalLibs = other.fAdditionalLibs;
361 fSplitMode = other.fSplitMode;
362 fAPIVersion = other.fAPIVersion;
363 fROOTVersion = other.fROOTVersion;
364 fAliROOTVersion = other.fAliROOTVersion;
365 fExternalPackages = other.fExternalPackages;
367 fGridWorkingDir = other.fGridWorkingDir;
368 fGridDataDir = other.fGridDataDir;
369 fDataPattern = other.fDataPattern;
370 fGridOutputDir = other.fGridOutputDir;
371 fOutputArchive = other.fOutputArchive;
372 fOutputFiles = other.fOutputFiles;
373 fInputFormat = other.fInputFormat;
374 fDatasetName = other.fDatasetName;
375 fJDLName = other.fJDLName;
376 fTerminateFiles = other.fTerminateFiles;
377 fMergeExcludes = other.fMergeExcludes;
378 fIncludePath = other.fIncludePath;
379 fCloseSE = other.fCloseSE;
380 fFriendChainName = other.fFriendChainName;
381 fJobTag = other.fJobTag;
382 fOutputSingle = other.fOutputSingle;
383 fRunPrefix = other.fRunPrefix;
384 fProofCluster = other.fProofCluster;
385 fProofDataSet = other.fProofDataSet;
386 fFileForTestMode = other.fFileForTestMode;
387 fRootVersionForProof = other.fRootVersionForProof;
388 fAliRootMode = other.fAliRootMode;
389 fMergeDirName = other.fMergeDirName;
390 if (other.fInputFiles) {
391 fInputFiles = new TObjArray();
392 TIter next(other.fInputFiles);
394 while ((obj=next())) fInputFiles->Add(new TObjString(obj->GetName()));
395 fInputFiles->SetOwner();
397 if (other.fPackages) {
398 fPackages = new TObjArray();
399 TIter next(other.fPackages);
401 while ((obj=next())) fPackages->Add(new TObjString(obj->GetName()));
402 fPackages->SetOwner();
404 if (other.fModules) {
405 fModules = new TObjArray();
406 fModules->SetOwner();
407 TIter next(other.fModules);
408 AliAnalysisTaskCfg *mod, *crt;
409 while ((crt=(AliAnalysisTaskCfg*)next())) {
410 mod = new AliAnalysisTaskCfg(*crt);
418 //______________________________________________________________________________
419 void AliAnalysisAlien::AddModule(AliAnalysisTaskCfg *module)
421 // Adding a module. Checks if already existing. Becomes owned by this.
423 if (GetModule(module->GetName())) {
424 Error("AddModule", "A module having the same name %s already added", module->GetName());
428 fModules = new TObjArray();
429 fModules->SetOwner();
431 fModules->Add(module);
434 //______________________________________________________________________________
435 void AliAnalysisAlien::AddModules(TObjArray *list)
437 // Adding a list of modules. Checks if already existing. Becomes owned by this.
439 AliAnalysisTaskCfg *module;
440 while ((module = (AliAnalysisTaskCfg*)next())) AddModule(module);
443 //______________________________________________________________________________
444 Bool_t AliAnalysisAlien::CheckDependencies()
446 // Check if all dependencies are satisfied. Reorder modules if needed.
447 Int_t nmodules = GetNmodules();
449 Warning("CheckDependencies", "No modules added yet to check their dependencies");
452 AliAnalysisTaskCfg *mod = 0;
453 AliAnalysisTaskCfg *dep = 0;
456 for (i=0; i<nmodules; i++) {
457 mod = (AliAnalysisTaskCfg*) fModules->At(i);
458 Int_t ndeps = mod->GetNdeps();
460 for (j=0; j<ndeps; j++) {
461 depname = mod->GetDependency(j);
462 dep = GetModule(depname);
464 Error("CheckDependencies","Dependency %s not added for module %s",
465 depname.Data(), mod->GetName());
468 if (dep->NeedsDependency(mod->GetName())) {
469 Error("CheckDependencies","Modules %s and %s circularly depend on each other",
470 mod->GetName(), dep->GetName());
473 Int_t idep = fModules->IndexOf(dep);
474 // The dependency task must come first
476 // Remove at idep and move all objects below up one slot
477 // down to index i included.
478 fModules->RemoveAt(idep);
479 for (k=idep-1; k>=i; k--) fModules->AddAt(fModules->RemoveAt(k),k+1);
480 fModules->AddAt(dep, i++);
482 //Redo from istart if dependencies were inserted
483 if (i>istart) i=istart-1;
489 //______________________________________________________________________________
490 AliAnalysisManager *AliAnalysisAlien::CreateAnalysisManager(const char *name, const char *filename)
492 // Create the analysis manager and optionally execute the macro in filename.
493 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
495 mgr = new AliAnalysisManager(name);
496 mgr->SetGridHandler((AliAnalysisGrid*)this);
497 if (strlen(filename)) {
498 TString line = gSystem->ExpandPathName(filename);
500 gROOT->ProcessLine(line.Data());
505 //______________________________________________________________________________
506 Int_t AliAnalysisAlien::GetNmodules() const
508 // Get number of modules.
509 if (!fModules) return 0;
510 return fModules->GetEntries();
513 //______________________________________________________________________________
514 AliAnalysisTaskCfg *AliAnalysisAlien::GetModule(const char *name)
516 // Get a module by name.
517 if (!fModules) return 0;
518 return (AliAnalysisTaskCfg*)fModules->FindObject(name);
521 //______________________________________________________________________________
522 Bool_t AliAnalysisAlien::LoadModule(AliAnalysisTaskCfg *mod)
524 // Load a given module.
525 if (mod->IsLoaded()) return kTRUE;
526 Int_t ndeps = mod->GetNdeps();
528 for (Int_t j=0; j<ndeps; j++) {
529 depname = mod->GetDependency(j);
530 AliAnalysisTaskCfg *dep = GetModule(depname);
532 Error("LoadModule","Dependency %s not existing for module %s",
533 depname.Data(), mod->GetName());
536 if (!LoadModule(dep)) {
537 Error("LoadModule","Dependency %s for module %s could not be loaded",
538 depname.Data(), mod->GetName());
542 // Load libraries for the module
543 if (!mod->CheckLoadLibraries()) {
544 Error("LoadModule", "Cannot load all libraries for module %s", mod->GetName());
548 if (mod->ExecuteMacro()<0) {
549 Error("LoadModule", "Executing the macro %s with arguments: %s for module %s returned a negative value",
550 mod->GetMacroName(), mod->GetMacroArgs(), mod->GetName());
553 // Configure dependencies
554 if (mod->GetConfigMacro() && mod->ExecuteConfigMacro()<0) {
555 Error("LoadModule", "There was an error executing the deps config macro %s for module %s",
556 mod->GetConfigMacro()->GetTitle(), mod->GetName());
559 // Adjust extra libraries
560 Int_t nlibs = mod->GetNlibs();
562 for (Int_t i=0; i<nlibs; i++) {
563 lib = mod->GetLibrary(i);
564 if (fAdditionalLibs.Contains(lib)) continue;
565 lib = Form("lib%s.so", lib.Data());
566 if (!fAdditionalLibs.IsNull()) fAdditionalLibs += " ";
567 fAdditionalLibs += lib;
572 //______________________________________________________________________________
573 Bool_t AliAnalysisAlien::GenerateTest(const char *name, const char *modname)
575 // Generate test macros for a single module or for the full train.
576 fAdditionalLibs = "";
577 if (strlen(modname)) {
578 if (!CheckDependencies()) return kFALSE;
579 AliAnalysisTaskCfg *mod = GetModule(modname);
581 Error("GenerateTest", "cannot generate test for inexistent module %s", modname);
584 if (!LoadModule(mod)) return kFALSE;
585 } else if (!LoadModules()) return kFALSE;
586 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
587 if (!mgr->InitAnalysis()) return kFALSE;
590 Int_t productionMode = fProductionMode;
592 TString macro = fAnalysisMacro;
593 TString executable = fExecutable;
594 TString validation = fValidationScript;
595 TString execCommand = fExecutableCommand;
596 SetAnalysisMacro(Form("%s.C", name));
597 SetExecutable(Form("%s.sh", name));
598 SetExecutableCommand("aliroot -b -q ");
599 SetValidationScript(Form("%s_validation.sh", name));
601 WriteAnalysisMacro();
603 WriteValidationScript();
604 SetLocalTest(kFALSE);
605 SetProductionMode(productionMode);
606 fAnalysisMacro = macro;
607 fExecutable = executable;
608 fExecutableCommand = execCommand;
609 fValidationScript = validation;
613 //______________________________________________________________________________
614 Bool_t AliAnalysisAlien::LoadModules()
616 // Load all modules by executing the AddTask macros. Checks first the dependencies.
617 fAdditionalLibs = "";
618 Int_t nmodules = GetNmodules();
620 Warning("LoadModules", "No module to be loaded");
623 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
625 Error("LoadModules", "No analysis manager created yet. Use CreateAnalysisManager first.");
628 if (!CheckDependencies()) return kFALSE;
629 nmodules = GetNmodules();
630 AliAnalysisTaskCfg *mod;
631 for (Int_t imod=0; imod<nmodules; imod++) {
632 mod = (AliAnalysisTaskCfg*)fModules->At(imod);
633 if (!LoadModule(mod)) return kFALSE;
638 //______________________________________________________________________________
639 void AliAnalysisAlien::SetRunPrefix(const char *prefix)
641 // Set the run number format. Can be a prefix or a format like "%09d"
643 if (!fRunPrefix.Contains("%")) fRunPrefix += "%d";
646 //______________________________________________________________________________
647 void AliAnalysisAlien::AddIncludePath(const char *path)
649 // Add include path in the remote analysis macro.
651 if (p.Contains("-I")) fIncludePath += Form("%s ", path);
652 else fIncludePath += Form("-I%s ", path);
655 //______________________________________________________________________________
656 void AliAnalysisAlien::AddRunNumber(Int_t run)
658 // Add a run number to the list of runs to be processed.
659 if (fRunNumbers.Length()) fRunNumbers += " ";
660 fRunNumbers += Form(fRunPrefix.Data(), run);
663 //______________________________________________________________________________
664 void AliAnalysisAlien::AddRunList(const char* runList)
666 // Add several runs into the list of runs; they are expected to be separated by a blank character.
667 TString sList = runList;
668 TObjArray *list = sList.Tokenize(" ");
669 Int_t n = list->GetEntries();
670 for (Int_t i = 0; i < n; i++) {
671 TObjString *os = (TObjString*)list->At(i);
672 AddRunNumber(os->GetString().Atoi());
677 //______________________________________________________________________________
678 void AliAnalysisAlien::AddRunNumber(const char* run)
680 // Add a run number to the list of runs to be processed.
683 TObjArray *arr = runs.Tokenize(" ");
686 prefix.Append(fRunPrefix, fRunPrefix.Index("%d"));
687 while ((os=(TObjString*)next())){
688 if (fRunNumbers.Length()) fRunNumbers += " ";
689 fRunNumbers += Form("%s%s", prefix.Data(), os->GetString().Data());
694 //______________________________________________________________________________
695 void AliAnalysisAlien::AddDataFile(const char *lfn)
697 // Adds a data file to the input to be analysed. The file should be a valid LFN
698 // or point to an existing file in the alien workdir.
699 if (!fInputFiles) fInputFiles = new TObjArray();
700 fInputFiles->Add(new TObjString(lfn));
703 //______________________________________________________________________________
704 void AliAnalysisAlien::AddExternalPackage(const char *package)
706 // Adds external packages w.r.t to the default ones (root,aliroot and gapi)
707 if (fExternalPackages) fExternalPackages += " ";
708 fExternalPackages += package;
711 //______________________________________________________________________________
712 Bool_t AliAnalysisAlien::Connect()
714 // Try to connect to AliEn. User needs a valid token and /tmp/gclient_env_$UID sourced.
715 if (gGrid && gGrid->IsConnected()) return kTRUE;
716 if (fProductionMode) return kTRUE;
718 Info("Connect", "Trying to connect to AliEn ...");
719 TGrid::Connect("alien://");
721 if (!gGrid || !gGrid->IsConnected()) {
722 Error("Connect", "Did not managed to connect to AliEn. Make sure you have a valid token.");
725 fUser = gGrid->GetUser();
726 Info("Connect", "\n##### Connected to AliEn as user %s. Setting analysis user to <%s>", fUser.Data(), fUser.Data());
730 //______________________________________________________________________________
731 void AliAnalysisAlien::CdWork()
733 // Check validity of alien workspace. Create directory if possible.
735 Error("CdWork", "Alien connection required");
738 TString homedir = gGrid->GetHomeDirectory();
739 TString workdir = homedir + fGridWorkingDir;
740 if (DirectoryExists(workdir)) {
744 // Work directory not existing - create it
746 if (gGrid->Mkdir(workdir, "-p")) {
747 gGrid->Cd(fGridWorkingDir);
748 Info("CdWork", "\n##### Created alien working directory %s", fGridWorkingDir.Data());
750 Warning("CdWork", "Working directory %s cannot be created.\n Using %s instead.",
751 workdir.Data(), homedir.Data());
752 fGridWorkingDir = "";
756 //______________________________________________________________________________
757 Bool_t AliAnalysisAlien::CheckFileCopy(const char *alienpath)
759 // Check if file copying is possible.
760 if (fProductionMode) return kTRUE;
762 Error("CheckFileCopy", "Not connected to AliEn. File copying cannot be tested.");
765 Info("CheckFileCopy", "Checking possibility to copy files to your AliEn home directory... \
766 \n +++ NOTE: You can disable this via: plugin->SetCheckCopy(kFALSE);");
767 // Check if alien_CLOSE_SE is defined
768 TString closeSE = gSystem->Getenv("alien_CLOSE_SE");
769 if (!closeSE.IsNull()) {
770 Info("CheckFileCopy", "Your current close storage is pointing to: \
771 \n alien_CLOSE_SE = \"%s\"", closeSE.Data());
773 Warning("CheckFileCopy", "Your current close storage is empty ! Depending on your location, file copying may fail.");
775 // Check if grid directory exists.
776 if (!DirectoryExists(alienpath)) {
777 Error("CheckFileCopy", "Alien path %s does not seem to exist", alienpath);
780 TFile f("plugin_test_copy", "RECREATE");
781 // User may not have write permissions to current directory
783 Error("CheckFileCopy", "Cannot create local test file. Do you have write access to current directory: <%s> ?",
784 gSystem->WorkingDirectory());
788 if (FileExists(Form("alien://%s/%s",alienpath, f.GetName()))) gGrid->Rm(Form("alien://%s/%s",alienpath, f.GetName()));
789 if (!TFile::Cp(f.GetName(), Form("alien://%s/%s",alienpath, f.GetName()))) {
790 Error("CheckFileCopy", "Cannot copy files to Alien destination: <%s> This may be temporary, or: \
791 \n# 1. Make sure you have write permissions there. If this is the case: \
792 \n# 2. Check the storage availability at: http://alimonitor.cern.ch/stats?page=SE/table \
793 \n# Do: export alien_CLOSE_SE=\"working_disk_SE\" \
794 \n# To make this permanent put in in your .bashrc (in .alienshrc is not enough) \
795 \n# Redo token: rm /tmp/x509up_u$UID then: alien-token-init <username>", alienpath);
796 gSystem->Unlink(f.GetName());
799 gSystem->Unlink(f.GetName());
800 gGrid->Rm(Form("%s%s",alienpath,f.GetName()));
801 Info("CheckFileCopy", "### ...SUCCESS ###");
805 //______________________________________________________________________________
806 Bool_t AliAnalysisAlien::CheckInputData()
808 // Check validity of input data. If necessary, create xml files.
809 if (fProductionMode) return kTRUE;
810 if (!fInputFiles && !fRunNumbers.Length() && !fRunRange[0]) {
811 if (!fGridDataDir.Length()) {
812 Error("CkeckInputData", "AliEn path to base data directory must be set.\n = Use: SetGridDataDir()");
816 Error("CheckInputData", "Merging via jdl works only with run numbers, run range or provided xml");
819 Info("CheckInputData", "Analysis will make a single xml for base data directory %s",fGridDataDir.Data());
820 if (fDataPattern.Contains("tag") && TestBit(AliAnalysisGrid::kTest))
821 TObject::SetBit(AliAnalysisGrid::kUseTags, kTRUE); // ADDED (fix problem in determining the tag usage in test mode)
824 // Process declared files
825 Bool_t isCollection = kFALSE;
826 Bool_t isXml = kFALSE;
827 Bool_t useTags = kFALSE;
828 Bool_t checked = kFALSE;
829 if (!TestBit(AliAnalysisGrid::kTest)) CdWork();
831 TString workdir = gGrid->GetHomeDirectory();
832 workdir += fGridWorkingDir;
835 TIter next(fInputFiles);
836 while ((objstr=(TObjString*)next())) {
839 file += objstr->GetString();
840 // Store full lfn path
841 if (FileExists(file)) objstr->SetString(file);
843 file = objstr->GetName();
844 if (!FileExists(objstr->GetName())) {
845 Error("CheckInputData", "Data file %s not found or not in your working dir: %s",
846 objstr->GetName(), workdir.Data());
850 Bool_t iscoll, isxml, usetags;
851 CheckDataType(file, iscoll, isxml, usetags);
854 isCollection = iscoll;
857 TObject::SetBit(AliAnalysisGrid::kUseTags, useTags);
859 if ((iscoll != isCollection) || (isxml != isXml) || (usetags != useTags)) {
860 Error("CheckInputData", "Some conflict was found in the types of inputs");
866 // Process requested run numbers
867 if (!fRunNumbers.Length() && !fRunRange[0]) return kTRUE;
868 // Check validity of alien data directory
869 if (!fGridDataDir.Length()) {
870 Error("CkeckInputData", "AliEn path to base data directory must be set.\n = Use: SetGridDataDir()");
873 if (!DirectoryExists(fGridDataDir)) {
874 Error("CheckInputData", "Data directory %s not existing.", fGridDataDir.Data());
878 Error("CheckInputData", "You are using raw AliEn collections as input. Cannot process run numbers.");
882 if (checked && !isXml) {
883 Error("CheckInputData", "Cannot mix processing of full runs with non-xml files");
886 // Check validity of run number(s)
891 TString schunk, schunk2;
895 useTags = fDataPattern.Contains("tag");
896 TObject::SetBit(AliAnalysisGrid::kUseTags, useTags);
898 if (useTags != fDataPattern.Contains("tag")) {
899 Error("CheckInputData", "Cannot mix input files using/not using tags");
902 if (fRunNumbers.Length()) {
903 Info("CheckDataType", "Using supplied run numbers (run ranges are ignored)");
904 arr = fRunNumbers.Tokenize(" ");
906 while ((os=(TObjString*)next())) {
907 path = Form("%s/%s ", fGridDataDir.Data(), os->GetString().Data());
908 if (!DirectoryExists(path)) {
909 Warning("CheckInputData", "Run number %s not found in path: <%s>", os->GetString().Data(), path.Data());
912 path = Form("%s/%s.xml", workdir.Data(),os->GetString().Data());
913 TString msg = "\n##### file: ";
915 msg += " type: xml_collection;";
916 if (useTags) msg += " using_tags: Yes";
917 else msg += " using_tags: No";
918 Info("CheckDataType", "%s", msg.Data());
919 if (fNrunsPerMaster<2) {
920 AddDataFile(Form("%s.xml", os->GetString().Data()));
923 if (((nruns-1)%fNrunsPerMaster) == 0) {
924 schunk = os->GetString();
926 if ((nruns%fNrunsPerMaster)!=0 && os!=arr->Last()) continue;
927 schunk += Form("_%s.xml", os->GetString().Data());
933 Info("CheckDataType", "Using run range [%d, %d]", fRunRange[0], fRunRange[1]);
934 for (Int_t irun=fRunRange[0]; irun<=fRunRange[1]; irun++) {
935 format = Form("%%s/%s ", fRunPrefix.Data());
936 path = Form(format.Data(), fGridDataDir.Data(), irun);
937 if (!DirectoryExists(path)) {
940 format = Form("%%s/%s.xml", fRunPrefix.Data());
941 path = Form(format.Data(), workdir.Data(),irun);
942 TString msg = "\n##### file: ";
944 msg += " type: xml_collection;";
945 if (useTags) msg += " using_tags: Yes";
946 else msg += " using_tags: No";
947 Info("CheckDataType", "%s", msg.Data());
948 if (fNrunsPerMaster<2) {
949 format = Form("%s.xml", fRunPrefix.Data());
950 AddDataFile(Form(format.Data(),irun));
953 if (((nruns-1)%fNrunsPerMaster) == 0) {
954 schunk = Form(fRunPrefix.Data(),irun);
956 format = Form("_%s.xml", fRunPrefix.Data());
957 schunk2 = Form(format.Data(), irun);
958 if ((nruns%fNrunsPerMaster)!=0 && irun != fRunRange[1]) continue;
971 //______________________________________________________________________________
972 Bool_t AliAnalysisAlien::CopyLocalDataset(const char *griddir, const char *pattern, Int_t nfiles, const char *output, const char *anchorfile)
974 // Copy data from the given grid directory according a pattern and make a local
977 Error("CopyLocalDataset", "Cannot copy local dataset with no grid connection");
980 if (!DirectoryExists(griddir)) {
981 Error("CopyLocalDataset", "Data directory %s not existing.", griddir);
984 TString command = Form("find -z -l %d %s %s", nfiles, griddir, pattern);
985 printf("Running command: %s\n", command.Data());
986 TGridResult *res = gGrid->Command(command);
987 Int_t nfound = res->GetEntries();
989 Error("CopyLocalDataset", "No file found in <%s> having pattern <%s>", griddir, pattern);
992 printf("... found %d files. Copying locally ...\n", nfound);
993 // Copy files locally
995 out.open(output, ios::out);
997 TString turl, dirname, filename, temp;
998 TString cdir = gSystem->WorkingDirectory();
999 gSystem->MakeDirectory("data");
1000 gSystem->ChangeDirectory("data");
1001 for (Int_t i=0; i<nfound; i++) {
1002 map = (TMap*)res->At(i);
1003 turl = map->GetValue("turl")->GetName();
1004 filename = gSystem->BaseName(turl.Data());
1005 dirname = gSystem->DirName(turl.Data());
1006 dirname = gSystem->BaseName(dirname.Data());
1007 gSystem->MakeDirectory(dirname);
1008 if (TFile::Cp(turl, Form("file:./%s/%s", dirname.Data(), filename.Data()))) {
1009 if (strlen(anchorfile)) filename = Form("%s#%s", filename.Data(), anchorfile);
1010 out << cdir << "/data/" << Form("%s/%s", dirname.Data(), filename.Data()) << endl;
1013 gSystem->ChangeDirectory(cdir);
1018 //______________________________________________________________________________
1019 Bool_t AliAnalysisAlien::CreateDataset(const char *pattern)
1021 // Create dataset for the grid data directory + run number.
1022 const Int_t gMaxEntries = 15000;
1023 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline)) return kTRUE;
1025 Error("CreateDataset", "Cannot create dataset with no grid connection");
1030 if (!TestBit(AliAnalysisGrid::kTest)) CdWork();
1031 TString workdir = gGrid->GetHomeDirectory();
1032 workdir += fGridWorkingDir;
1034 // Compose the 'find' command arguments
1037 TString options = "-x collection ";
1038 if (TestBit(AliAnalysisGrid::kTest)) options += Form("-l %d ", fNtestFiles);
1039 else options += Form("-l %d ", gMaxEntries); // Protection for the find command
1040 TString conditions = "";
1047 TString schunk, schunk2;
1048 TGridCollection *cbase=0, *cadd=0;
1049 if (!fRunNumbers.Length() && !fRunRange[0]) {
1050 if (fInputFiles && fInputFiles->GetEntries()) return kTRUE;
1051 // Make a single data collection from data directory.
1052 path = fGridDataDir;
1053 if (!DirectoryExists(path)) {
1054 Error("CreateDataset", "Path to data directory %s not valid",fGridDataDir.Data());
1058 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
1059 else file = Form("%s.xml", gSystem->BaseName(path));
1063 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
1065 command += Form("%s -o %d ",options.Data(), nstart);
1069 command += conditions;
1070 printf("command: %s\n", command.Data());
1071 TGridResult *res = gGrid->Command(command);
1072 if (res) delete res;
1073 // Write standard output to file
1074 gROOT->ProcessLine(Form("gGrid->Stdout(); > __tmp%d__%s", stage, file.Data()));
1075 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
1076 Bool_t nullFile = kFALSE;
1078 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
1080 nullFile = (gSystem->Exec(Form("grep -c /event __tmp%d__%s 2>/dev/null > __tmp__",stage,file.Data()))==0)?kFALSE:kTRUE;
1082 Error("CreateDataset","Dataset %s produced by the previous find command is empty !", file.Data());
1083 gSystem->Exec("rm -f __tmp*");
1091 gSystem->Exec("rm -f __tmp__");
1092 ncount = line.Atoi();
1095 if (ncount == gMaxEntries) {
1096 Info("CreateDataset", "Dataset %s has more than 15K entries. Trying to merge...", file.Data());
1097 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1098 if (!cbase) cbase = cadd;
1106 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1107 printf("... please wait - TAlienCollection::Add() scales badly...\n");
1110 cbase->ExportXML(Form("file://%s", file.Data()),kFALSE,kFALSE, file, "Merged entries for a run");
1111 delete cbase; cbase = 0;
1113 TFile::Cp(Form("__tmp%d__%s",stage, file.Data()), file.Data());
1115 gSystem->Exec("rm -f __tmp*");
1116 Info("CreateDataset", "Created dataset %s with %d files", file.Data(), nstart+ncount);
1120 Bool_t fileExists = FileExists(file);
1121 if (!TestBit(AliAnalysisGrid::kTest) && (!fileExists || fOverwriteMode)) {
1122 // Copy xml file to alien space
1123 if (fileExists) gGrid->Rm(file);
1124 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
1125 if (!FileExists(file)) {
1126 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
1129 // Update list of files to be processed.
1131 AddDataFile(Form("%s/%s", workdir.Data(), file.Data()));
1135 Bool_t nullResult = kTRUE;
1136 if (fRunNumbers.Length()) {
1137 TObjArray *arr = fRunNumbers.Tokenize(" ");
1140 while ((os=(TObjString*)next())) {
1143 path = Form("%s/%s/ ", fGridDataDir.Data(), os->GetString().Data());
1144 if (!DirectoryExists(path)) continue;
1146 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
1147 else file = Form("%s.xml", os->GetString().Data());
1148 // If local collection file does not exist, create it via 'find' command.
1152 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
1154 command += Form("%s -o %d ",options.Data(), nstart);
1157 command += conditions;
1158 TGridResult *res = gGrid->Command(command);
1159 if (res) delete res;
1160 // Write standard output to file
1161 gROOT->ProcessLine(Form("gGrid->Stdout(); > __tmp%d__%s", stage,file.Data()));
1162 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
1163 Bool_t nullFile = kFALSE;
1165 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
1167 nullFile = (gSystem->Exec(Form("grep -c /event __tmp%d__%s 2>/dev/null > __tmp__",stage,file.Data()))==0)?kFALSE:kTRUE;
1169 Warning("CreateDataset","Dataset %s produced by: <%s> is empty !", file.Data(), command.Data());
1170 gSystem->Exec("rm -f __tmp*");
1171 fRunNumbers.ReplaceAll(os->GetString().Data(), "");
1179 gSystem->Exec("rm -f __tmp__");
1180 ncount = line.Atoi();
1182 nullResult = kFALSE;
1184 if (ncount == gMaxEntries) {
1185 Info("CreateDataset", "Dataset %s has more than 15K entries. Trying to merge...", file.Data());
1186 if (fNrunsPerMaster > 1) {
1187 Error("CreateDataset", "File %s has more than %d entries. Please set the number of runs per master to 1 !",
1188 file.Data(),gMaxEntries);
1191 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1192 if (!cbase) cbase = cadd;
1199 if (cbase && fNrunsPerMaster<2) {
1200 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1201 printf("... please wait - TAlienCollection::Add() scales badly...\n");
1204 cbase->ExportXML(Form("file://%s", file.Data()),kFALSE,kFALSE, file, "Merged entries for a run");
1205 delete cbase; cbase = 0;
1207 TFile::Cp(Form("__tmp%d__%s",stage, file.Data()), file.Data());
1209 gSystem->Exec("rm -f __tmp*");
1210 Info("CreateDataset", "Created dataset %s with %d files", file.Data(), nstart+ncount);
1214 if (TestBit(AliAnalysisGrid::kTest)) break;
1215 // Check if there is one run per master job.
1216 if (fNrunsPerMaster<2) {
1217 if (FileExists(file)) {
1218 if (fOverwriteMode) gGrid->Rm(file);
1220 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
1224 // Copy xml file to alien space
1225 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
1226 if (!FileExists(file)) {
1227 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
1233 if (((nruns-1)%fNrunsPerMaster) == 0) {
1234 schunk = os->GetString();
1235 cbase = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
1237 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
1238 printf(" Merging collection <%s> into masterjob input...\n", file.Data());
1242 if ((nruns%fNrunsPerMaster)!=0 && os!=arr->Last()) {
1245 schunk += Form("_%s.xml", os->GetString().Data());
1246 if (FileExists(schunk)) {
1247 if (fOverwriteMode) gGrid->Rm(file);
1249 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", schunk.Data());
1253 printf("Exporting merged collection <%s> and copying to AliEn\n", schunk.Data());
1254 cbase->ExportXML(Form("file://%s", schunk.Data()),kFALSE,kFALSE, schunk, "Merged runs");
1255 TFile::Cp(Form("file:%s",schunk.Data()), Form("alien://%s/%s",workdir.Data(), schunk.Data()));
1256 if (!FileExists(schunk)) {
1257 Error("CreateDataset", "Copy command did NOT succeed for %s", schunk.Data());
1265 Error("CreateDataset", "No valid dataset corresponding to the query!");
1269 // Process a full run range.
1270 for (Int_t irun=fRunRange[0]; irun<=fRunRange[1]; irun++) {
1271 format = Form("%%s/%s ", fRunPrefix.Data());
1274 path = Form(format.Data(), fGridDataDir.Data(), irun);
1275 if (!DirectoryExists(path)) continue;
1277 format = Form("%s.xml", fRunPrefix.Data());
1278 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
1279 else file = Form(format.Data(), irun);
1280 if (FileExists(file) && fNrunsPerMaster<2 && !TestBit(AliAnalysisGrid::kTest)) {
1281 if (fOverwriteMode) gGrid->Rm(file);
1283 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
1287 // If local collection file does not exist, create it via 'find' command.
1291 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
1293 command += Form("%s -o %d ",options.Data(), nstart);
1296 command += conditions;
1297 TGridResult *res = gGrid->Command(command);
1298 if (res) delete res;
1299 // Write standard output to file
1300 gROOT->ProcessLine(Form("gGrid->Stdout(); > __tmp%d__%s", stage,file.Data()));
1301 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
1302 Bool_t nullFile = kFALSE;
1304 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
1306 nullFile = (gSystem->Exec(Form("grep -c /event __tmp%d__%s 2>/dev/null > __tmp__",stage,file.Data()))==0)?kFALSE:kTRUE;
1308 Warning("CreateDataset","Dataset %s produced by: <%s> is empty !", file.Data(), command.Data());
1309 gSystem->Exec("rm -f __tmp*");
1317 gSystem->Exec("rm -f __tmp__");
1318 ncount = line.Atoi();
1320 nullResult = kFALSE;
1322 if (ncount == gMaxEntries) {
1323 Info("CreateDataset", "Dataset %s has more than 15K entries. Trying to merge...", file.Data());
1324 if (fNrunsPerMaster > 1) {
1325 Error("CreateDataset", "File %s has more than %d entries. Please set the number of runs per master to 1 !",
1326 file.Data(),gMaxEntries);
1329 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1330 if (!cbase) cbase = cadd;
1337 if (cbase && fNrunsPerMaster<2) {
1338 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1339 printf("... please wait - TAlienCollection::Add() scales badly...\n");
1342 cbase->ExportXML(Form("file://%s", file.Data()),kFALSE,kFALSE, file, "Merged entries for a run");
1343 delete cbase; cbase = 0;
1345 TFile::Cp(Form("__tmp%d__%s",stage, file.Data()), file.Data());
1347 Info("CreateDataset", "Created dataset %s with %d files", file.Data(), nstart+ncount);
1351 if (TestBit(AliAnalysisGrid::kTest)) break;
1352 // Check if there is one run per master job.
1353 if (fNrunsPerMaster<2) {
1354 if (FileExists(file)) {
1355 if (fOverwriteMode) gGrid->Rm(file);
1357 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
1361 // Copy xml file to alien space
1362 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
1363 if (!FileExists(file)) {
1364 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
1369 // Check if the collection for the chunk exist locally.
1370 Int_t nchunk = (nruns-1)/fNrunsPerMaster;
1371 if (FileExists(fInputFiles->At(nchunk)->GetName())) {
1372 if (fOverwriteMode) gGrid->Rm(fInputFiles->At(nchunk)->GetName());
1375 printf(" Merging collection <%s> into %d runs chunk...\n",file.Data(),fNrunsPerMaster);
1376 if (((nruns-1)%fNrunsPerMaster) == 0) {
1377 schunk = Form(fRunPrefix.Data(), irun);
1378 cbase = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
1380 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
1384 format = Form("%%s_%s.xml", fRunPrefix.Data());
1385 schunk2 = Form(format.Data(), schunk.Data(), irun);
1386 if ((nruns%fNrunsPerMaster)!=0 && irun!=fRunRange[1] && schunk2 != fInputFiles->Last()->GetName()) {
1390 if (FileExists(schunk)) {
1391 if (fOverwriteMode) gGrid->Rm(schunk);
1393 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", schunk.Data());
1397 printf("Exporting merged collection <%s> and copying to AliEn.\n", schunk.Data());
1398 cbase->ExportXML(Form("file://%s", schunk.Data()),kFALSE,kFALSE, schunk, "Merged runs");
1399 if (FileExists(schunk)) {
1400 if (fOverwriteMode) gGrid->Rm(schunk);
1402 Info("CreateDataset", "\n##### Dataset %s exist. Skipping copy...", schunk.Data());
1406 TFile::Cp(Form("file:%s",schunk.Data()), Form("alien://%s/%s",workdir.Data(), schunk.Data()));
1407 if (!FileExists(schunk)) {
1408 Error("CreateDataset", "Copy command did NOT succeed for %s", schunk.Data());
1414 Error("CreateDataset", "No valid dataset corresponding to the query!");
1421 //______________________________________________________________________________
1422 Bool_t AliAnalysisAlien::CreateJDL()
1424 // Generate a JDL file according to current settings. The name of the file is
1425 // specified by fJDLName.
1426 Bool_t error = kFALSE;
1428 Bool_t copy = kTRUE;
1429 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
1430 Bool_t generate = kTRUE;
1431 if (TestBit(AliAnalysisGrid::kTest) || TestBit(AliAnalysisGrid::kSubmit)) generate = kFALSE;
1433 Error("CreateJDL", "Alien connection required");
1436 // Check validity of alien workspace
1438 if (!fProductionMode && !fGridWorkingDir.BeginsWith("/alice")) workdir = gGrid->GetHomeDirectory();
1439 if (!fProductionMode && !TestBit(AliAnalysisGrid::kTest)) CdWork();
1440 workdir += fGridWorkingDir;
1444 Error("CreateJDL()", "Define some input files for your analysis.");
1447 // Compose list of input files
1448 // Check if output files were defined
1449 if (!fOutputFiles.Length()) {
1450 Error("CreateJDL", "You must define at least one output file");
1453 // Check if an output directory was defined and valid
1454 if (!fGridOutputDir.Length()) {
1455 Error("CreateJDL", "You must define AliEn output directory");
1458 if (!fProductionMode) {
1459 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s", workdir.Data(), fGridOutputDir.Data());
1460 if (!DirectoryExists(fGridOutputDir)) {
1461 if (gGrid->Mkdir(fGridOutputDir,"-p")) {
1462 Info("CreateJDL", "\n##### Created alien output directory %s", fGridOutputDir.Data());
1464 Error("CreateJDL", "Could not create alien output directory %s", fGridOutputDir.Data());
1468 Warning("CreateJDL", "#### Output directory %s exists! If this contains old data, jobs will fail with ERROR_SV !!! ###", fGridOutputDir.Data());
1473 // Exit if any error up to now
1474 if (error) return kFALSE;
1476 if (!fUser.IsNull()) {
1477 fGridJDL->SetValue("User", Form("\"%s\"", fUser.Data()));
1478 fMergingJDL->SetValue("User", Form("\"%s\"", fUser.Data()));
1480 fGridJDL->SetExecutable(fExecutable, "This is the startup script");
1481 TString mergeExec = fExecutable;
1482 mergeExec.ReplaceAll(".sh", "_merge.sh");
1483 fMergingJDL->SetExecutable(mergeExec, "This is the startup script");
1484 mergeExec.ReplaceAll(".sh", ".C");
1485 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),mergeExec.Data()), "List of input files to be uploaded to workers");
1486 if (!fArguments.IsNull())
1487 fGridJDL->SetArguments(fArguments, "Arguments for the executable command");
1488 if (IsOneStageMerging()) fMergingJDL->SetArguments(fGridOutputDir);
1490 if (fProductionMode) fMergingJDL->SetArguments("wn.xml $4"); // xml, stage
1491 else fMergingJDL->SetArguments("wn.xml $2"); // xml, stage
1494 fGridJDL->SetValue("TTL", Form("\"%d\"",fTTL));
1495 fGridJDL->SetDescription("TTL", Form("Time after which the job is killed (%d min.)", fTTL/60));
1496 fMergingJDL->SetValue("TTL", Form("\"%d\"",fTTL));
1497 fMergingJDL->SetDescription("TTL", Form("Time after which the job is killed (%d min.)", fTTL/60));
1499 if (fMaxInitFailed > 0) {
1500 fGridJDL->SetValue("MaxInitFailed", Form("\"%d\"",fMaxInitFailed));
1501 fGridJDL->SetDescription("MaxInitFailed", "Maximum number of first failing jobs to abort the master job");
1503 if (fSplitMaxInputFileNumber > 0) {
1504 fGridJDL->SetValue("SplitMaxInputFileNumber", Form("\"%d\"", fSplitMaxInputFileNumber));
1505 fGridJDL->SetDescription("SplitMaxInputFileNumber", "Maximum number of input files to be processed per subjob");
1507 if (!IsOneStageMerging()) {
1508 fMergingJDL->SetValue("SplitMaxInputFileNumber", Form("\"%d\"",fMaxMergeFiles));
1509 fMergingJDL->SetDescription("SplitMaxInputFileNumber", "Maximum number of input files to be merged in one go");
1511 if (fSplitMode.Length()) {
1512 fGridJDL->SetValue("Split", Form("\"%s\"", fSplitMode.Data()));
1513 fGridJDL->SetDescription("Split", "We split per SE or file");
1515 fMergingJDL->SetValue("Split", "\"se\"");
1516 fMergingJDL->SetDescription("Split", "We split per SE for merging in stages");
1517 if (!fAliROOTVersion.IsNull()) {
1518 fGridJDL->AddToPackages("AliRoot", fAliROOTVersion,"VO_ALICE", "List of requested packages");
1519 fMergingJDL->AddToPackages("AliRoot", fAliROOTVersion, "VO_ALICE", "List of requested packages");
1521 if (!fROOTVersion.IsNull()) {
1522 fGridJDL->AddToPackages("ROOT", fROOTVersion);
1523 fMergingJDL->AddToPackages("ROOT", fROOTVersion);
1525 if (!fAPIVersion.IsNull()) {
1526 fGridJDL->AddToPackages("APISCONFIG", fAPIVersion);
1527 fMergingJDL->AddToPackages("APISCONFIG", fAPIVersion);
1529 if (!fExternalPackages.IsNull()) {
1530 arr = fExternalPackages.Tokenize(" ");
1532 while ((os=(TObjString*)next())) {
1533 TString pkgname = os->GetString();
1534 Int_t index = pkgname.Index("::");
1535 TString pkgversion = pkgname(index+2, pkgname.Length());
1536 pkgname.Remove(index);
1537 fGridJDL->AddToPackages(pkgname, pkgversion);
1538 fMergingJDL->AddToPackages(pkgname, pkgversion);
1542 fGridJDL->SetInputDataListFormat(fInputFormat, "Format of input data");
1543 fGridJDL->SetInputDataList("wn.xml", "Collection name to be processed on each worker node");
1544 fMergingJDL->SetInputDataListFormat(fInputFormat, "Format of input data");
1545 fMergingJDL->SetInputDataList("wn.xml", "Collection name to be processed on each worker node");
1546 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), fAnalysisMacro.Data()), "List of input files to be uploaded to workers");
1547 TString analysisFile = fExecutable;
1548 analysisFile.ReplaceAll(".sh", ".root");
1549 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),analysisFile.Data()));
1550 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),analysisFile.Data()));
1551 if (fAdditionalLibs.Length()) {
1552 arr = fAdditionalLibs.Tokenize(" ");
1554 while ((os=(TObjString*)next())) {
1555 if (os->GetString().Contains(".so")) continue;
1556 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), os->GetString().Data()));
1557 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), os->GetString().Data()));
1562 TIter next(fPackages);
1564 while ((obj=next())) {
1565 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), obj->GetName()));
1566 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), obj->GetName()));
1569 if (fOutputArchive.Length()) {
1570 arr = fOutputArchive.Tokenize(" ");
1572 Bool_t first = kTRUE;
1573 const char *comment = "Files to be archived";
1574 const char *comment1 = comment;
1575 while ((os=(TObjString*)next())) {
1576 if (!first) comment = NULL;
1577 if (!os->GetString().Contains("@") && fCloseSE.Length())
1578 fGridJDL->AddToOutputArchive(Form("%s@%s",os->GetString().Data(), fCloseSE.Data()), comment);
1580 fGridJDL->AddToOutputArchive(os->GetString(), comment);
1584 // Output archive for the merging jdl
1585 TString outputArchive;
1586 if (TestBit(AliAnalysisGrid::kDefaultOutputs)) {
1587 outputArchive = "log_archive.zip:std*@disk=1 ";
1588 // Add normal output files, extra files + terminate files
1589 TString files = GetListOfFiles("outextter");
1590 // Do not register merge excludes
1591 if (!fMergeExcludes.IsNull()) {
1592 arr = fMergeExcludes.Tokenize(" ");
1594 while ((os=(TObjString*)next1())) {
1595 files.ReplaceAll(Form("%s,",os->GetString().Data()),"");
1596 files.ReplaceAll(os->GetString(),"");
1600 files.ReplaceAll(".root", "*.root");
1601 outputArchive += Form("root_archive.zip:%s,*.stat@disk=%d",files.Data(),fNreplicas);
1603 TString files = fOutputArchive;
1604 files.ReplaceAll(".root", "*.root"); // nreplicas etc should be already atttached by use
1605 outputArchive = files;
1607 arr = outputArchive.Tokenize(" ");
1611 while ((os=(TObjString*)next2())) {
1612 if (!first) comment = NULL;
1613 TString currentfile = os->GetString();
1614 if (!currentfile.Contains("@") && fCloseSE.Length())
1615 fMergingJDL->AddToOutputArchive(Form("%s@%s",currentfile.Data(), fCloseSE.Data()), comment);
1617 fMergingJDL->AddToOutputArchive(currentfile, comment);
1622 arr = fOutputFiles.Tokenize(",");
1624 Bool_t first = kTRUE;
1625 const char *comment = "Files to be saved";
1626 while ((os=(TObjString*)next())) {
1627 // Ignore ouputs in jdl that are also in outputarchive
1628 TString sout = os->GetString();
1629 sout.ReplaceAll("*", "");
1630 sout.ReplaceAll(".root", "");
1631 if (sout.Index("@")>0) sout.Remove(sout.Index("@"));
1632 if (fOutputArchive.Contains(sout)) continue;
1633 if (!first) comment = NULL;
1634 if (!os->GetString().Contains("@") && fCloseSE.Length())
1635 fGridJDL->AddToOutputSandbox(Form("%s@%s",os->GetString().Data(), fCloseSE.Data()), comment);
1637 fGridJDL->AddToOutputSandbox(os->GetString(), comment);
1639 if (fMergeExcludes.Contains(sout)) continue;
1640 if (!os->GetString().Contains("@") && fCloseSE.Length())
1641 fMergingJDL->AddToOutputSandbox(Form("%s@%s",os->GetString().Data(), fCloseSE.Data()), comment);
1643 fMergingJDL->AddToOutputSandbox(os->GetString(), comment);
1646 fGridJDL->SetPrice((UInt_t)fPrice, "AliEn price for this job");
1647 fMergingJDL->SetPrice((UInt_t)fPrice, "AliEn price for this job");
1648 TString validationScript = fValidationScript;
1649 fGridJDL->SetValidationCommand(Form("%s/%s", workdir.Data(),validationScript.Data()), "Validation script to be run for each subjob");
1650 validationScript.ReplaceAll(".sh", "_merge.sh");
1651 fMergingJDL->SetValidationCommand(Form("%s/%s", workdir.Data(),validationScript.Data()), "Validation script to be run for each subjob");
1652 if (fMasterResubmitThreshold) {
1653 fGridJDL->SetValue("MasterResubmitThreshold", Form("\"%d%%\"", fMasterResubmitThreshold));
1654 fGridJDL->SetDescription("MasterResubmitThreshold", "Resubmit failed jobs until DONE rate reaches this percentage");
1656 // Write a jdl with 2 input parameters: collection name and output dir name.
1659 // Copy jdl to grid workspace
1661 // Check if an output directory was defined and valid
1662 if (!fGridOutputDir.Length()) {
1663 Error("CreateJDL", "You must define AliEn output directory");
1666 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s", workdir.Data(), fGridOutputDir.Data());
1667 if (!fProductionMode && !DirectoryExists(fGridOutputDir)) {
1668 if (gGrid->Mkdir(fGridOutputDir,"-p")) {
1669 Info("CreateJDL", "\n##### Created alien output directory %s", fGridOutputDir.Data());
1671 Error("CreateJDL", "Could not create alien output directory %s", fGridOutputDir.Data());
1677 if (TestBit(AliAnalysisGrid::kSubmit)) {
1678 TString mergeJDLName = fExecutable;
1679 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
1680 TString locjdl = Form("%s/%s", fGridOutputDir.Data(),fJDLName.Data());
1681 TString locjdl1 = Form("%s/%s", fGridOutputDir.Data(),mergeJDLName.Data());
1682 if (fProductionMode) {
1683 locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
1684 locjdl1 = Form("%s/%s", workdir.Data(),mergeJDLName.Data());
1686 if (FileExists(locjdl)) gGrid->Rm(locjdl);
1687 if (FileExists(locjdl1)) gGrid->Rm(locjdl1);
1688 Info("CreateJDL", "\n##### Copying JDL file <%s> to your AliEn output directory", fJDLName.Data());
1689 if (!copyLocal2Alien("CreateJDL", fJDLName, locjdl))
1690 Fatal("","Terminating");
1691 // TFile::Cp(Form("file:%s",fJDLName.Data()), Form("alien://%s", locjdl.Data()));
1693 Info("CreateJDL", "\n##### Copying merging JDL file <%s> to your AliEn output directory", mergeJDLName.Data());
1694 // TFile::Cp(Form("file:%s",mergeJDLName.Data()), Form("alien://%s", locjdl1.Data()));
1695 if (!copyLocal2Alien("CreateJDL", mergeJDLName.Data(), locjdl1))
1696 Fatal("","Terminating");
1699 if (fAdditionalLibs.Length()) {
1700 arr = fAdditionalLibs.Tokenize(" ");
1703 while ((os=(TObjString*)next())) {
1704 if (os->GetString().Contains(".so")) continue;
1705 Info("CreateJDL", "\n##### Copying dependency: <%s> to your alien workspace", os->GetString().Data());
1706 if (FileExists(os->GetString())) gGrid->Rm(os->GetString());
1707 // TFile::Cp(Form("file:%s",os->GetString().Data()), Form("alien://%s/%s", workdir.Data(), os->GetString().Data()));
1708 if (!copyLocal2Alien("CreateJDL", os->GetString().Data(),
1709 Form("%s/%s", workdir.Data(), os->GetString().Data())))
1710 Fatal("","Terminating");
1715 TIter next(fPackages);
1717 while ((obj=next())) {
1718 if (FileExists(obj->GetName())) gGrid->Rm(obj->GetName());
1719 Info("CreateJDL", "\n##### Copying dependency: <%s> to your alien workspace", obj->GetName());
1720 // TFile::Cp(Form("file:%s",obj->GetName()), Form("alien://%s/%s", workdir.Data(), obj->GetName()));
1721 if (!copyLocal2Alien("CreateJDL",obj->GetName(),
1722 Form("%s/%s", workdir.Data(), obj->GetName())))
1723 Fatal("","Terminating");
1730 //______________________________________________________________________________
1731 Bool_t AliAnalysisAlien::WriteJDL(Bool_t copy)
1733 // Writes one or more JDL's corresponding to findex. If findex is negative,
1734 // all run numbers are considered in one go (jdl). For non-negative indices
1735 // they correspond to the indices in the array fInputFiles.
1736 if (!fInputFiles) return kFALSE;
1739 if (!fProductionMode && !fGridWorkingDir.BeginsWith("/alice")) workdir = gGrid->GetHomeDirectory();
1740 workdir += fGridWorkingDir;
1741 TString stageName = "$2";
1742 if (fProductionMode) stageName = "$4";
1743 if (!fMergeDirName.IsNull()) {
1744 fMergingJDL->AddToInputDataCollection(Form("LF:$1/%s/Stage_%s.xml,nodownload",fMergeDirName.Data(),stageName.Data()), "Collection of files to be merged for current stage");
1745 fMergingJDL->SetOutputDirectory(Form("$1/%s/Stage_%s/#alien_counter_03i#",fMergeDirName.Data(),stageName.Data()), "Output directory");
1747 fMergingJDL->AddToInputDataCollection(Form("LF:$1/Stage_%s.xml,nodownload",stageName.Data()), "Collection of files to be merged for current stage");
1748 fMergingJDL->SetOutputDirectory(Form("$1/Stage_%s/#alien_counter_03i#",stageName.Data()), "Output directory");
1750 if (fProductionMode) {
1751 TIter next(fInputFiles);
1752 while ((os=next())) {
1753 fGridJDL->AddToInputDataCollection(Form("LF:%s,nodownload", os->GetName()), "Input xml collections");
1755 if (!fOutputToRunNo)
1756 fGridJDL->SetOutputDirectory(Form("%s/#alien_counter_04i#", fGridOutputDir.Data()));
1758 fGridJDL->SetOutputDirectory(fGridOutputDir);
1760 if (!fRunNumbers.Length() && !fRunRange[0]) {
1761 // One jdl with no parameters in case input data is specified by name.
1762 TIter next(fInputFiles);
1764 fGridJDL->AddToInputDataCollection(Form("LF:%s,nodownload", os->GetName()), "Input xml collections");
1765 if (!fOutputSingle.IsNull())
1766 fGridJDL->SetOutputDirectory(Form("#alienfulldir#/../%s",fOutputSingle.Data()), "Output directory");
1768 fGridJDL->SetOutputDirectory(Form("%s/#alien_counter_03i#", fGridOutputDir.Data()), "Output directory");
1769 fMergingJDL->SetOutputDirectory(fGridOutputDir);
1772 // One jdl to be submitted with 2 input parameters: data collection name and output dir prefix
1773 fGridJDL->AddToInputDataCollection(Form("LF:%s/$1,nodownload", workdir.Data()), "Input xml collections");
1774 if (!fOutputSingle.IsNull()) {
1775 if (!fOutputToRunNo) fGridJDL->SetOutputDirectory(Form("#alienfulldir#/%s",fOutputSingle.Data()), "Output directory");
1776 else fGridJDL->SetOutputDirectory(Form("%s/$2",fGridOutputDir.Data()), "Output directory");
1778 fGridJDL->SetOutputDirectory(Form("%s/$2/#alien_counter_03i#", fGridOutputDir.Data()), "Output directory");
1783 // Generate the JDL as a string
1784 TString sjdl = fGridJDL->Generate();
1785 TString sjdl1 = fMergingJDL->Generate();
1787 if (!fMergeDirName.IsNull()) {
1788 fMergingJDL->SetOutputDirectory(Form("$1/%s",fMergeDirName.Data()), "Output directory");
1789 fMergingJDL->AddToInputSandbox(Form("LF:$1/%s/Stage_%s.xml",fMergeDirName.Data(),stageName.Data()));
1791 fMergingJDL->SetOutputDirectory("$1", "Output directory");
1792 fMergingJDL->AddToInputSandbox(Form("LF:$1/Stage_%s.xml",stageName.Data()));
1794 TString sjdl2 = fMergingJDL->Generate();
1795 Int_t index, index1;
1796 sjdl.ReplaceAll("\"LF:", "\n \"LF:");
1797 sjdl.ReplaceAll("(member", "\n (member");
1798 sjdl.ReplaceAll("\",\"VO_", "\",\n \"VO_");
1799 sjdl.ReplaceAll("{", "{\n ");
1800 sjdl.ReplaceAll("};", "\n};");
1801 sjdl.ReplaceAll("{\n \n", "{\n");
1802 sjdl.ReplaceAll("\n\n", "\n");
1803 sjdl.ReplaceAll("OutputDirectory", "OutputDir");
1804 sjdl1.ReplaceAll("\"LF:", "\n \"LF:");
1805 sjdl1.ReplaceAll("(member", "\n (member");
1806 sjdl1.ReplaceAll("\",\"VO_", "\",\n \"VO_");
1807 sjdl1.ReplaceAll("{", "{\n ");
1808 sjdl1.ReplaceAll("};", "\n};");
1809 sjdl1.ReplaceAll("{\n \n", "{\n");
1810 sjdl1.ReplaceAll("\n\n", "\n");
1811 sjdl1.ReplaceAll("OutputDirectory", "OutputDir");
1812 sjdl2.ReplaceAll("\"LF:", "\n \"LF:");
1813 sjdl2.ReplaceAll("(member", "\n (member");
1814 sjdl2.ReplaceAll("\",\"VO_", "\",\n \"VO_");
1815 sjdl2.ReplaceAll("{", "{\n ");
1816 sjdl2.ReplaceAll("};", "\n};");
1817 sjdl2.ReplaceAll("{\n \n", "{\n");
1818 sjdl2.ReplaceAll("\n\n", "\n");
1819 sjdl2.ReplaceAll("OutputDirectory", "OutputDir");
1820 sjdl += "JDLVariables = \n{\n \"Packages\",\n \"OutputDir\"\n};\n";
1821 sjdl.Prepend(Form("Jobtag = {\n \"comment:%s\"\n};\n", fJobTag.Data()));
1822 index = sjdl.Index("JDLVariables");
1823 if (index >= 0) sjdl.Insert(index, "\n# JDL variables\n");
1824 sjdl += "Workdirectorysize = {\"5000MB\"};";
1825 sjdl1 += "Workdirectorysize = {\"5000MB\"};";
1826 sjdl1 += "JDLVariables = \n{\n \"Packages\",\n \"OutputDir\"\n};\n";
1827 index = fJobTag.Index(":");
1828 if (index < 0) index = fJobTag.Length();
1829 TString jobTag = fJobTag;
1830 if (fProductionMode) jobTag.Insert(index,"_Stage$4");
1831 sjdl1.Prepend(Form("Jobtag = {\n \"comment:%s_Merging\"\n};\n", jobTag.Data()));
1832 if (fProductionMode) {
1833 sjdl1.Prepend("# Generated merging jdl (production mode) \
1834 \n# $1 = full alien path to output directory to be merged \
1835 \n# $2 = train number \
1836 \n# $3 = production (like LHC10b) \
1837 \n# $4 = merging stage \
1838 \n# Stage_<n>.xml made via: find <OutputDir> *Stage<n-1>/*root_archive.zip\n");
1839 sjdl2.Prepend(Form("Jobtag = {\n \"comment:%s_FinalMerging\"\n};\n", jobTag.Data()));
1840 sjdl2.Prepend("# Generated merging jdl \
1841 \n# $1 = full alien path to output directory to be merged \
1842 \n# $2 = train number \
1843 \n# $3 = production (like LHC10b) \
1844 \n# $4 = merging stage \
1845 \n# Stage_<n>.xml made via: find <OutputDir> *Stage<n-1>/*root_archive.zip\n");
1847 sjdl1.Prepend("# Generated merging jdl \
1848 \n# $1 = full alien path to output directory to be merged \
1849 \n# $2 = merging stage \
1850 \n# xml made via: find <OutputDir> *Stage<n-1>/*root_archive.zip\n");
1851 sjdl2.Prepend(Form("Jobtag = {\n \"comment:%s_FinalMerging\"\n};\n", jobTag.Data()));
1852 sjdl2.Prepend("# Generated merging jdl \
1853 \n# $1 = full alien path to output directory to be merged \
1854 \n# $2 = merging stage \
1855 \n# xml made via: find <OutputDir> *Stage<n-1>/*root_archive.zip\n");
1857 index = sjdl1.Index("JDLVariables");
1858 if (index >= 0) sjdl1.Insert(index, "\n# JDL variables\n");
1859 index = sjdl2.Index("JDLVariables");
1860 if (index >= 0) sjdl2.Insert(index, "\n# JDL variables\n");
1861 sjdl1 += "Workdirectorysize = {\"5000MB\"};";
1862 sjdl2 += "Workdirectorysize = {\"5000MB\"};";
1863 index = sjdl2.Index("Split =");
1865 index1 = sjdl2.Index("\n", index);
1866 sjdl2.Remove(index, index1-index+1);
1868 index = sjdl2.Index("SplitMaxInputFileNumber");
1870 index1 = sjdl2.Index("\n", index);
1871 sjdl2.Remove(index, index1-index+1);
1873 index = sjdl2.Index("InputDataCollection");
1875 index1 = sjdl2.Index(";", index);
1876 sjdl2.Remove(index, index1-index+1);
1878 index = sjdl2.Index("InputDataListFormat");
1880 index1 = sjdl2.Index("\n", index);
1881 sjdl2.Remove(index, index1-index+1);
1883 index = sjdl2.Index("InputDataList");
1885 index1 = sjdl2.Index("\n", index);
1886 sjdl2.Remove(index, index1-index+1);
1888 sjdl2.ReplaceAll("wn.xml", Form("Stage_%s.xml",stageName.Data()));
1889 // Write jdl to file
1891 out.open(fJDLName.Data(), ios::out);
1893 Error("WriteJDL", "Bad file name: %s", fJDLName.Data());
1896 out << sjdl << endl;
1898 TString mergeJDLName = fExecutable;
1899 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
1902 out1.open(mergeJDLName.Data(), ios::out);
1904 Error("WriteJDL", "Bad file name: %s", mergeJDLName.Data());
1907 out1 << sjdl1 << endl;
1910 TString finalJDL = mergeJDLName;
1911 finalJDL.ReplaceAll(".jdl", "_final.jdl");
1912 out2.open(finalJDL.Data(), ios::out);
1914 Error("WriteJDL", "Bad file name: %s", finalJDL.Data());
1917 out2 << sjdl2 << endl;
1921 // Copy jdl to grid workspace
1923 Info("WriteJDL", "\n##### You may want to review jdl:%s and analysis macro:%s before running in <submit> mode", fJDLName.Data(), fAnalysisMacro.Data());
1925 TString locjdl = Form("%s/%s", fGridOutputDir.Data(),fJDLName.Data());
1926 TString locjdl1 = Form("%s/%s", fGridOutputDir.Data(),mergeJDLName.Data());
1927 TString finalJDL = mergeJDLName;
1928 finalJDL.ReplaceAll(".jdl", "_final.jdl");
1929 TString locjdl2 = Form("%s/%s", fGridOutputDir.Data(),finalJDL.Data());
1930 if (fProductionMode) {
1931 locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
1932 locjdl1 = Form("%s/%s", workdir.Data(),mergeJDLName.Data());
1933 locjdl2 = Form("%s/%s", workdir.Data(),finalJDL.Data());
1935 if (FileExists(locjdl)) gGrid->Rm(locjdl);
1936 if (FileExists(locjdl1)) gGrid->Rm(locjdl1);
1937 if (FileExists(locjdl2)) gGrid->Rm(locjdl2);
1938 Info("WriteJDL", "\n##### Copying JDL file <%s> to your AliEn output directory", fJDLName.Data());
1939 // TFile::Cp(Form("file:%s",fJDLName.Data()), Form("alien://%s", locjdl.Data()));
1940 if (!copyLocal2Alien("WriteJDL",fJDLName.Data(),locjdl.Data()))
1941 Fatal("","Terminating");
1943 Info("WriteJDL", "\n##### Copying merging JDL files <%s> to your AliEn output directory", mergeJDLName.Data());
1944 // TFile::Cp(Form("file:%s",mergeJDLName.Data()), Form("alien://%s", locjdl1.Data()));
1945 // TFile::Cp(Form("file:%s",finalJDL.Data()), Form("alien://%s", locjdl2.Data()));
1946 if (!copyLocal2Alien("WriteJDL",mergeJDLName.Data(),locjdl1.Data()))
1947 Fatal("","Terminating");
1948 if (!copyLocal2Alien("WriteJDL",finalJDL.Data(),locjdl2.Data()))
1949 Fatal("","Terminating");
1955 //______________________________________________________________________________
1956 Bool_t AliAnalysisAlien::FileExists(const char *lfn)
1958 // Returns true if file exists.
1959 if (!gGrid) return kFALSE;
1961 slfn.ReplaceAll("alien://","");
1962 TGridResult *res = gGrid->Ls(slfn);
1963 if (!res) return kFALSE;
1964 TMap *map = dynamic_cast<TMap*>(res->At(0));
1969 TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("name"));
1970 if (!objs || !objs->GetString().Length()) {
1978 //______________________________________________________________________________
1979 Bool_t AliAnalysisAlien::DirectoryExists(const char *dirname)
1981 // Returns true if directory exists. Can be also a path.
1982 if (!gGrid) return kFALSE;
1983 // Check if dirname is a path
1984 TString dirstripped = dirname;
1985 dirstripped = dirstripped.Strip();
1986 dirstripped = dirstripped.Strip(TString::kTrailing, '/');
1987 TString dir = gSystem->BaseName(dirstripped);
1989 TString path = gSystem->DirName(dirstripped);
1990 TGridResult *res = gGrid->Ls(path, "-F");
1991 if (!res) return kFALSE;
1995 while ((map=dynamic_cast<TMap*>(next()))) {
1996 obj = map->GetValue("name");
1998 if (dir == obj->GetName()) {
2007 //______________________________________________________________________________
2008 void AliAnalysisAlien::CheckDataType(const char *lfn, Bool_t &isCollection, Bool_t &isXml, Bool_t &useTags)
2010 // Check input data type.
2011 isCollection = kFALSE;
2015 Error("CheckDataType", "No connection to grid");
2018 isCollection = IsCollection(lfn);
2019 TString msg = "\n##### file: ";
2022 msg += " type: raw_collection;";
2023 // special treatment for collections
2025 // check for tag files in the collection
2026 TGridResult *res = gGrid->Command(Form("listFilesFromCollection -z -v %s",lfn), kFALSE);
2028 msg += " using_tags: No (unknown)";
2029 Info("CheckDataType", "%s", msg.Data());
2032 const char* typeStr = res->GetKey(0, "origLFN");
2033 if (!typeStr || !strlen(typeStr)) {
2034 msg += " using_tags: No (unknown)";
2035 Info("CheckDataType", "%s", msg.Data());
2038 TString file = typeStr;
2039 useTags = file.Contains(".tag");
2040 if (useTags) msg += " using_tags: Yes";
2041 else msg += " using_tags: No";
2042 Info("CheckDataType", "%s", msg.Data());
2047 isXml = slfn.Contains(".xml");
2049 // Open xml collection and check if there are tag files inside
2050 msg += " type: xml_collection;";
2051 TGridCollection *coll = (TGridCollection*)gROOT->ProcessLine(Form("TAlienCollection::Open(\"alien://%s\",1);",lfn));
2053 msg += " using_tags: No (unknown)";
2054 Info("CheckDataType", "%s", msg.Data());
2057 TMap *map = coll->Next();
2059 msg += " using_tags: No (unknown)";
2060 Info("CheckDataType", "%s", msg.Data());
2063 map = (TMap*)map->GetValue("");
2065 if (map && map->GetValue("name")) file = map->GetValue("name")->GetName();
2066 useTags = file.Contains(".tag");
2068 if (useTags) msg += " using_tags: Yes";
2069 else msg += " using_tags: No";
2070 Info("CheckDataType", "%s", msg.Data());
2073 useTags = slfn.Contains(".tag");
2074 if (slfn.Contains(".root")) msg += " type: root file;";
2075 else msg += " type: unknown file;";
2076 if (useTags) msg += " using_tags: Yes";
2077 else msg += " using_tags: No";
2078 Info("CheckDataType", "%s", msg.Data());
2081 //______________________________________________________________________________
2082 void AliAnalysisAlien::EnablePackage(const char *package)
2084 // Enables a par file supposed to exist in the current directory.
2085 TString pkg(package);
2086 pkg.ReplaceAll(".par", "");
2088 if (gSystem->AccessPathName(pkg)) {
2089 Fatal("EnablePackage", "Package %s not found", pkg.Data());
2092 if (!TObject::TestBit(AliAnalysisGrid::kUsePars))
2093 Info("EnablePackage", "AliEn plugin will use .par packages");
2094 TObject::SetBit(AliAnalysisGrid::kUsePars, kTRUE);
2096 fPackages = new TObjArray();
2097 fPackages->SetOwner();
2099 fPackages->Add(new TObjString(pkg));
2102 //______________________________________________________________________________
2103 TChain *AliAnalysisAlien::GetChainForTestMode(const char *treeName) const
2105 // Make a tree from files having the location specified in fFileForTestMode.
2106 // Inspired from JF's CreateESDChain.
2107 if (fFileForTestMode.IsNull()) {
2108 Error("GetChainForTestMode", "For proof test mode please use SetFileForTestMode() pointing to a file that contains data file locations.");
2111 if (gSystem->AccessPathName(fFileForTestMode)) {
2112 Error("GetChainForTestMode", "File not found: %s", fFileForTestMode.Data());
2117 in.open(fFileForTestMode);
2119 // Read the input list of files and add them to the chain
2121 TChain *chain = new TChain(treeName);
2125 if (line.IsNull()) continue;
2126 if (count++ == fNtestFiles) break;
2127 TString esdFile(line);
2128 TFile *file = TFile::Open(esdFile);
2130 if (!file->IsZombie()) chain->Add(esdFile);
2133 Error("GetChainforTestMode", "Skipping un-openable file: %s", esdFile.Data());
2137 if (!chain->GetListOfFiles()->GetEntries()) {
2138 Error("GetChainForTestMode", "No file from %s could be opened", fFileForTestMode.Data());
2146 //______________________________________________________________________________
2147 const char *AliAnalysisAlien::GetJobStatus(Int_t jobidstart, Int_t lastid, Int_t &nrunning, Int_t &nwaiting, Int_t &nerror, Int_t &ndone)
2149 // Get job status for all jobs with jobid>jobidstart.
2150 static char mstatus[20];
2156 TGridJobStatusList *list = gGrid->Ps("");
2157 if (!list) return mstatus;
2158 Int_t nentries = list->GetSize();
2159 TGridJobStatus *status;
2161 for (Int_t ijob=0; ijob<nentries; ijob++) {
2162 status = (TGridJobStatus *)list->At(ijob);
2163 pid = gROOT->ProcessLine(Form("atoi(((TAlienJobStatus*)%p)->GetKey(\"queueId\"));", status));
2164 if (pid<jobidstart) continue;
2165 if (pid == lastid) {
2166 gROOT->ProcessLine(Form("sprintf((char*)%p,((TAlienJobStatus*)%p)->GetKey(\"status\"));",mstatus, status));
2168 switch (status->GetStatus()) {
2169 case TGridJobStatus::kWAITING:
2171 case TGridJobStatus::kRUNNING:
2173 case TGridJobStatus::kABORTED:
2174 case TGridJobStatus::kFAIL:
2175 case TGridJobStatus::kUNKNOWN:
2177 case TGridJobStatus::kDONE:
2186 //______________________________________________________________________________
2187 Bool_t AliAnalysisAlien::IsCollection(const char *lfn) const
2189 // Returns true if file is a collection. Functionality duplicated from
2190 // TAlien::Type() because we don't want to directly depend on TAlien.
2192 Error("IsCollection", "No connection to grid");
2195 TGridResult *res = gGrid->Command(Form("type -z %s",lfn),kFALSE);
2196 if (!res) return kFALSE;
2197 const char* typeStr = res->GetKey(0, "type");
2198 if (!typeStr || !strlen(typeStr)) return kFALSE;
2199 if (!strcmp(typeStr, "collection")) return kTRUE;
2204 //______________________________________________________________________________
2205 Bool_t AliAnalysisAlien::IsSingleOutput() const
2207 // Check if single-ouput option is on.
2208 return (!fOutputSingle.IsNull());
2211 //______________________________________________________________________________
2212 void AliAnalysisAlien::Print(Option_t *) const
2214 // Print current plugin settings.
2215 printf("### AliEn analysis plugin current settings ###\n");
2216 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
2217 if (mgr && mgr->IsProofMode()) {
2218 TString proofType = "= PLUGIN IN PROOF MODE ON CLUSTER:_________________";
2219 if (TestBit(AliAnalysisGrid::kTest))
2220 proofType = "= PLUGIN IN PROOF LITE MODE ON CLUSTER:____________";
2221 printf("%s %s\n", proofType.Data(), fProofCluster.Data());
2222 if (!fProofDataSet.IsNull())
2223 printf("= Requested data set:___________________________ %s\n", fProofDataSet.Data());
2225 printf("= Soft reset signal will be send to master______ CHANGE BEHAVIOR AFTER COMPLETION\n");
2227 printf("= Hard reset signal will be send to master______ CHANGE BEHAVIOR AFTER COMPLETION\n");
2228 if (!fRootVersionForProof.IsNull())
2229 printf("= ROOT version requested________________________ %s\n", fRootVersionForProof.Data());
2231 printf("= ROOT version requested________________________ default\n");
2232 printf("= AliRoot version requested_____________________ %s\n", fAliROOTVersion.Data());
2233 if (!fAliRootMode.IsNull())
2234 printf("= Requested AliRoot mode________________________ %s\n", fAliRootMode.Data());
2236 printf("= Number of PROOF workers limited to____________ %d\n", fNproofWorkers);
2237 if (fNproofWorkersPerSlave)
2238 printf("= Maximum number of workers per slave___________ %d\n", fNproofWorkersPerSlave);
2239 if (TestSpecialBit(kClearPackages))
2240 printf("= ClearPackages requested...\n");
2241 if (fIncludePath.Data())
2242 printf("= Include path for runtime task compilation: ___ %s\n", fIncludePath.Data());
2243 printf("= Additional libs to be loaded or souces to be compiled runtime: <%s>\n",fAdditionalLibs.Data());
2244 if (fPackages && fPackages->GetEntries()) {
2245 TIter next(fPackages);
2248 while ((obj=next())) list += obj->GetName();
2249 printf("= Par files to be used: ________________________ %s\n", list.Data());
2251 if (TestSpecialBit(kProofConnectGrid))
2252 printf("= Requested PROOF connection to grid\n");
2255 printf("= OverwriteMode:________________________________ %d\n", fOverwriteMode);
2256 if (fOverwriteMode) {
2257 printf("***** NOTE: Overwrite mode will overwrite the input generated datasets and partial results from previous analysis. \
2258 \n***** To disable, use: plugin->SetOverwriteMode(kFALSE);\n");
2260 printf("= Copy files to grid: __________________________ %s\n", (IsUseCopy())?"YES":"NO");
2261 printf("= Check if files can be copied to grid: ________ %s\n", (IsCheckCopy())?"YES":"NO");
2262 printf("= Production mode:______________________________ %d\n", fProductionMode);
2263 printf("= Version of API requested: ____________________ %s\n", fAPIVersion.Data());
2264 printf("= Version of ROOT requested: ___________________ %s\n", fROOTVersion.Data());
2265 printf("= Version of AliRoot requested: ________________ %s\n", fAliROOTVersion.Data());
2267 printf("= User running the plugin: _____________________ %s\n", fUser.Data());
2268 printf("= Grid workdir relative to user $HOME: _________ %s\n", fGridWorkingDir.Data());
2269 printf("= Grid output directory relative to workdir: ___ %s\n", fGridOutputDir.Data());
2270 printf("= Data base directory path requested: __________ %s\n", fGridDataDir.Data());
2271 printf("= Data search pattern: _________________________ %s\n", fDataPattern.Data());
2272 printf("= Input data format: ___________________________ %s\n", fInputFormat.Data());
2273 if (fRunNumbers.Length())
2274 printf("= Run numbers to be processed: _________________ %s\n", fRunNumbers.Data());
2276 printf("= Run range to be processed: ___________________ %d-%d\n", fRunRange[0], fRunRange[1]);
2277 if (!fRunRange[0] && !fRunNumbers.Length()) {
2278 TIter next(fInputFiles);
2281 while ((obj=next())) list += obj->GetName();
2282 printf("= Input files to be processed: _________________ %s\n", list.Data());
2284 if (TestBit(AliAnalysisGrid::kTest))
2285 printf("= Number of input files used in test mode: _____ %d\n", fNtestFiles);
2286 printf("= List of output files to be registered: _______ %s\n", fOutputFiles.Data());
2287 printf("= List of outputs going to be archived: ________ %s\n", fOutputArchive.Data());
2288 printf("= List of outputs that should not be merged: ___ %s\n", fMergeExcludes.Data());
2289 printf("= List of outputs produced during Terminate: ___ %s\n", fTerminateFiles.Data());
2290 printf("=====================================================================\n");
2291 printf("= Job price: ___________________________________ %d\n", fPrice);
2292 printf("= Time to live (TTL): __________________________ %d\n", fTTL);
2293 printf("= Max files per subjob: ________________________ %d\n", fSplitMaxInputFileNumber);
2294 if (fMaxInitFailed>0)
2295 printf("= Max number of subjob fails to kill: __________ %d\n", fMaxInitFailed);
2296 if (fMasterResubmitThreshold>0)
2297 printf("= Resubmit master job if failed subjobs >_______ %d\n", fMasterResubmitThreshold);
2298 printf("= Number of replicas for the output files_______ %d\n", fNreplicas);
2299 if (fNrunsPerMaster>0)
2300 printf("= Number of runs per master job: _______________ %d\n", fNrunsPerMaster);
2301 printf("= Number of files in one chunk to be merged: ___ %d\n", fMaxMergeFiles);
2302 printf("= Name of the generated execution script: ______ %s\n", fExecutable.Data());
2303 printf("= Executable command: __________________________ %s\n", fExecutableCommand.Data());
2304 if (fArguments.Length())
2305 printf("= Arguments for the execution script: __________ %s\n",fArguments.Data());
2306 if (fExecutableArgs.Length())
2307 printf("= Arguments after macro name in executable______ %s\n",fExecutableArgs.Data());
2308 printf("= Name of the generated analysis macro: ________ %s\n",fAnalysisMacro.Data());
2309 printf("= User analysis files to be deployed: __________ %s\n",fAnalysisSource.Data());
2310 printf("= Additional libs to be loaded or souces to be compiled runtime: <%s>\n",fAdditionalLibs.Data());
2311 printf("= Master jobs split mode: ______________________ %s\n",fSplitMode.Data());
2313 printf("= Custom name for the dataset to be created: ___ %s\n", fDatasetName.Data());
2314 printf("= Name of the generated JDL: ___________________ %s\n", fJDLName.Data());
2315 if (fIncludePath.Data())
2316 printf("= Include path for runtime task compilation: ___ %s\n", fIncludePath.Data());
2317 if (fCloseSE.Length())
2318 printf("= Force job outputs to storage element: ________ %s\n", fCloseSE.Data());
2319 if (fFriendChainName.Length())
2320 printf("= Open friend chain file on worker: ____________ %s\n", fFriendChainName.Data());
2321 if (fPackages && fPackages->GetEntries()) {
2322 TIter next(fPackages);
2325 while ((obj=next())) list += obj->GetName();
2326 printf("= Par files to be used: ________________________ %s\n", list.Data());
2330 //______________________________________________________________________________
2331 void AliAnalysisAlien::SetDefaults()
2333 // Set default values for everything. What cannot be filled will be left empty.
2334 if (fGridJDL) delete fGridJDL;
2335 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
2336 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
2339 fSplitMaxInputFileNumber = 100;
2341 fMasterResubmitThreshold = 0;
2347 fNrunsPerMaster = 1;
2348 fMaxMergeFiles = 100;
2350 fExecutable = "analysis.sh";
2351 fExecutableCommand = "root -b -q";
2353 fExecutableArgs = "";
2354 fAnalysisMacro = "myAnalysis.C";
2355 fAnalysisSource = "";
2356 fAdditionalLibs = "";
2360 fAliROOTVersion = "";
2361 fUser = ""; // Your alien user name
2362 fGridWorkingDir = "";
2363 fGridDataDir = ""; // Can be like: /alice/sim/PDC_08a/LHC08c9/
2364 fDataPattern = "*AliESDs.root"; // Can be like: *AliESDs.root, */pass1/*AliESDs.root, ...
2365 fFriendChainName = "";
2366 fGridOutputDir = "output";
2367 fOutputArchive = "log_archive.zip:std*@disk=1 root_archive.zip:*.root@disk=2";
2368 fOutputFiles = ""; // Like "AliAODs.root histos.root"
2369 fInputFormat = "xml-single";
2370 fJDLName = "analysis.jdl";
2371 fJobTag = "Automatically generated analysis JDL";
2372 fMergeExcludes = "";
2375 SetCheckCopy(kTRUE);
2376 SetDefaultOutputs(kTRUE);
2380 //______________________________________________________________________________
2381 Bool_t AliAnalysisAlien::CheckMergedFiles(const char *filename, const char *aliendir, Int_t nperchunk, const char *jdl)
2383 // Checks current merge stage, makes xml for the next stage, counts number of files, submits next stage.
2384 // First check if the result is already in the output directory.
2385 if (FileExists(Form("%s/%s",aliendir,filename))) {
2386 printf("Final merged results found. Not merging again.\n");
2389 // Now check the last stage done.
2392 if (!FileExists(Form("%s/Stage_%d.xml",aliendir, stage+1))) break;
2395 // Next stage of merging
2397 TString pattern = "*root_archive.zip";
2398 if (stage>1) pattern = Form("Stage_%d/*root_archive.zip", stage-1);
2399 TGridResult *res = gGrid->Command(Form("find -x Stage_%d %s %s", stage, aliendir, pattern.Data()));
2400 if (res) delete res;
2401 // Write standard output to file
2402 gROOT->ProcessLine(Form("gGrid->Stdout(); > %s", Form("Stage_%d.xml",stage)));
2403 // Count the number of files inside
2405 ifile.open(Form("Stage_%d.xml",stage));
2406 if (!ifile.good()) {
2407 ::Error("CheckMergedFiles", "Could not redirect result of the find command to file %s", Form("Stage_%d.xml",stage));
2412 while (!ifile.eof()) {
2414 if (line.Contains("/event")) nfiles++;
2418 ::Error("CheckMergedFiles", "Cannot start Stage_%d merging since Stage_%d did not produced yet output", stage, stage-1);
2421 printf("=== Stage_%d produced %d files\n", stage-1, nfiles);
2423 // Copy the file in the output directory
2424 printf("===> Copying collection %s in the output directory %s\n", Form("Stage_%d.xml",stage), aliendir);
2425 // TFile::Cp(Form("Stage_%d.xml",stage), Form("alien://%s/Stage_%d.xml",aliendir,stage));
2426 if (!copyLocal2Alien("CheckMergedFiles", Form("Stage_%d.xml",stage),
2427 Form("%s/Stage_%d.xml",aliendir,stage))) Fatal("","Terminating");
2428 // Check if this is the last stage to be done.
2429 Bool_t laststage = (nfiles<nperchunk);
2430 if (fMaxMergeStages && stage>=fMaxMergeStages) laststage = kTRUE;
2432 printf("### Submiting final merging stage %d\n", stage);
2433 TString finalJDL = jdl;
2434 finalJDL.ReplaceAll(".jdl", "_final.jdl");
2435 TString query = Form("submit %s %s %d", finalJDL.Data(), aliendir, stage);
2436 Int_t jobId = SubmitSingleJob(query);
2437 if (!jobId) return kFALSE;
2439 printf("### Submiting merging stage %d\n", stage);
2440 TString query = Form("submit %s %s %d", jdl, aliendir, stage);
2441 Int_t jobId = SubmitSingleJob(query);
2442 if (!jobId) return kFALSE;
2447 //______________________________________________________________________________
2448 AliAnalysisManager *AliAnalysisAlien::LoadAnalysisManager(const char *fname)
2450 // Loat the analysis manager from a file.
2451 TFile *file = TFile::Open(fname);
2453 ::Error("LoadAnalysisManager", "Cannot open file %s", fname);
2456 TIter nextkey(file->GetListOfKeys());
2457 AliAnalysisManager *mgr = 0;
2459 while ((key=(TKey*)nextkey())) {
2460 if (!strcmp(key->GetClassName(), "AliAnalysisManager"))
2461 mgr = (AliAnalysisManager*)file->Get(key->GetName());
2464 ::Error("LoadAnalysisManager", "No analysis manager found in file %s", fname);
2468 //______________________________________________________________________________
2469 Int_t AliAnalysisAlien::SubmitSingleJob(const char *query)
2471 // Submits a single job corresponding to the query and returns job id. If 0 submission failed.
2472 if (!gGrid) return 0;
2473 printf("=> %s ------> ",query);
2474 TGridResult *res = gGrid->Command(query);
2476 TString jobId = res->GetKey(0,"jobId");
2478 if (jobId.IsNull()) {
2479 printf("submission failed. Reason:\n");
2482 ::Error("SubmitSingleJob", "Your query %s could not be submitted", query);
2485 printf(" Job id: %s\n", jobId.Data());
2489 //______________________________________________________________________________
2490 Bool_t AliAnalysisAlien::MergeOutput(const char *output, const char *basedir, Int_t nmaxmerge, Int_t stage)
2492 // Merge given output files from basedir. Basedir can be an alien output directory
2493 // but also an xml file with root_archive.zip locations. The file merger will merge nmaxmerge
2494 // files in a group (ignored for xml input). Merging can be done in stages:
2495 // stage=0 : will merge all existing files in a single stage, supporting resume if run locally
2496 // stage=1 : works with an xml of all root_archive.zip in the output directory
2497 // stage>1 : works with an xml of all root_archive.zip in the Stage_<n-1> directory
2498 TString outputFile = output;
2500 TString outputChunk;
2501 TString previousChunk = "";
2502 TObjArray *listoffiles = new TObjArray();
2503 // listoffiles->SetOwner();
2504 Int_t countChunk = 0;
2505 Int_t countZero = nmaxmerge;
2506 Bool_t merged = kTRUE;
2507 Int_t index = outputFile.Index("@");
2508 if (index > 0) outputFile.Remove(index);
2509 TString inputFile = outputFile;
2510 TString sbasedir = basedir;
2511 if (sbasedir.Contains(".xml")) {
2512 // Merge files pointed by the xml - ignore nmaxmerge and set ichunk to 0
2513 nmaxmerge = 9999999;
2514 TGridCollection *coll = (TGridCollection*)gROOT->ProcessLine(Form("TAlienCollection::Open(\"%s\");", basedir));
2516 ::Error("MergeOutput", "Input XML collection empty.");
2519 // Iterate grid collection
2520 while (coll->Next()) {
2521 TString fname = gSystem->DirName(coll->GetTURL());
2524 listoffiles->Add(new TNamed(fname.Data(),""));
2527 command = Form("find %s/ *%s", basedir, inputFile.Data());
2528 printf("command: %s\n", command.Data());
2529 TGridResult *res = gGrid->Command(command);
2531 ::Error("MergeOutput","No result for the find command\n");
2537 while ((map=(TMap*)nextmap())) {
2538 TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("turl"));
2539 if (!objs || !objs->GetString().Length()) {
2540 // Nothing found - skip this output
2545 listoffiles->Add(new TNamed(objs->GetName(),""));
2549 if (!listoffiles->GetEntries()) {
2550 ::Error("MergeOutput","No result for the find command\n");
2555 TFileMerger *fm = 0;
2556 TIter next0(listoffiles);
2557 TObjArray *listoffilestmp = new TObjArray();
2558 listoffilestmp->SetOwner();
2561 // Keep only the files at upper level
2562 Int_t countChar = 0;
2563 while ((nextfile=next0())) {
2564 snextfile = nextfile->GetName();
2565 Int_t crtCount = snextfile.CountChar('/');
2566 if (nextfile == listoffiles->First()) countChar = crtCount;
2567 if (crtCount < countChar) countChar = crtCount;
2570 while ((nextfile=next0())) {
2571 snextfile = nextfile->GetName();
2572 Int_t crtCount = snextfile.CountChar('/');
2573 if (crtCount > countChar) {
2577 listoffilestmp->Add(nextfile);
2580 listoffiles = listoffilestmp; // Now contains 'good' files
2581 listoffiles->Print();
2582 TIter next(listoffiles);
2583 // Check if there is a merge operation to resume. Works only for stage 0 or 1.
2584 outputChunk = outputFile;
2585 outputChunk.ReplaceAll(".root", "_*.root");
2586 // Check for existent temporary merge files
2587 // Check overwrite mode and remove previous partial results if needed
2588 // Preserve old merging functionality for stage 0.
2590 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
2592 // Skip as many input files as in a chunk
2593 for (Int_t counter=0; counter<nmaxmerge; counter++) {
2596 ::Error("MergeOutput", "Mismatch found. Please remove partial merged files from local dir.");
2600 snextfile = nextfile->GetName();
2602 outputChunk = outputFile;
2603 outputChunk.ReplaceAll(".root", Form("_%04d.root", countChunk));
2605 if (gSystem->AccessPathName(outputChunk)) continue;
2606 // Merged file with chunks up to <countChunk> found
2607 ::Info("MergeOutput", "Resume merging of <%s> from <%s>\n", outputFile.Data(), outputChunk.Data());
2608 previousChunk = outputChunk;
2612 countZero = nmaxmerge;
2614 while ((nextfile=next())) {
2615 snextfile = nextfile->GetName();
2616 // Loop 'find' results and get next LFN
2617 if (countZero == nmaxmerge) {
2618 // First file in chunk - create file merger and add previous chunk if any.
2619 fm = new TFileMerger(kTRUE);
2620 fm->SetFastMethod(kTRUE);
2621 if (previousChunk.Length()) fm->AddFile(previousChunk.Data());
2622 outputChunk = outputFile;
2623 outputChunk.ReplaceAll(".root", Form("_%04d.root", countChunk));
2625 // If last file found, put merged results in the output file
2626 if (nextfile == listoffiles->Last()) outputChunk = outputFile;
2627 // Add file to be merged and decrement chunk counter.
2628 fm->AddFile(snextfile);
2630 if (countZero==0 || nextfile == listoffiles->Last()) {
2631 if (!fm->GetMergeList() || !fm->GetMergeList()->GetSize()) {
2632 // Nothing found - skip this output
2633 ::Warning("MergeOutput", "No <%s> files found.", inputFile.Data());
2637 fm->OutputFile(outputChunk);
2638 // Merge the outputs, then go to next chunk
2640 ::Error("MergeOutput", "Could not merge all <%s> files", outputFile.Data());
2644 ::Info("MergeOutputs", "\n##### Merged %d output files to <%s>", fm->GetMergeList()->GetSize(), outputChunk.Data());
2645 gSystem->Unlink(previousChunk);
2647 if (nextfile == listoffiles->Last()) break;
2649 countZero = nmaxmerge;
2650 previousChunk = outputChunk;
2657 // Merging stage different than 0.
2658 // Move to the begining of the requested chunk.
2659 fm = new TFileMerger(kTRUE);
2660 fm->SetFastMethod(kTRUE);
2661 while ((nextfile=next())) fm->AddFile(nextfile->GetName());
2663 if (!fm->GetMergeList() || !fm->GetMergeList()->GetSize()) {
2664 // Nothing found - skip this output
2665 ::Warning("MergeOutput", "No <%s> files found.", inputFile.Data());
2669 fm->OutputFile(outputFile);
2670 // Merge the outputs
2672 ::Error("MergeOutput", "Could not merge all <%s> files", outputFile.Data());
2676 ::Info("MergeOutput", "\n##### Merged %d output files to <%s>", fm->GetMergeList()->GetSize(), outputFile.Data());
2682 //______________________________________________________________________________
2683 Bool_t AliAnalysisAlien::MergeOutputs()
2685 // Merge analysis outputs existing in the AliEn space.
2686 if (TestBit(AliAnalysisGrid::kTest)) return kTRUE;
2687 if (TestBit(AliAnalysisGrid::kOffline)) return kFALSE;
2689 Error("MergeOutputs", "Cannot merge outputs without grid connection. Terminate will NOT be executed");
2693 if (!TestBit(AliAnalysisGrid::kMerge)) {
2694 Info("MergeOutputs", "### Re-run with <MergeViaJDL> option in terminate mode of the plugin to submit merging jobs ###");
2697 if (fProductionMode) {
2698 Info("MergeOutputs", "### Merging will be submitted by LPM manager... ###");
2701 Info("MergeOutputs", "Submitting merging JDL");
2702 if (!SubmitMerging()) return kFALSE;
2703 Info("MergeOutputs", "### Re-run with <MergeViaJDL> off to collect results after merging jobs are done ###");
2704 Info("MergeOutputs", "### The Terminate() method is executed by the merging jobs");
2707 // Get the output path
2708 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
2709 if (!DirectoryExists(fGridOutputDir)) {
2710 Error("MergeOutputs", "Grid output directory %s not found. Terminate() will NOT be executed", fGridOutputDir.Data());
2713 if (!fOutputFiles.Length()) {
2714 Error("MergeOutputs", "No output file names defined. Are you running the right AliAnalysisAlien configuration ?");
2717 // Check if fast read option was requested
2718 Info("MergeOutputs", "Started local merging of output files from: alien://%s \
2719 \n======= overwrite mode = %d", fGridOutputDir.Data(), (Int_t)fOverwriteMode);
2720 if (fFastReadOption) {
2721 Warning("MergeOutputs", "You requested FastRead option. Using xrootd flags to reduce timeouts. This may skip some files that could be accessed ! \
2722 \n+++ NOTE: To disable this option, use: plugin->SetFastReadOption(kFALSE)");
2723 gEnv->SetValue("XNet.ConnectTimeout",50);
2724 gEnv->SetValue("XNet.RequestTimeout",50);
2725 gEnv->SetValue("XNet.MaxRedirectCount",2);
2726 gEnv->SetValue("XNet.ReconnectTimeout",50);
2727 gEnv->SetValue("XNet.FirstConnectMaxCnt",1);
2729 // Make sure we change the temporary directory
2730 gSystem->Setenv("TMPDIR", gSystem->pwd());
2731 // Set temporary compilation directory to current one
2732 gSystem->SetBuildDir(gSystem->pwd(), kTRUE);
2733 TObjArray *list = fOutputFiles.Tokenize(",");
2737 Bool_t merged = kTRUE;
2738 while((str=(TObjString*)next())) {
2739 outputFile = str->GetString();
2740 Int_t index = outputFile.Index("@");
2741 if (index > 0) outputFile.Remove(index);
2742 TString outputChunk = outputFile;
2743 outputChunk.ReplaceAll(".root", "_*.root");
2744 // Skip already merged outputs
2745 if (!gSystem->AccessPathName(outputFile)) {
2746 if (fOverwriteMode) {
2747 Info("MergeOutputs", "Overwrite mode. Existing file %s was deleted.", outputFile.Data());
2748 gSystem->Unlink(outputFile);
2749 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
2750 Info("MergeOutput", "Overwrite mode: partial merged files %s will removed",
2751 outputChunk.Data());
2752 gSystem->Exec(Form("rm -f %s", outputChunk.Data()));
2755 Info("MergeOutputs", "Output file <%s> found. Not merging again.", outputFile.Data());
2759 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
2760 Info("MergeOutput", "Overwrite mode: partial merged files %s will removed",
2761 outputChunk.Data());
2762 gSystem->Exec(Form("rm -f %s", outputChunk.Data()));
2765 if (fMergeExcludes.Length() &&
2766 fMergeExcludes.Contains(outputFile.Data())) continue;
2767 // Perform a 'find' command in the output directory, looking for registered outputs
2768 merged = MergeOutput(outputFile, fGridOutputDir, fMaxMergeFiles);
2770 Error("MergeOutputs", "Terminate() will NOT be executed");
2773 TFile *fileOpened = (TFile*)gROOT->GetListOfFiles()->FindObject(outputFile);
2774 if (fileOpened) fileOpened->Close();
2779 //______________________________________________________________________________
2780 void AliAnalysisAlien::SetDefaultOutputs(Bool_t flag)
2782 // Use the output files connected to output containers from the analysis manager
2783 // rather than the files defined by SetOutputFiles
2784 if (flag && !TObject::TestBit(AliAnalysisGrid::kDefaultOutputs))
2785 Info("SetDefaultOutputs", "Plugin will use the output files taken from analysis manager");
2786 TObject::SetBit(AliAnalysisGrid::kDefaultOutputs, flag);
2789 //______________________________________________________________________________
2790 void AliAnalysisAlien::SetOutputFiles(const char *list)
2792 // Manually set the output files list.
2793 // Removes duplicates. Not allowed if default outputs are not disabled.
2794 if (TObject::TestBit(AliAnalysisGrid::kDefaultOutputs)) {
2795 Fatal("SetOutputFiles", "You have to explicitly call SetDefaultOutputs(kFALSE) to manually set output files.");
2798 Info("SetOutputFiles", "Output file list is set manually - you are on your own.");
2800 TString slist = list;
2801 if (slist.Contains("@")) Warning("SetOutputFiles","The plugin does not allow explicit SE's. Please use: SetNumberOfReplicas() instead.");
2802 TObjArray *arr = slist.Tokenize(" ");
2806 while ((os=(TObjString*)next())) {
2807 sout = os->GetString();
2808 if (sout.Index("@")>0) sout.Remove(sout.Index("@"));
2809 if (fOutputFiles.Contains(sout)) continue;
2810 if (!fOutputFiles.IsNull()) fOutputFiles += ",";
2811 fOutputFiles += sout;
2816 //______________________________________________________________________________
2817 void AliAnalysisAlien::SetOutputArchive(const char *list)
2819 // Manually set the output archive list. Free text - you are on your own...
2820 // Not allowed if default outputs are not disabled.
2821 if (TObject::TestBit(AliAnalysisGrid::kDefaultOutputs)) {
2822 Fatal("SetOutputArchive", "You have to explicitly call SetDefaultOutputs(kFALSE) to manually set the output archives.");
2825 Info("SetOutputArchive", "Output archive is set manually - you are on your own.");
2826 fOutputArchive = list;
2829 //______________________________________________________________________________
2830 void AliAnalysisAlien::SetPreferedSE(const char */*se*/)
2832 // Setting a prefered output SE is not allowed anymore.
2833 Warning("SetPreferedSE", "Setting a preferential SE is not allowed anymore via the plugin. Use SetNumberOfReplicas() and SetDefaultOutputs()");
2836 //______________________________________________________________________________
2837 void AliAnalysisAlien::SetProofParameter(const char *pname, const char *value)
2839 // Set some PROOF special parameter.
2840 TPair *pair = dynamic_cast<TPair*>(fProofParam.FindObject(pname));
2842 TObject *old = pair->Key();
2843 TObject *val = pair->Value();
2844 fProofParam.Remove(old);
2848 fProofParam.Add(new TObjString(pname), new TObjString(value));
2851 //______________________________________________________________________________
2852 const char *AliAnalysisAlien::GetProofParameter(const char *pname) const
2854 // Returns a special PROOF parameter.
2855 TPair *pair = dynamic_cast<TPair*>(fProofParam.FindObject(pname));
2856 if (!pair) return 0;
2857 return pair->Value()->GetName();
2860 //______________________________________________________________________________
2861 Bool_t AliAnalysisAlien::StartAnalysis(Long64_t /*nentries*/, Long64_t /*firstEntry*/)
2863 // Start remote grid analysis.
2864 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
2865 Bool_t testMode = TestBit(AliAnalysisGrid::kTest);
2866 if (!mgr || !mgr->IsInitialized()) {
2867 Error("StartAnalysis", "You need an initialized analysis manager for this");
2870 // Are we in PROOF mode ?
2871 if (mgr->IsProofMode()) {
2872 if (testMode) Info("StartAnalysis", "##### Starting PROOF analysis with Proof Lite via the plugin #####");
2873 else Info("StartAnalysis", "##### Starting PROOF analysis on cluster <%s> via the plugin #####", fProofCluster.Data());
2874 if (fProofCluster.IsNull()) {
2875 Error("StartAnalysis", "You need to specify the proof cluster name via SetProofCluster");
2878 if (fProofDataSet.IsNull() && !testMode) {
2879 Error("StartAnalysis", "You need to specify a dataset using SetProofDataSet()");
2882 // Set the needed environment
2883 gEnv->SetValue("XSec.GSI.DelegProxy","2");
2884 // Do we need to reset PROOF ? The success of the Reset operation cannot be checked
2885 if (fProofReset && !testMode) {
2886 if (fProofReset==1) {
2887 Info("StartAnalysis", "Sending soft reset signal to proof cluster %s", fProofCluster.Data());
2888 gROOT->ProcessLine(Form("TProof::Reset(\"%s\", kFALSE);", fProofCluster.Data()));
2890 Info("StartAnalysis", "Sending hard reset signal to proof cluster %s", fProofCluster.Data());
2891 gROOT->ProcessLine(Form("TProof::Reset(\"%s\", kTRUE);", fProofCluster.Data()));
2893 Info("StartAnalysis", "Stopping the analysis. Please use SetProofReset(0) to resume.");
2898 // Check if there is an old active session
2899 Long_t nsessions = gROOT->ProcessLine(Form("TProof::Mgr(\"%s\")->QuerySessions(\"\")->GetEntries();", fProofCluster.Data()));
2901 Error("StartAnalysis","You have to reset your old session first\n");
2905 // Do we need to change the ROOT version ? The success of this cannot be checked.
2906 if (!fRootVersionForProof.IsNull() && !testMode) {
2907 gROOT->ProcessLine(Form("TProof::Mgr(\"%s\")->SetROOTVersion(\"%s\");",
2908 fProofCluster.Data(), fRootVersionForProof.Data()));
2910 // Connect to PROOF and check the status
2913 if (fNproofWorkersPerSlave) sworkers = Form("workers=%dx", fNproofWorkersPerSlave);
2914 else if (fNproofWorkers) sworkers = Form("workers=%d", fNproofWorkers);
2916 if (!sworkers.IsNull())
2917 proof = gROOT->ProcessLine(Form("TProof::Open(\"%s\", \"%s\");", fProofCluster.Data(), sworkers.Data()));
2919 proof = gROOT->ProcessLine(Form("TProof::Open(\"%s\");", fProofCluster.Data()));
2921 proof = gROOT->ProcessLine("TProof::Open(\"\");");
2923 Error("StartAnalysis", "Could not start PROOF in test mode");
2928 Error("StartAnalysis", "Could not connect to PROOF cluster <%s>", fProofCluster.Data());
2931 if (fNproofWorkersPerSlave*fNproofWorkers > 0)
2932 gROOT->ProcessLine(Form("gProof->SetParallel(%d);", fNproofWorkers));
2933 // Set proof special parameters if any
2934 TIter nextpp(&fProofParam);
2935 TObject *proofparam;
2936 while ((proofparam=nextpp())) {
2937 TString svalue = GetProofParameter(proofparam->GetName());
2938 gROOT->ProcessLine(Form("gProof->SetParameter(\"%s\",%s);", proofparam->GetName(), svalue.Data()));
2940 // Is dataset existing ?
2942 TString dataset = fProofDataSet;
2943 Int_t index = dataset.Index("#");
2944 if (index>=0) dataset.Remove(index);
2945 // if (!gROOT->ProcessLine(Form("gProof->ExistsDataSet(\"%s\");",fProofDataSet.Data()))) {
2946 // Error("StartAnalysis", "Dataset %s not existing", fProofDataSet.Data());
2949 // Info("StartAnalysis", "Dataset %s found", dataset.Data());
2951 // Is ClearPackages() needed ?
2952 if (TestSpecialBit(kClearPackages)) {
2953 Info("StartAnalysis", "ClearPackages signal sent to PROOF. Use SetClearPackages(kFALSE) to reset this.");
2954 gROOT->ProcessLine("gProof->ClearPackages();");
2956 // Is a given aliroot mode requested ?
2959 if (!fAliRootMode.IsNull()) {
2960 TString alirootMode = fAliRootMode;
2961 if (alirootMode == "default") alirootMode = "";
2962 Info("StartAnalysis", "You are requesting AliRoot mode: %s", fAliRootMode.Data());
2963 optionsList.SetOwner();
2964 optionsList.Add(new TNamed("ALIROOT_MODE", alirootMode.Data()));
2965 // Check the additional libs to be loaded
2967 Bool_t parMode = kFALSE;
2968 if (!alirootMode.IsNull()) extraLibs = "ANALYSIS:OADB:ANALYSISalice";
2969 // Parse the extra libs for .so
2970 if (fAdditionalLibs.Length()) {
2971 TObjArray *list = fAdditionalLibs.Tokenize(" ");
2974 while((str=(TObjString*)next())) {
2975 if (str->GetString().Contains(".so")) {
2977 Warning("StartAnalysis", "Plugin does not support loading libs after par files in PROOF mode. Library %s and following will not load on workers", str->GetName());
2980 TString stmp = str->GetName();
2981 if (stmp.BeginsWith("lib")) stmp.Remove(0,3);
2982 stmp.ReplaceAll(".so","");
2983 if (!extraLibs.IsNull()) extraLibs += ":";
2987 if (str->GetString().Contains(".par")) {
2988 // The first par file found in the list will not allow any further .so
2990 if (!parLibs.IsNull()) parLibs += ":";
2991 parLibs += str->GetName();
2995 if (list) delete list;
2997 if (!extraLibs.IsNull()) {
2998 Info("StartAnalysis", "Adding extra libs: %s",extraLibs.Data());
2999 optionsList.Add(new TNamed("ALIROOT_EXTRA_LIBS",extraLibs.Data()));
3001 // Check extra includes
3002 if (!fIncludePath.IsNull()) {
3003 TString includePath = fIncludePath;
3004 includePath.ReplaceAll(" ",":");
3005 includePath.ReplaceAll("$ALICE_ROOT/","");
3006 includePath.ReplaceAll("${ALICE_ROOT}/","");
3007 includePath.ReplaceAll("-I","");
3008 includePath.Remove(TString::kTrailing, ':');
3009 Info("StartAnalysis", "Adding extra includes: %s",includePath.Data());
3010 optionsList.Add(new TNamed("ALIROOT_EXTRA_INCLUDES",includePath.Data()));
3012 // Check if connection to grid is requested
3013 if (TestSpecialBit(kProofConnectGrid))
3014 optionsList.Add(new TNamed("ALIROOT_ENABLE_ALIEN", "1"));
3015 // Enable AliRoot par
3017 // Enable proof lite package
3018 TString alirootLite = gSystem->ExpandPathName("$ALICE_ROOT/ANALYSIS/macros/AliRootProofLite.par");
3019 for (Int_t i=0; i<optionsList.GetSize(); i++) {
3020 TNamed *obj = (TNamed*)optionsList.At(i);
3021 printf("%s %s\n", obj->GetName(), obj->GetTitle());
3023 if (!gROOT->ProcessLine(Form("gProof->UploadPackage(\"%s\");",alirootLite.Data()))
3024 && !gROOT->ProcessLine(Form("gProof->EnablePackage(\"%s\", (TList*)%p);",alirootLite.Data(),&optionsList))) {
3025 Info("StartAnalysis", "AliRootProofLite enabled");
3027 Error("StartAnalysis", "There was an error trying to enable package AliRootProofLite.par");
3031 if ( ! fAliROOTVersion.IsNull() ) {
3032 if (gROOT->ProcessLine(Form("gProof->EnablePackage(\"VO_ALICE@AliRoot::%s\", (TList*)%p, kTRUE);",
3033 fAliROOTVersion.Data(), &optionsList))) {
3034 Error("StartAnalysis", "There was an error trying to enable package VO_ALICE@AliRoot::%s", fAliROOTVersion.Data());
3039 // Enable first par files from fAdditionalLibs
3040 if (!parLibs.IsNull()) {
3041 TObjArray *list = parLibs.Tokenize(":");
3043 TObjString *package;
3044 while((package=(TObjString*)next())) {
3045 TString spkg = package->GetName();
3046 spkg.ReplaceAll(".par", "");
3047 gSystem->Exec(TString::Format("rm -rf %s", spkg.Data()));
3048 if (!gROOT->ProcessLine(Form("gProof->UploadPackage(\"%s\");", package->GetName()))) {
3049 TString enablePackage = (testMode)?Form("gProof->EnablePackage(\"%s\",kFALSE);", package->GetName()):Form("gProof->EnablePackage(\"%s\",kTRUE);", package->GetName());
3050 if (gROOT->ProcessLine(enablePackage)) {
3051 Error("StartAnalysis", "There was an error trying to enable package %s", package->GetName());
3055 Error("StartAnalysis", "There was an error trying to upload package %s", package->GetName());
3059 if (list) delete list;
3062 if (fAdditionalLibs.Contains(".so") && !testMode) {
3063 Error("StartAnalysis", "You request additional libs to be loaded but did not enabled any AliRoot mode. Please refer to: \
3064 \n http://aaf.cern.ch/node/83 and use a parameter for SetAliRootMode()");
3068 // Enable par files if requested
3069 if (fPackages && fPackages->GetEntries()) {
3070 TIter next(fPackages);
3072 while ((package=next())) {
3073 // Skip packages already enabled
3074 if (parLibs.Contains(package->GetName())) continue;
3075 TString spkg = package->GetName();
3076 spkg.ReplaceAll(".par", "");
3077 gSystem->Exec(TString::Format("rm -rf %s", spkg.Data()));
3078 if (!gROOT->ProcessLine(Form("gProof->UploadPackage(\"%s\");", package->GetName()))) {
3079 if (gROOT->ProcessLine(Form("gProof->EnablePackage(\"%s\",kTRUE);", package->GetName()))) {
3080 Error("StartAnalysis", "There was an error trying to enable package %s", package->GetName());
3084 Error("StartAnalysis", "There was an error trying to upload package %s", package->GetName());
3089 // Do we need to load analysis source files ?
3090 // NOTE: don't load on client since this is anyway done by the user to attach his task.
3091 if (fAnalysisSource.Length()) {
3092 TObjArray *list = fAnalysisSource.Tokenize(" ");
3095 while((str=(TObjString*)next())) {
3096 gROOT->ProcessLine(Form("gProof->Load(\"%s+g\", kTRUE);", str->GetName()));
3098 if (list) delete list;
3101 // Register dataset to proof lite.
3102 if (fFileForTestMode.IsNull()) {
3103 Error("GetChainForTestMode", "For proof test mode please use SetFileForTestMode() pointing to a file that contains data file locations.");
3106 if (gSystem->AccessPathName(fFileForTestMode)) {
3107 Error("GetChainForTestMode", "File not found: %s", fFileForTestMode.Data());
3110 TFileCollection *coll = new TFileCollection();
3111 coll->AddFromFile(fFileForTestMode);
3112 gROOT->ProcessLine(Form("gProof->RegisterDataSet(\"test_collection\", (TFileCollection*)%p, \"OV\");", coll));
3113 gROOT->ProcessLine("gProof->ShowDataSets()");
3118 // Check if output files have to be taken from the analysis manager
3119 if (TestBit(AliAnalysisGrid::kDefaultOutputs)) {
3120 // Add output files and AOD files
3121 fOutputFiles = GetListOfFiles("outaod");
3122 // Add extra files registered to the analysis manager
3123 TString extra = GetListOfFiles("ext");
3124 if (!extra.IsNull()) {
3125 extra.ReplaceAll(".root", "*.root");
3126 if (!fOutputFiles.IsNull()) fOutputFiles += ",";
3127 fOutputFiles += extra;
3129 // Compose the output archive.
3130 fOutputArchive = "log_archive.zip:std*@disk=1 ";
3131 fOutputArchive += Form("root_archive.zip:%s,*.stat@disk=%d",fOutputFiles.Data(),fNreplicas);
3133 // if (!fCloseSE.Length()) fCloseSE = gSystem->Getenv("alien_CLOSE_SE");
3134 if (TestBit(AliAnalysisGrid::kOffline)) {
3135 Info("StartAnalysis","\n##### OFFLINE MODE ##### Files to be used in GRID are produced but not copied \
3136 \n there nor any job run. You can revise the JDL and analysis \
3137 \n macro then run the same in \"submit\" mode.");
3138 } else if (TestBit(AliAnalysisGrid::kTest)) {
3139 Info("StartAnalysis","\n##### LOCAL MODE ##### Your analysis will be run locally on a subset of the requested \
3141 } else if (TestBit(AliAnalysisGrid::kSubmit)) {
3142 Info("StartAnalysis","\n##### SUBMIT MODE ##### Files required by your analysis are copied to your grid working \
3143 \n space and job submitted.");
3144 } else if (TestBit(AliAnalysisGrid::kMerge)) {
3145 Info("StartAnalysis","\n##### MERGE MODE ##### The registered outputs of the analysis will be merged");
3146 if (fMergeViaJDL) CheckInputData();
3149 Info("StartAnalysis","\n##### FULL ANALYSIS MODE ##### Producing needed files and submitting your analysis job...");
3154 Error("StartAnalysis", "Cannot start grid analysis without grid connection");
3157 if (IsCheckCopy() && gGrid) CheckFileCopy(gGrid->GetHomeDirectory());
3158 if (!CheckInputData()) {
3159 Error("StartAnalysis", "There was an error in preprocessing your requested input data");
3162 if (!CreateDataset(fDataPattern)) {
3164 if (!fRunNumbers.Length() && !fRunRange[0]) serror = Form("path to data directory: <%s>", fGridDataDir.Data());
3165 if (fRunNumbers.Length()) serror = "run numbers";
3166 if (fRunRange[0]) serror = Form("run range [%d, %d]", fRunRange[0], fRunRange[1]);
3167 serror += Form("\n or data pattern <%s>", fDataPattern.Data());
3168 Error("StartAnalysis", "No data to process. Please fix %s in your plugin configuration.", serror.Data());
3171 WriteAnalysisFile();
3172 WriteAnalysisMacro();
3174 WriteValidationScript();
3176 WriteMergingMacro();
3177 WriteMergeExecutable();
3178 WriteValidationScript(kTRUE);
3180 if (!CreateJDL()) return kFALSE;
3181 if (TestBit(AliAnalysisGrid::kOffline)) return kFALSE;
3183 // Locally testing the analysis
3184 Info("StartAnalysis", "\n_______________________________________________________________________ \
3185 \n Running analysis script in a daughter shell as on a worker node \
3186 \n_______________________________________________________________________");
3187 TObjArray *list = fOutputFiles.Tokenize(",");
3191 while((str=(TObjString*)next())) {
3192 outputFile = str->GetString();
3193 Int_t index = outputFile.Index("@");
3194 if (index > 0) outputFile.Remove(index);
3195 if (!gSystem->AccessPathName(outputFile)) gSystem->Exec(Form("rm %s", outputFile.Data()));
3198 gSystem->Exec(Form("bash %s 2>stderr", fExecutable.Data()));
3199 gSystem->Exec(Form("bash %s",fValidationScript.Data()));
3200 // gSystem->Exec("cat stdout");
3203 // Check if submitting is managed by LPM manager
3204 if (fProductionMode) {
3205 TString prodfile = fJDLName;
3206 prodfile.ReplaceAll(".jdl", ".prod");
3207 WriteProductionFile(prodfile);
3208 Info("StartAnalysis", "Job submitting is managed by LPM. Rerun in terminate mode after jobs finished.");
3211 // Submit AliEn job(s)
3212 gGrid->Cd(fGridOutputDir);
3215 if (!fRunNumbers.Length() && !fRunRange[0]) {
3216 // Submit a given xml or a set of runs
3217 res = gGrid->Command(Form("submit %s", fJDLName.Data()));
3218 printf("*************************** %s\n",Form("submit %s", fJDLName.Data()));
3220 const char *cjobId = res->GetKey(0,"jobId");
3224 Error("StartAnalysis", "Your JDL %s could not be submitted", fJDLName.Data());
3227 Info("StartAnalysis", "\n_______________________________________________________________________ \
3228 \n##### Your JDL %s was successfully submitted. \nTHE JOB ID IS: %s \
3229 \n_______________________________________________________________________",
3230 fJDLName.Data(), cjobId);
3235 Error("StartAnalysis", "No grid result after submission !!! Bailing out...");
3239 // Submit for a range of enumeration of runs.
3240 if (!Submit()) return kFALSE;
3243 Info("StartAnalysis", "\n#### STARTING AN ALIEN SHELL FOR YOU. EXIT WHEN YOUR JOB %s HAS FINISHED. #### \
3244 \n You may exit at any time and terminate the job later using the option <terminate> \
3245 \n ##################################################################################", jobID.Data());
3246 gSystem->Exec("aliensh");
3250 //______________________________________________________________________________
3251 const char *AliAnalysisAlien::GetListOfFiles(const char *type)
3253 // Get a comma-separated list of output files of the requested type.
3254 // Type can be (case unsensitive):
3255 // aod - list of aod files (std, extensions and filters)
3256 // out - list of output files connected to containers (but not aod's or extras)
3257 // ext - list of extra files registered to the manager
3258 // ter - list of files produced in terminate
3259 static TString files;
3261 TString stype = type;
3263 TString aodfiles, extra;
3264 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
3266 ::Error("GetListOfFiles", "Cannot call this without analysis manager");
3267 return files.Data();
3269 if (mgr->GetOutputEventHandler()) {
3270 aodfiles = mgr->GetOutputEventHandler()->GetOutputFileName();
3271 TString extraaod = mgr->GetOutputEventHandler()->GetExtraOutputs();
3272 if (!extraaod.IsNull()) {
3274 aodfiles += extraaod;
3277 if (stype.Contains("aod")) {
3279 if (stype == "aod") return files.Data();
3281 // Add output files that are not in the list of AOD files
3282 TString outputfiles = "";
3283 TIter next(mgr->GetOutputs());
3284 AliAnalysisDataContainer *output;
3285 const char *filename = 0;
3286 while ((output=(AliAnalysisDataContainer*)next())) {
3287 filename = output->GetFileName();
3288 if (!(strcmp(filename, "default"))) continue;
3289 if (outputfiles.Contains(filename)) continue;
3290 if (aodfiles.Contains(filename)) continue;
3291 if (!outputfiles.IsNull()) outputfiles += ",";
3292 outputfiles += filename;
3294 if (stype.Contains("out")) {
3295 if (!files.IsNull()) files += ",";
3296 files += outputfiles;
3297 if (stype == "out") return files.Data();
3299 // Add extra files registered to the analysis manager
3301 extra = mgr->GetExtraFiles();
3302 if (!extra.IsNull()) {
3304 extra.ReplaceAll(" ", ",");
3305 TObjArray *fextra = extra.Tokenize(",");
3306 TIter nextx(fextra);
3308 while ((obj=nextx())) {
3309 if (aodfiles.Contains(obj->GetName())) continue;
3310 if (outputfiles.Contains(obj->GetName())) continue;
3311 if (sextra.Contains(obj->GetName())) continue;
3312 if (!sextra.IsNull()) sextra += ",";
3313 sextra += obj->GetName();
3316 if (stype.Contains("ext")) {
3317 if (!files.IsNull()) files += ",";
3321 if (stype == "ext") return files.Data();
3323 if (!fTerminateFiles.IsNull()) {
3324 fTerminateFiles.Strip();
3325 fTerminateFiles.ReplaceAll(" ",",");
3326 TObjArray *fextra = fTerminateFiles.Tokenize(",");
3327 TIter nextx(fextra);
3329 while ((obj=nextx())) {
3330 if (aodfiles.Contains(obj->GetName())) continue;
3331 if (outputfiles.Contains(obj->GetName())) continue;
3332 if (termfiles.Contains(obj->GetName())) continue;
3333 if (sextra.Contains(obj->GetName())) continue;
3334 if (!termfiles.IsNull()) termfiles += ",";
3335 termfiles += obj->GetName();
3339 if (stype.Contains("ter")) {
3340 if (!files.IsNull() && !termfiles.IsNull()) {
3345 return files.Data();
3348 //______________________________________________________________________________
3349 Bool_t AliAnalysisAlien::Submit()
3351 // Submit all master jobs.
3352 Int_t nmasterjobs = fInputFiles->GetEntries();
3353 Long_t tshoot = gSystem->Now();
3354 if (!fNsubmitted && !SubmitNext()) return kFALSE;
3355 while (fNsubmitted < nmasterjobs) {
3356 Long_t now = gSystem->Now();
3357 if ((now-tshoot)>30000) {
3359 if (!SubmitNext()) return kFALSE;
3365 //______________________________________________________________________________
3366 Bool_t AliAnalysisAlien::SubmitMerging()
3368 // Submit all merging jobs.
3369 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
3370 gGrid->Cd(fGridOutputDir);
3371 TString mergeJDLName = fExecutable;
3372 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
3374 Error("SubmitMerging", "You have to use explicit run numbers or run range to merge via JDL!");
3377 Int_t ntosubmit = fInputFiles->GetEntries();
3378 for (Int_t i=0; i<ntosubmit; i++) {
3379 TString runOutDir = gSystem->BaseName(fInputFiles->At(i)->GetName());
3380 runOutDir.ReplaceAll(".xml", "");
3381 if (fOutputToRunNo) {
3382 // The output directory is the run number
3383 printf("### Submitting merging job for run <%s>\n", runOutDir.Data());
3384 runOutDir = Form("%s/%s", fGridOutputDir.Data(), runOutDir.Data());
3386 if (!fRunNumbers.Length() && !fRunRange[0]) {
3387 // The output directory is the grid outdir
3388 printf("### Submitting merging job for the full output directory %s.\n", fGridOutputDir.Data());
3389 runOutDir = fGridOutputDir;
3391 // The output directory is the master number in 3 digits format
3392 printf("### Submitting merging job for master <%03d>\n", i);
3393 runOutDir = Form("%s/%03d",fGridOutputDir.Data(), i);
3396 // Check now the number of merging stages.
3397 TObjArray *list = fOutputFiles.Tokenize(",");
3401 while((str=(TObjString*)next())) {
3402 outputFile = str->GetString();
3403 Int_t index = outputFile.Index("@");
3404 if (index > 0) outputFile.Remove(index);
3405 if (!fMergeExcludes.Contains(outputFile)) break;
3408 Bool_t done = CheckMergedFiles(outputFile, runOutDir, fMaxMergeFiles, mergeJDLName);
3409 if (!done && (i==ntosubmit-1)) return kFALSE;
3410 if (!fRunNumbers.Length() && !fRunRange[0]) break;
3412 if (!ntosubmit) return kTRUE;
3413 Info("StartAnalysis", "\n #### STARTING AN ALIEN SHELL FOR YOU. You can exit any time or inspect your jobs in a different shell.##########\
3414 \n Make sure your jobs are in a final state (you can resubmit failed ones via 'masterjob <id> resubmit ERROR_ALL')\
3415 \n Rerun in 'terminate' mode to submit all merging stages, each AFTER the previous one completed. The final merged \
3416 \n output will be written to your alien output directory, while separate stages in <Stage_n>. \
3417 \n ################################################################################################################");
3418 gSystem->Exec("aliensh");
3422 //______________________________________________________________________________
3423 Bool_t AliAnalysisAlien::SubmitNext()
3425 // Submit next bunch of master jobs if the queue is free. The first master job is
3426 // submitted right away, while the next will not be unless the previous was split.
3427 // The plugin will not submit new master jobs if there are more that 500 jobs in
3429 static Bool_t iscalled = kFALSE;
3430 static Int_t firstmaster = 0;
3431 static Int_t lastmaster = 0;
3432 static Int_t npermaster = 0;
3433 if (iscalled) return kTRUE;
3435 Int_t nrunning=0, nwaiting=0, nerror=0, ndone=0;
3436 Int_t ntosubmit = 0;
3439 Int_t nmasterjobs = fInputFiles->GetEntries();
3442 if (!IsUseSubmitPolicy()) {
3444 Info("SubmitNext","### Warning submit policy not used ! Submitting too many jobs at a time may be prohibitted. \
3445 \n### You can use SetUseSubmitPolicy() to enable if you have problems.");
3446 ntosubmit = nmasterjobs;
3449 TString status = GetJobStatus(firstmaster, lastmaster, nrunning, nwaiting, nerror, ndone);
3450 printf("=== master %d: %s\n", lastmaster, status.Data());
3451 // If last master not split, just return
3452 if (status != "SPLIT") {iscalled = kFALSE; return kTRUE;}
3453 // No more than 100 waiting jobs
3454 if (nwaiting>500) {iscalled = kFALSE; return kTRUE;}
3455 npermaster = (nrunning+nwaiting+nerror+ndone)/fNsubmitted;
3456 if (npermaster) ntosubmit = (500-nwaiting)/npermaster;
3457 if (!ntosubmit) ntosubmit = 1;
3458 printf("=== WAITING(%d) RUNNING(%d) DONE(%d) OTHER(%d) NperMaster=%d => to submit %d jobs\n",
3459 nwaiting, nrunning, ndone, nerror, npermaster, ntosubmit);
3461 for (Int_t i=0; i<ntosubmit; i++) {
3462 // Submit for a range of enumeration of runs.
3463 if (fNsubmitted>=nmasterjobs) {iscalled = kFALSE; return kTRUE;}
3465 TString runOutDir = gSystem->BaseName(fInputFiles->At(fNsubmitted)->GetName());
3466 runOutDir.ReplaceAll(".xml", "");
3468 query = Form("submit %s %s %s", fJDLName.Data(), fInputFiles->At(fNsubmitted)->GetName(), runOutDir.Data());
3470 query = Form("submit %s %s %03d", fJDLName.Data(), fInputFiles->At(fNsubmitted)->GetName(), fNsubmitted);
3471 printf("********* %s\n",query.Data());
3472 res = gGrid->Command(query);
3474 TString cjobId1 = res->GetKey(0,"jobId");
3475 if (!cjobId1.Length()) {
3479 Error("StartAnalysis", "Your JDL %s could not be submitted. The message was:", fJDLName.Data());
3482 Info("StartAnalysis", "\n_______________________________________________________________________ \
3483 \n##### Your JDL %s submitted (%d to go). \nTHE JOB ID IS: %s \
3484 \n_______________________________________________________________________",
3485 fJDLName.Data(), nmasterjobs-fNsubmitted-1, cjobId1.Data());
3488 lastmaster = cjobId1.Atoi();
3489 if (!firstmaster) firstmaster = lastmaster;
3494 Error("StartAnalysis", "No grid result after submission !!! Bailing out...");
3502 //______________________________________________________________________________
3503 void AliAnalysisAlien::WriteAnalysisFile()
3505 // Write current analysis manager into the file <analysisFile>
3506 TString analysisFile = fExecutable;
3507 analysisFile.ReplaceAll(".sh", ".root");
3508 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3509 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
3510 if (!mgr || !mgr->IsInitialized()) {
3511 Error("WriteAnalysisFile", "You need an initialized analysis manager for this");
3514 // Check analysis type
3516 if (mgr->GetMCtruthEventHandler()) TObject::SetBit(AliAnalysisGrid::kUseMC);
3517 handler = (TObject*)mgr->GetInputEventHandler();
3519 if (handler->InheritsFrom("AliMultiInputEventHandler")) {
3520 AliMultiInputEventHandler *multiIH = (AliMultiInputEventHandler*)handler;
3521 if (multiIH->GetFirstInputEventHandler()->InheritsFrom("AliESDInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseESD);
3522 if (multiIH->GetFirstInputEventHandler()->InheritsFrom("AliAODInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseAOD);
3524 if (handler->InheritsFrom("AliESDInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseESD);
3525 if (handler->InheritsFrom("AliAODInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseAOD);
3528 TDirectory *cdir = gDirectory;
3529 TFile *file = TFile::Open(analysisFile, "RECREATE");
3531 // Skip task Terminate calls for the grid job (but not in test mode, where we want to check also the terminate mode
3532 if (!TestBit(AliAnalysisGrid::kTest)) mgr->SetSkipTerminate(kTRUE);
3533 // Unless merging makes no sense
3534 if (IsSingleOutput()) mgr->SetSkipTerminate(kFALSE);
3537 // Enable termination for local jobs
3538 mgr->SetSkipTerminate(kFALSE);
3540 if (cdir) cdir->cd();
3541 Info("WriteAnalysisFile", "\n##### Analysis manager: %s wrote to file <%s>\n", mgr->GetName(),analysisFile.Data());
3543 Bool_t copy = kTRUE;
3544 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3547 TString workdir = gGrid->GetHomeDirectory();
3548 workdir += fGridWorkingDir;
3549 Info("WriteAnalysisFile", "\n##### Copying file <%s> containing your initialized analysis manager to your alien workspace", analysisFile.Data());
3550 if (FileExists(analysisFile)) gGrid->Rm(analysisFile);
3551 if (!copyLocal2Alien("WriteAnalysisFile",analysisFile.Data(),
3552 Form("%s/%s", workdir.Data(),analysisFile.Data()))) Fatal("","Terminating");
3556 //______________________________________________________________________________
3557 void AliAnalysisAlien::WriteAnalysisMacro()
3559 // Write the analysis macro that will steer the analysis in grid mode.
3560 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3562 out.open(fAnalysisMacro.Data(), ios::out);
3564 Error("WriteAnalysisMacro", "could not open file %s for writing", fAnalysisMacro.Data());
3567 Bool_t hasSTEERBase = kFALSE;
3568 Bool_t hasESD = kFALSE;
3569 Bool_t hasAOD = kFALSE;
3570 Bool_t hasANALYSIS = kFALSE;
3571 Bool_t hasOADB = kFALSE;
3572 Bool_t hasANALYSISalice = kFALSE;
3573 Bool_t hasCORRFW = kFALSE;
3574 TString func = fAnalysisMacro;
3575 TString type = "ESD";
3576 TString comment = "// Analysis using ";
3577 if (IsUseMCchain()) {
3581 if (TObject::TestBit(AliAnalysisGrid::kUseESD)) comment += "ESD";
3582 if (TObject::TestBit(AliAnalysisGrid::kUseAOD)) {
3587 if (type!="AOD" && fFriendChainName!="") {
3588 Error("WriteAnalysisMacro", "Friend chain can be attached only to AOD");
3591 if (TObject::TestBit(AliAnalysisGrid::kUseMC)) comment += "/MC";
3592 else comment += " data";
3593 out << "const char *anatype = \"" << type.Data() << "\";" << endl << endl;
3594 func.ReplaceAll(".C", "");
3595 out << "void " << func.Data() << "()" << endl;
3597 out << comment.Data() << endl;
3598 out << "// Automatically generated analysis steering macro executed in grid subjobs" << endl << endl;
3599 out << " TStopwatch timer;" << endl;
3600 out << " timer.Start();" << endl << endl;
3601 // Change temp directory to current one
3602 out << "// Set temporary merging directory to current one" << endl;
3603 out << " gSystem->Setenv(\"TMPDIR\", gSystem->pwd());" << endl << endl;
3604 out << "// Set temporary compilation directory to current one" << endl;
3605 out << " gSystem->SetBuildDir(gSystem->pwd(), kTRUE);" << endl << endl;
3606 // Reset existing include path
3607 out << "// Reset existing include path and add current directory first in the search" << endl;
3608 out << " gSystem->SetIncludePath(\"-I.\");" << endl;
3609 if (!fExecutableCommand.Contains("aliroot")) {
3610 out << "// load base root libraries" << endl;
3611 out << " gSystem->Load(\"libTree\");" << endl;
3612 out << " gSystem->Load(\"libGeom\");" << endl;
3613 out << " gSystem->Load(\"libVMC\");" << endl;
3614 out << " gSystem->Load(\"libPhysics\");" << endl << endl;
3615 out << " gSystem->Load(\"libMinuit\");" << endl << endl;
3617 if (fAdditionalRootLibs.Length()) {
3618 // in principle libtree /lib geom libvmc etc. can go into this list, too
3619 out << "// Add aditional libraries" << endl;
3620 TObjArray *list = fAdditionalRootLibs.Tokenize(" ");
3623 while((str=(TObjString*)next())) {
3624 if (str->GetString().Contains(".so"))
3625 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
3627 if (list) delete list;
3629 out << "// Load analysis framework libraries" << endl;
3630 TString setupPar = "AliAnalysisAlien::SetupPar";
3632 if (!fExecutableCommand.Contains("aliroot")) {
3633 out << " gSystem->Load(\"libSTEERBase\");" << endl;
3634 out << " gSystem->Load(\"libESD\");" << endl;
3635 out << " gSystem->Load(\"libAOD\");" << endl;
3637 out << " gSystem->Load(\"libANALYSIS\");" << endl;
3638 out << " gSystem->Load(\"libOADB\");" << endl;
3639 out << " gSystem->Load(\"libANALYSISalice\");" << endl;
3640 out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
3642 TIter next(fPackages);
3645 while ((obj=next())) {
3646 pkgname = obj->GetName();
3647 if (pkgname == "STEERBase" ||
3648 pkgname == "STEERBase.par") hasSTEERBase = kTRUE;
3649 if (pkgname == "ESD" ||
3650 pkgname == "ESD.par") hasESD = kTRUE;
3651 if (pkgname == "AOD" ||
3652 pkgname == "AOD.par") hasAOD = kTRUE;
3653 if (pkgname == "ANALYSIS" ||
3654 pkgname == "ANALYSIS.par") hasANALYSIS = kTRUE;
3655 if (pkgname == "OADB" ||
3656 pkgname == "OADB.par") hasOADB = kTRUE;
3657 if (pkgname == "ANALYSISalice" ||
3658 pkgname == "ANALYSISalice.par") hasANALYSISalice = kTRUE;
3659 if (pkgname == "CORRFW" ||
3660 pkgname == "CORRFW.par") hasCORRFW = kTRUE;
3662 if (hasANALYSISalice) setupPar = "SetupPar";
3663 if (!hasSTEERBase) out << " gSystem->Load(\"libSTEERBase\");" << endl;
3664 else out << " if (!" << setupPar << "(\"STEERBase\")) return;" << endl;
3665 if (!hasESD) out << " gSystem->Load(\"libESD\");" << endl;
3666 else out << " if (!" << setupPar << "(\"ESD\")) return;" << endl;
3667 if (!hasAOD) out << " gSystem->Load(\"libAOD\");" << endl;
3668 else out << " if (!" << setupPar << "(\"AOD\")) return;" << endl;
3669 if (!hasANALYSIS) out << " gSystem->Load(\"libANALYSIS\");" << endl;
3670 else out << " if (!" << setupPar << "(\"ANALYSIS\")) return;" << endl;
3671 if (!hasOADB) out << " gSystem->Load(\"libOADB\");" << endl;
3672 else out << " if (!" << setupPar << "(\"OADB\")) return;" << endl;
3673 if (!hasANALYSISalice) out << " gSystem->Load(\"libANALYSISalice\");" << endl;
3674 else out << " if (!" << setupPar << "(\"ANALYSISalice\")) return;" << endl;
3675 if (!hasCORRFW) out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
3676 else out << " if (!" << setupPar << "(\"CORRFW\")) return;" << endl << endl;
3677 out << "// Compile other par packages" << endl;
3679 while ((obj=next())) {
3680 pkgname = obj->GetName();
3681 if (pkgname == "STEERBase" ||
3682 pkgname == "STEERBase.par" ||
3684 pkgname == "ESD.par" ||
3686 pkgname == "AOD.par" ||
3687 pkgname == "ANALYSIS" ||
3688 pkgname == "ANALYSIS.par" ||
3689 pkgname == "OADB" ||
3690 pkgname == "OADB.par" ||
3691 pkgname == "ANALYSISalice" ||
3692 pkgname == "ANALYSISalice.par" ||
3693 pkgname == "CORRFW" ||
3694 pkgname == "CORRFW.par") continue;
3695 out << " if (!" << setupPar << "(\"" << obj->GetName() << "\")) return;" << endl;
3698 out << "// include path" << endl;
3699 // Get the include path from the interpreter and remove entries pointing to AliRoot
3700 out << " TString intPath = gInterpreter->GetIncludePath();" << endl;
3701 out << " TObjArray *listpaths = intPath.Tokenize(\" \");" << endl;
3702 out << " TIter nextpath(listpaths);" << endl;
3703 out << " TObjString *pname;" << endl;
3704 out << " while ((pname=(TObjString*)nextpath())) {" << endl;
3705 out << " TString current = pname->GetName();" << endl;
3706 out << " if (current.Contains(\"AliRoot\") || current.Contains(\"ALICE_ROOT\")) continue;" << endl;
3707 out << " gSystem->AddIncludePath(current);" << endl;
3708 out << " }" << endl;
3709 out << " if (listpaths) delete listpaths;" << endl;
3710 if (fIncludePath.Length()) out << " gSystem->AddIncludePath(\"" << fIncludePath.Data() << "\");" << endl;
3711 out << " gROOT->ProcessLine(\".include $ALICE_ROOT/include\");" << endl;
3712 out << " printf(\"Include path: %s\\n\", gSystem->GetIncludePath());" << endl << endl;
3713 if (fAdditionalLibs.Length()) {
3714 out << "// Add aditional AliRoot libraries" << endl;
3715 TObjArray *list = fAdditionalLibs.Tokenize(" ");
3718 while((str=(TObjString*)next())) {
3719 if (str->GetString().Contains(".so"))
3720 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
3721 if (str->GetString().Contains(".par"))
3722 out << " if (!" << setupPar << "(\"" << str->GetString() << "\")) return;" << endl;
3724 if (list) delete list;
3727 out << "// analysis source to be compiled at runtime (if any)" << endl;
3728 if (fAnalysisSource.Length()) {
3729 TObjArray *list = fAnalysisSource.Tokenize(" ");
3732 while((str=(TObjString*)next())) {
3733 out << " gROOT->ProcessLine(\".L " << str->GetString().Data() << "+g\");" << endl;
3735 if (list) delete list;
3738 // out << " printf(\"Currently load libraries:\\n\");" << endl;
3739 // out << " printf(\"%s\\n\", gSystem->GetLibraries());" << endl;
3740 if (fFastReadOption) {
3741 Warning("WriteAnalysisMacro", "!!! You requested FastRead option. Using xrootd flags to reduce timeouts in the grid jobs. This may skip some files that could be accessed !!! \
3742 \n+++ NOTE: To disable this option, use: plugin->SetFastReadOption(kFALSE)");
3743 out << "// fast xrootd reading enabled" << endl;
3744 out << " printf(\"!!! You requested FastRead option. Using xrootd flags to reduce timeouts. Note that this may skip some files that could be accessed !!!\");" << endl;
3745 out << " gEnv->SetValue(\"XNet.ConnectTimeout\",50);" << endl;
3746 out << " gEnv->SetValue(\"XNet.RequestTimeout\",50);" << endl;
3747 out << " gEnv->SetValue(\"XNet.MaxRedirectCount\",2);" << endl;
3748 out << " gEnv->SetValue(\"XNet.ReconnectTimeout\",50);" << endl;
3749 out << " gEnv->SetValue(\"XNet.FirstConnectMaxCnt\",1);" << endl << endl;
3751 if (!IsLocalTest()) {
3752 out << "// connect to AliEn and make the chain" << endl;
3753 out << " if (!TGrid::Connect(\"alien://\")) return;" << endl;
3755 out << "// read the analysis manager from file" << endl;
3756 TString analysisFile = fExecutable;
3757 analysisFile.ReplaceAll(".sh", ".root");
3758 out << " AliAnalysisManager *mgr = AliAnalysisAlien::LoadAnalysisManager(\""
3759 << analysisFile << "\");" << endl;
3760 out << " if (!mgr) return;" << endl;
3761 if (IsLocalTest()) {
3762 out << " AliAnalysisAlien *plugin = new AliAnalysisAlien();" << endl;
3763 out << " plugin->SetRunMode(\"test\");" << endl;
3764 out << " plugin->SetFileForTestMode(\"data.txt\");" << endl;
3765 out << " mgr->SetGridHandler(plugin);" << endl;
3766 out << " mgr->SetDebugLevel(10);" << endl;
3767 out << " mgr->SetNSysInfo(100);" << endl;
3769 out << " mgr->PrintStatus();" << endl;
3770 if (AliAnalysisManager::GetAnalysisManager()) {
3771 if (AliAnalysisManager::GetAnalysisManager()->GetDebugLevel()>3) {
3772 out << " gEnv->SetValue(\"XNet.Debug\", \"1\");" << endl;
3774 if (TestBit(AliAnalysisGrid::kTest))
3775 out << " AliLog::SetGlobalLogLevel(AliLog::kWarning);" << endl;
3777 out << " AliLog::SetGlobalLogLevel(AliLog::kError);" << endl;
3780 if (!IsLocalTest()) {
3781 out << " TChain *chain = CreateChain(\"wn.xml\", anatype);" << endl << endl;
3782 out << " mgr->StartAnalysis(\"localfile\", chain);" << endl;
3784 out << " mgr->StartAnalysis(\"localfile\");" << endl;
3786 out << " timer.Stop();" << endl;
3787 out << " timer.Print();" << endl;
3788 out << "}" << endl << endl;
3789 if (!IsLocalTest()) {
3790 out <<"//________________________________________________________________________________" << endl;
3791 out << "TChain* CreateChain(const char *xmlfile, const char *type=\"ESD\")" << endl;
3793 out << "// Create a chain using url's from xml file" << endl;
3794 out << " TString filename;" << endl;
3795 out << " Int_t run = 0;" << endl;
3796 if (IsUseMCchain()) {
3797 out << " TString treename = \"TE\";" << endl;
3799 out << " TString treename = type;" << endl;
3800 out << " treename.ToLower();" << endl;
3801 out << " treename += \"Tree\";" << endl;
3803 out << " printf(\"***************************************\\n\");" << endl;
3804 out << " printf(\" Getting chain of trees %s\\n\", treename.Data());" << endl;
3805 out << " printf(\"***************************************\\n\");" << endl;
3806 out << " TAlienCollection *coll = TAlienCollection::Open(xmlfile);" << endl;
3807 out << " if (!coll) {" << endl;
3808 out << " ::Error(\"CreateChain\", \"Cannot create an AliEn collection from %s\", xmlfile);" << endl;
3809 out << " return NULL;" << endl;
3810 out << " }" << endl;
3811 out << " AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();" << endl;
3812 out << " TChain *chain = new TChain(treename);" << endl;
3813 if(fFriendChainName!="") {
3814 out << " TChain *chainFriend = new TChain(treename);" << endl;
3816 out << " coll->Reset();" << endl;
3817 out << " while (coll->Next()) {" << endl;
3818 out << " filename = coll->GetTURL("");" << endl;
3819 out << " if (mgr) {" << endl;
3820 out << " Int_t nrun = AliAnalysisManager::GetRunFromAlienPath(filename);" << endl;
3821 out << " if (nrun && nrun != run) {" << endl;
3822 out << " printf(\"### Run number detected from chain: %d\\n\", nrun);" << endl;
3823 out << " mgr->SetRunFromPath(nrun);" << endl;
3824 out << " run = nrun;" << endl;
3825 out << " }" << endl;
3826 out << " }" << endl;
3827 out << " chain->Add(filename);" << endl;
3828 if(fFriendChainName!="") {
3829 out << " TString fileFriend=coll->GetTURL(\"\");" << endl;
3830 out << " fileFriend.ReplaceAll(\"AliAOD.root\",\""<<fFriendChainName.Data()<<"\");" << endl;
3831 out << " fileFriend.ReplaceAll(\"AliAODs.root\",\""<<fFriendChainName.Data()<<"\");" << endl;
3832 out << " chainFriend->Add(fileFriend.Data());" << endl;
3834 out << " }" << endl;
3835 out << " if (!chain->GetNtrees()) {" << endl;
3836 out << " ::Error(\"CreateChain\", \"No tree found from collection %s\", xmlfile);" << endl;
3837 out << " return NULL;" << endl;
3838 out << " }" << endl;
3839 if(fFriendChainName!="") {
3840 out << " chain->AddFriend(chainFriend);" << endl;
3842 out << " return chain;" << endl;
3843 out << "}" << endl << endl;
3845 if (hasANALYSISalice) {
3846 out <<"//________________________________________________________________________________" << endl;
3847 out << "Bool_t SetupPar(const char *package) {" << endl;
3848 out << "// Compile the package and set it up." << endl;
3849 out << " TString pkgdir = package;" << endl;
3850 out << " pkgdir.ReplaceAll(\".par\",\"\");" << endl;
3851 out << " gSystem->Exec(TString::Format(\"tar xvzf %s.par\", pkgdir.Data()));" << endl;
3852 out << " TString cdir = gSystem->WorkingDirectory();" << endl;
3853 out << " gSystem->ChangeDirectory(pkgdir);" << endl;
3854 out << " // Check for BUILD.sh and execute" << endl;
3855 out << " if (!gSystem->AccessPathName(\"PROOF-INF/BUILD.sh\")) {" << endl;
3856 out << " printf(\"*******************************\\n\");" << endl;
3857 out << " printf(\"*** Building PAR archive ***\\n\");" << endl;
3858 out << " printf(\"*******************************\\n\");" << endl;
3859 out << " if (gSystem->Exec(\"PROOF-INF/BUILD.sh\")) {" << endl;
3860 out << " ::Error(\"SetupPar\", \"Cannot build par archive %s\", pkgdir.Data());" << endl;
3861 out << " gSystem->ChangeDirectory(cdir);" << endl;
3862 out << " return kFALSE;" << endl;
3863 out << " }" << endl;
3864 out << " } else {" << endl;
3865 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/BUILD.sh for package %s\", pkgdir.Data());" << endl;
3866 out << " gSystem->ChangeDirectory(cdir);" << endl;
3867 out << " return kFALSE;" << endl;
3868 out << " }" << endl;
3869 out << " // Check for SETUP.C and execute" << endl;
3870 out << " if (!gSystem->AccessPathName(\"PROOF-INF/SETUP.C\")) {" << endl;
3871 out << " printf(\"*******************************\\n\");" << endl;
3872 out << " printf(\"*** Setup PAR archive ***\\n\");" << endl;
3873 out << " printf(\"*******************************\\n\");" << endl;
3874 out << " gROOT->Macro(\"PROOF-INF/SETUP.C\");" << endl;
3875 out << " } else {" << endl;
3876 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/SETUP.C for package %s\", pkgdir.Data());" << endl;
3877 out << " gSystem->ChangeDirectory(cdir);" << endl;
3878 out << " return kFALSE;" << endl;
3879 out << " }" << endl;
3880 out << " // Restore original workdir" << endl;
3881 out << " gSystem->ChangeDirectory(cdir);" << endl;
3882 out << " return kTRUE;" << endl;
3885 Info("WriteAnalysisMacro", "\n##### Analysis macro to run on worker nodes <%s> written",fAnalysisMacro.Data());
3887 Bool_t copy = kTRUE;
3888 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3891 TString workdir = gGrid->GetHomeDirectory();
3892 workdir += fGridWorkingDir;
3893 if (FileExists(fAnalysisMacro)) gGrid->Rm(fAnalysisMacro);
3894 Info("WriteAnalysisMacro", "\n##### Copying analysis macro: <%s> to your alien workspace", fAnalysisMacro.Data());
3895 // TFile::Cp(Form("file:%s",fAnalysisMacro.Data()), Form("alien://%s/%s", workdir.Data(), fAnalysisMacro.Data()));
3896 if (!copyLocal2Alien("WriteAnalysisMacro",fAnalysisMacro.Data(),
3897 Form("alien://%s/%s", workdir.Data(),
3898 fAnalysisMacro.Data()))) Fatal("","Terminating");
3902 //______________________________________________________________________________
3903 void AliAnalysisAlien::WriteMergingMacro()
3905 // Write a macro to merge the outputs per master job.
3906 if (!fMergeViaJDL) return;
3907 if (!fOutputFiles.Length()) {
3908 Error("WriteMergingMacro", "No output file names defined. Are you running the right AliAnalysisAlien configuration ?");
3911 TString mergingMacro = fExecutable;
3912 mergingMacro.ReplaceAll(".sh","_merge.C");
3913 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
3914 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3916 out.open(mergingMacro.Data(), ios::out);
3918 Error("WriteMergingMacro", "could not open file %s for writing", fAnalysisMacro.Data());
3921 Bool_t hasSTEERBase = kFALSE;
3922 Bool_t hasESD = kFALSE;
3923 Bool_t hasAOD = kFALSE;
3924 Bool_t hasANALYSIS = kFALSE;
3925 Bool_t hasOADB = kFALSE;
3926 Bool_t hasANALYSISalice = kFALSE;
3927 Bool_t hasCORRFW = kFALSE;
3928 TString func = mergingMacro;
3930 func.ReplaceAll(".C", "");
3931 out << "void " << func.Data() << "(const char *dir, Int_t stage=0)" << endl;
3933 out << "// Automatically generated merging macro executed in grid subjobs" << endl << endl;
3934 out << " TStopwatch timer;" << endl;
3935 out << " timer.Start();" << endl << endl;
3936 // Reset existing include path
3937 out << "// Reset existing include path and add current directory first in the search" << endl;
3938 out << " gSystem->SetIncludePath(\"-I.\");" << endl;
3939 if (!fExecutableCommand.Contains("aliroot")) {
3940 out << "// load base root libraries" << endl;
3941 out << " gSystem->Load(\"libTree\");" << endl;
3942 out << " gSystem->Load(\"libGeom\");" << endl;
3943 out << " gSystem->Load(\"libVMC\");" << endl;
3944 out << " gSystem->Load(\"libPhysics\");" << endl << endl;
3945 out << " gSystem->Load(\"libMinuit\");" << endl << endl;
3947 if (fAdditionalRootLibs.Length()) {
3948 // in principle libtree /lib geom libvmc etc. can go into this list, too
3949 out << "// Add aditional libraries" << endl;
3950 TObjArray *list = fAdditionalRootLibs.Tokenize(" ");
3953 while((str=(TObjString*)next())) {
3954 if (str->GetString().Contains(".so"))
3955 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
3957 if (list) delete list;
3959 out << "// Load analysis framework libraries" << endl;
3961 if (!fExecutableCommand.Contains("aliroot")) {
3962 out << " gSystem->Load(\"libSTEERBase\");" << endl;
3963 out << " gSystem->Load(\"libESD\");" << endl;
3964 out << " gSystem->Load(\"libAOD\");" << endl;
3966 out << " gSystem->Load(\"libANALYSIS\");" << endl;
3967 out << " gSystem->Load(\"libOADB\");" << endl;
3968 out << " gSystem->Load(\"libANALYSISalice\");" << endl;
3969 out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
3971 TIter next(fPackages);
3974 TString setupPar = "AliAnalysisAlien::SetupPar";
3975 while ((obj=next())) {
3976 pkgname = obj->GetName();
3977 if (pkgname == "STEERBase" ||
3978 pkgname == "STEERBase.par") hasSTEERBase = kTRUE;
3979 if (pkgname == "ESD" ||
3980 pkgname == "ESD.par") hasESD = kTRUE;
3981 if (pkgname == "AOD" ||
3982 pkgname == "AOD.par") hasAOD = kTRUE;
3983 if (pkgname == "ANALYSIS" ||
3984 pkgname == "ANALYSIS.par") hasANALYSIS = kTRUE;
3985 if (pkgname == "OADB" ||
3986 pkgname == "OADB.par") hasOADB = kTRUE;
3987 if (pkgname == "ANALYSISalice" ||
3988 pkgname == "ANALYSISalice.par") hasANALYSISalice = kTRUE;
3989 if (pkgname == "CORRFW" ||
3990 pkgname == "CORRFW.par") hasCORRFW = kTRUE;
3992 if (hasANALYSISalice) setupPar = "SetupPar";
3993 if (!hasSTEERBase) out << " gSystem->Load(\"libSTEERBase\");" << endl;
3994 else out << " if (!" << setupPar << "(\"STEERBase\")) return;" << endl;
3995 if (!hasESD) out << " gSystem->Load(\"libESD\");" << endl;
3996 else out << " if (!" << setupPar << "(\"ESD\")) return;" << endl;
3997 if (!hasAOD) out << " gSystem->Load(\"libAOD\");" << endl;
3998 else out << " if (!" << setupPar << "(\"AOD\")) return;" << endl;
3999 out << " gSystem->Load(\"libOADB\");" << endl;
4000 if (!hasANALYSIS) out << " gSystem->Load(\"libANALYSIS\");" << endl;
4001 else out << " if (!" << setupPar << "(\"ANALYSIS\")) return;" << endl;
4002 if (!hasOADB) out << " gSystem->Load(\"libOADB\");" << endl;
4003 else out << " if (!" << setupPar << "(\"OADB\")) return;" << endl;
4004 if (!hasANALYSISalice) out << " gSystem->Load(\"libANALYSISalice\");" << endl;
4005 else out << " if (!" << setupPar << "(\"ANALYSISalice\")) return;" << endl;
4006 if (!hasCORRFW) out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
4007 else out << " if (!" << setupPar << "(\"CORRFW\")) return;" << endl << endl;
4008 out << "// Compile other par packages" << endl;
4010 while ((obj=next())) {
4011 pkgname = obj->GetName();
4012 if (pkgname == "STEERBase" ||
4013 pkgname == "STEERBase.par" ||
4015 pkgname == "ESD.par" ||
4017 pkgname == "AOD.par" ||
4018 pkgname == "ANALYSIS" ||
4019 pkgname == "ANALYSIS.par" ||
4020 pkgname == "OADB" ||
4021 pkgname == "OADB.par" ||
4022 pkgname == "ANALYSISalice" ||
4023 pkgname == "ANALYSISalice.par" ||
4024 pkgname == "CORRFW" ||
4025 pkgname == "CORRFW.par") continue;
4026 out << " if (!" << setupPar << "(\"" << obj->GetName() << "\")) return;" << endl;
4029 out << "// include path" << endl;
4030 // Get the include path from the interpreter and remove entries pointing to AliRoot
4031 out << " TString intPath = gInterpreter->GetIncludePath();" << endl;
4032 out << " TObjArray *listpaths = intPath.Tokenize(\" \");" << endl;
4033 out << " TIter nextpath(listpaths);" << endl;
4034 out << " TObjString *pname;" << endl;
4035 out << " while ((pname=(TObjString*)nextpath())) {" << endl;
4036 out << " TString current = pname->GetName();" << endl;
4037 out << " if (current.Contains(\"AliRoot\") || current.Contains(\"ALICE_ROOT\")) continue;" << endl;
4038 out << " gSystem->AddIncludePath(current);" << endl;
4039 out << " }" << endl;
4040 out << " if (listpaths) delete listpaths;" << endl;
4041 if (fIncludePath.Length()) out << " gSystem->AddIncludePath(\"" << fIncludePath.Data() << "\");" << endl;
4042 out << " gROOT->ProcessLine(\".include $ALICE_ROOT/include\");" << endl;
4043 out << " printf(\"Include path: %s\\n\", gSystem->GetIncludePath());" << endl << endl;
4044 if (fAdditionalLibs.Length()) {
4045 out << "// Add aditional AliRoot libraries" << endl;
4046 TObjArray *list = fAdditionalLibs.Tokenize(" ");
4049 while((str=(TObjString*)next())) {
4050 if (str->GetString().Contains(".so"))
4051 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
4053 if (list) delete list;
4056 out << "// Analysis source to be compiled at runtime (if any)" << endl;
4057 if (fAnalysisSource.Length()) {
4058 TObjArray *list = fAnalysisSource.Tokenize(" ");
4061 while((str=(TObjString*)next())) {
4062 out << " gROOT->ProcessLine(\".L " << str->GetString().Data() << "+g\");" << endl;
4064 if (list) delete list;
4068 if (fFastReadOption) {
4069 Warning("WriteMergingMacro", "!!! You requested FastRead option. Using xrootd flags to reduce timeouts in the grid merging jobs. Note that this may skip some files that could be accessed !!!");
4070 out << "// fast xrootd reading enabled" << endl;
4071 out << " printf(\"!!! You requested FastRead option. Using xrootd flags to reduce timeouts. Note that this may skip some files that could be accessed !!!\");" << endl;
4072 out << " gEnv->SetValue(\"XNet.ConnectTimeout\",50);" << endl;
4073 out << " gEnv->SetValue(\"XNet.RequestTimeout\",50);" << endl;
4074 out << " gEnv->SetValue(\"XNet.MaxRedirectCount\",2);" << endl;
4075 out << " gEnv->SetValue(\"XNet.ReconnectTimeout\",50);" << endl;
4076 out << " gEnv->SetValue(\"XNet.FirstConnectMaxCnt\",1);" << endl << endl;
4078 // Change temp directory to current one
4079 out << "// Set temporary merging directory to current one" << endl;
4080 out << " gSystem->Setenv(\"TMPDIR\", gSystem->pwd());" << endl << endl;
4081 out << "// Set temporary compilation directory to current one" << endl;
4082 out << " gSystem->SetBuildDir(gSystem->pwd(), kTRUE);" << endl << endl;
4083 out << "// Connect to AliEn" << endl;
4084 out << " if (!TGrid::Connect(\"alien://\")) return;" << endl;
4085 out << " TString outputDir = dir;" << endl;
4086 out << " TString outputFiles = \"" << GetListOfFiles("out") << "\";" << endl;
4087 out << " TString mergeExcludes = \"" << fMergeExcludes << "\";" << endl;
4088 out << " TObjArray *list = outputFiles.Tokenize(\",\");" << endl;
4089 out << " TIter *iter = new TIter(list);" << endl;
4090 out << " TObjString *str;" << endl;
4091 out << " TString outputFile;" << endl;
4092 out << " Bool_t merged = kTRUE;" << endl;
4093 out << " while((str=(TObjString*)iter->Next())) {" << endl;
4094 out << " outputFile = str->GetString();" << endl;
4095 out << " if (outputFile.Contains(\"*\")) continue;" << endl;
4096 out << " Int_t index = outputFile.Index(\"@\");" << endl;
4097 out << " if (index > 0) outputFile.Remove(index);" << endl;
4098 out << " // Skip already merged outputs" << endl;
4099 out << " if (!gSystem->AccessPathName(outputFile)) {" << endl;
4100 out << " printf(\"Output file <%s> found. Not merging again.\",outputFile.Data());" << endl;
4101 out << " continue;" << endl;
4102 out << " }" << endl;
4103 out << " if (mergeExcludes.Contains(outputFile.Data())) continue;" << endl;
4104 out << " merged = AliAnalysisAlien::MergeOutput(outputFile, outputDir, " << fMaxMergeFiles << ", stage);" << endl;
4105 out << " if (!merged) {" << endl;
4106 out << " printf(\"ERROR: Cannot merge %s\\n\", outputFile.Data());" << endl;
4107 out << " return;" << endl;
4108 out << " }" << endl;
4109 out << " }" << endl;
4110 out << " // all outputs merged, validate" << endl;
4111 out << " ofstream out;" << endl;
4112 out << " out.open(\"outputs_valid\", ios::out);" << endl;
4113 out << " out.close();" << endl;
4114 out << " // read the analysis manager from file" << endl;
4115 TString analysisFile = fExecutable;
4116 analysisFile.ReplaceAll(".sh", ".root");
4117 out << " if (!outputDir.Contains(\"Stage\")) return;" << endl;
4118 out << " AliAnalysisManager *mgr = AliAnalysisAlien::LoadAnalysisManager(\""
4119 << analysisFile << "\");" << endl;
4120 out << " if (!mgr) return;" << endl;
4121 out << " mgr->SetRunFromPath(mgr->GetRunFromAlienPath(dir));" << endl;
4122 out << " mgr->SetSkipTerminate(kFALSE);" << endl;
4123 out << " mgr->PrintStatus();" << endl;
4124 if (AliAnalysisManager::GetAnalysisManager()) {
4125 if (AliAnalysisManager::GetAnalysisManager()->GetDebugLevel()>3) {
4126 out << " gEnv->SetValue(\"XNet.Debug\", \"1\");" << endl;
4128 if (TestBit(AliAnalysisGrid::kTest))
4129 out << " AliLog::SetGlobalLogLevel(AliLog::kWarning);" << endl;
4131 out << " AliLog::SetGlobalLogLevel(AliLog::kError);" << endl;
4134 out << " TTree *tree = NULL;" << endl;
4135 out << " mgr->StartAnalysis(\"gridterminate\", tree);" << endl;
4136 out << "}" << endl << endl;
4137 if (hasANALYSISalice) {
4138 out <<"//________________________________________________________________________________" << endl;
4139 out << "Bool_t SetupPar(const char *package) {" << endl;
4140 out << "// Compile the package and set it up." << endl;
4141 out << " TString pkgdir = package;" << endl;
4142 out << " pkgdir.ReplaceAll(\".par\",\"\");" << endl;
4143 out << " gSystem->Exec(TString::Format(\"tar xvzf %s.par\", pkgdir.Data()));" << endl;
4144 out << " TString cdir = gSystem->WorkingDirectory();" << endl;
4145 out << " gSystem->ChangeDirectory(pkgdir);" << endl;
4146 out << " // Check for BUILD.sh and execute" << endl;
4147 out << " if (!gSystem->AccessPathName(\"PROOF-INF/BUILD.sh\")) {" << endl;
4148 out << " printf(\"*******************************\\n\");" << endl;
4149 out << " printf(\"*** Building PAR archive ***\\n\");" << endl;
4150 out << " printf(\"*******************************\\n\");" << endl;
4151 out << " if (gSystem->Exec(\"PROOF-INF/BUILD.sh\")) {" << endl;
4152 out << " ::Error(\"SetupPar\", \"Cannot build par archive %s\", pkgdir.Data());" << endl;
4153 out << " gSystem->ChangeDirectory(cdir);" << endl;
4154 out << " return kFALSE;" << endl;
4155 out << " }" << endl;
4156 out << " } else {" << endl;
4157 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/BUILD.sh for package %s\", pkgdir.Data());" << endl;
4158 out << " gSystem->ChangeDirectory(cdir);" << endl;
4159 out << " return kFALSE;" << endl;
4160 out << " }" << endl;
4161 out << " // Check for SETUP.C and execute" << endl;
4162 out << " if (!gSystem->AccessPathName(\"PROOF-INF/SETUP.C\")) {" << endl;
4163 out << " printf(\"*******************************\\n\");" << endl;
4164 out << " printf(\"*** Setup PAR archive ***\\n\");" << endl;
4165 out << " printf(\"*******************************\\n\");" << endl;
4166 out << " gROOT->Macro(\"PROOF-INF/SETUP.C\");" << endl;
4167 out << " } else {" << endl;
4168 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/SETUP.C for package %s\", pkgdir.Data());" << endl;
4169 out << " gSystem->ChangeDirectory(cdir);" << endl;
4170 out << " return kFALSE;" << endl;
4171 out << " }" << endl;
4172 out << " // Restore original workdir" << endl;
4173 out << " gSystem->ChangeDirectory(cdir);" << endl;
4174 out << " return kTRUE;" << endl;
4178 Bool_t copy = kTRUE;
4179 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
4182 TString workdir = gGrid->GetHomeDirectory();
4183 workdir += fGridWorkingDir;
4184 if (FileExists(mergingMacro)) gGrid->Rm(mergingMacro);
4185 Info("WriteMergingMacro", "\n##### Copying merging macro: <%s> to your alien workspace", mergingMacro.Data());
4186 // TFile::Cp(Form("file:%s",mergingMacro.Data()), Form("alien://%s/%s", workdir.Data(), mergingMacro.Data()));
4187 if (!copyLocal2Alien("WriteMergeMacro",mergingMacro.Data(),
4188 Form("%s/%s", workdir.Data(), mergingMacro.Data()))) Fatal("","Terminating");
4192 //______________________________________________________________________________
4193 Bool_t AliAnalysisAlien::SetupPar(const char *package)
4195 // Compile the par file archive pointed by <package>. This must be present in the current directory.
4196 // Note that for loading the compiled library. The current directory should have precedence in
4198 TString pkgdir = package;
4199 pkgdir.ReplaceAll(".par","");
4200 gSystem->Exec(TString::Format("tar xzf %s.par", pkgdir.Data()));
4201 TString cdir = gSystem->WorkingDirectory();
4202 gSystem->ChangeDirectory(pkgdir);
4203 // Check for BUILD.sh and execute
4204 if (!gSystem->AccessPathName("PROOF-INF/BUILD.sh")) {
4205 printf("**************************************************\n");
4206 printf("*** Building PAR archive %s\n", package);
4207 printf("**************************************************\n");
4208 if (gSystem->Exec("PROOF-INF/BUILD.sh")) {
4209 ::Error("SetupPar", "Cannot build par archive %s", pkgdir.Data());
4210 gSystem->ChangeDirectory(cdir);
4214 ::Error("SetupPar","Cannot access PROOF-INF/BUILD.sh for package %s", pkgdir.Data());
4215 gSystem->ChangeDirectory(cdir);
4218 // Check for SETUP.C and execute
4219 if (!gSystem->AccessPathName("PROOF-INF/SETUP.C")) {
4220 printf("**************************************************\n");
4221 printf("*** Setup PAR archive %s\n", package);
4222 printf("**************************************************\n");
4223 gROOT->Macro("PROOF-INF/SETUP.C");
4224 printf("*** Loaded library: %s\n", gSystem->GetLibraries(pkgdir,"",kFALSE));
4226 ::Error("SetupPar","Cannot access PROOF-INF/SETUP.C for package %s", pkgdir.Data());
4227 gSystem->ChangeDirectory(cdir);
4230 // Restore original workdir
4231 gSystem->ChangeDirectory(cdir);
4235 //______________________________________________________________________________
4236 void AliAnalysisAlien::WriteExecutable()
4238 // Generate the alien executable script.
4239 if (!TestBit(AliAnalysisGrid::kSubmit)) {
4241 out.open(fExecutable.Data(), ios::out);
4243 Error("WriteExecutable", "Bad file name for executable: %s", fExecutable.Data());
4246 out << "#!/bin/bash" << endl;
4247 // Make sure we can properly compile par files
4248 out << "export LD_LIBRARY_PATH=.:$LD_LIBRARY_PATH" << endl;
4249 out << "echo \"=========================================\"" << endl;
4250 out << "echo \"############## PATH : ##############\"" << endl;
4251 out << "echo $PATH" << endl;
4252 out << "echo \"############## LD_LIBRARY_PATH : ##############\"" << endl;
4253 out << "echo $LD_LIBRARY_PATH" << endl;
4254 out << "echo \"############## ROOTSYS : ##############\"" << endl;
4255 out << "echo $ROOTSYS" << endl;
4256 out << "echo \"############## which root : ##############\"" << endl;
4257 out << "which root" << endl;
4258 out << "echo \"############## ALICE_ROOT : ##############\"" << endl;
4259 out << "echo $ALICE_ROOT" << endl;
4260 out << "echo \"############## which aliroot : ##############\"" << endl;
4261 out << "which aliroot" << endl;
4262 out << "echo \"############## system limits : ##############\"" << endl;
4263 out << "ulimit -a" << endl;
4264 out << "echo \"############## memory : ##############\"" << endl;
4265 out << "free -m" << endl;
4266 out << "echo \"=========================================\"" << endl << endl;
4267 out << fExecutableCommand << " ";
4268 out << fAnalysisMacro.Data() << " " << fExecutableArgs.Data() << endl << endl;
4269 out << "echo \"======== " << fAnalysisMacro.Data() << " finished with exit code: $? ========\"" << endl;
4270 out << "echo \"############## memory after: ##############\"" << endl;
4271 out << "free -m" << endl;
4273 Bool_t copy = kTRUE;
4274 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
4277 TString workdir = gGrid->GetHomeDirectory();
4278 TString bindir = Form("%s/bin", workdir.Data());
4279 if (!DirectoryExists(bindir)) gGrid->Mkdir(bindir,"-p");
4280 workdir += fGridWorkingDir;
4281 TString executable = Form("%s/bin/%s", gGrid->GetHomeDirectory(), fExecutable.Data());
4282 if (FileExists(executable)) gGrid->Rm(executable);
4283 Info("WriteExecutable", "\n##### Copying executable file <%s> to your AliEn bin directory", fExecutable.Data());
4284 // TFile::Cp(Form("file:%s",fExecutable.Data()), Form("alien://%s", executable.Data()));
4285 if (!copyLocal2Alien("WriteExecutable",fExecutable.Data(),
4286 executable.Data())) Fatal("","Terminating");
4290 //______________________________________________________________________________
4291 void AliAnalysisAlien::WriteMergeExecutable()
4293 // Generate the alien executable script for the merging job.
4294 if (!fMergeViaJDL) return;
4295 TString mergeExec = fExecutable;
4296 mergeExec.ReplaceAll(".sh", "_merge.sh");
4297 if (!TestBit(AliAnalysisGrid::kSubmit)) {
4299 out.open(mergeExec.Data(), ios::out);
4301 Error("WriteMergingExecutable", "Bad file name for executable: %s", mergeExec.Data());
4304 out << "#!/bin/bash" << endl;
4305 // Make sure we can properly compile par files
4306 out << "export LD_LIBRARY_PATH=.:$LD_LIBRARY_PATH" << endl;
4307 out << "echo \"=========================================\"" << endl;
4308 out << "echo \"############## PATH : ##############\"" << endl;
4309 out << "echo $PATH" << endl;
4310 out << "echo \"############## LD_LIBRARY_PATH : ##############\"" << endl;
4311 out << "echo $LD_LIBRARY_PATH" << endl;
4312 out << "echo \"############## ROOTSYS : ##############\"" << endl;
4313 out << "echo $ROOTSYS" << endl;
4314 out << "echo \"############## which root : ##############\"" << endl;
4315 out << "which root" << endl;
4316 out << "echo \"############## ALICE_ROOT : ##############\"" << endl;
4317 out << "echo $ALICE_ROOT" << endl;
4318 out << "echo \"############## which aliroot : ##############\"" << endl;
4319 out << "which aliroot" << endl;
4320 out << "echo \"############## system limits : ##############\"" << endl;
4321 out << "ulimit -a" << endl;
4322 out << "echo \"############## memory : ##############\"" << endl;
4323 out << "free -m" << endl;
4324 out << "echo \"=========================================\"" << endl << endl;
4325 TString mergeMacro = fExecutable;
4326 mergeMacro.ReplaceAll(".sh", "_merge.C");
4327 if (IsOneStageMerging())
4328 out << "export ARG=\"" << mergeMacro << "(\\\"$1\\\")\"" << endl;
4330 out << "export ARG=\"" << mergeMacro << "(\\\"$1\\\",$2)\"" << endl;
4331 out << fExecutableCommand << " " << "$ARG" << endl;
4332 out << "echo \"======== " << mergeMacro.Data() << " finished with exit code: $? ========\"" << endl;
4333 out << "echo \"############## memory after: ##############\"" << endl;
4334 out << "free -m" << endl;
4336 Bool_t copy = kTRUE;
4337 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
4340 TString workdir = gGrid->GetHomeDirectory();
4341 TString bindir = Form("%s/bin", workdir.Data());
4342 if (!DirectoryExists(bindir)) gGrid->Mkdir(bindir,"-p");
4343 workdir += fGridWorkingDir;
4344 TString executable = Form("%s/bin/%s", gGrid->GetHomeDirectory(), mergeExec.Data());
4345 if (FileExists(executable)) gGrid->Rm(executable);
4346 Info("WriteMergeExecutable", "\n##### Copying executable file <%s> to your AliEn bin directory", mergeExec.Data());
4347 // TFile::Cp(Form("file:%s",mergeExec.Data()), Form("alien://%s", executable.Data()));
4348 if (!copyLocal2Alien("WriteMergeExecutable",
4349 mergeExec.Data(), executable.Data())) Fatal("","Terminating");
4353 //______________________________________________________________________________
4354 void AliAnalysisAlien::WriteProductionFile(const char *filename) const
4356 // Write the production file to be submitted by LPM manager. The format is:
4357 // First line: full_path_to_jdl estimated_no_subjobs_per_master
4358 // Next lines: full_path_to_dataset XXX (XXX is a string)
4359 // To submit, one has to: submit jdl XXX for all lines
4361 out.open(filename, ios::out);
4363 Error("WriteProductionFile", "Bad file name: %s", filename);
4367 if (!fProductionMode && !fGridWorkingDir.BeginsWith("/alice"))
4368 workdir = gGrid->GetHomeDirectory();
4369 workdir += fGridWorkingDir;
4370 Int_t njobspermaster = 1000*fNrunsPerMaster/fSplitMaxInputFileNumber;
4371 TString locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
4372 out << locjdl << " " << njobspermaster << endl;
4373 Int_t nmasterjobs = fInputFiles->GetEntries();
4374 for (Int_t i=0; i<nmasterjobs; i++) {
4375 TString runOutDir = gSystem->BaseName(fInputFiles->At(i)->GetName());
4376 runOutDir.ReplaceAll(".xml", "");
4378 out << Form("%s", fInputFiles->At(i)->GetName()) << " " << runOutDir << endl;
4380 out << Form("%s", fInputFiles->At(i)->GetName()) << " " << Form("%03d", i) << endl;
4383 Info("WriteProductionFile", "\n##### Copying production file <%s> to your work directory", filename);
4384 if (FileExists(filename)) gGrid->Rm(filename);
4385 // TFile::Cp(Form("file:%s",filename), Form("alien://%s/%s", workdir.Data(),filename));
4386 if (!copyLocal2Alien("WriteProductionFile", filename,
4387 Form("%s/%s", workdir.Data(),filename))) Fatal("","Terminating");
4391 //______________________________________________________________________________
4392 void AliAnalysisAlien::WriteValidationScript(Bool_t merge)
4394 // Generate the alien validation script.
4395 // Generate the validation script
4397 if (fValidationScript.IsNull()) {
4398 fValidationScript = fExecutable;
4399 fValidationScript.ReplaceAll(".sh", "_validation.sh");
4401 TString validationScript = fValidationScript;
4402 if (merge) validationScript.ReplaceAll(".sh", "_merge.sh");
4404 Error("WriteValidationScript", "Alien connection required");
4407 if (!fTerminateFiles.IsNull()) {
4408 fTerminateFiles.Strip();
4409 fTerminateFiles.ReplaceAll(" ",",");
4411 TString outStream = "";
4412 if (!TestBit(AliAnalysisGrid::kTest)) outStream = " >> stdout";
4413 if (!TestBit(AliAnalysisGrid::kSubmit)) {
4415 out.open(validationScript, ios::out);
4416 out << "#!/bin/bash" << endl;
4417 out << "##################################################" << endl;
4418 out << "validateout=`dirname $0`" << endl;
4419 out << "validatetime=`date`" << endl;
4420 out << "validated=\"0\";" << endl;
4421 out << "error=0" << endl;
4422 out << "if [ -z $validateout ]" << endl;
4423 out << "then" << endl;
4424 out << " validateout=\".\"" << endl;
4425 out << "fi" << endl << endl;
4426 out << "cd $validateout;" << endl;
4427 out << "validateworkdir=`pwd`;" << endl << endl;
4428 out << "echo \"*******************************************************\"" << outStream << endl;
4429 out << "echo \"* Automatically generated validation script *\"" << outStream << endl;
4431 out << "echo \"* Time: $validatetime \"" << outStream << endl;
4432 out << "echo \"* Dir: $validateout\"" << outStream << endl;
4433 out << "echo \"* Workdir: $validateworkdir\"" << outStream << endl;
4434 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl;
4435 out << "ls -la ./" << outStream << endl;
4436 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl << endl;
4437 out << "##################################################" << endl;
4440 out << "if [ ! -f stderr ] ; then" << endl;
4441 out << " error=1" << endl;
4442 out << " echo \"* ########## Job not validated - no stderr ###\" " << outStream << endl;
4443 out << " echo \"Error = $error\" " << outStream << endl;
4444 out << "fi" << endl;
4446 out << "parArch=`grep -Ei \"Cannot Build the PAR Archive\" stderr`" << endl;
4447 out << "segViol=`grep -Ei \"Segmentation violation\" stderr`" << endl;
4448 out << "segFault=`grep -Ei \"Segmentation fault\" stderr`" << endl;
4449 out << "glibcErr=`grep -Ei \"*** glibc detected ***\" stderr`" << endl;
4452 out << "if [ \"$parArch\" != \"\" ] ; then" << endl;
4453 out << " error=1" << endl;
4454 out << " echo \"* ########## Job not validated - PAR archive not built ###\" " << outStream << endl;
4455 out << " echo \"$parArch\" " << outStream << endl;
4456 out << " echo \"Error = $error\" " << outStream << endl;
4457 out << "fi" << endl;
4459 out << "if [ \"$segViol\" != \"\" ] ; then" << endl;
4460 out << " error=1" << endl;
4461 out << " echo \"* ########## Job not validated - Segment. violation ###\" " << outStream << endl;
4462 out << " echo \"$segViol\" " << outStream << endl;
4463 out << " echo \"Error = $error\" " << outStream << endl;
4464 out << "fi" << endl;
4466 out << "if [ \"$segFault\" != \"\" ] ; then" << endl;
4467 out << " error=1" << endl;
4468 out << " echo \"* ########## Job not validated - Segment. fault ###\" " << outStream << endl;
4469 out << " echo \"$segFault\" " << outStream << endl;
4470 out << " echo \"Error = $error\" " << outStream << endl;
4471 out << "fi" << endl;
4473 out << "if [ \"$glibcErr\" != \"\" ] ; then" << endl;
4474 out << " error=1" << endl;
4475 out << " echo \"* ########## Job not validated - *** glibc detected *** ###\" " << outStream << endl;
4476 out << " echo \"$glibcErr\" " << outStream << endl;
4477 out << " echo \"Error = $error\" " << outStream << endl;
4478 out << "fi" << endl;
4480 // Part dedicated to the specific analyses running into the train
4482 TString outputFiles = fOutputFiles;
4483 if (merge && !fTerminateFiles.IsNull()) {
4485 outputFiles += fTerminateFiles;
4487 TObjArray *arr = outputFiles.Tokenize(",");
4490 while (!merge && (os=(TObjString*)next1())) {
4491 // No need to validate outputs produced by merging since the merging macro does this
4492 outputFile = os->GetString();
4493 Int_t index = outputFile.Index("@");
4494 if (index > 0) outputFile.Remove(index);
4495 if (fTerminateFiles.Contains(outputFile)) continue;
4496 if (outputFile.Contains("*")) continue;
4497 out << "if ! [ -f " << outputFile.Data() << " ] ; then" << endl;
4498 out << " error=1" << endl;
4499 out << " echo \"Output file " << outputFile << " not found. Job FAILED !\"" << outStream << endl;
4500 out << " echo \"Output file " << outputFile << " not found. Job FAILED !\" >> stderr" << endl;
4501 out << "fi" << endl;
4504 out << "if ! [ -f outputs_valid ] ; then" << endl;
4505 out << " error=1" << endl;
4506 out << " echo \"Output files were not validated by the analysis manager\" >> stdout" << endl;
4507 out << " echo \"Output files were not validated by the analysis manager\" >> stderr" << endl;
4508 out << "fi" << endl;
4510 out << "if [ $error = 0 ] ; then" << endl;
4511 out << " echo \"* ---------------- Job Validated ------------------*\"" << outStream << endl;
4512 if (!IsKeepLogs()) {
4513 out << " echo \"* === Logs std* will be deleted === \"" << endl;
4515 out << " rm -f std*" << endl;
4517 out << "fi" << endl;
4519 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl;
4520 out << "echo \"*******************************************************\"" << outStream << endl;
4521 out << "cd -" << endl;
4522 out << "exit $error" << endl;
4524 Bool_t copy = kTRUE;
4525 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
4528 TString workdir = gGrid->GetHomeDirectory();
4529 workdir += fGridWorkingDir;
4530 Info("WriteValidationScript", "\n##### Copying validation script <%s> to your AliEn working space", validationScript.Data());
4531 if (FileExists(validationScript)) gGrid->Rm(validationScript);
4532 // TFile::Cp(Form("file:%s",validationScript.Data()), Form("alien://%s/%s", workdir.Data(),validationScript.Data()));
4533 if (!copyLocal2Alien("WriteValidationScript", validationScript.Data(),
4534 Form("%s/%s",workdir.Data(), validationScript.Data()))) Fatal("","Terminating");