1 /**************************************************************************
2 * Copyright(c) 1998-2007, ALICE Experiment at CERN, All rights reserved. *
4 * Author: The ALICE Off-line Project. *
5 * Contributors are mentioned in the code where appropriate. *
7 * Permission to use, copy, modify and distribute this software and its *
8 * documentation strictly for non-commercial purposes is hereby granted *
9 * without fee, provided that the above copyright notice appears in all *
10 * copies and that both the copyright notice and this permission notice *
11 * appear in the supporting documentation. The authors make no claims *
12 * about the suitability of this software for any purpose. It is *
13 * provided "as is" without express or implied warranty. *
14 **************************************************************************/
16 // Author: Mihaela Gheata, 01/09/2008
18 //==============================================================================
19 // AliAnalysisAlien - AliEn utility class. Provides interface for creating
20 // a personalized JDL, finding and creating a dataset.
21 //==============================================================================
23 #include "AliAnalysisAlien.h"
25 #include "Riostream.h"
33 #include "TFileCollection.h"
35 #include "TObjString.h"
36 #include "TObjArray.h"
39 #include "TGridResult.h"
40 #include "TGridCollection.h"
42 #include "TGridJobStatusList.h"
43 #include "TGridJobStatus.h"
44 #include "TFileMerger.h"
45 #include "AliAnalysisManager.h"
46 #include "AliAnalysisTaskCfg.h"
47 #include "AliVEventHandler.h"
48 #include "AliAnalysisDataContainer.h"
49 #include "AliMultiInputEventHandler.h"
55 ClassImp(AliAnalysisAlien)
61 Bool_t copyLocal2Alien(const char* where, const char* loc, const char* rem)
63 TString sl(Form("file:%s", loc));
64 TString sr(Form("alien://%s", rem));
65 Bool_t ret = TFile::Cp(sl, sr);
67 Warning(where, "Failed to copy %s to %s", sl.Data(), sr.Data());
73 //______________________________________________________________________________
74 AliAnalysisAlien::AliAnalysisAlien()
80 fSplitMaxInputFileNumber(0),
82 fMasterResubmitThreshold(0),
95 fNproofWorkersPerSlave(0),
105 fAdditionalRootLibs(),
149 //______________________________________________________________________________
150 AliAnalysisAlien::AliAnalysisAlien(const char *name)
151 :AliAnalysisGrid(name),
156 fSplitMaxInputFileNumber(0),
158 fMasterResubmitThreshold(0),
171 fNproofWorkersPerSlave(0),
175 fExecutableCommand(),
181 fAdditionalRootLibs(),
225 //______________________________________________________________________________
226 AliAnalysisAlien::AliAnalysisAlien(const AliAnalysisAlien& other)
227 :AliAnalysisGrid(other),
230 fPrice(other.fPrice),
232 fSplitMaxInputFileNumber(other.fSplitMaxInputFileNumber),
233 fMaxInitFailed(other.fMaxInitFailed),
234 fMasterResubmitThreshold(other.fMasterResubmitThreshold),
235 fNtestFiles(other.fNtestFiles),
236 fNrunsPerMaster(other.fNrunsPerMaster),
237 fMaxMergeFiles(other.fMaxMergeFiles),
238 fMaxMergeStages(other.fMaxMergeStages),
239 fNsubmitted(other.fNsubmitted),
240 fProductionMode(other.fProductionMode),
241 fOutputToRunNo(other.fOutputToRunNo),
242 fMergeViaJDL(other.fMergeViaJDL),
243 fFastReadOption(other.fFastReadOption),
244 fOverwriteMode(other.fOverwriteMode),
245 fNreplicas(other.fNreplicas),
246 fNproofWorkers(other.fNproofWorkers),
247 fNproofWorkersPerSlave(other.fNproofWorkersPerSlave),
248 fProofReset(other.fProofReset),
249 fRunNumbers(other.fRunNumbers),
250 fExecutable(other.fExecutable),
251 fExecutableCommand(other.fExecutableCommand),
252 fArguments(other.fArguments),
253 fExecutableArgs(other.fExecutableArgs),
254 fAnalysisMacro(other.fAnalysisMacro),
255 fAnalysisSource(other.fAnalysisSource),
256 fValidationScript(other.fValidationScript),
257 fAdditionalRootLibs(other.fAdditionalRootLibs),
258 fAdditionalLibs(other.fAdditionalLibs),
259 fSplitMode(other.fSplitMode),
260 fAPIVersion(other.fAPIVersion),
261 fROOTVersion(other.fROOTVersion),
262 fAliROOTVersion(other.fAliROOTVersion),
263 fExternalPackages(other.fExternalPackages),
265 fGridWorkingDir(other.fGridWorkingDir),
266 fGridDataDir(other.fGridDataDir),
267 fDataPattern(other.fDataPattern),
268 fGridOutputDir(other.fGridOutputDir),
269 fOutputArchive(other.fOutputArchive),
270 fOutputFiles(other.fOutputFiles),
271 fInputFormat(other.fInputFormat),
272 fDatasetName(other.fDatasetName),
273 fJDLName(other.fJDLName),
274 fTerminateFiles(other.fTerminateFiles),
275 fMergeExcludes(other.fMergeExcludes),
276 fRegisterExcludes(other.fRegisterExcludes),
277 fIncludePath(other.fIncludePath),
278 fCloseSE(other.fCloseSE),
279 fFriendChainName(other.fFriendChainName),
280 fJobTag(other.fJobTag),
281 fOutputSingle(other.fOutputSingle),
282 fRunPrefix(other.fRunPrefix),
283 fProofCluster(other.fProofCluster),
284 fProofDataSet(other.fProofDataSet),
285 fFileForTestMode(other.fFileForTestMode),
286 fAliRootMode(other.fAliRootMode),
287 fProofProcessOpt(other.fProofProcessOpt),
288 fMergeDirName(other.fMergeDirName),
293 fDropToShell(other.fDropToShell),
294 fGridJobIDs(other.fGridJobIDs),
295 fGridStages(other.fGridStages)
298 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
299 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
300 fRunRange[0] = other.fRunRange[0];
301 fRunRange[1] = other.fRunRange[1];
302 if (other.fInputFiles) {
303 fInputFiles = new TObjArray();
304 TIter next(other.fInputFiles);
306 while ((obj=next())) fInputFiles->Add(new TObjString(obj->GetName()));
307 fInputFiles->SetOwner();
309 if (other.fPackages) {
310 fPackages = new TObjArray();
311 TIter next(other.fPackages);
313 while ((obj=next())) fPackages->Add(new TObjString(obj->GetName()));
314 fPackages->SetOwner();
316 if (other.fModules) {
317 fModules = new TObjArray();
318 fModules->SetOwner();
319 TIter next(other.fModules);
320 AliAnalysisTaskCfg *mod, *crt;
321 while ((crt=(AliAnalysisTaskCfg*)next())) {
322 mod = new AliAnalysisTaskCfg(*crt);
328 //______________________________________________________________________________
329 AliAnalysisAlien::~AliAnalysisAlien()
337 fProofParam.DeleteAll();
340 //______________________________________________________________________________
341 AliAnalysisAlien &AliAnalysisAlien::operator=(const AliAnalysisAlien& other)
344 if (this != &other) {
345 AliAnalysisGrid::operator=(other);
346 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
347 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
348 fPrice = other.fPrice;
350 fSplitMaxInputFileNumber = other.fSplitMaxInputFileNumber;
351 fMaxInitFailed = other.fMaxInitFailed;
352 fMasterResubmitThreshold = other.fMasterResubmitThreshold;
353 fNtestFiles = other.fNtestFiles;
354 fNrunsPerMaster = other.fNrunsPerMaster;
355 fMaxMergeFiles = other.fMaxMergeFiles;
356 fMaxMergeStages = other.fMaxMergeStages;
357 fNsubmitted = other.fNsubmitted;
358 fProductionMode = other.fProductionMode;
359 fOutputToRunNo = other.fOutputToRunNo;
360 fMergeViaJDL = other.fMergeViaJDL;
361 fFastReadOption = other.fFastReadOption;
362 fOverwriteMode = other.fOverwriteMode;
363 fNreplicas = other.fNreplicas;
364 fNproofWorkers = other.fNproofWorkers;
365 fNproofWorkersPerSlave = other.fNproofWorkersPerSlave;
366 fProofReset = other.fProofReset;
367 fRunNumbers = other.fRunNumbers;
368 fExecutable = other.fExecutable;
369 fExecutableCommand = other.fExecutableCommand;
370 fArguments = other.fArguments;
371 fExecutableArgs = other.fExecutableArgs;
372 fAnalysisMacro = other.fAnalysisMacro;
373 fAnalysisSource = other.fAnalysisSource;
374 fValidationScript = other.fValidationScript;
375 fAdditionalRootLibs = other.fAdditionalRootLibs;
376 fAdditionalLibs = other.fAdditionalLibs;
377 fSplitMode = other.fSplitMode;
378 fAPIVersion = other.fAPIVersion;
379 fROOTVersion = other.fROOTVersion;
380 fAliROOTVersion = other.fAliROOTVersion;
381 fExternalPackages = other.fExternalPackages;
383 fGridWorkingDir = other.fGridWorkingDir;
384 fGridDataDir = other.fGridDataDir;
385 fDataPattern = other.fDataPattern;
386 fGridOutputDir = other.fGridOutputDir;
387 fOutputArchive = other.fOutputArchive;
388 fOutputFiles = other.fOutputFiles;
389 fInputFormat = other.fInputFormat;
390 fDatasetName = other.fDatasetName;
391 fJDLName = other.fJDLName;
392 fTerminateFiles = other.fTerminateFiles;
393 fMergeExcludes = other.fMergeExcludes;
394 fRegisterExcludes = other.fRegisterExcludes;
395 fIncludePath = other.fIncludePath;
396 fCloseSE = other.fCloseSE;
397 fFriendChainName = other.fFriendChainName;
398 fJobTag = other.fJobTag;
399 fOutputSingle = other.fOutputSingle;
400 fRunPrefix = other.fRunPrefix;
401 fProofCluster = other.fProofCluster;
402 fProofDataSet = other.fProofDataSet;
403 fFileForTestMode = other.fFileForTestMode;
404 fAliRootMode = other.fAliRootMode;
405 fProofProcessOpt = other.fProofProcessOpt;
406 fMergeDirName = other.fMergeDirName;
407 fDropToShell = other.fDropToShell;
408 fGridJobIDs = other.fGridJobIDs;
409 fGridStages = other.fGridStages;
410 if (other.fInputFiles) {
411 fInputFiles = new TObjArray();
412 TIter next(other.fInputFiles);
414 while ((obj=next())) fInputFiles->Add(new TObjString(obj->GetName()));
415 fInputFiles->SetOwner();
417 if (other.fPackages) {
418 fPackages = new TObjArray();
419 TIter next(other.fPackages);
421 while ((obj=next())) fPackages->Add(new TObjString(obj->GetName()));
422 fPackages->SetOwner();
424 if (other.fModules) {
425 fModules = new TObjArray();
426 fModules->SetOwner();
427 TIter next(other.fModules);
428 AliAnalysisTaskCfg *mod, *crt;
429 while ((crt=(AliAnalysisTaskCfg*)next())) {
430 mod = new AliAnalysisTaskCfg(*crt);
438 //______________________________________________________________________________
439 void AliAnalysisAlien::AddAdditionalLibrary(const char *name)
441 // Add a single additional library to be loaded. Extension must be present.
443 if (!lib.Contains(".")) {
444 Error("AddAdditionalLibrary", "Extension not defined for %s", name);
447 if (fAdditionalLibs.Contains(name)) {
448 Warning("AddAdditionalLibrary", "Library %s already added.", name);
451 if (!fAdditionalLibs.IsNull()) fAdditionalLibs += " ";
452 fAdditionalLibs += lib;
455 //______________________________________________________________________________
456 void AliAnalysisAlien::AddModule(AliAnalysisTaskCfg *module)
458 // Adding a module. Checks if already existing. Becomes owned by this.
460 if (GetModule(module->GetName())) {
461 Error("AddModule", "A module having the same name %s already added", module->GetName());
465 fModules = new TObjArray();
466 fModules->SetOwner();
468 fModules->Add(module);
471 //______________________________________________________________________________
472 void AliAnalysisAlien::AddModules(TObjArray *list)
474 // Adding a list of modules. Checks if already existing. Becomes owned by this.
476 AliAnalysisTaskCfg *module;
477 while ((module = (AliAnalysisTaskCfg*)next())) AddModule(module);
480 //______________________________________________________________________________
481 Bool_t AliAnalysisAlien::CheckDependencies()
483 // Check if all dependencies are satisfied. Reorder modules if needed.
484 Int_t nmodules = GetNmodules();
486 Warning("CheckDependencies", "No modules added yet to check their dependencies");
489 AliAnalysisTaskCfg *mod = 0;
490 AliAnalysisTaskCfg *dep = 0;
493 for (i=0; i<nmodules; i++) {
494 mod = (AliAnalysisTaskCfg*) fModules->At(i);
495 Int_t ndeps = mod->GetNdeps();
497 for (j=0; j<ndeps; j++) {
498 depname = mod->GetDependency(j);
499 dep = GetModule(depname);
501 Error("CheckDependencies","Dependency %s not added for module %s",
502 depname.Data(), mod->GetName());
505 if (dep->NeedsDependency(mod->GetName())) {
506 Error("CheckDependencies","Modules %s and %s circularly depend on each other",
507 mod->GetName(), dep->GetName());
510 Int_t idep = fModules->IndexOf(dep);
511 // The dependency task must come first
513 // Remove at idep and move all objects below up one slot
514 // down to index i included.
515 fModules->RemoveAt(idep);
516 for (k=idep-1; k>=i; k--) fModules->AddAt(fModules->RemoveAt(k),k+1);
517 fModules->AddAt(dep, i++);
519 //Redo from istart if dependencies were inserted
520 if (i>istart) i=istart-1;
526 //______________________________________________________________________________
527 AliAnalysisManager *AliAnalysisAlien::CreateAnalysisManager(const char *name, const char *filename)
529 // Create the analysis manager and optionally execute the macro in filename.
530 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
532 mgr = new AliAnalysisManager(name);
533 mgr->SetGridHandler((AliAnalysisGrid*)this);
534 if (strlen(filename)) {
535 TString line = gSystem->ExpandPathName(filename);
537 gROOT->ProcessLine(line.Data());
542 //______________________________________________________________________________
543 Int_t AliAnalysisAlien::GetNmodules() const
545 // Get number of modules.
546 if (!fModules) return 0;
547 return fModules->GetEntries();
550 //______________________________________________________________________________
551 AliAnalysisTaskCfg *AliAnalysisAlien::GetModule(const char *name)
553 // Get a module by name.
554 if (!fModules) return 0;
555 return (AliAnalysisTaskCfg*)fModules->FindObject(name);
558 //______________________________________________________________________________
559 Bool_t AliAnalysisAlien::LoadModule(AliAnalysisTaskCfg *mod)
561 // Load a given module.
562 if (mod->IsLoaded()) return kTRUE;
563 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
565 Error("LoadModule", "No analysis manager created yet. Use CreateAnalysisManager first.");
568 Int_t ndeps = mod->GetNdeps();
570 for (Int_t j=0; j<ndeps; j++) {
571 depname = mod->GetDependency(j);
572 AliAnalysisTaskCfg *dep = GetModule(depname);
574 Error("LoadModule","Dependency %s not existing for module %s",
575 depname.Data(), mod->GetName());
578 if (!LoadModule(dep)) {
579 Error("LoadModule","Dependency %s for module %s could not be loaded",
580 depname.Data(), mod->GetName());
584 // Load libraries for the module
585 if (!mod->CheckLoadLibraries()) {
586 Error("LoadModule", "Cannot load all libraries for module %s", mod->GetName());
589 // Check if a custom file name was requested
590 if (strlen(mod->GetOutputFileName())) mgr->SetCommonFileName(mod->GetOutputFileName());
592 // Check if a custom terminate file name was requested
593 if (strlen(mod->GetTerminateFileName())) {
594 if (!fTerminateFiles.IsNull()) fTerminateFiles += ",";
595 fTerminateFiles += mod->GetTerminateFileName();
599 if (mod->ExecuteMacro()<0) {
600 Error("LoadModule", "Executing the macro %s with arguments: %s for module %s returned a negative value",
601 mod->GetMacroName(), mod->GetMacroArgs(), mod->GetName());
604 // Configure dependencies
605 if (mod->GetConfigMacro() && mod->ExecuteConfigMacro()<0) {
606 Error("LoadModule", "There was an error executing the deps config macro %s for module %s",
607 mod->GetConfigMacro()->GetTitle(), mod->GetName());
610 // Adjust extra libraries
611 Int_t nlibs = mod->GetNlibs();
613 for (Int_t i=0; i<nlibs; i++) {
614 lib = mod->GetLibrary(i);
615 lib = Form("lib%s.so", lib.Data());
616 if (fAdditionalLibs.Contains(lib)) continue;
617 if (!fAdditionalLibs.IsNull()) fAdditionalLibs += " ";
618 fAdditionalLibs += lib;
623 //______________________________________________________________________________
624 Bool_t AliAnalysisAlien::GenerateTrain(const char *name)
626 // Generate the full train.
627 fAdditionalLibs = "";
628 if (!LoadModules()) return kFALSE;
629 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
630 if (!mgr->InitAnalysis()) return kFALSE;
633 Int_t productionMode = fProductionMode;
635 TString macro = fAnalysisMacro;
636 TString executable = fExecutable;
637 TString validation = fValidationScript;
638 TString execCommand = fExecutableCommand;
639 SetAnalysisMacro(Form("%s.C", name));
640 SetExecutable(Form("%s.sh", name));
641 // SetExecutableCommand("aliroot -b -q ");
642 SetValidationScript(Form("%s_validation.sh", name));
644 SetProductionMode(productionMode);
645 fAnalysisMacro = macro;
646 fExecutable = executable;
647 fExecutableCommand = execCommand;
648 fValidationScript = validation;
652 //______________________________________________________________________________
653 Bool_t AliAnalysisAlien::GenerateTest(const char *name, const char *modname)
655 // Generate test macros for a single module or for the full train.
656 fAdditionalLibs = "";
657 if (strlen(modname)) {
658 if (!CheckDependencies()) return kFALSE;
659 AliAnalysisTaskCfg *mod = GetModule(modname);
661 Error("GenerateTest", "cannot generate test for inexistent module %s", modname);
664 if (!LoadModule(mod)) return kFALSE;
665 } else if (!LoadModules()) return kFALSE;
666 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
667 if (!mgr->InitAnalysis()) return kFALSE;
671 Int_t productionMode = fProductionMode;
673 TString macro = fAnalysisMacro;
674 TString executable = fExecutable;
675 TString validation = fValidationScript;
676 TString execCommand = fExecutableCommand;
677 SetAnalysisMacro(Form("%s.C", name));
678 SetExecutable(Form("%s.sh", name));
679 fOutputFiles = GetListOfFiles("outaod");
680 // Add extra files registered to the analysis manager
681 TString extra = GetListOfFiles("ext");
682 if (!extra.IsNull()) {
683 extra.ReplaceAll(".root", "*.root");
684 if (!fOutputFiles.IsNull()) fOutputFiles += ",";
685 fOutputFiles += extra;
687 // SetExecutableCommand("aliroot -b -q ");
688 SetValidationScript(Form("%s_validation.sh", name));
690 WriteAnalysisMacro();
692 WriteValidationScript();
694 WriteMergeExecutable();
695 WriteValidationScript(kTRUE);
696 SetLocalTest(kFALSE);
697 SetProductionMode(productionMode);
698 fAnalysisMacro = macro;
699 fExecutable = executable;
700 fExecutableCommand = execCommand;
701 fValidationScript = validation;
705 //______________________________________________________________________________
706 Bool_t AliAnalysisAlien::LoadModules()
708 // Load all modules by executing the AddTask macros. Checks first the dependencies.
709 fAdditionalLibs = "";
710 Int_t nmodules = GetNmodules();
712 Warning("LoadModules", "No module to be loaded");
715 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
717 Error("LoadModules", "No analysis manager created yet. Use CreateAnalysisManager first.");
720 if (!CheckDependencies()) return kFALSE;
721 nmodules = GetNmodules();
722 AliAnalysisTaskCfg *mod;
723 for (Int_t imod=0; imod<nmodules; imod++) {
724 mod = (AliAnalysisTaskCfg*)fModules->At(imod);
725 if (!LoadModule(mod)) return kFALSE;
730 //______________________________________________________________________________
731 void AliAnalysisAlien::SetRunPrefix(const char *prefix)
733 // Set the run number format. Can be a prefix or a format like "%09d"
735 if (!fRunPrefix.Contains("%")) fRunPrefix += "%d";
738 //______________________________________________________________________________
739 void AliAnalysisAlien::AddIncludePath(const char *path)
741 // Add include path in the remote analysis macro.
743 if (p.Contains("-I")) fIncludePath += Form("%s ", path);
744 else fIncludePath += Form("-I%s ", path);
747 //______________________________________________________________________________
748 void AliAnalysisAlien::AddRunNumber(Int_t run)
750 // Add a run number to the list of runs to be processed.
751 if (fRunNumbers.Length()) fRunNumbers += " ";
752 fRunNumbers += Form(fRunPrefix.Data(), run);
755 //______________________________________________________________________________
756 void AliAnalysisAlien::AddRunList(const char* runList)
758 // Add several runs into the list of runs; they are expected to be separated by a blank character.
759 TString sList = runList;
760 TObjArray *list = sList.Tokenize(" ");
761 Int_t n = list->GetEntries();
762 for (Int_t i = 0; i < n; i++) {
763 TObjString *os = (TObjString*)list->At(i);
764 AddRunNumber(os->GetString().Atoi());
769 //______________________________________________________________________________
770 void AliAnalysisAlien::AddRunNumber(const char* run)
772 // Add a run number to the list of runs to be processed.
775 TObjArray *arr = runs.Tokenize(" ");
778 prefix.Append(fRunPrefix, fRunPrefix.Index("%d"));
779 while ((os=(TObjString*)next())){
780 if (fRunNumbers.Length()) fRunNumbers += " ";
781 fRunNumbers += Form("%s%s", prefix.Data(), os->GetString().Data());
786 //______________________________________________________________________________
787 void AliAnalysisAlien::AddDataFile(const char *lfn)
789 // Adds a data file to the input to be analysed. The file should be a valid LFN
790 // or point to an existing file in the alien workdir.
791 if (!fInputFiles) fInputFiles = new TObjArray();
792 fInputFiles->Add(new TObjString(lfn));
795 //______________________________________________________________________________
796 void AliAnalysisAlien::AddExternalPackage(const char *package)
798 // Adds external packages w.r.t to the default ones (root,aliroot and gapi)
799 if (fExternalPackages) fExternalPackages += " ";
800 fExternalPackages += package;
803 //______________________________________________________________________________
804 Bool_t AliAnalysisAlien::Connect()
806 // Try to connect to AliEn. User needs a valid token and /tmp/gclient_env_$UID sourced.
807 if (gGrid && gGrid->IsConnected()) return kTRUE;
808 if (fProductionMode) return kTRUE;
810 Info("Connect", "Trying to connect to AliEn ...");
811 TGrid::Connect("alien://");
813 if (!gGrid || !gGrid->IsConnected()) {
814 Error("Connect", "Did not managed to connect to AliEn. Make sure you have a valid token.");
817 fUser = gGrid->GetUser();
818 Info("Connect", "\n##### Connected to AliEn as user %s. Setting analysis user to <%s>", fUser.Data(), fUser.Data());
822 //______________________________________________________________________________
823 void AliAnalysisAlien::CdWork()
825 // Check validity of alien workspace. Create directory if possible.
827 Error("CdWork", "Alien connection required");
830 TString homedir = gGrid->GetHomeDirectory();
831 TString workdir = homedir + fGridWorkingDir;
832 if (DirectoryExists(workdir)) {
836 // Work directory not existing - create it
838 if (gGrid->Mkdir(workdir, "-p")) {
839 gGrid->Cd(fGridWorkingDir);
840 Info("CdWork", "\n##### Created alien working directory %s", fGridWorkingDir.Data());
842 Warning("CdWork", "Working directory %s cannot be created.\n Using %s instead.",
843 workdir.Data(), homedir.Data());
844 fGridWorkingDir = "";
848 //______________________________________________________________________________
849 Bool_t AliAnalysisAlien::CheckFileCopy(const char *alienpath)
851 // Check if file copying is possible.
852 if (fProductionMode) return kTRUE;
853 TString salienpath(alienpath);
854 if (salienpath.Contains(" ")) {
855 Error("CheckFileCopy", "path: <%s> contains blancs - FIX IT !",alienpath);
859 Error("CheckFileCopy", "Not connected to AliEn. File copying cannot be tested.");
862 Info("CheckFileCopy", "Checking possibility to copy files to your AliEn home directory... \
863 \n +++ NOTE: You can disable this via: plugin->SetCheckCopy(kFALSE);");
864 // Check if alien_CLOSE_SE is defined
865 TString closeSE = gSystem->Getenv("alien_CLOSE_SE");
866 if (!closeSE.IsNull()) {
867 Info("CheckFileCopy", "Your current close storage is pointing to: \
868 \n alien_CLOSE_SE = \"%s\"", closeSE.Data());
870 Warning("CheckFileCopy", "Your current close storage is empty ! Depending on your location, file copying may fail.");
872 // Check if grid directory exists.
873 if (!DirectoryExists(alienpath)) {
874 Error("CheckFileCopy", "Alien path %s does not seem to exist", alienpath);
877 TString stest = "plugin_test_copy";
878 TFile f(stest, "RECREATE");
879 // User may not have write permissions to current directory
881 Error("CheckFileCopy", "Cannot create local test file. Do you have write access to current directory: <%s> ?",
882 gSystem->WorkingDirectory());
886 if (FileExists(Form("alien://%s/%s",alienpath, stest.Data()))) gGrid->Rm(Form("alien://%s/%s",alienpath, stest.Data()));
887 if (!TFile::Cp(stest.Data(), Form("alien://%s/%s",alienpath, stest.Data()))) {
888 Error("CheckFileCopy", "Cannot copy files to Alien destination: <%s> This may be temporary, or: \
889 \n# 1. Make sure you have write permissions there. If this is the case: \
890 \n# 2. Check the storage availability at: http://alimonitor.cern.ch/stats?page=SE/table \
891 \n# Do: export alien_CLOSE_SE=\"working_disk_SE\" \
892 \n# To make this permanent put in in your .bashrc (in .alienshrc is not enough) \
893 \n# Redo token: rm /tmp/x509up_u$UID then: alien-token-init <username>", alienpath);
894 gSystem->Unlink(stest.Data());
897 gSystem->Unlink(stest.Data());
898 gGrid->Rm(Form("%s/%s",alienpath,stest.Data()));
899 Info("CheckFileCopy", "### ...SUCCESS ###");
903 //______________________________________________________________________________
904 Bool_t AliAnalysisAlien::CheckInputData()
906 // Check validity of input data. If necessary, create xml files.
907 if (fProductionMode) return kTRUE;
908 if (!fInputFiles && !fRunNumbers.Length() && !fRunRange[0]) {
909 if (!fGridDataDir.Length()) {
910 Error("CkeckInputData", "AliEn path to base data directory must be set.\n = Use: SetGridDataDir()");
914 Error("CheckInputData", "Merging via jdl works only with run numbers, run range or provided xml");
917 Info("CheckInputData", "Analysis will make a single xml for base data directory %s",fGridDataDir.Data());
918 if (fDataPattern.Contains("tag") && TestBit(AliAnalysisGrid::kTest))
919 TObject::SetBit(AliAnalysisGrid::kUseTags, kTRUE); // ADDED (fix problem in determining the tag usage in test mode)
922 // Process declared files
923 Bool_t isCollection = kFALSE;
924 Bool_t isXml = kFALSE;
925 Bool_t useTags = kFALSE;
926 Bool_t checked = kFALSE;
927 if (!TestBit(AliAnalysisGrid::kTest)) CdWork();
929 TString workdir = gGrid->GetHomeDirectory();
930 workdir += fGridWorkingDir;
933 TIter next(fInputFiles);
934 while ((objstr=(TObjString*)next())) {
937 file += objstr->GetString();
938 // Store full lfn path
939 if (FileExists(file)) objstr->SetString(file);
941 file = objstr->GetName();
942 if (!FileExists(objstr->GetName())) {
943 Error("CheckInputData", "Data file %s not found or not in your working dir: %s",
944 objstr->GetName(), workdir.Data());
948 Bool_t iscoll, isxml, usetags;
949 CheckDataType(file, iscoll, isxml, usetags);
952 isCollection = iscoll;
955 TObject::SetBit(AliAnalysisGrid::kUseTags, useTags);
957 if ((iscoll != isCollection) || (isxml != isXml) || (usetags != useTags)) {
958 Error("CheckInputData", "Some conflict was found in the types of inputs");
964 // Process requested run numbers
965 if (!fRunNumbers.Length() && !fRunRange[0]) return kTRUE;
966 // Check validity of alien data directory
967 if (!fGridDataDir.Length()) {
968 Error("CkeckInputData", "AliEn path to base data directory must be set.\n = Use: SetGridDataDir()");
971 if (!DirectoryExists(fGridDataDir)) {
972 Error("CheckInputData", "Data directory %s not existing.", fGridDataDir.Data());
976 Error("CheckInputData", "You are using raw AliEn collections as input. Cannot process run numbers.");
980 if (checked && !isXml) {
981 Error("CheckInputData", "Cannot mix processing of full runs with non-xml files");
984 // Check validity of run number(s)
989 TString schunk, schunk2;
993 useTags = fDataPattern.Contains("tag");
994 TObject::SetBit(AliAnalysisGrid::kUseTags, useTags);
996 if (useTags != fDataPattern.Contains("tag")) {
997 Error("CheckInputData", "Cannot mix input files using/not using tags");
1000 if (fRunNumbers.Length()) {
1001 Info("CheckDataType", "Using supplied run numbers (run ranges are ignored)");
1002 arr = fRunNumbers.Tokenize(" ");
1004 while ((os=(TObjString*)next())) {
1005 path = Form("%s/%s ", fGridDataDir.Data(), os->GetString().Data());
1006 if (!DirectoryExists(path)) {
1007 Warning("CheckInputData", "Run number %s not found in path: <%s>", os->GetString().Data(), path.Data());
1010 path = Form("%s/%s.xml", workdir.Data(),os->GetString().Data());
1011 TString msg = "\n##### file: ";
1013 msg += " type: xml_collection;";
1014 if (useTags) msg += " using_tags: Yes";
1015 else msg += " using_tags: No";
1016 Info("CheckDataType", "%s", msg.Data());
1017 if (fNrunsPerMaster<2) {
1018 AddDataFile(Form("%s.xml", os->GetString().Data()));
1021 if (((nruns-1)%fNrunsPerMaster) == 0) {
1022 schunk = os->GetString();
1024 if ((nruns%fNrunsPerMaster)!=0 && os!=arr->Last()) continue;
1025 schunk += Form("_%s.xml", os->GetString().Data());
1026 AddDataFile(schunk);
1031 Info("CheckDataType", "Using run range [%d, %d]", fRunRange[0], fRunRange[1]);
1032 for (Int_t irun=fRunRange[0]; irun<=fRunRange[1]; irun++) {
1033 format = Form("%%s/%s ", fRunPrefix.Data());
1034 path = Form(format.Data(), fGridDataDir.Data(), irun);
1035 if (!DirectoryExists(path)) {
1038 format = Form("%%s/%s.xml", fRunPrefix.Data());
1039 path = Form(format.Data(), workdir.Data(),irun);
1040 TString msg = "\n##### file: ";
1042 msg += " type: xml_collection;";
1043 if (useTags) msg += " using_tags: Yes";
1044 else msg += " using_tags: No";
1045 Info("CheckDataType", "%s", msg.Data());
1046 if (fNrunsPerMaster<2) {
1047 format = Form("%s.xml", fRunPrefix.Data());
1048 AddDataFile(Form(format.Data(),irun));
1051 if (((nruns-1)%fNrunsPerMaster) == 0) {
1052 schunk = Form(fRunPrefix.Data(),irun);
1054 format = Form("_%s.xml", fRunPrefix.Data());
1055 schunk2 = Form(format.Data(), irun);
1056 if ((nruns%fNrunsPerMaster)!=0 && irun != fRunRange[1]) continue;
1058 AddDataFile(schunk);
1063 AddDataFile(schunk);
1069 //______________________________________________________________________________
1070 Int_t AliAnalysisAlien::CopyLocalDataset(const char *griddir, const char *pattern, Int_t nfiles, const char *output, const char *archivefile, const char *outputdir)
1072 // Copy data from the given grid directory according a pattern and make a local
1074 // archivefile (optional) results in that the archive containing the file <pattern> is copied. archivefile can contain a list of files (semicolon-separated) which are all copied
1076 Error("CopyLocalDataset", "Cannot copy local dataset with no grid connection");
1079 if (!DirectoryExists(griddir)) {
1080 Error("CopyLocalDataset", "Data directory %s not existing.", griddir);
1083 TString command = Form("find -z -l %d %s %s", nfiles, griddir, pattern);
1084 printf("Running command: %s\n", command.Data());
1085 TGridResult *res = gGrid->Command(command);
1086 Int_t nfound = res->GetEntries();
1088 Error("CopyLocalDataset", "No file found in <%s> having pattern <%s>", griddir, pattern);
1091 printf("... found %d files. Copying locally ...\n", nfound);
1094 TObjArray* additionalArchives = 0;
1095 if (strlen(archivefile) > 0 && TString(archivefile).Contains(";")) {
1096 additionalArchives = TString(archivefile).Tokenize(";");
1097 archivefile = additionalArchives->At(0)->GetName();
1098 additionalArchives->RemoveAt(0);
1099 additionalArchives->Compress();
1102 // Copy files locally
1104 out.open(output, ios::out);
1106 TString turl, dirname, filename, temp;
1107 TString cdir = gSystem->WorkingDirectory();
1108 gSystem->MakeDirectory(outputdir);
1109 gSystem->ChangeDirectory(outputdir);
1111 for (Int_t i=0; i<nfound; i++) {
1112 map = (TMap*)res->At(i);
1113 turl = map->GetValue("turl")->GetName();
1114 filename = gSystem->BaseName(turl.Data());
1115 dirname = gSystem->DirName(turl.Data());
1116 dirname = gSystem->BaseName(dirname.Data());
1117 gSystem->MakeDirectory(dirname);
1119 TString source(turl);
1120 TString targetFileName(filename);
1122 if (strlen(archivefile) > 0) {
1123 // TODO here the archive in which the file resides should be determined
1124 // however whereis returns only a guid, and guid2lfn does not work
1125 // Therefore we use the one provided as argument for now
1126 source = Form("%s/%s", gSystem->DirName(source.Data()), archivefile);
1127 targetFileName = archivefile;
1129 if (TFile::Cp(source, Form("file:./%s/%s", dirname.Data(), targetFileName.Data()))) {
1130 Bool_t success = kTRUE;
1131 if (additionalArchives) {
1132 for (Int_t j=0; j<additionalArchives->GetEntriesFast(); j++) {
1134 target.Form("./%s/%s", dirname.Data(), additionalArchives->At(j)->GetName());
1135 gSystem->MakeDirectory(gSystem->DirName(target));
1136 success &= TFile::Cp(Form("%s/%s", gSystem->DirName(source.Data()), additionalArchives->At(j)->GetName()), Form("file:%s", target.Data()));
1141 if (strlen(archivefile) > 0) targetFileName = Form("%s#%s", targetFileName.Data(), gSystem->BaseName(turl.Data()));
1142 out << cdir << Form("/%s/%s/%s", outputdir, dirname.Data(), targetFileName.Data()) << endl;
1147 gSystem->ChangeDirectory(cdir);
1149 delete additionalArchives;
1153 //______________________________________________________________________________
1154 Bool_t AliAnalysisAlien::CreateDataset(const char *pattern)
1156 // Create dataset for the grid data directory + run number.
1157 const Int_t gMaxEntries = 15000;
1158 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline)) return kTRUE;
1160 Error("CreateDataset", "Cannot create dataset with no grid connection");
1165 if (!TestBit(AliAnalysisGrid::kTest)) CdWork();
1166 TString workdir = gGrid->GetHomeDirectory();
1167 workdir += fGridWorkingDir;
1169 // Compose the 'find' command arguments
1172 TString options = "-x collection ";
1173 if (TestBit(AliAnalysisGrid::kTest)) options += Form("-l %d ", fNtestFiles);
1174 else options += Form("-l %d ", gMaxEntries); // Protection for the find command
1175 TString conditions = "";
1182 TString schunk, schunk2;
1183 TGridCollection *cbase=0, *cadd=0;
1184 if (!fRunNumbers.Length() && !fRunRange[0]) {
1185 if (fInputFiles && fInputFiles->GetEntries()) return kTRUE;
1186 // Make a single data collection from data directory.
1187 path = fGridDataDir;
1188 if (!DirectoryExists(path)) {
1189 Error("CreateDataset", "Path to data directory %s not valid",fGridDataDir.Data());
1193 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
1194 else file = Form("%s.xml", gSystem->BaseName(path));
1198 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
1200 command += Form("%s -o %d ",options.Data(), nstart);
1204 command += conditions;
1205 printf("command: %s\n", command.Data());
1206 TGridResult *res = gGrid->Command(command);
1207 if (res) delete res;
1208 // Write standard output to file
1209 gROOT->ProcessLine(Form("gGrid->Stdout(); > __tmp%d__%s", stage, file.Data()));
1210 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
1211 Bool_t nullFile = kFALSE;
1213 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
1215 nullFile = (gSystem->Exec(Form("grep -c /event __tmp%d__%s 2>/dev/null > __tmp__",stage,file.Data()))==0)?kFALSE:kTRUE;
1217 Error("CreateDataset","Dataset %s produced by the previous find command is empty !", file.Data());
1218 gSystem->Exec("rm -f __tmp*");
1226 gSystem->Exec("rm -f __tmp__");
1227 ncount = line.Atoi();
1230 if (ncount == gMaxEntries) {
1231 Info("CreateDataset", "Dataset %s has more than 15K entries. Trying to merge...", file.Data());
1232 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1233 if (!cbase) cbase = cadd;
1241 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1242 printf("... please wait - TAlienCollection::Add() scales badly...\n");
1245 cbase->ExportXML(Form("file://%s", file.Data()),kFALSE,kFALSE, file, "Merged entries for a run");
1246 delete cbase; cbase = 0;
1248 TFile::Cp(Form("__tmp%d__%s",stage, file.Data()), file.Data());
1250 gSystem->Exec("rm -f __tmp*");
1251 Info("CreateDataset", "Created dataset %s with %d files", file.Data(), nstart+ncount);
1255 Bool_t fileExists = FileExists(file);
1256 if (!TestBit(AliAnalysisGrid::kTest) && (!fileExists || fOverwriteMode)) {
1257 // Copy xml file to alien space
1258 if (fileExists) gGrid->Rm(file);
1259 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
1260 if (!FileExists(file)) {
1261 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
1264 // Update list of files to be processed.
1266 AddDataFile(Form("%s/%s", workdir.Data(), file.Data()));
1270 Bool_t nullResult = kTRUE;
1271 if (fRunNumbers.Length()) {
1272 TObjArray *arr = fRunNumbers.Tokenize(" ");
1275 while ((os=(TObjString*)next())) {
1278 path = Form("%s/%s/ ", fGridDataDir.Data(), os->GetString().Data());
1279 if (!DirectoryExists(path)) continue;
1281 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
1282 else file = Form("%s.xml", os->GetString().Data());
1283 // If local collection file does not exist, create it via 'find' command.
1287 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
1289 command += Form("%s -o %d ",options.Data(), nstart);
1292 command += conditions;
1293 TGridResult *res = gGrid->Command(command);
1294 if (res) delete res;
1295 // Write standard output to file
1296 gROOT->ProcessLine(Form("gGrid->Stdout(); > __tmp%d__%s", stage,file.Data()));
1297 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
1298 Bool_t nullFile = kFALSE;
1300 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
1302 nullFile = (gSystem->Exec(Form("grep -c /event __tmp%d__%s 2>/dev/null > __tmp__",stage,file.Data()))==0)?kFALSE:kTRUE;
1304 Warning("CreateDataset","Dataset %s produced by: <%s> is empty !", file.Data(), command.Data());
1305 gSystem->Exec("rm -f __tmp*");
1306 fRunNumbers.ReplaceAll(os->GetString().Data(), "");
1314 gSystem->Exec("rm -f __tmp__");
1315 ncount = line.Atoi();
1317 nullResult = kFALSE;
1319 if (ncount == gMaxEntries) {
1320 Info("CreateDataset", "Dataset %s has more than 15K entries. Trying to merge...", file.Data());
1321 if (fNrunsPerMaster > 1) {
1322 Error("CreateDataset", "File %s has more than %d entries. Please set the number of runs per master to 1 !",
1323 file.Data(),gMaxEntries);
1326 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1327 if (!cbase) cbase = cadd;
1334 if (cbase && fNrunsPerMaster<2) {
1335 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1336 printf("... please wait - TAlienCollection::Add() scales badly...\n");
1339 cbase->ExportXML(Form("file://%s", file.Data()),kFALSE,kFALSE, file, "Merged entries for a run");
1340 delete cbase; cbase = 0;
1342 TFile::Cp(Form("__tmp%d__%s",stage, file.Data()), file.Data());
1344 gSystem->Exec("rm -f __tmp*");
1345 Info("CreateDataset", "Created dataset %s with %d files", file.Data(), nstart+ncount);
1349 if (TestBit(AliAnalysisGrid::kTest)) break;
1350 // Check if there is one run per master job.
1351 if (fNrunsPerMaster<2) {
1352 if (FileExists(file)) {
1353 if (fOverwriteMode) gGrid->Rm(file);
1355 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
1359 // Copy xml file to alien space
1360 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
1361 if (!FileExists(file)) {
1362 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
1368 if (((nruns-1)%fNrunsPerMaster) == 0) {
1369 schunk = os->GetString();
1370 cbase = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
1372 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
1373 printf(" Merging collection <%s> into masterjob input...\n", file.Data());
1377 if ((nruns%fNrunsPerMaster)!=0 && os!=arr->Last()) {
1380 schunk += Form("_%s.xml", os->GetString().Data());
1381 if (FileExists(schunk)) {
1382 if (fOverwriteMode) gGrid->Rm(file);
1384 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", schunk.Data());
1388 printf("Exporting merged collection <%s> and copying to AliEn\n", schunk.Data());
1389 cbase->ExportXML(Form("file://%s", schunk.Data()),kFALSE,kFALSE, schunk, "Merged runs");
1390 TFile::Cp(Form("file:%s",schunk.Data()), Form("alien://%s/%s",workdir.Data(), schunk.Data()));
1391 if (!FileExists(schunk)) {
1392 Error("CreateDataset", "Copy command did NOT succeed for %s", schunk.Data());
1400 Error("CreateDataset", "No valid dataset corresponding to the query!");
1404 // Process a full run range.
1405 for (Int_t irun=fRunRange[0]; irun<=fRunRange[1]; irun++) {
1406 format = Form("%%s/%s ", fRunPrefix.Data());
1409 path = Form(format.Data(), fGridDataDir.Data(), irun);
1410 if (!DirectoryExists(path)) continue;
1412 format = Form("%s.xml", fRunPrefix.Data());
1413 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
1414 else file = Form(format.Data(), irun);
1415 if (FileExists(file) && fNrunsPerMaster<2 && !TestBit(AliAnalysisGrid::kTest)) {
1416 if (fOverwriteMode) gGrid->Rm(file);
1418 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
1422 // If local collection file does not exist, create it via 'find' command.
1426 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
1428 command += Form("%s -o %d ",options.Data(), nstart);
1431 command += conditions;
1432 TGridResult *res = gGrid->Command(command);
1433 if (res) delete res;
1434 // Write standard output to file
1435 gROOT->ProcessLine(Form("gGrid->Stdout(); > __tmp%d__%s", stage,file.Data()));
1436 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
1437 Bool_t nullFile = kFALSE;
1439 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
1441 nullFile = (gSystem->Exec(Form("grep -c /event __tmp%d__%s 2>/dev/null > __tmp__",stage,file.Data()))==0)?kFALSE:kTRUE;
1443 Warning("CreateDataset","Dataset %s produced by: <%s> is empty !", file.Data(), command.Data());
1444 gSystem->Exec("rm -f __tmp*");
1452 gSystem->Exec("rm -f __tmp__");
1453 ncount = line.Atoi();
1455 nullResult = kFALSE;
1457 if (ncount == gMaxEntries) {
1458 Info("CreateDataset", "Dataset %s has more than 15K entries. Trying to merge...", file.Data());
1459 if (fNrunsPerMaster > 1) {
1460 Error("CreateDataset", "File %s has more than %d entries. Please set the number of runs per master to 1 !",
1461 file.Data(),gMaxEntries);
1464 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1465 if (!cbase) cbase = cadd;
1472 if (cbase && fNrunsPerMaster<2) {
1473 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1474 printf("... please wait - TAlienCollection::Add() scales badly...\n");
1477 cbase->ExportXML(Form("file://%s", file.Data()),kFALSE,kFALSE, file, "Merged entries for a run");
1478 delete cbase; cbase = 0;
1480 TFile::Cp(Form("__tmp%d__%s",stage, file.Data()), file.Data());
1482 Info("CreateDataset", "Created dataset %s with %d files", file.Data(), nstart+ncount);
1486 if (TestBit(AliAnalysisGrid::kTest)) break;
1487 // Check if there is one run per master job.
1488 if (fNrunsPerMaster<2) {
1489 if (FileExists(file)) {
1490 if (fOverwriteMode) gGrid->Rm(file);
1492 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
1496 // Copy xml file to alien space
1497 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
1498 if (!FileExists(file)) {
1499 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
1504 // Check if the collection for the chunk exist locally.
1505 Int_t nchunk = (nruns-1)/fNrunsPerMaster;
1506 if (FileExists(fInputFiles->At(nchunk)->GetName())) {
1507 if (fOverwriteMode) gGrid->Rm(fInputFiles->At(nchunk)->GetName());
1510 printf(" Merging collection <%s> into %d runs chunk...\n",file.Data(),fNrunsPerMaster);
1511 if (((nruns-1)%fNrunsPerMaster) == 0) {
1512 schunk = Form(fRunPrefix.Data(), irun);
1513 cbase = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
1515 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
1519 format = Form("%%s_%s.xml", fRunPrefix.Data());
1520 schunk2 = Form(format.Data(), schunk.Data(), irun);
1521 if ((nruns%fNrunsPerMaster)!=0 && irun!=fRunRange[1] && schunk2 != fInputFiles->Last()->GetName()) {
1525 if (FileExists(schunk)) {
1526 if (fOverwriteMode) gGrid->Rm(schunk);
1528 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", schunk.Data());
1532 printf("Exporting merged collection <%s> and copying to AliEn.\n", schunk.Data());
1533 cbase->ExportXML(Form("file://%s", schunk.Data()),kFALSE,kFALSE, schunk, "Merged runs");
1534 if (FileExists(schunk)) {
1535 if (fOverwriteMode) gGrid->Rm(schunk);
1537 Info("CreateDataset", "\n##### Dataset %s exist. Skipping copy...", schunk.Data());
1541 TFile::Cp(Form("file:%s",schunk.Data()), Form("alien://%s/%s",workdir.Data(), schunk.Data()));
1542 if (!FileExists(schunk)) {
1543 Error("CreateDataset", "Copy command did NOT succeed for %s", schunk.Data());
1549 Error("CreateDataset", "No valid dataset corresponding to the query!");
1556 //______________________________________________________________________________
1557 Bool_t AliAnalysisAlien::CreateJDL()
1559 // Generate a JDL file according to current settings. The name of the file is
1560 // specified by fJDLName.
1561 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
1562 Bool_t error = kFALSE;
1564 Bool_t copy = kTRUE;
1565 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
1566 Bool_t generate = kTRUE;
1567 if (TestBit(AliAnalysisGrid::kTest) || TestBit(AliAnalysisGrid::kSubmit)) generate = kFALSE;
1569 Error("CreateJDL", "Alien connection required");
1572 // Check validity of alien workspace
1574 if (!fProductionMode && !fGridWorkingDir.BeginsWith("/alice")) workdir = gGrid->GetHomeDirectory();
1575 if (!fProductionMode && !TestBit(AliAnalysisGrid::kTest)) CdWork();
1576 workdir += fGridWorkingDir;
1580 Error("CreateJDL()", "Define some input files for your analysis.");
1583 // Compose list of input files
1584 // Check if output files were defined
1585 if (!fOutputFiles.Length()) {
1586 Error("CreateJDL", "You must define at least one output file");
1589 // Check if an output directory was defined and valid
1590 if (!fGridOutputDir.Length()) {
1591 Error("CreateJDL", "You must define AliEn output directory");
1594 if (!fProductionMode) {
1595 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s", workdir.Data(), fGridOutputDir.Data());
1596 if (!DirectoryExists(fGridOutputDir)) {
1597 if (gGrid->Mkdir(fGridOutputDir,"-p")) {
1598 Info("CreateJDL", "\n##### Created alien output directory %s", fGridOutputDir.Data());
1600 Error("CreateJDL", "Could not create alien output directory %s", fGridOutputDir.Data());
1604 Warning("CreateJDL", "#### Output directory %s exists! If this contains old data, jobs will fail with ERROR_SV !!! ###", fGridOutputDir.Data());
1609 // Exit if any error up to now
1610 if (error) return kFALSE;
1612 if (!fUser.IsNull()) {
1613 fGridJDL->SetValue("User", Form("\"%s\"", fUser.Data()));
1614 fMergingJDL->SetValue("User", Form("\"%s\"", fUser.Data()));
1616 fGridJDL->SetExecutable(fExecutable, "This is the startup script");
1617 TString mergeExec = fExecutable;
1618 mergeExec.ReplaceAll(".sh", "_merge.sh");
1619 fMergingJDL->SetExecutable(mergeExec, "This is the startup script");
1620 mergeExec.ReplaceAll(".sh", ".C");
1621 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),mergeExec.Data()), "List of input files to be uploaded to workers");
1622 if (!fArguments.IsNull())
1623 fGridJDL->SetArguments(fArguments, "Arguments for the executable command");
1624 if (IsOneStageMerging()) fMergingJDL->SetArguments(fGridOutputDir);
1626 if (fProductionMode) fMergingJDL->SetArguments("wn.xml $4"); // xml, stage
1627 else fMergingJDL->SetArguments("wn.xml $2"); // xml, stage
1630 fGridJDL->SetValue("TTL", Form("\"%d\"",fTTL));
1631 fGridJDL->SetDescription("TTL", Form("Time after which the job is killed (%d min.)", fTTL/60));
1632 fMergingJDL->SetValue("TTL", Form("\"%d\"",fTTL));
1633 fMergingJDL->SetDescription("TTL", Form("Time after which the job is killed (%d min.)", fTTL/60));
1635 if (fMaxInitFailed > 0) {
1636 fGridJDL->SetValue("MaxInitFailed", Form("\"%d\"",fMaxInitFailed));
1637 fGridJDL->SetDescription("MaxInitFailed", "Maximum number of first failing jobs to abort the master job");
1639 if (fSplitMaxInputFileNumber > 0) {
1640 fGridJDL->SetValue("SplitMaxInputFileNumber", Form("\"%d\"", fSplitMaxInputFileNumber));
1641 fGridJDL->SetDescription("SplitMaxInputFileNumber", "Maximum number of input files to be processed per subjob");
1643 if (!IsOneStageMerging()) {
1644 fMergingJDL->SetValue("SplitMaxInputFileNumber", Form("\"%d\"",fMaxMergeFiles));
1645 fMergingJDL->SetDescription("SplitMaxInputFileNumber", "Maximum number of input files to be merged in one go");
1647 if (fSplitMode.Length()) {
1648 fGridJDL->SetValue("Split", Form("\"%s\"", fSplitMode.Data()));
1649 fGridJDL->SetDescription("Split", "We split per SE or file");
1651 fMergingJDL->SetValue("Split", "\"se\"");
1652 fMergingJDL->SetDescription("Split", "We split per SE for merging in stages");
1653 if (!fAliROOTVersion.IsNull()) {
1654 fGridJDL->AddToPackages("AliRoot", fAliROOTVersion,"VO_ALICE", "List of requested packages");
1655 fMergingJDL->AddToPackages("AliRoot", fAliROOTVersion, "VO_ALICE", "List of requested packages");
1657 if (!fROOTVersion.IsNull()) {
1658 fGridJDL->AddToPackages("ROOT", fROOTVersion);
1659 fMergingJDL->AddToPackages("ROOT", fROOTVersion);
1661 if (!fAPIVersion.IsNull()) {
1662 fGridJDL->AddToPackages("APISCONFIG", fAPIVersion);
1663 fMergingJDL->AddToPackages("APISCONFIG", fAPIVersion);
1665 if (!fExternalPackages.IsNull()) {
1666 arr = fExternalPackages.Tokenize(" ");
1668 while ((os=(TObjString*)next())) {
1669 TString pkgname = os->GetString();
1670 Int_t index = pkgname.Index("::");
1671 TString pkgversion = pkgname(index+2, pkgname.Length());
1672 pkgname.Remove(index);
1673 fGridJDL->AddToPackages(pkgname, pkgversion);
1674 fMergingJDL->AddToPackages(pkgname, pkgversion);
1678 fGridJDL->SetInputDataListFormat(fInputFormat, "Format of input data");
1679 fGridJDL->SetInputDataList("wn.xml", "Collection name to be processed on each worker node");
1680 fMergingJDL->SetInputDataListFormat(fInputFormat, "Format of input data");
1681 fMergingJDL->SetInputDataList("wn.xml", "Collection name to be processed on each worker node");
1682 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), fAnalysisMacro.Data()), "List of input files to be uploaded to workers");
1683 TString analysisFile = fExecutable;
1684 analysisFile.ReplaceAll(".sh", ".root");
1685 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),analysisFile.Data()));
1686 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),analysisFile.Data()));
1687 if (fAdditionalLibs.Length()) {
1688 arr = fAdditionalLibs.Tokenize(" ");
1690 while ((os=(TObjString*)next())) {
1691 if (os->GetString().Contains(".so")) continue;
1692 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), os->GetString().Data()));
1693 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), os->GetString().Data()));
1698 TIter next(fPackages);
1700 while ((obj=next())) {
1701 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), obj->GetName()));
1702 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), obj->GetName()));
1705 if (fOutputArchive.Length()) {
1706 TString outputArchive = fOutputArchive;
1707 if (!fRegisterExcludes.IsNull()) {
1708 arr = fRegisterExcludes.Tokenize(" ");
1710 while ((os=(TObjString*)next1())) {
1711 outputArchive.ReplaceAll(Form("%s,",os->GetString().Data()),"");
1712 outputArchive.ReplaceAll(os->GetString(),"");
1716 arr = outputArchive.Tokenize(" ");
1718 Bool_t first = kTRUE;
1719 const char *comment = "Files to be archived";
1720 const char *comment1 = comment;
1721 while ((os=(TObjString*)next())) {
1722 if (!first) comment = NULL;
1723 if (!os->GetString().Contains("@") && fCloseSE.Length())
1724 fGridJDL->AddToOutputArchive(Form("%s@%s",os->GetString().Data(), fCloseSE.Data()), comment);
1726 fGridJDL->AddToOutputArchive(os->GetString(), comment);
1730 // Output archive for the merging jdl
1731 if (TestBit(AliAnalysisGrid::kDefaultOutputs)) {
1732 outputArchive = "log_archive.zip:std*@disk=1 ";
1733 // Add normal output files, extra files + terminate files
1734 TString files = GetListOfFiles("outextter");
1735 // Do not register files in fRegisterExcludes
1736 if (!fRegisterExcludes.IsNull()) {
1737 arr = fRegisterExcludes.Tokenize(" ");
1739 while ((os=(TObjString*)next1())) {
1740 files.ReplaceAll(Form("%s,",os->GetString().Data()),"");
1741 files.ReplaceAll(os->GetString(),"");
1745 files.ReplaceAll(".root", "*.root");
1747 if (mgr->IsCollectThroughput())
1748 outputArchive += Form("root_archive.zip:%s,*.stat,*%s@disk=%d",files.Data(),mgr->GetFileInfoLog(),fNreplicas);
1750 outputArchive += Form("root_archive.zip:%s,*.stat@disk=%d",files.Data(),fNreplicas);
1752 TString files = fOutputArchive;
1753 files.ReplaceAll(".root", "*.root"); // nreplicas etc should be already atttached by use
1754 outputArchive = files;
1756 arr = outputArchive.Tokenize(" ");
1760 while ((os=(TObjString*)next2())) {
1761 if (!first) comment = NULL;
1762 TString currentfile = os->GetString();
1763 if (!currentfile.Contains("@") && fCloseSE.Length())
1764 fMergingJDL->AddToOutputArchive(Form("%s@%s",currentfile.Data(), fCloseSE.Data()), comment);
1766 fMergingJDL->AddToOutputArchive(currentfile, comment);
1771 arr = fOutputFiles.Tokenize(",");
1773 Bool_t first = kTRUE;
1774 const char *comment = "Files to be saved";
1775 while ((os=(TObjString*)next())) {
1776 // Ignore ouputs in jdl that are also in outputarchive
1777 TString sout = os->GetString();
1778 sout.ReplaceAll("*", "");
1779 sout.ReplaceAll(".root", "");
1780 if (sout.Index("@")>0) sout.Remove(sout.Index("@"));
1781 if (fOutputArchive.Contains(sout)) continue;
1782 // Ignore fRegisterExcludes
1783 if (fRegisterExcludes.Contains(sout)) continue;
1784 if (!first) comment = NULL;
1785 if (!os->GetString().Contains("@") && fCloseSE.Length())
1786 fGridJDL->AddToOutputSandbox(Form("%s@%s",os->GetString().Data(), fCloseSE.Data()), comment);
1788 fGridJDL->AddToOutputSandbox(os->GetString(), comment);
1790 if (fMergeExcludes.Contains(sout)) continue;
1791 if (!os->GetString().Contains("@") && fCloseSE.Length())
1792 fMergingJDL->AddToOutputSandbox(Form("%s@%s",os->GetString().Data(), fCloseSE.Data()), comment);
1794 fMergingJDL->AddToOutputSandbox(os->GetString(), comment);
1797 fGridJDL->SetPrice((UInt_t)fPrice, "AliEn price for this job");
1798 fMergingJDL->SetPrice((UInt_t)fPrice, "AliEn price for this job");
1799 TString validationScript = fValidationScript;
1800 fGridJDL->SetValidationCommand(Form("%s/%s", workdir.Data(),validationScript.Data()), "Validation script to be run for each subjob");
1801 validationScript.ReplaceAll(".sh", "_merge.sh");
1802 fMergingJDL->SetValidationCommand(Form("%s/%s", workdir.Data(),validationScript.Data()), "Validation script to be run for each subjob");
1803 if (fMasterResubmitThreshold) {
1804 fGridJDL->SetValue("MasterResubmitThreshold", Form("\"%d%%\"", fMasterResubmitThreshold));
1805 fGridJDL->SetDescription("MasterResubmitThreshold", "Resubmit failed jobs until DONE rate reaches this percentage");
1807 // Write a jdl with 2 input parameters: collection name and output dir name.
1810 // Copy jdl to grid workspace
1812 // Check if an output directory was defined and valid
1813 if (!fGridOutputDir.Length()) {
1814 Error("CreateJDL", "You must define AliEn output directory");
1817 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s", workdir.Data(), fGridOutputDir.Data());
1818 if (!fProductionMode && !DirectoryExists(fGridOutputDir)) {
1819 if (gGrid->Mkdir(fGridOutputDir,"-p")) {
1820 Info("CreateJDL", "\n##### Created alien output directory %s", fGridOutputDir.Data());
1822 Error("CreateJDL", "Could not create alien output directory %s", fGridOutputDir.Data());
1828 if (TestBit(AliAnalysisGrid::kSubmit)) {
1829 TString mergeJDLName = fExecutable;
1830 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
1831 TString locjdl = Form("%s/%s", fGridOutputDir.Data(),fJDLName.Data());
1832 TString locjdl1 = Form("%s/%s", fGridOutputDir.Data(),mergeJDLName.Data());
1833 if (fProductionMode) {
1834 locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
1835 locjdl1 = Form("%s/%s", workdir.Data(),mergeJDLName.Data());
1837 if (FileExists(locjdl)) gGrid->Rm(locjdl);
1838 if (FileExists(locjdl1)) gGrid->Rm(locjdl1);
1839 Info("CreateJDL", "\n##### Copying JDL file <%s> to your AliEn output directory", fJDLName.Data());
1840 if (!copyLocal2Alien("CreateJDL", fJDLName, locjdl))
1841 Fatal("","Terminating");
1842 // TFile::Cp(Form("file:%s",fJDLName.Data()), Form("alien://%s", locjdl.Data()));
1844 Info("CreateJDL", "\n##### Copying merging JDL file <%s> to your AliEn output directory", mergeJDLName.Data());
1845 // TFile::Cp(Form("file:%s",mergeJDLName.Data()), Form("alien://%s", locjdl1.Data()));
1846 if (!copyLocal2Alien("CreateJDL", mergeJDLName.Data(), locjdl1))
1847 Fatal("","Terminating");
1850 if (fAdditionalLibs.Length()) {
1851 arr = fAdditionalLibs.Tokenize(" ");
1854 while ((os=(TObjString*)next())) {
1855 if (os->GetString().Contains(".so")) continue;
1856 Info("CreateJDL", "\n##### Copying dependency: <%s> to your alien workspace", os->GetString().Data());
1857 if (FileExists(os->GetString())) gGrid->Rm(os->GetString());
1858 // TFile::Cp(Form("file:%s",os->GetString().Data()), Form("alien://%s/%s", workdir.Data(), os->GetString().Data()));
1859 if (!copyLocal2Alien("CreateJDL", os->GetString().Data(),
1860 Form("%s/%s", workdir.Data(), os->GetString().Data())))
1861 Fatal("","Terminating");
1866 TIter next(fPackages);
1868 while ((obj=next())) {
1869 if (FileExists(obj->GetName())) gGrid->Rm(obj->GetName());
1870 Info("CreateJDL", "\n##### Copying dependency: <%s> to your alien workspace", obj->GetName());
1871 // TFile::Cp(Form("file:%s",obj->GetName()), Form("alien://%s/%s", workdir.Data(), obj->GetName()));
1872 if (!copyLocal2Alien("CreateJDL",obj->GetName(),
1873 Form("%s/%s", workdir.Data(), obj->GetName())))
1874 Fatal("","Terminating");
1881 //______________________________________________________________________________
1882 Bool_t AliAnalysisAlien::WriteJDL(Bool_t copy)
1884 // Writes one or more JDL's corresponding to findex. If findex is negative,
1885 // all run numbers are considered in one go (jdl). For non-negative indices
1886 // they correspond to the indices in the array fInputFiles.
1887 if (!fInputFiles) return kFALSE;
1890 if (!fProductionMode && !fGridWorkingDir.BeginsWith("/alice")) workdir = gGrid->GetHomeDirectory();
1891 workdir += fGridWorkingDir;
1892 TString stageName = "$2";
1893 if (fProductionMode) stageName = "$4";
1894 if (!fMergeDirName.IsNull()) {
1895 fMergingJDL->AddToInputDataCollection(Form("LF:$1/%s/Stage_%s.xml,nodownload",fMergeDirName.Data(),stageName.Data()), "Collection of files to be merged for current stage");
1896 fMergingJDL->SetOutputDirectory(Form("$1/%s/Stage_%s/#alien_counter_03i#",fMergeDirName.Data(),stageName.Data()), "Output directory");
1898 fMergingJDL->AddToInputDataCollection(Form("LF:$1/Stage_%s.xml,nodownload",stageName.Data()), "Collection of files to be merged for current stage");
1899 fMergingJDL->SetOutputDirectory(Form("$1/Stage_%s/#alien_counter_03i#",stageName.Data()), "Output directory");
1901 if (fProductionMode) {
1902 TIter next(fInputFiles);
1903 while ((os=next())) {
1904 fGridJDL->AddToInputDataCollection(Form("LF:%s,nodownload", os->GetName()), "Input xml collections");
1906 if (!fOutputToRunNo)
1907 fGridJDL->SetOutputDirectory(Form("%s/#alien_counter_04i#", fGridOutputDir.Data()));
1909 fGridJDL->SetOutputDirectory(fGridOutputDir);
1911 if (!fRunNumbers.Length() && !fRunRange[0]) {
1912 // One jdl with no parameters in case input data is specified by name.
1913 TIter next(fInputFiles);
1915 fGridJDL->AddToInputDataCollection(Form("LF:%s,nodownload", os->GetName()), "Input xml collections");
1916 if (!fOutputSingle.IsNull())
1917 fGridJDL->SetOutputDirectory(Form("#alienfulldir#/../%s",fOutputSingle.Data()), "Output directory");
1919 fGridJDL->SetOutputDirectory(Form("%s/#alien_counter_03i#", fGridOutputDir.Data()), "Output directory");
1920 fMergingJDL->SetOutputDirectory(fGridOutputDir);
1923 // One jdl to be submitted with 2 input parameters: data collection name and output dir prefix
1924 fGridJDL->AddToInputDataCollection(Form("LF:%s/$1,nodownload", workdir.Data()), "Input xml collections");
1925 if (!fOutputSingle.IsNull()) {
1926 if (!fOutputToRunNo) fGridJDL->SetOutputDirectory(Form("#alienfulldir#/%s",fOutputSingle.Data()), "Output directory");
1927 else fGridJDL->SetOutputDirectory(Form("%s/$2",fGridOutputDir.Data()), "Output directory");
1929 fGridJDL->SetOutputDirectory(Form("%s/$2/#alien_counter_03i#", fGridOutputDir.Data()), "Output directory");
1934 // Generate the JDL as a string
1935 TString sjdl = fGridJDL->Generate();
1936 TString sjdl1 = fMergingJDL->Generate();
1938 if (!fMergeDirName.IsNull()) {
1939 fMergingJDL->SetOutputDirectory(Form("$1/%s",fMergeDirName.Data()), "Output directory");
1940 fMergingJDL->AddToInputSandbox(Form("LF:$1/%s/Stage_%s.xml",fMergeDirName.Data(),stageName.Data()));
1942 fMergingJDL->SetOutputDirectory("$1", "Output directory");
1943 fMergingJDL->AddToInputSandbox(Form("LF:$1/Stage_%s.xml",stageName.Data()));
1945 TString sjdl2 = fMergingJDL->Generate();
1946 Int_t index, index1;
1947 sjdl.ReplaceAll("\"LF:", "\n \"LF:");
1948 sjdl.ReplaceAll("(member", "\n (member");
1949 sjdl.ReplaceAll("\",\"VO_", "\",\n \"VO_");
1950 sjdl.ReplaceAll("{", "{\n ");
1951 sjdl.ReplaceAll("};", "\n};");
1952 sjdl.ReplaceAll("{\n \n", "{\n");
1953 sjdl.ReplaceAll("\n\n", "\n");
1954 sjdl.ReplaceAll("OutputDirectory", "OutputDir");
1955 sjdl1.ReplaceAll("\"LF:", "\n \"LF:");
1956 sjdl1.ReplaceAll("(member", "\n (member");
1957 sjdl1.ReplaceAll("\",\"VO_", "\",\n \"VO_");
1958 sjdl1.ReplaceAll("{", "{\n ");
1959 sjdl1.ReplaceAll("};", "\n};");
1960 sjdl1.ReplaceAll("{\n \n", "{\n");
1961 sjdl1.ReplaceAll("\n\n", "\n");
1962 sjdl1.ReplaceAll("OutputDirectory", "OutputDir");
1963 sjdl2.ReplaceAll("\"LF:", "\n \"LF:");
1964 sjdl2.ReplaceAll("(member", "\n (member");
1965 sjdl2.ReplaceAll("\",\"VO_", "\",\n \"VO_");
1966 sjdl2.ReplaceAll("{", "{\n ");
1967 sjdl2.ReplaceAll("};", "\n};");
1968 sjdl2.ReplaceAll("{\n \n", "{\n");
1969 sjdl2.ReplaceAll("\n\n", "\n");
1970 sjdl2.ReplaceAll("OutputDirectory", "OutputDir");
1971 sjdl += "JDLVariables = \n{\n \"Packages\",\n \"OutputDir\"\n};\n";
1972 sjdl.Prepend(Form("Jobtag = {\n \"comment:%s\"\n};\n", fJobTag.Data()));
1973 index = sjdl.Index("JDLVariables");
1974 if (index >= 0) sjdl.Insert(index, "\n# JDL variables\n");
1975 sjdl += "Workdirectorysize = {\"5000MB\"};";
1976 sjdl1 += "Workdirectorysize = {\"5000MB\"};";
1977 sjdl1 += "JDLVariables = \n{\n \"Packages\",\n \"OutputDir\"\n};\n";
1978 index = fJobTag.Index(":");
1979 if (index < 0) index = fJobTag.Length();
1980 TString jobTag = fJobTag;
1981 if (fProductionMode) jobTag.Insert(index,"_Stage$4");
1982 sjdl1.Prepend(Form("Jobtag = {\n \"comment:%s_Merging\"\n};\n", jobTag.Data()));
1983 if (fProductionMode) {
1984 sjdl1.Prepend("# Generated merging jdl (production mode) \
1985 \n# $1 = full alien path to output directory to be merged \
1986 \n# $2 = train number \
1987 \n# $3 = production (like LHC10b) \
1988 \n# $4 = merging stage \
1989 \n# Stage_<n>.xml made via: find <OutputDir> *Stage<n-1>/*root_archive.zip\n");
1990 sjdl2.Prepend(Form("Jobtag = {\n \"comment:%s_FinalMerging\"\n};\n", jobTag.Data()));
1991 sjdl2.Prepend("# Generated merging jdl \
1992 \n# $1 = full alien path to output directory to be merged \
1993 \n# $2 = train number \
1994 \n# $3 = production (like LHC10b) \
1995 \n# $4 = merging stage \
1996 \n# Stage_<n>.xml made via: find <OutputDir> *Stage<n-1>/*root_archive.zip\n");
1998 sjdl1.Prepend("# Generated merging jdl \
1999 \n# $1 = full alien path to output directory to be merged \
2000 \n# $2 = merging stage \
2001 \n# xml made via: find <OutputDir> *Stage<n-1>/*root_archive.zip\n");
2002 sjdl2.Prepend(Form("Jobtag = {\n \"comment:%s_FinalMerging\"\n};\n", jobTag.Data()));
2003 sjdl2.Prepend("# Generated merging jdl \
2004 \n# $1 = full alien path to output directory to be merged \
2005 \n# $2 = merging stage \
2006 \n# xml made via: find <OutputDir> *Stage<n-1>/*root_archive.zip\n");
2008 index = sjdl1.Index("JDLVariables");
2009 if (index >= 0) sjdl1.Insert(index, "\n# JDL variables\n");
2010 index = sjdl2.Index("JDLVariables");
2011 if (index >= 0) sjdl2.Insert(index, "\n# JDL variables\n");
2012 sjdl1 += "Workdirectorysize = {\"5000MB\"};";
2013 sjdl2 += "Workdirectorysize = {\"5000MB\"};";
2014 index = sjdl2.Index("Split =");
2016 index1 = sjdl2.Index("\n", index);
2017 sjdl2.Remove(index, index1-index+1);
2019 index = sjdl2.Index("SplitMaxInputFileNumber");
2021 index1 = sjdl2.Index("\n", index);
2022 sjdl2.Remove(index, index1-index+1);
2024 index = sjdl2.Index("InputDataCollection");
2026 index1 = sjdl2.Index(";", index);
2027 sjdl2.Remove(index, index1-index+1);
2029 index = sjdl2.Index("InputDataListFormat");
2031 index1 = sjdl2.Index("\n", index);
2032 sjdl2.Remove(index, index1-index+1);
2034 index = sjdl2.Index("InputDataList");
2036 index1 = sjdl2.Index("\n", index);
2037 sjdl2.Remove(index, index1-index+1);
2039 sjdl2.ReplaceAll("wn.xml", Form("Stage_%s.xml",stageName.Data()));
2040 // Write jdl to file
2042 out.open(fJDLName.Data(), ios::out);
2044 Error("WriteJDL", "Bad file name: %s", fJDLName.Data());
2047 out << sjdl << endl;
2049 TString mergeJDLName = fExecutable;
2050 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
2053 out1.open(mergeJDLName.Data(), ios::out);
2055 Error("WriteJDL", "Bad file name: %s", mergeJDLName.Data());
2058 out1 << sjdl1 << endl;
2061 TString finalJDL = mergeJDLName;
2062 finalJDL.ReplaceAll(".jdl", "_final.jdl");
2063 out2.open(finalJDL.Data(), ios::out);
2065 Error("WriteJDL", "Bad file name: %s", finalJDL.Data());
2068 out2 << sjdl2 << endl;
2072 // Copy jdl to grid workspace
2074 Info("WriteJDL", "\n##### You may want to review jdl:%s and analysis macro:%s before running in <submit> mode", fJDLName.Data(), fAnalysisMacro.Data());
2076 TString locjdl = Form("%s/%s", fGridOutputDir.Data(),fJDLName.Data());
2077 TString locjdl1 = Form("%s/%s", fGridOutputDir.Data(),mergeJDLName.Data());
2078 TString finalJDL = mergeJDLName;
2079 finalJDL.ReplaceAll(".jdl", "_final.jdl");
2080 TString locjdl2 = Form("%s/%s", fGridOutputDir.Data(),finalJDL.Data());
2081 if (fProductionMode) {
2082 locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
2083 locjdl1 = Form("%s/%s", workdir.Data(),mergeJDLName.Data());
2084 locjdl2 = Form("%s/%s", workdir.Data(),finalJDL.Data());
2086 if (FileExists(locjdl)) gGrid->Rm(locjdl);
2087 if (FileExists(locjdl1)) gGrid->Rm(locjdl1);
2088 if (FileExists(locjdl2)) gGrid->Rm(locjdl2);
2089 Info("WriteJDL", "\n##### Copying JDL file <%s> to your AliEn output directory", fJDLName.Data());
2090 // TFile::Cp(Form("file:%s",fJDLName.Data()), Form("alien://%s", locjdl.Data()));
2091 if (!copyLocal2Alien("WriteJDL",fJDLName.Data(),locjdl.Data()))
2092 Fatal("","Terminating");
2094 Info("WriteJDL", "\n##### Copying merging JDL files <%s> to your AliEn output directory", mergeJDLName.Data());
2095 // TFile::Cp(Form("file:%s",mergeJDLName.Data()), Form("alien://%s", locjdl1.Data()));
2096 // TFile::Cp(Form("file:%s",finalJDL.Data()), Form("alien://%s", locjdl2.Data()));
2097 if (!copyLocal2Alien("WriteJDL",mergeJDLName.Data(),locjdl1.Data()))
2098 Fatal("","Terminating");
2099 if (!copyLocal2Alien("WriteJDL",finalJDL.Data(),locjdl2.Data()))
2100 Fatal("","Terminating");
2106 //______________________________________________________________________________
2107 Bool_t AliAnalysisAlien::FileExists(const char *lfn)
2109 // Returns true if file exists.
2110 if (!gGrid) return kFALSE;
2112 slfn.ReplaceAll("alien://","");
2113 TGridResult *res = gGrid->Ls(slfn);
2114 if (!res) return kFALSE;
2115 TMap *map = dynamic_cast<TMap*>(res->At(0));
2120 TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("name"));
2121 if (!objs || !objs->GetString().Length()) {
2129 //______________________________________________________________________________
2130 Bool_t AliAnalysisAlien::DirectoryExists(const char *dirname)
2132 // Returns true if directory exists. Can be also a path.
2133 if (!gGrid) return kFALSE;
2134 // Check if dirname is a path
2135 TString dirstripped = dirname;
2136 dirstripped = dirstripped.Strip();
2137 dirstripped = dirstripped.Strip(TString::kTrailing, '/');
2138 TString dir = gSystem->BaseName(dirstripped);
2140 TString path = gSystem->DirName(dirstripped);
2141 TGridResult *res = gGrid->Ls(path, "-F");
2142 if (!res) return kFALSE;
2146 while ((map=dynamic_cast<TMap*>(next()))) {
2147 obj = map->GetValue("name");
2149 if (dir == obj->GetName()) {
2158 //______________________________________________________________________________
2159 void AliAnalysisAlien::CheckDataType(const char *lfn, Bool_t &isCollection, Bool_t &isXml, Bool_t &useTags)
2161 // Check input data type.
2162 isCollection = kFALSE;
2166 Error("CheckDataType", "No connection to grid");
2169 isCollection = IsCollection(lfn);
2170 TString msg = "\n##### file: ";
2173 msg += " type: raw_collection;";
2174 // special treatment for collections
2176 // check for tag files in the collection
2177 TGridResult *res = gGrid->Command(Form("listFilesFromCollection -z -v %s",lfn), kFALSE);
2179 msg += " using_tags: No (unknown)";
2180 Info("CheckDataType", "%s", msg.Data());
2183 const char* typeStr = res->GetKey(0, "origLFN");
2184 if (!typeStr || !strlen(typeStr)) {
2185 msg += " using_tags: No (unknown)";
2186 Info("CheckDataType", "%s", msg.Data());
2189 TString file = typeStr;
2190 useTags = file.Contains(".tag");
2191 if (useTags) msg += " using_tags: Yes";
2192 else msg += " using_tags: No";
2193 Info("CheckDataType", "%s", msg.Data());
2198 isXml = slfn.Contains(".xml");
2200 // Open xml collection and check if there are tag files inside
2201 msg += " type: xml_collection;";
2202 TGridCollection *coll = (TGridCollection*)gROOT->ProcessLine(Form("TAlienCollection::Open(\"alien://%s\",1);",lfn));
2204 msg += " using_tags: No (unknown)";
2205 Info("CheckDataType", "%s", msg.Data());
2208 TMap *map = coll->Next();
2210 msg += " using_tags: No (unknown)";
2211 Info("CheckDataType", "%s", msg.Data());
2214 map = (TMap*)map->GetValue("");
2216 if (map && map->GetValue("name")) file = map->GetValue("name")->GetName();
2217 useTags = file.Contains(".tag");
2219 if (useTags) msg += " using_tags: Yes";
2220 else msg += " using_tags: No";
2221 Info("CheckDataType", "%s", msg.Data());
2224 useTags = slfn.Contains(".tag");
2225 if (slfn.Contains(".root")) msg += " type: root file;";
2226 else msg += " type: unknown file;";
2227 if (useTags) msg += " using_tags: Yes";
2228 else msg += " using_tags: No";
2229 Info("CheckDataType", "%s", msg.Data());
2232 //______________________________________________________________________________
2233 void AliAnalysisAlien::EnablePackage(const char *package)
2235 // Enables a par file supposed to exist in the current directory.
2236 TString pkg(package);
2237 pkg.ReplaceAll(".par", "");
2239 if (gSystem->AccessPathName(pkg)) {
2240 Fatal("EnablePackage", "Package %s not found", pkg.Data());
2243 if (!TObject::TestBit(AliAnalysisGrid::kUsePars))
2244 Info("EnablePackage", "AliEn plugin will use .par packages");
2245 TObject::SetBit(AliAnalysisGrid::kUsePars, kTRUE);
2247 fPackages = new TObjArray();
2248 fPackages->SetOwner();
2250 fPackages->Add(new TObjString(pkg));
2253 //______________________________________________________________________________
2254 TChain *AliAnalysisAlien::GetChainForTestMode(const char *treeName) const
2256 // Make a tree from files having the location specified in fFileForTestMode.
2257 // Inspired from JF's CreateESDChain.
2258 if (fFileForTestMode.IsNull()) {
2259 Error("GetChainForTestMode", "For proof test mode please use SetFileForTestMode() pointing to a file that contains data file locations.");
2262 if (gSystem->AccessPathName(fFileForTestMode)) {
2263 Error("GetChainForTestMode", "File not found: %s", fFileForTestMode.Data());
2268 in.open(fFileForTestMode);
2270 // Read the input list of files and add them to the chain
2272 TString streeName(treeName);
2273 if (IsUseMCchain()) streeName = "TE";
2274 TChain *chain = new TChain(streeName);
2275 TChain *chainFriend = 0;
2276 if (!fFriendChainName.IsNull()) chainFriend = new TChain(streeName);
2280 if (line.IsNull() || line.BeginsWith("#")) continue;
2281 if (count++ == fNtestFiles) break;
2282 TString esdFile(line);
2283 TFile *file = TFile::Open(esdFile);
2284 if (file && !file->IsZombie()) {
2285 chain->Add(esdFile);
2287 if (!fFriendChainName.IsNull()) {
2288 if (esdFile.Index("#") > -1)
2289 esdFile.Remove(esdFile.Index("#"));
2290 esdFile = gSystem->DirName(esdFile);
2291 esdFile += "/" + fFriendChainName;
2292 file = TFile::Open(esdFile);
2293 if (file && !file->IsZombie()) {
2295 chainFriend->Add(esdFile);
2297 Fatal("GetChainForTestMode", "Cannot open friend file: %s", esdFile.Data());
2302 Error("GetChainforTestMode", "Skipping un-openable file: %s", esdFile.Data());
2306 if (!chain->GetListOfFiles()->GetEntries()) {
2307 Error("GetChainForTestMode", "No file from %s could be opened", fFileForTestMode.Data());
2313 if (!fFriendChainName.IsNull()) chain->AddFriend(chainFriend);
2317 //______________________________________________________________________________
2318 const char *AliAnalysisAlien::GetJobStatus(Int_t jobidstart, Int_t lastid, Int_t &nrunning, Int_t &nwaiting, Int_t &nerror, Int_t &ndone)
2320 // Get job status for all jobs with jobid>jobidstart.
2321 static char mstatus[20];
2327 TGridJobStatusList *list = gGrid->Ps("");
2328 if (!list) return mstatus;
2329 Int_t nentries = list->GetSize();
2330 TGridJobStatus *status;
2332 for (Int_t ijob=0; ijob<nentries; ijob++) {
2333 status = (TGridJobStatus *)list->At(ijob);
2334 pid = gROOT->ProcessLine(Form("atoi(((TAlienJobStatus*)%p)->GetKey(\"queueId\"));", status));
2335 if (pid<jobidstart) continue;
2336 if (pid == lastid) {
2337 gROOT->ProcessLine(Form("sprintf((char*)%p,((TAlienJobStatus*)%p)->GetKey(\"status\"));",mstatus, status));
2339 switch (status->GetStatus()) {
2340 case TGridJobStatus::kWAITING:
2342 case TGridJobStatus::kRUNNING:
2344 case TGridJobStatus::kABORTED:
2345 case TGridJobStatus::kFAIL:
2346 case TGridJobStatus::kUNKNOWN:
2348 case TGridJobStatus::kDONE:
2357 //______________________________________________________________________________
2358 Bool_t AliAnalysisAlien::IsCollection(const char *lfn) const
2360 // Returns true if file is a collection. Functionality duplicated from
2361 // TAlien::Type() because we don't want to directly depend on TAlien.
2363 Error("IsCollection", "No connection to grid");
2366 TGridResult *res = gGrid->Command(Form("type -z %s",lfn),kFALSE);
2367 if (!res) return kFALSE;
2368 const char* typeStr = res->GetKey(0, "type");
2369 if (!typeStr || !strlen(typeStr)) return kFALSE;
2370 if (!strcmp(typeStr, "collection")) return kTRUE;
2375 //______________________________________________________________________________
2376 Bool_t AliAnalysisAlien::IsSingleOutput() const
2378 // Check if single-ouput option is on.
2379 return (!fOutputSingle.IsNull());
2382 //______________________________________________________________________________
2383 void AliAnalysisAlien::Print(Option_t *) const
2385 // Print current plugin settings.
2386 printf("### AliEn analysis plugin current settings ###\n");
2387 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
2388 if (mgr && mgr->IsProofMode()) {
2389 TString proofType = "= PLUGIN IN PROOF MODE ON CLUSTER:_________________";
2390 if (TestBit(AliAnalysisGrid::kTest))
2391 proofType = "= PLUGIN IN PROOF LITE MODE ON CLUSTER:____________";
2392 printf("%s %s\n", proofType.Data(), fProofCluster.Data());
2393 if (!fProofDataSet.IsNull())
2394 printf("= Requested data set:___________________________ %s\n", fProofDataSet.Data());
2396 printf("= Soft reset signal will be send to master______ CHANGE BEHAVIOR AFTER COMPLETION\n");
2398 printf("= Hard reset signal will be send to master______ CHANGE BEHAVIOR AFTER COMPLETION\n");
2399 if (!fROOTVersion.IsNull())
2400 printf("= ROOT version requested________________________ %s\n", fROOTVersion.Data());
2402 printf("= ROOT version requested________________________ default\n");
2403 printf("= AliRoot version requested_____________________ %s\n", fAliROOTVersion.Data());
2404 if (!fAliRootMode.IsNull())
2405 printf("= Requested AliRoot mode________________________ %s\n", fAliRootMode.Data());
2407 printf("= Number of PROOF workers limited to____________ %d\n", fNproofWorkers);
2408 if (fNproofWorkersPerSlave)
2409 printf("= Maximum number of workers per slave___________ %d\n", fNproofWorkersPerSlave);
2410 if (TestSpecialBit(kClearPackages))
2411 printf("= ClearPackages requested...\n");
2412 if (fIncludePath.Data())
2413 printf("= Include path for runtime task compilation: ___ %s\n", fIncludePath.Data());
2414 printf("= Additional libs to be loaded or souces to be compiled runtime: <%s>\n",fAdditionalLibs.Data());
2415 if (fPackages && fPackages->GetEntries()) {
2416 TIter next(fPackages);
2419 while ((obj=next())) list += obj->GetName();
2420 printf("= Par files to be used: ________________________ %s\n", list.Data());
2422 if (TestSpecialBit(kProofConnectGrid))
2423 printf("= Requested PROOF connection to grid\n");
2426 printf("= OverwriteMode:________________________________ %d\n", fOverwriteMode);
2427 if (fOverwriteMode) {
2428 printf("***** NOTE: Overwrite mode will overwrite the input generated datasets and partial results from previous analysis. \
2429 \n***** To disable, use: plugin->SetOverwriteMode(kFALSE);\n");
2431 printf("= Copy files to grid: __________________________ %s\n", (IsUseCopy())?"YES":"NO");
2432 printf("= Check if files can be copied to grid: ________ %s\n", (IsCheckCopy())?"YES":"NO");
2433 printf("= Production mode:______________________________ %d\n", fProductionMode);
2434 printf("= Version of API requested: ____________________ %s\n", fAPIVersion.Data());
2435 printf("= Version of ROOT requested: ___________________ %s\n", fROOTVersion.Data());
2436 printf("= Version of AliRoot requested: ________________ %s\n", fAliROOTVersion.Data());
2438 printf("= User running the plugin: _____________________ %s\n", fUser.Data());
2439 printf("= Grid workdir relative to user $HOME: _________ %s\n", fGridWorkingDir.Data());
2440 printf("= Grid output directory relative to workdir: ___ %s\n", fGridOutputDir.Data());
2441 printf("= Data base directory path requested: __________ %s\n", fGridDataDir.Data());
2442 printf("= Data search pattern: _________________________ %s\n", fDataPattern.Data());
2443 printf("= Input data format: ___________________________ %s\n", fInputFormat.Data());
2444 if (fRunNumbers.Length())
2445 printf("= Run numbers to be processed: _________________ %s\n", fRunNumbers.Data());
2447 printf("= Run range to be processed: ___________________ %d-%d\n", fRunRange[0], fRunRange[1]);
2448 if (!fRunRange[0] && !fRunNumbers.Length()) {
2449 TIter next(fInputFiles);
2452 while ((obj=next())) list += obj->GetName();
2453 printf("= Input files to be processed: _________________ %s\n", list.Data());
2455 if (TestBit(AliAnalysisGrid::kTest))
2456 printf("= Number of input files used in test mode: _____ %d\n", fNtestFiles);
2457 printf("= List of output files to be registered: _______ %s\n", fOutputFiles.Data());
2458 printf("= List of outputs going to be archived: ________ %s\n", fOutputArchive.Data());
2459 printf("= List of outputs that should not be merged: ___ %s\n", fMergeExcludes.Data());
2460 printf("= List of outputs that should not be registered: %s\n", fRegisterExcludes.Data());
2461 printf("= List of outputs produced during Terminate: ___ %s\n", fTerminateFiles.Data());
2462 printf("=====================================================================\n");
2463 printf("= Job price: ___________________________________ %d\n", fPrice);
2464 printf("= Time to live (TTL): __________________________ %d\n", fTTL);
2465 printf("= Max files per subjob: ________________________ %d\n", fSplitMaxInputFileNumber);
2466 if (fMaxInitFailed>0)
2467 printf("= Max number of subjob fails to kill: __________ %d\n", fMaxInitFailed);
2468 if (fMasterResubmitThreshold>0)
2469 printf("= Resubmit master job if failed subjobs >_______ %d\n", fMasterResubmitThreshold);
2470 printf("= Number of replicas for the output files_______ %d\n", fNreplicas);
2471 if (fNrunsPerMaster>0)
2472 printf("= Number of runs per master job: _______________ %d\n", fNrunsPerMaster);
2473 printf("= Number of files in one chunk to be merged: ___ %d\n", fMaxMergeFiles);
2474 printf("= Name of the generated execution script: ______ %s\n", fExecutable.Data());
2475 printf("= Executable command: __________________________ %s\n", fExecutableCommand.Data());
2476 if (fArguments.Length())
2477 printf("= Arguments for the execution script: __________ %s\n",fArguments.Data());
2478 if (fExecutableArgs.Length())
2479 printf("= Arguments after macro name in executable______ %s\n",fExecutableArgs.Data());
2480 printf("= Name of the generated analysis macro: ________ %s\n",fAnalysisMacro.Data());
2481 printf("= User analysis files to be deployed: __________ %s\n",fAnalysisSource.Data());
2482 printf("= Additional libs to be loaded or souces to be compiled runtime: <%s>\n",fAdditionalLibs.Data());
2483 printf("= Master jobs split mode: ______________________ %s\n",fSplitMode.Data());
2485 printf("= Custom name for the dataset to be created: ___ %s\n", fDatasetName.Data());
2486 printf("= Name of the generated JDL: ___________________ %s\n", fJDLName.Data());
2487 if (fIncludePath.Data())
2488 printf("= Include path for runtime task compilation: ___ %s\n", fIncludePath.Data());
2489 if (fCloseSE.Length())
2490 printf("= Force job outputs to storage element: ________ %s\n", fCloseSE.Data());
2491 if (fFriendChainName.Length())
2492 printf("= Open friend chain file on worker: ____________ %s\n", fFriendChainName.Data());
2493 if (fPackages && fPackages->GetEntries()) {
2494 TIter next(fPackages);
2497 while ((obj=next())) list += obj->GetName();
2498 printf("= Par files to be used: ________________________ %s\n", list.Data());
2502 //______________________________________________________________________________
2503 void AliAnalysisAlien::SetDefaults()
2505 // Set default values for everything. What cannot be filled will be left empty.
2506 if (fGridJDL) delete fGridJDL;
2507 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
2508 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
2511 fSplitMaxInputFileNumber = 100;
2513 fMasterResubmitThreshold = 0;
2519 fNrunsPerMaster = 1;
2520 fMaxMergeFiles = 100;
2522 fExecutable = "analysis.sh";
2523 fExecutableCommand = "root -b -q -x";
2525 fExecutableArgs = "";
2526 fAnalysisMacro = "myAnalysis.C";
2527 fAnalysisSource = "";
2528 fAdditionalLibs = "";
2532 fAliROOTVersion = "";
2533 fUser = ""; // Your alien user name
2534 fGridWorkingDir = "";
2535 fGridDataDir = ""; // Can be like: /alice/sim/PDC_08a/LHC08c9/
2536 fDataPattern = "*AliESDs.root"; // Can be like: *AliESDs.root, */pass1/*AliESDs.root, ...
2537 fFriendChainName = "";
2538 fGridOutputDir = "output";
2539 fOutputArchive = "log_archive.zip:std*@disk=1 root_archive.zip:*.root@disk=2";
2540 fOutputFiles = ""; // Like "AliAODs.root histos.root"
2541 fInputFormat = "xml-single";
2542 fJDLName = "analysis.jdl";
2543 fJobTag = "Automatically generated analysis JDL";
2544 fMergeExcludes = "";
2547 SetCheckCopy(kTRUE);
2548 SetDefaultOutputs(kTRUE);
2552 //______________________________________________________________________________
2553 void AliAnalysisAlien::SetRootVersionForProof(const char *version)
2555 // Obsolete method. Use SetROOTVersion instead
2556 Warning("SetRootVersionForProof", "Obsolete. Use SetROOTVersion instead");
2557 SetROOTVersion(version);
2560 //______________________________________________________________________________
2561 Bool_t AliAnalysisAlien::CheckMergedFiles(const char *filename, const char *aliendir, Int_t nperchunk, const char *jdl)
2563 // Checks current merge stage, makes xml for the next stage, counts number of files, submits next stage.
2564 // First check if the result is already in the output directory.
2565 if (FileExists(Form("%s/%s",aliendir,filename))) {
2566 printf("Final merged results found. Not merging again.\n");
2569 // Now check the last stage done.
2572 if (!FileExists(Form("%s/Stage_%d.xml",aliendir, stage+1))) break;
2575 // Next stage of merging
2577 TString pattern = "*root_archive.zip";
2578 if (stage>1) pattern = Form("Stage_%d/*root_archive.zip", stage-1);
2579 TGridResult *res = gGrid->Command(Form("find -x Stage_%d %s %s", stage, aliendir, pattern.Data()));
2580 if (res) delete res;
2581 // Write standard output to file
2582 gROOT->ProcessLine(Form("gGrid->Stdout(); > %s", Form("Stage_%d.xml",stage)));
2583 // Count the number of files inside
2585 ifile.open(Form("Stage_%d.xml",stage));
2586 if (!ifile.good()) {
2587 ::Error("CheckMergedFiles", "Could not redirect result of the find command to file %s", Form("Stage_%d.xml",stage));
2592 while (!ifile.eof()) {
2594 if (line.Contains("/event")) nfiles++;
2598 ::Error("CheckMergedFiles", "Cannot start Stage_%d merging since Stage_%d did not produced yet output", stage, stage-1);
2601 printf("=== Stage_%d produced %d files\n", stage-1, nfiles);
2603 // Copy the file in the output directory
2604 printf("===> Copying collection %s in the output directory %s\n", Form("Stage_%d.xml",stage), aliendir);
2605 // TFile::Cp(Form("Stage_%d.xml",stage), Form("alien://%s/Stage_%d.xml",aliendir,stage));
2606 if (!copyLocal2Alien("CheckMergedFiles", Form("Stage_%d.xml",stage),
2607 Form("%s/Stage_%d.xml",aliendir,stage))) Fatal("","Terminating");
2608 // Check if this is the last stage to be done.
2609 Bool_t laststage = (nfiles<nperchunk);
2610 if (fMaxMergeStages && stage>=fMaxMergeStages) laststage = kTRUE;
2613 printf("### Submiting final merging stage %d\n", stage);
2614 TString finalJDL = jdl;
2615 finalJDL.ReplaceAll(".jdl", "_final.jdl");
2616 TString query = Form("submit %s %s %d", finalJDL.Data(), aliendir, stage);
2617 jobId = SubmitSingleJob(query);
2619 printf("### Submiting merging stage %d\n", stage);
2620 TString query = Form("submit %s %s %d", jdl, aliendir, stage);
2621 jobId = SubmitSingleJob(query);
2623 if (!jobId) return kFALSE;
2625 if (!fGridJobIDs.IsNull()) fGridJobIDs.Append(" ");
2626 fGridJobIDs.Append(Form("%d", jobId));
2627 if (!fGridStages.IsNull()) fGridStages.Append(" ");
2628 fGridStages.Append(Form("%s_merge_stage%d",
2629 laststage ? "final" : "partial", stage));
2634 //______________________________________________________________________________
2635 AliAnalysisManager *AliAnalysisAlien::LoadAnalysisManager(const char *fname)
2637 // Loat the analysis manager from a file.
2638 TFile *file = TFile::Open(fname);
2640 ::Error("LoadAnalysisManager", "Cannot open file %s", fname);
2643 TIter nextkey(file->GetListOfKeys());
2644 AliAnalysisManager *mgr = 0;
2646 while ((key=(TKey*)nextkey())) {
2647 if (!strcmp(key->GetClassName(), "AliAnalysisManager"))
2648 mgr = (AliAnalysisManager*)file->Get(key->GetName());
2651 ::Error("LoadAnalysisManager", "No analysis manager found in file %s", fname);
2655 //______________________________________________________________________________
2656 Int_t AliAnalysisAlien::SubmitSingleJob(const char *query)
2658 // Submits a single job corresponding to the query and returns job id. If 0 submission failed.
2659 if (!gGrid) return 0;
2660 printf("=> %s ------> ",query);
2661 TGridResult *res = gGrid->Command(query);
2663 TString jobId = res->GetKey(0,"jobId");
2665 if (jobId.IsNull()) {
2666 printf("submission failed. Reason:\n");
2669 ::Error("SubmitSingleJob", "Your query %s could not be submitted", query);
2672 Int_t ijobId = jobId.Atoi();
2673 printf(" Job id: '%s' (%d)\n", jobId.Data(), ijobId);
2677 //______________________________________________________________________________
2678 Bool_t AliAnalysisAlien::MergeInfo(const char *output, const char *collection)
2680 // Merges a collection of output files using concatenation.
2681 TString scoll(collection);
2682 if (!scoll.Contains(".xml")) return kFALSE;
2683 TGridCollection *coll = (TGridCollection*)gROOT->ProcessLine(Form("TAlienCollection::Open(\"%s\");", collection));
2685 ::Error("MergeInfo", "Input XML %s collection empty.", collection);
2688 // Iterate grid collection
2690 Bool_t merged = kFALSE;
2692 while (coll->Next()) {
2693 TString fname = gSystem->DirName(coll->GetTURL());
2696 outtmp = Form("%d_%s", ifile, output);
2697 if (!TFile::Cp(fname, outtmp)) {
2698 ::Error("MergeInfo", "Could not copy %s", fname.Data());
2703 gSystem->Exec(Form("cp %s lastmerged", outtmp.Data()));
2706 gSystem->Exec(Form("cat lastmerged %s > tempmerged", outtmp.Data()));
2707 gSystem->Exec("cp tempmerged lastmerged");
2710 gSystem->Exec(Form("cp lastmerged %s", output));
2711 gSystem->Exec(Form("rm tempmerged lastmerged *_%s", output));
2717 //______________________________________________________________________________
2718 Bool_t AliAnalysisAlien::MergeOutput(const char *output, const char *basedir, Int_t nmaxmerge, Int_t stage)
2720 // Merge given output files from basedir. Basedir can be an alien output directory
2721 // but also an xml file with root_archive.zip locations. The file merger will merge nmaxmerge
2722 // files in a group (ignored for xml input). Merging can be done in stages:
2723 // stage=0 : will merge all existing files in a single stage, supporting resume if run locally
2724 // stage=1 : works with an xml of all root_archive.zip in the output directory
2725 // stage>1 : works with an xml of all root_archive.zip in the Stage_<n-1> directory
2726 TString outputFile = output;
2728 TString outputChunk;
2729 TString previousChunk = "";
2730 TObjArray *listoffiles = new TObjArray();
2731 // listoffiles->SetOwner();
2732 Int_t countChunk = 0;
2733 Int_t countZero = nmaxmerge;
2734 Bool_t merged = kTRUE;
2735 Bool_t isGrid = kTRUE;
2736 Int_t index = outputFile.Index("@");
2737 if (index > 0) outputFile.Remove(index);
2738 TString inputFile = outputFile;
2739 TString sbasedir = basedir;
2740 if (sbasedir.Contains(".xml")) {
2741 // Merge files pointed by the xml - ignore nmaxmerge and set ichunk to 0
2742 nmaxmerge = 9999999;
2743 TGridCollection *coll = (TGridCollection*)gROOT->ProcessLine(Form("TAlienCollection::Open(\"%s\");", basedir));
2745 ::Error("MergeOutput", "Input XML collection empty.");
2748 // Iterate grid collection
2749 while (coll->Next()) {
2750 TString fname = gSystem->DirName(coll->GetTURL());
2753 listoffiles->Add(new TNamed(fname.Data(),""));
2755 } else if (sbasedir.Contains(".txt")) {
2756 // The file having the .txt extension is expected to contain a list of
2757 // folders where the output files will be looked. For alien folders,
2758 // the full folder LFN is expected (starting with alien://)
2759 // Assume lfn's on each line
2764 ::Error("MergeOutput", "File %s cannot be opened. Merging stopped." ,sbasedir.Data());
2770 if (line.IsNull() || line.BeginsWith("#")) continue;
2772 if (!line.Contains("alien:")) isGrid = kFALSE;
2776 listoffiles->Add(new TNamed(line.Data(),""));
2780 ::Error("MergeOutput","Input file %s contains no files to be merged\n", sbasedir.Data());
2785 command = Form("find %s/ *%s", basedir, inputFile.Data());
2786 printf("command: %s\n", command.Data());
2787 TGridResult *res = gGrid->Command(command);
2789 ::Error("MergeOutput","No result for the find command\n");
2795 while ((map=(TMap*)nextmap())) {
2796 TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("turl"));
2797 if (!objs || !objs->GetString().Length()) {
2798 // Nothing found - skip this output
2803 listoffiles->Add(new TNamed(objs->GetName(),""));
2807 if (!listoffiles->GetEntries()) {
2808 ::Error("MergeOutput","No result for the find command\n");
2813 TFileMerger *fm = 0;
2814 TIter next0(listoffiles);
2815 TObjArray *listoffilestmp = new TObjArray();
2816 listoffilestmp->SetOwner();
2819 // Keep only the files at upper level
2820 Int_t countChar = 0;
2821 while ((nextfile=next0())) {
2822 snextfile = nextfile->GetName();
2823 Int_t crtCount = snextfile.CountChar('/');
2824 if (nextfile == listoffiles->First()) countChar = crtCount;
2825 if (crtCount < countChar) countChar = crtCount;
2828 while ((nextfile=next0())) {
2829 snextfile = nextfile->GetName();
2830 Int_t crtCount = snextfile.CountChar('/');
2831 if (crtCount > countChar) {
2835 listoffilestmp->Add(nextfile);
2838 listoffiles = listoffilestmp; // Now contains 'good' files
2839 listoffiles->Print();
2840 TIter next(listoffiles);
2841 // Check if there is a merge operation to resume. Works only for stage 0 or 1.
2842 outputChunk = outputFile;
2843 outputChunk.ReplaceAll(".root", "_*.root");
2844 // Check for existent temporary merge files
2845 // Check overwrite mode and remove previous partial results if needed
2846 // Preserve old merging functionality for stage 0.
2848 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
2850 // Skip as many input files as in a chunk
2851 for (Int_t counter=0; counter<nmaxmerge; counter++) {
2854 ::Error("MergeOutput", "Mismatch found. Please remove partial merged files from local dir.");
2858 snextfile = nextfile->GetName();
2860 outputChunk = outputFile;
2861 outputChunk.ReplaceAll(".root", Form("_%04d.root", countChunk));
2863 if (gSystem->AccessPathName(outputChunk)) continue;
2864 // Merged file with chunks up to <countChunk> found
2865 ::Info("MergeOutput", "Resume merging of <%s> from <%s>\n", outputFile.Data(), outputChunk.Data());
2866 previousChunk = outputChunk;
2870 countZero = nmaxmerge;
2872 while ((nextfile=next())) {
2873 snextfile = nextfile->GetName();
2874 // Loop 'find' results and get next LFN
2875 if (countZero == nmaxmerge) {
2876 // First file in chunk - create file merger and add previous chunk if any.
2877 fm = new TFileMerger(isGrid);
2878 fm->SetFastMethod(kTRUE);
2879 if (previousChunk.Length()) fm->AddFile(previousChunk.Data());
2880 outputChunk = outputFile;
2881 outputChunk.ReplaceAll(".root", Form("_%04d.root", countChunk));
2883 // If last file found, put merged results in the output file
2884 if (nextfile == listoffiles->Last()) outputChunk = outputFile;
2885 // Add file to be merged and decrement chunk counter.
2886 fm->AddFile(snextfile);
2888 if (countZero==0 || nextfile == listoffiles->Last()) {
2889 if (!fm->GetMergeList() || !fm->GetMergeList()->GetSize()) {
2890 // Nothing found - skip this output
2891 ::Warning("MergeOutput", "No <%s> files found.", inputFile.Data());
2895 fm->OutputFile(outputChunk);
2896 // Merge the outputs, then go to next chunk
2898 ::Error("MergeOutput", "Could not merge all <%s> files", outputFile.Data());
2902 ::Info("MergeOutputs", "\n##### Merged %d output files to <%s>", fm->GetMergeList()->GetSize(), outputChunk.Data());
2903 gSystem->Unlink(previousChunk);
2905 if (nextfile == listoffiles->Last()) break;
2907 countZero = nmaxmerge;
2908 previousChunk = outputChunk;
2915 // Merging stage different than 0.
2916 // Move to the begining of the requested chunk.
2917 fm = new TFileMerger(isGrid);
2918 fm->SetFastMethod(kTRUE);
2919 while ((nextfile=next())) fm->AddFile(nextfile->GetName());
2921 if (!fm->GetMergeList() || !fm->GetMergeList()->GetSize()) {
2922 // Nothing found - skip this output
2923 ::Warning("MergeOutput", "No <%s> files found.", inputFile.Data());
2927 fm->OutputFile(outputFile);
2928 // Merge the outputs
2930 ::Error("MergeOutput", "Could not merge all <%s> files", outputFile.Data());
2934 ::Info("MergeOutput", "\n##### Merged %d output files to <%s>", fm->GetMergeList()->GetSize(), outputFile.Data());
2940 //______________________________________________________________________________
2941 Bool_t AliAnalysisAlien::MergeOutputs()
2943 // Merge analysis outputs existing in the AliEn space.
2944 if (TestBit(AliAnalysisGrid::kTest)) return kTRUE;
2945 if (TestBit(AliAnalysisGrid::kOffline)) return kFALSE;
2947 Error("MergeOutputs", "Cannot merge outputs without grid connection. Terminate will NOT be executed");
2951 if (!TestBit(AliAnalysisGrid::kMerge)) {
2952 Info("MergeOutputs", "### Re-run with <MergeViaJDL> option in terminate mode of the plugin to submit merging jobs ###");
2955 if (fProductionMode) {
2956 Info("MergeOutputs", "### Merging will be submitted by LPM manager... ###");
2959 Info("MergeOutputs", "Submitting merging JDL");
2960 if (!SubmitMerging()) return kFALSE;
2961 Info("MergeOutputs", "### Re-run with <MergeViaJDL> off to collect results after merging jobs are done ###");
2962 Info("MergeOutputs", "### The Terminate() method is executed by the merging jobs");
2965 // Get the output path
2966 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
2967 if (!DirectoryExists(fGridOutputDir)) {
2968 Error("MergeOutputs", "Grid output directory %s not found. Terminate() will NOT be executed", fGridOutputDir.Data());
2971 if (!fOutputFiles.Length()) {
2972 Error("MergeOutputs", "No output file names defined. Are you running the right AliAnalysisAlien configuration ?");
2975 // Check if fast read option was requested
2976 Info("MergeOutputs", "Started local merging of output files from: alien://%s \
2977 \n======= overwrite mode = %d", fGridOutputDir.Data(), (Int_t)fOverwriteMode);
2978 if (fFastReadOption) {
2979 Warning("MergeOutputs", "You requested FastRead option. Using xrootd flags to reduce timeouts. This may skip some files that could be accessed ! \
2980 \n+++ NOTE: To disable this option, use: plugin->SetFastReadOption(kFALSE)");
2981 gEnv->SetValue("XNet.ConnectTimeout",50);
2982 gEnv->SetValue("XNet.RequestTimeout",50);
2983 gEnv->SetValue("XNet.MaxRedirectCount",2);
2984 gEnv->SetValue("XNet.ReconnectTimeout",50);
2985 gEnv->SetValue("XNet.FirstConnectMaxCnt",1);
2987 // Make sure we change the temporary directory
2988 gSystem->Setenv("TMPDIR", gSystem->pwd());
2989 // Set temporary compilation directory to current one
2990 gSystem->SetBuildDir(gSystem->pwd(), kTRUE);
2991 TObjArray *list = fOutputFiles.Tokenize(",");
2995 Bool_t merged = kTRUE;
2996 while((str=(TObjString*)next())) {
2997 outputFile = str->GetString();
2998 Int_t index = outputFile.Index("@");
2999 if (index > 0) outputFile.Remove(index);
3000 TString outputChunk = outputFile;
3001 outputChunk.ReplaceAll(".root", "_*.root");
3002 // Skip already merged outputs
3003 if (!gSystem->AccessPathName(outputFile)) {
3004 if (fOverwriteMode) {
3005 Info("MergeOutputs", "Overwrite mode. Existing file %s was deleted.", outputFile.Data());
3006 gSystem->Unlink(outputFile);
3007 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
3008 Info("MergeOutput", "Overwrite mode: partial merged files %s will removed",
3009 outputChunk.Data());
3010 gSystem->Exec(Form("rm -f %s", outputChunk.Data()));
3013 Info("MergeOutputs", "Output file <%s> found. Not merging again.", outputFile.Data());
3017 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
3018 Info("MergeOutput", "Overwrite mode: partial merged files %s will removed",
3019 outputChunk.Data());
3020 gSystem->Exec(Form("rm -f %s", outputChunk.Data()));
3023 if (fMergeExcludes.Contains(outputFile.Data()) ||
3024 fRegisterExcludes.Contains(outputFile.Data())) continue;
3025 // Perform a 'find' command in the output directory, looking for registered outputs
3026 merged = MergeOutput(outputFile, fGridOutputDir, fMaxMergeFiles);
3028 Error("MergeOutputs", "Terminate() will NOT be executed");
3032 TFile *fileOpened = (TFile*)gROOT->GetListOfFiles()->FindObject(outputFile);
3033 if (fileOpened) fileOpened->Close();
3039 //______________________________________________________________________________
3040 void AliAnalysisAlien::SetDefaultOutputs(Bool_t flag)
3042 // Use the output files connected to output containers from the analysis manager
3043 // rather than the files defined by SetOutputFiles
3044 if (flag && !TObject::TestBit(AliAnalysisGrid::kDefaultOutputs))
3045 Info("SetDefaultOutputs", "Plugin will use the output files taken from analysis manager");
3046 TObject::SetBit(AliAnalysisGrid::kDefaultOutputs, flag);
3049 //______________________________________________________________________________
3050 void AliAnalysisAlien::SetOutputFiles(const char *list)
3052 // Manually set the output files list.
3053 // Removes duplicates. Not allowed if default outputs are not disabled.
3054 if (TObject::TestBit(AliAnalysisGrid::kDefaultOutputs)) {
3055 Fatal("SetOutputFiles", "You have to explicitly call SetDefaultOutputs(kFALSE) to manually set output files.");
3058 Info("SetOutputFiles", "Output file list is set manually - you are on your own.");
3060 TString slist = list;
3061 if (slist.Contains("@")) Warning("SetOutputFiles","The plugin does not allow explicit SE's. Please use: SetNumberOfReplicas() instead.");
3062 TObjArray *arr = slist.Tokenize(" ");
3066 while ((os=(TObjString*)next())) {
3067 sout = os->GetString();
3068 if (sout.Index("@")>0) sout.Remove(sout.Index("@"));
3069 if (fOutputFiles.Contains(sout)) continue;
3070 if (!fOutputFiles.IsNull()) fOutputFiles += ",";
3071 fOutputFiles += sout;
3076 //______________________________________________________________________________
3077 void AliAnalysisAlien::SetOutputArchive(const char *list)
3079 // Manually set the output archive list. Free text - you are on your own...
3080 // Not allowed if default outputs are not disabled.
3081 if (TObject::TestBit(AliAnalysisGrid::kDefaultOutputs)) {
3082 Fatal("SetOutputArchive", "You have to explicitly call SetDefaultOutputs(kFALSE) to manually set the output archives.");
3085 Info("SetOutputArchive", "Output archive is set manually - you are on your own.");
3086 fOutputArchive = list;
3089 //______________________________________________________________________________
3090 void AliAnalysisAlien::SetPreferedSE(const char */*se*/)
3092 // Setting a prefered output SE is not allowed anymore.
3093 Warning("SetPreferedSE", "Setting a preferential SE is not allowed anymore via the plugin. Use SetNumberOfReplicas() and SetDefaultOutputs()");
3096 //______________________________________________________________________________
3097 void AliAnalysisAlien::SetProofParameter(const char *pname, const char *value)
3099 // Set some PROOF special parameter.
3100 TPair *pair = dynamic_cast<TPair*>(fProofParam.FindObject(pname));
3102 TObject *old = pair->Key();
3103 TObject *val = pair->Value();
3104 fProofParam.Remove(old);
3108 fProofParam.Add(new TObjString(pname), new TObjString(value));
3111 //______________________________________________________________________________
3112 const char *AliAnalysisAlien::GetProofParameter(const char *pname) const
3114 // Returns a special PROOF parameter.
3115 TPair *pair = dynamic_cast<TPair*>(fProofParam.FindObject(pname));
3116 if (!pair) return 0;
3117 return pair->Value()->GetName();
3120 //______________________________________________________________________________
3121 Bool_t AliAnalysisAlien::StartAnalysis(Long64_t /*nentries*/, Long64_t /*firstEntry*/)
3123 // Start remote grid analysis.
3124 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
3125 Bool_t testMode = TestBit(AliAnalysisGrid::kTest);
3126 if (!mgr || !mgr->IsInitialized()) {
3127 Error("StartAnalysis", "You need an initialized analysis manager for this");
3130 // Are we in PROOF mode ?
3131 if (mgr->IsProofMode()) {
3132 if (testMode) Info("StartAnalysis", "##### Starting PROOF analysis with Proof Lite via the plugin #####");
3133 else Info("StartAnalysis", "##### Starting PROOF analysis on cluster <%s> via the plugin #####", fProofCluster.Data());
3134 if (fProofCluster.IsNull()) {
3135 Error("StartAnalysis", "You need to specify the proof cluster name via SetProofCluster");
3138 if (fProofDataSet.IsNull() && !testMode) {
3139 Error("StartAnalysis", "You need to specify a dataset using SetProofDataSet()");
3142 // Set the needed environment
3143 gEnv->SetValue("XSec.GSI.DelegProxy","2");
3144 // Do we need to reset PROOF ? The success of the Reset operation cannot be checked
3145 if (fProofReset && !testMode) {
3146 if (fProofReset==1) {
3147 Info("StartAnalysis", "Sending soft reset signal to proof cluster %s", fProofCluster.Data());
3148 gROOT->ProcessLine(Form("TProof::Reset(\"%s\", kFALSE);", fProofCluster.Data()));
3150 Info("StartAnalysis", "Sending hard reset signal to proof cluster %s", fProofCluster.Data());
3151 gROOT->ProcessLine(Form("TProof::Reset(\"%s\", kTRUE);", fProofCluster.Data()));
3153 Info("StartAnalysis", "Stopping the analysis. Please use SetProofReset(0) to resume.");
3158 // Check if there is an old active session
3159 Long_t nsessions = gROOT->ProcessLine(Form("TProof::Mgr(\"%s\")->QuerySessions(\"\")->GetEntries();", fProofCluster.Data()));
3161 Error("StartAnalysis","You have to reset your old session first\n");
3165 // Do we need to change the ROOT version ? The success of this cannot be checked.
3166 if (!fROOTVersion.IsNull() && !testMode) {
3167 gROOT->ProcessLine(Form("TProof::Mgr(\"%s\")->SetROOTVersion(\"VO_ALICE@ROOT::%s\");",
3168 fProofCluster.Data(), fROOTVersion.Data()));
3170 // Connect to PROOF and check the status
3173 if (fNproofWorkersPerSlave) sworkers = Form("workers=%dx", fNproofWorkersPerSlave);
3174 else if (fNproofWorkers) sworkers = Form("workers=%d", fNproofWorkers);
3176 if (!sworkers.IsNull())
3177 proof = gROOT->ProcessLine(Form("TProof::Open(\"%s\", \"%s\");", fProofCluster.Data(), sworkers.Data()));
3179 proof = gROOT->ProcessLine(Form("TProof::Open(\"%s\");", fProofCluster.Data()));
3181 proof = gROOT->ProcessLine("TProof::Open(\"\");");
3183 Error("StartAnalysis", "Could not start PROOF in test mode");
3188 Error("StartAnalysis", "Could not connect to PROOF cluster <%s>", fProofCluster.Data());
3191 if (fNproofWorkersPerSlave*fNproofWorkers > 0)
3192 gROOT->ProcessLine(Form("gProof->SetParallel(%d);", fNproofWorkers));
3193 // Set proof special parameters if any
3194 TIter nextpp(&fProofParam);
3195 TObject *proofparam;
3196 while ((proofparam=nextpp())) {
3197 TString svalue = GetProofParameter(proofparam->GetName());
3198 gROOT->ProcessLine(Form("gProof->SetParameter(\"%s\",%s);", proofparam->GetName(), svalue.Data()));
3200 // Is dataset existing ?
3202 TString dataset = fProofDataSet;
3203 Int_t index = dataset.Index("#");
3204 if (index>=0) dataset.Remove(index);
3205 // if (!gROOT->ProcessLine(Form("gProof->ExistsDataSet(\"%s\");",fProofDataSet.Data()))) {
3206 // Error("StartAnalysis", "Dataset %s not existing", fProofDataSet.Data());
3209 // Info("StartAnalysis", "Dataset %s found", dataset.Data());
3211 // Is ClearPackages() needed ?
3212 if (TestSpecialBit(kClearPackages)) {
3213 Info("StartAnalysis", "ClearPackages signal sent to PROOF. Use SetClearPackages(kFALSE) to reset this.");
3214 gROOT->ProcessLine("gProof->ClearPackages();");
3216 // Is a given aliroot mode requested ?
3219 if (!fAliRootMode.IsNull()) {
3220 TString alirootMode = fAliRootMode;
3221 if (alirootMode == "default") alirootMode = "";
3222 Info("StartAnalysis", "You are requesting AliRoot mode: %s", fAliRootMode.Data());
3223 optionsList.SetOwner();
3224 optionsList.Add(new TNamed("ALIROOT_MODE", alirootMode.Data()));
3225 // Check the additional libs to be loaded
3227 Bool_t parMode = kFALSE;
3228 if (!alirootMode.IsNull()) extraLibs = "ANALYSIS:OADB:ANALYSISalice";
3229 // Parse the extra libs for .so
3230 if (fAdditionalLibs.Length()) {
3231 TObjArray *list = fAdditionalLibs.Tokenize(" ");
3234 while((str=(TObjString*)next())) {
3235 if (str->GetString().Contains(".so")) {
3237 Warning("StartAnalysis", "Plugin does not support loading libs after par files in PROOF mode. Library %s and following will not load on workers", str->GetName());
3240 TString stmp = str->GetName();
3241 if (stmp.BeginsWith("lib")) stmp.Remove(0,3);
3242 stmp.ReplaceAll(".so","");
3243 if (!extraLibs.IsNull()) extraLibs += ":";
3247 if (str->GetString().Contains(".par")) {
3248 // The first par file found in the list will not allow any further .so
3250 if (!parLibs.IsNull()) parLibs += ":";
3251 parLibs += str->GetName();
3255 if (list) delete list;
3257 if (!extraLibs.IsNull()) {
3258 Info("StartAnalysis", "Adding extra libs: %s",extraLibs.Data());
3259 optionsList.Add(new TNamed("ALIROOT_EXTRA_LIBS",extraLibs.Data()));
3261 // Check extra includes
3262 if (!fIncludePath.IsNull()) {
3263 TString includePath = fIncludePath;
3264 includePath.ReplaceAll(" ",":");
3265 includePath.ReplaceAll("$ALICE_ROOT/","");
3266 includePath.ReplaceAll("${ALICE_ROOT}/","");
3267 includePath.ReplaceAll("-I","");
3268 includePath.Remove(TString::kTrailing, ':');
3269 Info("StartAnalysis", "Adding extra includes: %s",includePath.Data());
3270 optionsList.Add(new TNamed("ALIROOT_EXTRA_INCLUDES",includePath.Data()));
3272 // Check if connection to grid is requested
3273 if (TestSpecialBit(kProofConnectGrid))
3274 optionsList.Add(new TNamed("ALIROOT_ENABLE_ALIEN", "1"));
3275 // Enable AliRoot par
3277 // Enable proof lite package
3278 TString alirootLite = gSystem->ExpandPathName("$ALICE_ROOT/ANALYSIS/macros/AliRootProofLite.par");
3279 for (Int_t i=0; i<optionsList.GetSize(); i++) {
3280 TNamed *obj = (TNamed*)optionsList.At(i);
3281 printf("%s %s\n", obj->GetName(), obj->GetTitle());
3283 if (!gROOT->ProcessLine(Form("gProof->UploadPackage(\"%s\");",alirootLite.Data()))
3284 && !gROOT->ProcessLine(Form("gProof->EnablePackage(\"%s\", (TList*)%p);",alirootLite.Data(),&optionsList))) {
3285 Info("StartAnalysis", "AliRootProofLite enabled");
3287 Error("StartAnalysis", "There was an error trying to enable package AliRootProofLite.par");
3291 if ( ! fAliROOTVersion.IsNull() ) {
3292 if (gROOT->ProcessLine(Form("gProof->EnablePackage(\"VO_ALICE@AliRoot::%s\", (TList*)%p, kTRUE);",
3293 fAliROOTVersion.Data(), &optionsList))) {
3294 Error("StartAnalysis", "There was an error trying to enable package VO_ALICE@AliRoot::%s", fAliROOTVersion.Data());
3299 // Enable first par files from fAdditionalLibs
3300 if (!parLibs.IsNull()) {
3301 TObjArray *list = parLibs.Tokenize(":");
3303 TObjString *package;
3304 while((package=(TObjString*)next())) {
3305 TString spkg = package->GetName();
3306 spkg.ReplaceAll(".par", "");
3307 gSystem->Exec(TString::Format("rm -rf %s", spkg.Data()));
3308 if (!gROOT->ProcessLine(Form("gProof->UploadPackage(\"%s\");", package->GetName()))) {
3309 TString enablePackage = (testMode)?Form("gProof->EnablePackage(\"%s\",kFALSE);", package->GetName()):Form("gProof->EnablePackage(\"%s\",kTRUE);", package->GetName());
3310 if (gROOT->ProcessLine(enablePackage)) {
3311 Error("StartAnalysis", "There was an error trying to enable package %s", package->GetName());
3315 Error("StartAnalysis", "There was an error trying to upload package %s", package->GetName());
3319 if (list) delete list;
3322 if (fAdditionalLibs.Contains(".so") && !testMode) {
3323 Error("StartAnalysis", "You request additional libs to be loaded but did not enabled any AliRoot mode. Please refer to: \
3324 \n http://aaf.cern.ch/node/83 and use a parameter for SetAliRootMode()");
3328 // Enable par files if requested
3329 if (fPackages && fPackages->GetEntries()) {
3330 TIter next(fPackages);
3332 while ((package=next())) {
3333 // Skip packages already enabled
3334 if (parLibs.Contains(package->GetName())) continue;
3335 TString spkg = package->GetName();
3336 spkg.ReplaceAll(".par", "");
3337 gSystem->Exec(TString::Format("rm -rf %s", spkg.Data()));
3338 if (!gROOT->ProcessLine(Form("gProof->UploadPackage(\"%s\");", package->GetName()))) {
3339 if (gROOT->ProcessLine(Form("gProof->EnablePackage(\"%s\",kTRUE);", package->GetName()))) {
3340 Error("StartAnalysis", "There was an error trying to enable package %s", package->GetName());
3344 Error("StartAnalysis", "There was an error trying to upload package %s", package->GetName());
3349 // Do we need to load analysis source files ?
3350 // NOTE: don't load on client since this is anyway done by the user to attach his task.
3351 if (fAnalysisSource.Length()) {
3352 TObjArray *list = fAnalysisSource.Tokenize(" ");
3355 while((str=(TObjString*)next())) {
3356 gROOT->ProcessLine(Form("gProof->Load(\"%s+g\", kTRUE);", str->GetName()));
3358 if (list) delete list;
3361 // Register dataset to proof lite.
3362 if (fFileForTestMode.IsNull()) {
3363 Error("GetChainForTestMode", "For proof test mode please use SetFileForTestMode() pointing to a file that contains data file locations.");
3366 if (gSystem->AccessPathName(fFileForTestMode)) {
3367 Error("GetChainForTestMode", "File not found: %s", fFileForTestMode.Data());
3370 TFileCollection *coll = new TFileCollection();
3371 coll->AddFromFile(fFileForTestMode);
3372 gROOT->ProcessLine(Form("gProof->RegisterDataSet(\"test_collection\", (TFileCollection*)%p, \"OV\");", coll));
3373 gROOT->ProcessLine("gProof->ShowDataSets()");
3378 // Check if output files have to be taken from the analysis manager
3379 if (TestBit(AliAnalysisGrid::kDefaultOutputs)) {
3380 // Add output files and AOD files
3381 fOutputFiles = GetListOfFiles("outaod");
3382 // Add extra files registered to the analysis manager
3383 TString extra = GetListOfFiles("ext");
3384 if (!extra.IsNull()) {
3385 extra.ReplaceAll(".root", "*.root");
3386 if (!fOutputFiles.IsNull()) fOutputFiles += ",";
3387 fOutputFiles += extra;
3389 // Compose the output archive.
3390 fOutputArchive = "log_archive.zip:std*@disk=1 ";
3391 if (mgr->IsCollectThroughput())
3392 fOutputArchive += Form("root_archive.zip:%s,*.stat,*%s@disk=%d",fOutputFiles.Data(),mgr->GetFileInfoLog(),fNreplicas);
3394 fOutputArchive += Form("root_archive.zip:%s,*.stat@disk=%d",fOutputFiles.Data(),fNreplicas);
3396 // if (!fCloseSE.Length()) fCloseSE = gSystem->Getenv("alien_CLOSE_SE");
3397 if (TestBit(AliAnalysisGrid::kOffline)) {
3398 Info("StartAnalysis","\n##### OFFLINE MODE ##### Files to be used in GRID are produced but not copied \
3399 \n there nor any job run. You can revise the JDL and analysis \
3400 \n macro then run the same in \"submit\" mode.");
3401 } else if (TestBit(AliAnalysisGrid::kTest)) {
3402 Info("StartAnalysis","\n##### LOCAL MODE ##### Your analysis will be run locally on a subset of the requested \
3404 } else if (TestBit(AliAnalysisGrid::kSubmit)) {
3405 Info("StartAnalysis","\n##### SUBMIT MODE ##### Files required by your analysis are copied to your grid working \
3406 \n space and job submitted.");
3407 } else if (TestBit(AliAnalysisGrid::kMerge)) {
3408 Info("StartAnalysis","\n##### MERGE MODE ##### The registered outputs of the analysis will be merged");
3409 if (fMergeViaJDL) CheckInputData();
3412 Info("StartAnalysis","\n##### FULL ANALYSIS MODE ##### Producing needed files and submitting your analysis job...");
3417 Error("StartAnalysis", "Cannot start grid analysis without grid connection");
3420 if (IsCheckCopy() && gGrid) CheckFileCopy(gGrid->GetHomeDirectory());
3421 if (!CheckInputData()) {
3422 Error("StartAnalysis", "There was an error in preprocessing your requested input data");
3425 if (!CreateDataset(fDataPattern)) {
3427 if (!fRunNumbers.Length() && !fRunRange[0]) serror = Form("path to data directory: <%s>", fGridDataDir.Data());
3428 if (fRunNumbers.Length()) serror = "run numbers";
3429 if (fRunRange[0]) serror = Form("run range [%d, %d]", fRunRange[0], fRunRange[1]);
3430 serror += Form("\n or data pattern <%s>", fDataPattern.Data());
3431 Error("StartAnalysis", "No data to process. Please fix %s in your plugin configuration.", serror.Data());
3434 WriteAnalysisFile();
3435 WriteAnalysisMacro();
3437 WriteValidationScript();
3439 WriteMergingMacro();
3440 WriteMergeExecutable();
3441 WriteValidationScript(kTRUE);
3443 if (!CreateJDL()) return kFALSE;
3444 if (TestBit(AliAnalysisGrid::kOffline)) return kFALSE;
3446 // Locally testing the analysis
3447 Info("StartAnalysis", "\n_______________________________________________________________________ \
3448 \n Running analysis script in a daughter shell as on a worker node \
3449 \n_______________________________________________________________________");
3450 TObjArray *list = fOutputFiles.Tokenize(",");
3454 while((str=(TObjString*)next())) {
3455 outputFile = str->GetString();
3456 Int_t index = outputFile.Index("@");
3457 if (index > 0) outputFile.Remove(index);
3458 if (!gSystem->AccessPathName(outputFile)) gSystem->Exec(Form("rm %s", outputFile.Data()));
3461 gSystem->Exec(Form("bash %s 2>stderr", fExecutable.Data()));
3462 gSystem->Exec(Form("bash %s",fValidationScript.Data()));
3463 // gSystem->Exec("cat stdout");
3466 // Check if submitting is managed by LPM manager
3467 if (fProductionMode) {
3468 //TString prodfile = fJDLName;
3469 //prodfile.ReplaceAll(".jdl", ".prod");
3470 //WriteProductionFile(prodfile);
3471 Info("StartAnalysis", "Job submitting is managed by LPM. Rerun in terminate mode after jobs finished.");
3474 // Submit AliEn job(s)
3475 gGrid->Cd(fGridOutputDir);
3480 if (!fRunNumbers.Length() && !fRunRange[0]) {
3481 // Submit a given xml or a set of runs
3482 res = gGrid->Command(Form("submit %s", fJDLName.Data()));
3483 printf("*************************** %s\n",Form("submit %s", fJDLName.Data()));
3485 const char *cjobId = res->GetKey(0,"jobId");
3489 Error("StartAnalysis", "Your JDL %s could not be submitted", fJDLName.Data());
3492 Info("StartAnalysis", "\n_______________________________________________________________________ \
3493 \n##### Your JDL %s was successfully submitted. \nTHE JOB ID IS: %s \
3494 \n_______________________________________________________________________",
3495 fJDLName.Data(), cjobId);
3498 if (!fGridJobIDs.IsNull()) fGridJobIDs.Append(" ");
3499 fGridJobIDs.Append(jobID);
3500 if (!fGridStages.IsNull()) fGridStages.Append(" ");
3501 fGridStages.Append("full");
3506 Error("StartAnalysis", "No grid result after submission !!! Bailing out...");
3510 // Submit for a range of enumeration of runs.
3511 if (!Submit()) return kFALSE;
3512 jobID = fGridJobIDs;
3516 Info("StartAnalysis", "\n#### STARTING AN ALIEN SHELL FOR YOU. EXIT WHEN YOUR JOB %s HAS FINISHED. #### \
3517 \n You may exit at any time and terminate the job later using the option <terminate> \
3518 \n ##################################################################################", jobID.Data());
3519 gSystem->Exec("aliensh");
3521 Info("StartAnalysis", "\n#### SUBMITTED JOB %s TO ALIEN QUEUE #### \
3522 \n Remember to terminate the job later using the option <terminate> \
3523 \n ##################################################################################", jobID.Data());
3528 //______________________________________________________________________________
3529 const char *AliAnalysisAlien::GetListOfFiles(const char *type)
3531 // Get a comma-separated list of output files of the requested type.
3532 // Type can be (case unsensitive):
3533 // aod - list of aod files (std, extensions and filters)
3534 // out - list of output files connected to containers (but not aod's or extras)
3535 // ext - list of extra files registered to the manager
3536 // ter - list of files produced in terminate
3537 static TString files;
3539 TString stype = type;
3541 TString aodfiles, extra;
3542 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
3544 ::Error("GetListOfFiles", "Cannot call this without analysis manager");
3545 return files.Data();
3547 if (mgr->GetOutputEventHandler()) {
3548 aodfiles = mgr->GetOutputEventHandler()->GetOutputFileName();
3549 TString extraaod = mgr->GetOutputEventHandler()->GetExtraOutputs();
3550 if (!extraaod.IsNull()) {
3552 aodfiles += extraaod;
3555 if (stype.Contains("aod")) {
3557 if (stype == "aod") return files.Data();
3559 // Add output files that are not in the list of AOD files
3560 TString outputfiles = "";
3561 TIter next(mgr->GetOutputs());
3562 AliAnalysisDataContainer *output;
3563 const char *filename = 0;
3564 while ((output=(AliAnalysisDataContainer*)next())) {
3565 filename = output->GetFileName();
3566 if (!(strcmp(filename, "default"))) continue;
3567 if (outputfiles.Contains(filename)) continue;
3568 if (aodfiles.Contains(filename)) continue;
3569 if (!outputfiles.IsNull()) outputfiles += ",";
3570 outputfiles += filename;
3572 if (stype.Contains("out")) {
3573 if (!files.IsNull()) files += ",";
3574 files += outputfiles;
3575 if (stype == "out") return files.Data();
3577 // Add extra files registered to the analysis manager
3579 extra = mgr->GetExtraFiles();
3580 if (!extra.IsNull()) {
3582 extra.ReplaceAll(" ", ",");
3583 TObjArray *fextra = extra.Tokenize(",");
3584 TIter nextx(fextra);
3586 while ((obj=nextx())) {
3587 if (aodfiles.Contains(obj->GetName())) continue;
3588 if (outputfiles.Contains(obj->GetName())) continue;
3589 if (sextra.Contains(obj->GetName())) continue;
3590 if (!sextra.IsNull()) sextra += ",";
3591 sextra += obj->GetName();
3594 if (stype.Contains("ext")) {
3595 if (!files.IsNull()) files += ",";
3599 if (stype == "ext") return files.Data();
3601 if (!fTerminateFiles.IsNull()) {
3602 fTerminateFiles.Strip();
3603 fTerminateFiles.ReplaceAll(" ",",");
3604 TObjArray *fextra = fTerminateFiles.Tokenize(",");
3605 TIter nextx(fextra);
3607 while ((obj=nextx())) {
3608 if (aodfiles.Contains(obj->GetName())) continue;
3609 if (outputfiles.Contains(obj->GetName())) continue;
3610 if (termfiles.Contains(obj->GetName())) continue;
3611 if (sextra.Contains(obj->GetName())) continue;
3612 if (!termfiles.IsNull()) termfiles += ",";
3613 termfiles += obj->GetName();
3617 if (stype.Contains("ter")) {
3618 if (!files.IsNull() && !termfiles.IsNull()) {
3623 return files.Data();
3626 //______________________________________________________________________________
3627 Bool_t AliAnalysisAlien::Submit()
3629 // Submit all master jobs.
3630 Int_t nmasterjobs = fInputFiles->GetEntries();
3631 Long_t tshoot = gSystem->Now();
3632 if (!fNsubmitted && !SubmitNext()) return kFALSE;
3633 while (fNsubmitted < nmasterjobs) {
3634 Long_t now = gSystem->Now();
3635 if ((now-tshoot)>30000) {
3637 if (!SubmitNext()) return kFALSE;
3643 //______________________________________________________________________________
3644 Bool_t AliAnalysisAlien::SubmitMerging()
3646 // Submit all merging jobs.
3647 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
3648 gGrid->Cd(fGridOutputDir);
3649 TString mergeJDLName = fExecutable;
3650 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
3652 Error("SubmitMerging", "You have to use explicit run numbers or run range to merge via JDL!");
3655 Int_t ntosubmit = fInputFiles->GetEntries();
3656 for (Int_t i=0; i<ntosubmit; i++) {
3657 TString runOutDir = gSystem->BaseName(fInputFiles->At(i)->GetName());
3658 runOutDir.ReplaceAll(".xml", "");
3659 if (fOutputToRunNo) {
3660 // The output directory is the run number
3661 printf("### Submitting merging job for run <%s>\n", runOutDir.Data());
3662 runOutDir = Form("%s/%s", fGridOutputDir.Data(), runOutDir.Data());
3664 if (!fRunNumbers.Length() && !fRunRange[0]) {
3665 // The output directory is the grid outdir
3666 printf("### Submitting merging job for the full output directory %s.\n", fGridOutputDir.Data());
3667 runOutDir = fGridOutputDir;
3669 // The output directory is the master number in 3 digits format
3670 printf("### Submitting merging job for master <%03d>\n", i);
3671 runOutDir = Form("%s/%03d",fGridOutputDir.Data(), i);
3674 // Check now the number of merging stages.
3675 TObjArray *list = fOutputFiles.Tokenize(",");
3679 while((str=(TObjString*)next())) {
3680 outputFile = str->GetString();
3681 Int_t index = outputFile.Index("@");
3682 if (index > 0) outputFile.Remove(index);
3683 if (!fMergeExcludes.Contains(outputFile) &&
3684 !fRegisterExcludes.Contains(outputFile)) break;
3687 Bool_t done = CheckMergedFiles(outputFile, runOutDir, fMaxMergeFiles, mergeJDLName);
3688 if (!done && (i==ntosubmit-1)) return kFALSE;
3689 if (!fRunNumbers.Length() && !fRunRange[0]) break;
3691 if (!ntosubmit) return kTRUE;
3693 Info("StartAnalysis", "\n #### STARTING AN ALIEN SHELL FOR YOU. You can exit any time or inspect your jobs in a different shell.##########\
3694 \n Make sure your jobs are in a final state (you can resubmit failed ones via 'masterjob <id> resubmit ERROR_ALL')\
3695 \n Rerun in 'terminate' mode to submit all merging stages, each AFTER the previous one completed. The final merged \
3696 \n output will be written to your alien output directory, while separate stages in <Stage_n>. \
3697 \n ################################################################################################################");
3698 gSystem->Exec("aliensh");
3700 Info("StartAnalysis", "\n #### STARTED MERGING JOBS FOR YOU #### \
3701 \n Make sure your jobs are in a final state (you can resubmit failed ones via 'masterjob <id> resubmit ERROR_ALL') \
3702 \n Rerun in 'terminate' mode to submit all merging stages, each AFTER the previous one completed. The final merged \
3703 \n output will be written to your alien output directory, while separate stages in <Stage_n>. \
3704 \n ################################################################################################################");
3709 //______________________________________________________________________________
3710 Bool_t AliAnalysisAlien::SubmitNext()
3712 // Submit next bunch of master jobs if the queue is free. The first master job is
3713 // submitted right away, while the next will not be unless the previous was split.
3714 // The plugin will not submit new master jobs if there are more that 500 jobs in
3716 static Bool_t iscalled = kFALSE;
3717 static Int_t firstmaster = 0;
3718 static Int_t lastmaster = 0;
3719 static Int_t npermaster = 0;
3720 if (iscalled) return kTRUE;
3722 Int_t nrunning=0, nwaiting=0, nerror=0, ndone=0;
3723 Int_t ntosubmit = 0;
3726 Int_t nmasterjobs = fInputFiles->GetEntries();
3729 if (!IsUseSubmitPolicy()) {
3731 Info("SubmitNext","### Warning submit policy not used ! Submitting too many jobs at a time may be prohibitted. \
3732 \n### You can use SetUseSubmitPolicy() to enable if you have problems.");
3733 ntosubmit = nmasterjobs;
3736 TString status = GetJobStatus(firstmaster, lastmaster, nrunning, nwaiting, nerror, ndone);
3737 printf("=== master %d: %s\n", lastmaster, status.Data());
3738 // If last master not split, just return
3739 if (status != "SPLIT") {iscalled = kFALSE; return kTRUE;}
3740 // No more than 100 waiting jobs
3741 if (nwaiting>500) {iscalled = kFALSE; return kTRUE;}
3742 npermaster = (nrunning+nwaiting+nerror+ndone)/fNsubmitted;
3743 if (npermaster) ntosubmit = (500-nwaiting)/npermaster;
3744 if (!ntosubmit) ntosubmit = 1;
3745 printf("=== WAITING(%d) RUNNING(%d) DONE(%d) OTHER(%d) NperMaster=%d => to submit %d jobs\n",
3746 nwaiting, nrunning, ndone, nerror, npermaster, ntosubmit);
3748 for (Int_t i=0; i<ntosubmit; i++) {
3749 // Submit for a range of enumeration of runs.
3750 if (fNsubmitted>=nmasterjobs) {iscalled = kFALSE; return kTRUE;}
3752 TString runOutDir = gSystem->BaseName(fInputFiles->At(fNsubmitted)->GetName());
3753 runOutDir.ReplaceAll(".xml", "");
3755 query = Form("submit %s %s %s", fJDLName.Data(), fInputFiles->At(fNsubmitted)->GetName(), runOutDir.Data());
3757 query = Form("submit %s %s %03d", fJDLName.Data(), fInputFiles->At(fNsubmitted)->GetName(), fNsubmitted);
3758 printf("********* %s\n",query.Data());
3759 res = gGrid->Command(query);
3761 TString cjobId1 = res->GetKey(0,"jobId");
3762 if (!cjobId1.Length()) {
3766 Error("StartAnalysis", "Your JDL %s could not be submitted. The message was:", fJDLName.Data());
3769 Info("StartAnalysis", "\n_______________________________________________________________________ \
3770 \n##### Your JDL %s submitted (%d to go). \nTHE JOB ID IS: %s \
3771 \n_______________________________________________________________________",
3772 fJDLName.Data(), nmasterjobs-fNsubmitted-1, cjobId1.Data());
3773 if (!fGridJobIDs.IsNull()) fGridJobIDs.Append(" ");
3774 fGridJobIDs.Append(cjobId1);
3775 if (!fGridStages.IsNull()) fGridStages.Append(" ");
3776 fGridStages.Append("full");
3779 lastmaster = cjobId1.Atoi();
3780 if (!firstmaster) firstmaster = lastmaster;
3785 Error("StartAnalysis", "No grid result after submission !!! Bailing out...");
3793 //______________________________________________________________________________
3794 void AliAnalysisAlien::WriteAnalysisFile()
3796 // Write current analysis manager into the file <analysisFile>
3797 TString analysisFile = fExecutable;
3798 analysisFile.ReplaceAll(".sh", ".root");
3799 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3800 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
3801 if (!mgr || !mgr->IsInitialized()) {
3802 Error("WriteAnalysisFile", "You need an initialized analysis manager for this");
3805 // Check analysis type
3807 if (mgr->GetMCtruthEventHandler()) TObject::SetBit(AliAnalysisGrid::kUseMC);
3808 handler = (TObject*)mgr->GetInputEventHandler();
3810 if (handler->InheritsFrom("AliMultiInputEventHandler")) {
3811 AliMultiInputEventHandler *multiIH = (AliMultiInputEventHandler*)handler;
3812 if (multiIH->GetFirstInputEventHandler()->InheritsFrom("AliESDInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseESD);
3813 if (multiIH->GetFirstInputEventHandler()->InheritsFrom("AliAODInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseAOD);
3815 if (handler->InheritsFrom("AliESDInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseESD);
3816 if (handler->InheritsFrom("AliAODInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseAOD);
3819 TDirectory *cdir = gDirectory;
3820 TFile *file = TFile::Open(analysisFile, "RECREATE");
3822 // Skip task Terminate calls for the grid job (but not in test mode, where we want to check also the terminate mode
3823 if (!TestBit(AliAnalysisGrid::kTest)) mgr->SetSkipTerminate(kTRUE);
3824 // Unless merging makes no sense
3825 if (IsSingleOutput()) mgr->SetSkipTerminate(kFALSE);
3828 // Enable termination for local jobs
3829 mgr->SetSkipTerminate(kFALSE);
3831 if (cdir) cdir->cd();
3832 Info("WriteAnalysisFile", "\n##### Analysis manager: %s wrote to file <%s>\n", mgr->GetName(),analysisFile.Data());
3834 Bool_t copy = kTRUE;
3835 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3838 TString workdir = gGrid->GetHomeDirectory();
3839 workdir += fGridWorkingDir;
3840 Info("WriteAnalysisFile", "\n##### Copying file <%s> containing your initialized analysis manager to your alien workspace", analysisFile.Data());
3841 if (FileExists(analysisFile)) gGrid->Rm(analysisFile);
3842 if (!copyLocal2Alien("WriteAnalysisFile",analysisFile.Data(),
3843 Form("%s/%s", workdir.Data(),analysisFile.Data()))) Fatal("","Terminating");
3847 //______________________________________________________________________________
3848 void AliAnalysisAlien::WriteAnalysisMacro()
3850 // Write the analysis macro that will steer the analysis in grid mode.
3851 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3853 out.open(fAnalysisMacro.Data(), ios::out);
3855 Error("WriteAnalysisMacro", "could not open file %s for writing", fAnalysisMacro.Data());
3858 Bool_t hasSTEERBase = kFALSE;
3859 Bool_t hasESD = kFALSE;
3860 Bool_t hasAOD = kFALSE;
3861 Bool_t hasANALYSIS = kFALSE;
3862 Bool_t hasOADB = kFALSE;
3863 Bool_t hasANALYSISalice = kFALSE;
3864 Bool_t hasCORRFW = kFALSE;
3865 TString func = fAnalysisMacro;
3866 TString type = "ESD";
3867 TString comment = "// Analysis using ";
3868 if (IsUseMCchain()) {
3872 if (TObject::TestBit(AliAnalysisGrid::kUseESD)) comment += "ESD";
3873 if (TObject::TestBit(AliAnalysisGrid::kUseAOD)) {
3878 if (type!="AOD" && fFriendChainName!="") {
3879 Error("WriteAnalysisMacro", "Friend chain can be attached only to AOD");
3882 if (TObject::TestBit(AliAnalysisGrid::kUseMC)) comment += "/MC";
3883 else comment += " data";
3884 out << "const char *anatype = \"" << type.Data() << "\";" << endl << endl;
3885 func.ReplaceAll(".C", "");
3886 out << "void " << func.Data() << "()" << endl;
3888 out << comment.Data() << endl;
3889 out << "// Automatically generated analysis steering macro executed in grid subjobs" << endl << endl;
3890 out << " TStopwatch timer;" << endl;
3891 out << " timer.Start();" << endl << endl;
3892 // Change temp directory to current one
3893 if (!IsLocalTest()) {
3894 out << "// connect to AliEn and make the chain" << endl;
3895 out << " if (!TGrid::Connect(\"alien://\")) return;" << endl;
3897 out << "// Set temporary merging directory to current one" << endl;
3898 out << " gSystem->Setenv(\"TMPDIR\", gSystem->pwd());" << endl << endl;
3899 out << "// Set temporary compilation directory to current one" << endl;
3900 out << " gSystem->SetBuildDir(gSystem->pwd(), kTRUE);" << endl << endl;
3901 // Reset existing include path
3902 out << "// Reset existing include path and add current directory first in the search" << endl;
3903 out << " gSystem->SetIncludePath(\"-I.\");" << endl;
3904 if (!fExecutableCommand.Contains("aliroot")) {
3905 out << "// load base root libraries" << endl;
3906 out << " gSystem->Load(\"libTree\");" << endl;
3907 out << " gSystem->Load(\"libGeom\");" << endl;
3908 out << " gSystem->Load(\"libVMC\");" << endl;
3909 out << " gSystem->Load(\"libPhysics\");" << endl << endl;
3910 out << " gSystem->Load(\"libMinuit\");" << endl << endl;
3912 if (fAdditionalRootLibs.Length()) {
3913 // in principle libtree /lib geom libvmc etc. can go into this list, too
3914 out << "// Add aditional libraries" << endl;
3915 TObjArray *list = fAdditionalRootLibs.Tokenize(" ");
3918 while((str=(TObjString*)next())) {
3919 if (str->GetString().Contains(".so"))
3920 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
3922 if (list) delete list;
3924 out << "// Load analysis framework libraries" << endl;
3925 TString setupPar = "AliAnalysisAlien::SetupPar";
3927 if (!fExecutableCommand.Contains("aliroot")) {
3928 out << " gSystem->Load(\"libSTEERBase\");" << endl;
3929 out << " gSystem->Load(\"libESD\");" << endl;
3930 out << " gSystem->Load(\"libAOD\");" << endl;
3932 out << " gSystem->Load(\"libANALYSIS\");" << endl;
3933 out << " gSystem->Load(\"libOADB\");" << endl;
3934 out << " gSystem->Load(\"libANALYSISalice\");" << endl;
3935 out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
3937 TIter next(fPackages);
3940 while ((obj=next())) {
3941 pkgname = obj->GetName();
3942 if (pkgname == "STEERBase" ||
3943 pkgname == "STEERBase.par") hasSTEERBase = kTRUE;
3944 if (pkgname == "ESD" ||
3945 pkgname == "ESD.par") hasESD = kTRUE;
3946 if (pkgname == "AOD" ||
3947 pkgname == "AOD.par") hasAOD = kTRUE;
3948 if (pkgname == "ANALYSIS" ||
3949 pkgname == "ANALYSIS.par") hasANALYSIS = kTRUE;
3950 if (pkgname == "OADB" ||
3951 pkgname == "OADB.par") hasOADB = kTRUE;
3952 if (pkgname == "ANALYSISalice" ||
3953 pkgname == "ANALYSISalice.par") hasANALYSISalice = kTRUE;
3954 if (pkgname == "CORRFW" ||
3955 pkgname == "CORRFW.par") hasCORRFW = kTRUE;
3957 if (hasANALYSISalice) setupPar = "SetupPar";
3958 if (!hasSTEERBase) out << " gSystem->Load(\"libSTEERBase\");" << endl;
3959 else out << " if (!" << setupPar << "(\"STEERBase\")) return;" << endl;
3960 if (!hasESD) out << " gSystem->Load(\"libESD\");" << endl;
3961 else out << " if (!" << setupPar << "(\"ESD\")) return;" << endl;
3962 if (!hasAOD) out << " gSystem->Load(\"libAOD\");" << endl;
3963 else out << " if (!" << setupPar << "(\"AOD\")) return;" << endl;
3964 if (!hasANALYSIS) out << " gSystem->Load(\"libANALYSIS\");" << endl;
3965 else out << " if (!" << setupPar << "(\"ANALYSIS\")) return;" << endl;
3966 if (!hasOADB) out << " gSystem->Load(\"libOADB\");" << endl;
3967 else out << " if (!" << setupPar << "(\"OADB\")) return;" << endl;
3968 if (!hasANALYSISalice) out << " gSystem->Load(\"libANALYSISalice\");" << endl;
3969 else out << " if (!" << setupPar << "(\"ANALYSISalice\")) return;" << endl;
3970 if (!hasCORRFW) out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
3971 else out << " if (!" << setupPar << "(\"CORRFW\")) return;" << endl << endl;
3972 out << "// Compile other par packages" << endl;
3974 while ((obj=next())) {
3975 pkgname = obj->GetName();
3976 if (pkgname == "STEERBase" ||
3977 pkgname == "STEERBase.par" ||
3979 pkgname == "ESD.par" ||
3981 pkgname == "AOD.par" ||
3982 pkgname == "ANALYSIS" ||
3983 pkgname == "ANALYSIS.par" ||
3984 pkgname == "OADB" ||
3985 pkgname == "OADB.par" ||
3986 pkgname == "ANALYSISalice" ||
3987 pkgname == "ANALYSISalice.par" ||
3988 pkgname == "CORRFW" ||
3989 pkgname == "CORRFW.par") continue;
3990 out << " if (!" << setupPar << "(\"" << obj->GetName() << "\")) return;" << endl;
3993 out << "// include path" << endl;
3994 // Get the include path from the interpreter and remove entries pointing to AliRoot
3995 out << " TString intPath = gInterpreter->GetIncludePath();" << endl;
3996 out << " TObjArray *listpaths = intPath.Tokenize(\" \");" << endl;
3997 out << " TIter nextpath(listpaths);" << endl;
3998 out << " TObjString *pname;" << endl;
3999 out << " while ((pname=(TObjString*)nextpath())) {" << endl;
4000 out << " TString current = pname->GetName();" << endl;
4001 out << " if (current.Contains(\"AliRoot\") || current.Contains(\"ALICE_ROOT\")) continue;" << endl;
4002 out << " gSystem->AddIncludePath(current);" << endl;
4003 out << " }" << endl;
4004 out << " if (listpaths) delete listpaths;" << endl;
4005 if (fIncludePath.Length()) out << " gSystem->AddIncludePath(\"" << fIncludePath.Data() << "\");" << endl;
4006 out << " gROOT->ProcessLine(\".include $ALICE_ROOT/include\");" << endl;
4007 out << " printf(\"Include path: %s\\n\", gSystem->GetIncludePath());" << endl << endl;
4008 if (fAdditionalLibs.Length()) {
4009 out << "// Add aditional AliRoot libraries" << endl;
4010 TObjArray *list = fAdditionalLibs.Tokenize(" ");
4013 while((str=(TObjString*)next())) {
4014 if (str->GetString().Contains(".so"))
4015 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
4016 if (str->GetString().Contains(".par"))
4017 out << " if (!" << setupPar << "(\"" << str->GetString() << "\")) return;" << endl;
4019 if (list) delete list;
4022 out << "// analysis source to be compiled at runtime (if any)" << endl;
4023 if (fAnalysisSource.Length()) {
4024 TObjArray *list = fAnalysisSource.Tokenize(" ");
4027 while((str=(TObjString*)next())) {
4028 out << " gROOT->ProcessLine(\".L " << str->GetString().Data() << "+g\");" << endl;
4030 if (list) delete list;
4033 // out << " printf(\"Currently load libraries:\\n\");" << endl;
4034 // out << " printf(\"%s\\n\", gSystem->GetLibraries());" << endl;
4035 if (fFastReadOption) {
4036 Warning("WriteAnalysisMacro", "!!! You requested FastRead option. Using xrootd flags to reduce timeouts in the grid jobs. This may skip some files that could be accessed !!! \
4037 \n+++ NOTE: To disable this option, use: plugin->SetFastReadOption(kFALSE)");
4038 out << "// fast xrootd reading enabled" << endl;
4039 out << " printf(\"!!! You requested FastRead option. Using xrootd flags to reduce timeouts. Note that this may skip some files that could be accessed !!!\");" << endl;
4040 out << " gEnv->SetValue(\"XNet.ConnectTimeout\",50);" << endl;
4041 out << " gEnv->SetValue(\"XNet.RequestTimeout\",50);" << endl;
4042 out << " gEnv->SetValue(\"XNet.MaxRedirectCount\",2);" << endl;
4043 out << " gEnv->SetValue(\"XNet.ReconnectTimeout\",50);" << endl;
4044 out << " gEnv->SetValue(\"XNet.FirstConnectMaxCnt\",1);" << endl << endl;
4046 out << "// read the analysis manager from file" << endl;
4047 TString analysisFile = fExecutable;
4048 analysisFile.ReplaceAll(".sh", ".root");
4049 out << " AliAnalysisManager *mgr = AliAnalysisAlien::LoadAnalysisManager(\""
4050 << analysisFile << "\");" << endl;
4051 out << " if (!mgr) return;" << endl;
4052 if (IsLocalTest()) {
4053 out << " AliAnalysisAlien *plugin = new AliAnalysisAlien();" << endl;
4054 out << " plugin->SetRunMode(\"test\");" << endl;
4055 if (fFileForTestMode.IsNull())
4056 out << " plugin->SetFileForTestMode(\"data.txt\");" << endl;
4058 out << " plugin->SetFileForTestMode(\"" << fFileForTestMode << "\");" << endl;
4059 out << " plugin->SetNtestFiles(" << fNtestFiles << ");" << endl;
4060 if (!fFriendChainName.IsNull())
4061 out << " plugin->SetFriendChainName(\"" << fFriendChainName << "\");" << endl;
4063 out << " plugin->SetUseMCchain();" << endl;
4064 out << " mgr->SetGridHandler(plugin);" << endl;
4065 if (AliAnalysisManager::GetAnalysisManager()) {
4066 out << " mgr->SetDebugLevel(" << AliAnalysisManager::GetAnalysisManager()->GetDebugLevel() << ");" << endl;
4067 out << " mgr->SetNSysInfo(" << AliAnalysisManager::GetAnalysisManager()->GetNsysInfo() << ");" << endl;
4069 out << " mgr->SetDebugLevel(10);" << endl;
4070 out << " mgr->SetNSysInfo(100);" << endl;
4073 out << " mgr->PrintStatus();" << endl;
4074 if (AliAnalysisManager::GetAnalysisManager()) {
4075 if (AliAnalysisManager::GetAnalysisManager()->GetDebugLevel()>3) {
4076 out << " gEnv->SetValue(\"XNet.Debug\", \"1\");" << endl;
4078 if (TestBit(AliAnalysisGrid::kTest))
4079 out << " AliLog::SetGlobalLogLevel(AliLog::kWarning);" << endl;
4081 out << " AliLog::SetGlobalLogLevel(AliLog::kError);" << endl;
4084 if (!IsLocalTest()) {
4085 out << " TChain *chain = CreateChain(\"wn.xml\", anatype);" << endl << endl;
4086 out << " mgr->StartAnalysis(\"localfile\", chain);" << endl;
4088 out << " mgr->StartAnalysis(\"localfile\");" << endl;
4090 out << " timer.Stop();" << endl;
4091 out << " timer.Print();" << endl;
4092 out << "}" << endl << endl;
4093 if (!IsLocalTest()) {
4094 out <<"//________________________________________________________________________________" << endl;
4095 out << "TChain* CreateChain(const char *xmlfile, const char *type=\"ESD\")" << endl;
4097 out << "// Create a chain using url's from xml file" << endl;
4098 out << " TString filename;" << endl;
4099 out << " Int_t run = 0;" << endl;
4100 if (IsUseMCchain()) {
4101 out << " TString treename = \"TE\";" << endl;
4103 out << " TString treename = type;" << endl;
4104 out << " treename.ToLower();" << endl;
4105 out << " treename += \"Tree\";" << endl;
4107 out << " printf(\"***************************************\\n\");" << endl;
4108 out << " printf(\" Getting chain of trees %s\\n\", treename.Data());" << endl;
4109 out << " printf(\"***************************************\\n\");" << endl;
4110 out << " TAlienCollection *coll = TAlienCollection::Open(xmlfile);" << endl;
4111 out << " if (!coll) {" << endl;
4112 out << " ::Error(\"CreateChain\", \"Cannot create an AliEn collection from %s\", xmlfile);" << endl;
4113 out << " return NULL;" << endl;
4114 out << " }" << endl;
4115 out << " AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();" << endl;
4116 out << " TChain *chain = new TChain(treename);" << endl;
4117 if(fFriendChainName!="") {
4118 out << " TChain *chainFriend = new TChain(treename);" << endl;
4120 out << " coll->Reset();" << endl;
4121 out << " while (coll->Next()) {" << endl;
4122 out << " filename = coll->GetTURL("");" << endl;
4123 out << " if (mgr) {" << endl;
4124 out << " Int_t nrun = AliAnalysisManager::GetRunFromAlienPath(filename);" << endl;
4125 out << " if (nrun && nrun != run) {" << endl;
4126 out << " printf(\"### Run number detected from chain: %d\\n\", nrun);" << endl;
4127 out << " mgr->SetRunFromPath(nrun);" << endl;
4128 out << " run = nrun;" << endl;
4129 out << " }" << endl;
4130 out << " }" << endl;
4131 out << " chain->Add(filename);" << endl;
4132 if(fFriendChainName!="") {
4133 out << " TString fileFriend=coll->GetTURL(\"\");" << endl;
4134 out << " if (fileFriend.Index(\"#\") > -1) fileFriend.Remove(fileFriend.Index(\"#\"));" << endl;
4135 out << " fileFriend = gSystem->DirName(fileFriend);" << endl;
4136 out << " fileFriend += \"/\";" << endl;
4137 out << " fileFriend += \"" << fFriendChainName << "\";";
4138 out << " TFile *file = TFile::Open(fileFriend);" << endl;
4139 out << " if (file) {" << endl;
4140 out << " file->Close();" << endl;
4141 out << " chainFriend->Add(fileFriend.Data());" << endl;
4142 out << " } else {" << endl;
4143 out << " ::Fatal(\"CreateChain\", \"Cannot open friend file: %s\", fileFriend.Data());" << endl;
4144 out << " return 0;" << endl;
4145 out << " }" << endl;
4147 out << " }" << endl;
4148 out << " if (!chain->GetNtrees()) {" << endl;
4149 out << " ::Error(\"CreateChain\", \"No tree found from collection %s\", xmlfile);" << endl;
4150 out << " return NULL;" << endl;
4151 out << " }" << endl;
4152 if(fFriendChainName!="") {
4153 out << " chain->AddFriend(chainFriend);" << endl;
4155 out << " return chain;" << endl;
4156 out << "}" << endl << endl;
4158 if (hasANALYSISalice) {
4159 out <<"//________________________________________________________________________________" << endl;
4160 out << "Bool_t SetupPar(const char *package) {" << endl;
4161 out << "// Compile the package and set it up." << endl;
4162 out << " TString pkgdir = package;" << endl;
4163 out << " pkgdir.ReplaceAll(\".par\",\"\");" << endl;
4164 out << " gSystem->Exec(TString::Format(\"tar xvzf %s.par\", pkgdir.Data()));" << endl;
4165 out << " TString cdir = gSystem->WorkingDirectory();" << endl;
4166 out << " gSystem->ChangeDirectory(pkgdir);" << endl;
4167 out << " // Check for BUILD.sh and execute" << endl;
4168 out << " if (!gSystem->AccessPathName(\"PROOF-INF/BUILD.sh\")) {" << endl;
4169 out << " printf(\"*******************************\\n\");" << endl;
4170 out << " printf(\"*** Building PAR archive ***\\n\");" << endl;
4171 out << " printf(\"*******************************\\n\");" << endl;
4172 out << " if (gSystem->Exec(\"PROOF-INF/BUILD.sh\")) {" << endl;
4173 out << " ::Error(\"SetupPar\", \"Cannot build par archive %s\", pkgdir.Data());" << endl;
4174 out << " gSystem->ChangeDirectory(cdir);" << endl;
4175 out << " return kFALSE;" << endl;
4176 out << " }" << endl;
4177 out << " } else {" << endl;
4178 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/BUILD.sh for package %s\", pkgdir.Data());" << endl;
4179 out << " gSystem->ChangeDirectory(cdir);" << endl;
4180 out << " return kFALSE;" << endl;
4181 out << " }" << endl;
4182 out << " // Check for SETUP.C and execute" << endl;
4183 out << " if (!gSystem->AccessPathName(\"PROOF-INF/SETUP.C\")) {" << endl;
4184 out << " printf(\"*******************************\\n\");" << endl;
4185 out << " printf(\"*** Setup PAR archive ***\\n\");" << endl;
4186 out << " printf(\"*******************************\\n\");" << endl;
4187 out << " gROOT->Macro(\"PROOF-INF/SETUP.C\");" << endl;
4188 out << " } else {" << endl;
4189 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/SETUP.C for package %s\", pkgdir.Data());" << endl;
4190 out << " gSystem->ChangeDirectory(cdir);" << endl;
4191 out << " return kFALSE;" << endl;
4192 out << " }" << endl;
4193 out << " // Restore original workdir" << endl;
4194 out << " gSystem->ChangeDirectory(cdir);" << endl;
4195 out << " return kTRUE;" << endl;
4198 Info("WriteAnalysisMacro", "\n##### Analysis macro to run on worker nodes <%s> written",fAnalysisMacro.Data());
4200 Bool_t copy = kTRUE;
4201 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
4204 TString workdir = gGrid->GetHomeDirectory();
4205 workdir += fGridWorkingDir;
4206 if (FileExists(fAnalysisMacro)) gGrid->Rm(fAnalysisMacro);
4207 Info("WriteAnalysisMacro", "\n##### Copying analysis macro: <%s> to your alien workspace", fAnalysisMacro.Data());
4208 // TFile::Cp(Form("file:%s",fAnalysisMacro.Data()), Form("alien://%s/%s", workdir.Data(), fAnalysisMacro.Data()));
4209 if (!copyLocal2Alien("WriteAnalysisMacro",fAnalysisMacro.Data(),
4210 Form("alien://%s/%s", workdir.Data(),
4211 fAnalysisMacro.Data()))) Fatal("","Terminating");
4215 //______________________________________________________________________________
4216 void AliAnalysisAlien::WriteMergingMacro()
4218 // Write a macro to merge the outputs per master job.
4219 if (!fMergeViaJDL) return;
4220 if (!fOutputFiles.Length()) {
4221 Error("WriteMergingMacro", "No output file names defined. Are you running the right AliAnalysisAlien configuration ?");
4224 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
4225 TString mergingMacro = fExecutable;
4226 mergingMacro.ReplaceAll(".sh","_merge.C");
4227 if (gGrid && !fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
4228 if (!TestBit(AliAnalysisGrid::kSubmit)) {
4230 out.open(mergingMacro.Data(), ios::out);
4232 Error("WriteMergingMacro", "could not open file %s for writing", fAnalysisMacro.Data());
4235 Bool_t hasSTEERBase = kFALSE;
4236 Bool_t hasESD = kFALSE;
4237 Bool_t hasAOD = kFALSE;
4238 Bool_t hasANALYSIS = kFALSE;
4239 Bool_t hasOADB = kFALSE;
4240 Bool_t hasANALYSISalice = kFALSE;
4241 Bool_t hasCORRFW = kFALSE;
4242 TString func = mergingMacro;
4244 func.ReplaceAll(".C", "");
4245 out << "void " << func.Data() << "(const char *dir, Int_t stage=0)" << endl;
4247 out << "// Automatically generated merging macro executed in grid subjobs" << endl << endl;
4248 out << " TStopwatch timer;" << endl;
4249 out << " timer.Start();" << endl << endl;
4250 // Reset existing include path
4251 out << "// Reset existing include path and add current directory first in the search" << endl;
4252 out << " gSystem->SetIncludePath(\"-I.\");" << endl;
4253 if (!fExecutableCommand.Contains("aliroot")) {
4254 out << "// load base root libraries" << endl;
4255 out << " gSystem->Load(\"libTree\");" << endl;
4256 out << " gSystem->Load(\"libGeom\");" << endl;
4257 out << " gSystem->Load(\"libVMC\");" << endl;
4258 out << " gSystem->Load(\"libPhysics\");" << endl << endl;
4259 out << " gSystem->Load(\"libMinuit\");" << endl << endl;
4261 if (fAdditionalRootLibs.Length()) {
4262 // in principle libtree /lib geom libvmc etc. can go into this list, too
4263 out << "// Add aditional libraries" << endl;
4264 TObjArray *list = fAdditionalRootLibs.Tokenize(" ");
4267 while((str=(TObjString*)next())) {
4268 if (str->GetString().Contains(".so"))
4269 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
4271 if (list) delete list;
4273 out << "// Load analysis framework libraries" << endl;
4275 if (!fExecutableCommand.Contains("aliroot")) {
4276 out << " gSystem->Load(\"libSTEERBase\");" << endl;
4277 out << " gSystem->Load(\"libESD\");" << endl;
4278 out << " gSystem->Load(\"libAOD\");" << endl;
4280 out << " gSystem->Load(\"libANALYSIS\");" << endl;
4281 out << " gSystem->Load(\"libOADB\");" << endl;
4282 out << " gSystem->Load(\"libANALYSISalice\");" << endl;
4283 out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
4285 TIter next(fPackages);
4288 TString setupPar = "AliAnalysisAlien::SetupPar";
4289 while ((obj=next())) {
4290 pkgname = obj->GetName();
4291 if (pkgname == "STEERBase" ||
4292 pkgname == "STEERBase.par") hasSTEERBase = kTRUE;
4293 if (pkgname == "ESD" ||
4294 pkgname == "ESD.par") hasESD = kTRUE;
4295 if (pkgname == "AOD" ||
4296 pkgname == "AOD.par") hasAOD = kTRUE;
4297 if (pkgname == "ANALYSIS" ||
4298 pkgname == "ANALYSIS.par") hasANALYSIS = kTRUE;
4299 if (pkgname == "OADB" ||
4300 pkgname == "OADB.par") hasOADB = kTRUE;
4301 if (pkgname == "ANALYSISalice" ||
4302 pkgname == "ANALYSISalice.par") hasANALYSISalice = kTRUE;
4303 if (pkgname == "CORRFW" ||
4304 pkgname == "CORRFW.par") hasCORRFW = kTRUE;
4306 if (hasANALYSISalice) setupPar = "SetupPar";
4307 if (!hasSTEERBase) out << " gSystem->Load(\"libSTEERBase\");" << endl;
4308 else out << " if (!" << setupPar << "(\"STEERBase\")) return;" << endl;
4309 if (!hasESD) out << " gSystem->Load(\"libESD\");" << endl;
4310 else out << " if (!" << setupPar << "(\"ESD\")) return;" << endl;
4311 if (!hasAOD) out << " gSystem->Load(\"libAOD\");" << endl;
4312 else out << " if (!" << setupPar << "(\"AOD\")) return;" << endl;
4313 out << " gSystem->Load(\"libOADB\");" << endl;
4314 if (!hasANALYSIS) out << " gSystem->Load(\"libANALYSIS\");" << endl;
4315 else out << " if (!" << setupPar << "(\"ANALYSIS\")) return;" << endl;
4316 if (!hasOADB) out << " gSystem->Load(\"libOADB\");" << endl;
4317 else out << " if (!" << setupPar << "(\"OADB\")) return;" << endl;
4318 if (!hasANALYSISalice) out << " gSystem->Load(\"libANALYSISalice\");" << endl;
4319 else out << " if (!" << setupPar << "(\"ANALYSISalice\")) return;" << endl;
4320 if (!hasCORRFW) out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
4321 else out << " if (!" << setupPar << "(\"CORRFW\")) return;" << endl << endl;
4322 out << "// Compile other par packages" << endl;
4324 while ((obj=next())) {
4325 pkgname = obj->GetName();
4326 if (pkgname == "STEERBase" ||
4327 pkgname == "STEERBase.par" ||
4329 pkgname == "ESD.par" ||
4331 pkgname == "AOD.par" ||
4332 pkgname == "ANALYSIS" ||
4333 pkgname == "ANALYSIS.par" ||
4334 pkgname == "OADB" ||
4335 pkgname == "OADB.par" ||
4336 pkgname == "ANALYSISalice" ||
4337 pkgname == "ANALYSISalice.par" ||
4338 pkgname == "CORRFW" ||
4339 pkgname == "CORRFW.par") continue;
4340 out << " if (!" << setupPar << "(\"" << obj->GetName() << "\")) return;" << endl;
4343 out << "// include path" << endl;
4344 // Get the include path from the interpreter and remove entries pointing to AliRoot
4345 out << " TString intPath = gInterpreter->GetIncludePath();" << endl;
4346 out << " TObjArray *listpaths = intPath.Tokenize(\" \");" << endl;
4347 out << " TIter nextpath(listpaths);" << endl;
4348 out << " TObjString *pname;" << endl;
4349 out << " while ((pname=(TObjString*)nextpath())) {" << endl;
4350 out << " TString current = pname->GetName();" << endl;
4351 out << " if (current.Contains(\"AliRoot\") || current.Contains(\"ALICE_ROOT\")) continue;" << endl;
4352 out << " gSystem->AddIncludePath(current);" << endl;
4353 out << " }" << endl;
4354 out << " if (listpaths) delete listpaths;" << endl;
4355 if (fIncludePath.Length()) out << " gSystem->AddIncludePath(\"" << fIncludePath.Data() << "\");" << endl;
4356 out << " gROOT->ProcessLine(\".include $ALICE_ROOT/include\");" << endl;
4357 out << " printf(\"Include path: %s\\n\", gSystem->GetIncludePath());" << endl << endl;
4358 if (fAdditionalLibs.Length()) {
4359 out << "// Add aditional AliRoot libraries" << endl;
4360 TObjArray *list = fAdditionalLibs.Tokenize(" ");
4363 while((str=(TObjString*)next())) {
4364 if (str->GetString().Contains(".so"))
4365 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
4367 if (list) delete list;
4370 out << "// Analysis source to be compiled at runtime (if any)" << endl;
4371 if (fAnalysisSource.Length()) {
4372 TObjArray *list = fAnalysisSource.Tokenize(" ");
4375 while((str=(TObjString*)next())) {
4376 out << " gROOT->ProcessLine(\".L " << str->GetString().Data() << "+g\");" << endl;
4378 if (list) delete list;
4382 if (fFastReadOption) {
4383 Warning("WriteMergingMacro", "!!! You requested FastRead option. Using xrootd flags to reduce timeouts in the grid merging jobs. Note that this may skip some files that could be accessed !!!");
4384 out << "// fast xrootd reading enabled" << endl;
4385 out << " printf(\"!!! You requested FastRead option. Using xrootd flags to reduce timeouts. Note that this may skip some files that could be accessed !!!\");" << endl;
4386 out << " gEnv->SetValue(\"XNet.ConnectTimeout\",50);" << endl;
4387 out << " gEnv->SetValue(\"XNet.RequestTimeout\",50);" << endl;
4388 out << " gEnv->SetValue(\"XNet.MaxRedirectCount\",2);" << endl;
4389 out << " gEnv->SetValue(\"XNet.ReconnectTimeout\",50);" << endl;
4390 out << " gEnv->SetValue(\"XNet.FirstConnectMaxCnt\",1);" << endl << endl;
4392 // Change temp directory to current one
4393 out << "// Connect to AliEn" << endl;
4394 out << " if (!TGrid::Connect(\"alien://\")) return;" << endl;
4395 out << "// Set temporary merging directory to current one" << endl;
4396 out << " gSystem->Setenv(\"TMPDIR\", gSystem->pwd());" << endl << endl;
4397 out << "// Set temporary compilation directory to current one" << endl;
4398 out << " gSystem->SetBuildDir(gSystem->pwd(), kTRUE);" << endl << endl;
4399 out << " TString outputDir = dir;" << endl;
4400 out << " TString outputFiles = \"" << GetListOfFiles("out") << "\";" << endl;
4401 out << " TString mergeExcludes = \"" << fMergeExcludes << " " << fRegisterExcludes << "\";" << endl;
4402 out << " TObjArray *list = outputFiles.Tokenize(\",\");" << endl;
4403 out << " TIter *iter = new TIter(list);" << endl;
4404 out << " TObjString *str;" << endl;
4405 out << " TString outputFile;" << endl;
4406 out << " Bool_t merged = kTRUE;" << endl;
4407 TString analysisFile = fExecutable;
4408 analysisFile.ReplaceAll(".sh", ".root");
4409 out << " AliAnalysisManager *mgr = AliAnalysisAlien::LoadAnalysisManager(\""
4410 << analysisFile << "\");" << endl;
4411 out << " if (!mgr) {" << endl;
4412 out << " printf(\"ERROR: Analysis manager could not be extracted from file \");" << endl;
4413 out << " return;" << endl;
4414 out << " }" << endl;
4415 if (IsLocalTest()) {
4416 out << " printf(\"===================================\n\");" << endl;
4417 out << " printf(\"Testing merging...\\n\");" << endl;
4418 out << " printf(\"===================================\n\");" << endl;
4420 out << " while((str=(TObjString*)iter->Next())) {" << endl;
4421 out << " outputFile = str->GetString();" << endl;
4422 out << " if (outputFile.Contains(\"*\")) continue;" << endl;
4423 out << " Int_t index = outputFile.Index(\"@\");" << endl;
4424 out << " if (index > 0) outputFile.Remove(index);" << endl;
4425 out << " // Skip already merged outputs" << endl;
4426 out << " if (!gSystem->AccessPathName(outputFile)) {" << endl;
4427 out << " printf(\"Output file <%s> found. Not merging again.\\n\",outputFile.Data());" << endl;
4428 out << " continue;" << endl;
4429 out << " }" << endl;
4430 out << " if (mergeExcludes.Contains(outputFile.Data())) continue;" << endl;
4431 out << " merged = AliAnalysisAlien::MergeOutput(outputFile, outputDir, " << fMaxMergeFiles << ", stage);" << endl;
4432 out << " if (!merged) {" << endl;
4433 out << " printf(\"ERROR: Cannot merge %s\\n\", outputFile.Data());" << endl;
4434 out << " return;" << endl;
4435 out << " }" << endl;
4436 out << " }" << endl;
4437 if (mgr && mgr->IsCollectThroughput() && !IsLocalTest()) {
4438 out << " TString infolog = \"" << mgr->GetFileInfoLog() << "\";" << endl;
4439 out << " AliAnalysisAlien::MergeInfo(infolog, outputDir);" << endl;
4441 out << " // all outputs merged, validate" << endl;
4442 out << " ofstream out;" << endl;
4443 out << " out.open(\"outputs_valid\", ios::out);" << endl;
4444 out << " out.close();" << endl;
4445 out << " // read the analysis manager from file" << endl;
4446 if (IsLocalTest()) {
4447 out << " printf(\"===================================\n\");" << endl;
4448 out << " printf(\"Testing Terminate()...\\n\");" << endl;
4449 out << " printf(\"===================================\n\");" << endl;
4451 out << " if (!outputDir.Contains(\"Stage\")) return;" << endl;
4453 out << " mgr->SetRunFromPath(mgr->GetRunFromAlienPath(dir));" << endl;
4454 out << " mgr->SetSkipTerminate(kFALSE);" << endl;
4455 out << " mgr->PrintStatus();" << endl;
4457 if (mgr->GetDebugLevel()>3) {
4458 out << " gEnv->SetValue(\"XNet.Debug\", \"1\");" << endl;
4460 if (TestBit(AliAnalysisGrid::kTest))
4461 out << " AliLog::SetGlobalLogLevel(AliLog::kWarning);" << endl;
4463 out << " AliLog::SetGlobalLogLevel(AliLog::kError);" << endl;
4466 out << " TTree *tree = NULL;" << endl;
4467 out << " mgr->StartAnalysis(\"gridterminate\", tree);" << endl;
4468 out << "}" << endl << endl;
4469 if (hasANALYSISalice) {
4470 out <<"//________________________________________________________________________________" << endl;
4471 out << "Bool_t SetupPar(const char *package) {" << endl;
4472 out << "// Compile the package and set it up." << endl;
4473 out << " TString pkgdir = package;" << endl;
4474 out << " pkgdir.ReplaceAll(\".par\",\"\");" << endl;
4475 out << " gSystem->Exec(TString::Format(\"tar xvzf %s.par\", pkgdir.Data()));" << endl;
4476 out << " TString cdir = gSystem->WorkingDirectory();" << endl;
4477 out << " gSystem->ChangeDirectory(pkgdir);" << endl;
4478 out << " // Check for BUILD.sh and execute" << endl;
4479 out << " if (!gSystem->AccessPathName(\"PROOF-INF/BUILD.sh\")) {" << endl;
4480 out << " printf(\"*******************************\\n\");" << endl;
4481 out << " printf(\"*** Building PAR archive ***\\n\");" << endl;
4482 out << " printf(\"*******************************\\n\");" << endl;
4483 out << " if (gSystem->Exec(\"PROOF-INF/BUILD.sh\")) {" << endl;
4484 out << " ::Error(\"SetupPar\", \"Cannot build par archive %s\", pkgdir.Data());" << endl;
4485 out << " gSystem->ChangeDirectory(cdir);" << endl;
4486 out << " return kFALSE;" << endl;
4487 out << " }" << endl;
4488 out << " } else {" << endl;
4489 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/BUILD.sh for package %s\", pkgdir.Data());" << endl;
4490 out << " gSystem->ChangeDirectory(cdir);" << endl;
4491 out << " return kFALSE;" << endl;
4492 out << " }" << endl;
4493 out << " // Check for SETUP.C and execute" << endl;
4494 out << " if (!gSystem->AccessPathName(\"PROOF-INF/SETUP.C\")) {" << endl;
4495 out << " printf(\"*******************************\\n\");" << endl;
4496 out << " printf(\"*** Setup PAR archive ***\\n\");" << endl;
4497 out << " printf(\"*******************************\\n\");" << endl;
4498 out << " gROOT->Macro(\"PROOF-INF/SETUP.C\");" << endl;
4499 out << " } else {" << endl;
4500 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/SETUP.C for package %s\", pkgdir.Data());" << endl;
4501 out << " gSystem->ChangeDirectory(cdir);" << endl;
4502 out << " return kFALSE;" << endl;
4503 out << " }" << endl;
4504 out << " // Restore original workdir" << endl;
4505 out << " gSystem->ChangeDirectory(cdir);" << endl;
4506 out << " return kTRUE;" << endl;
4510 Bool_t copy = kTRUE;
4511 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
4514 TString workdir = gGrid->GetHomeDirectory();
4515 workdir += fGridWorkingDir;
4516 if (FileExists(mergingMacro)) gGrid->Rm(mergingMacro);
4517 Info("WriteMergingMacro", "\n##### Copying merging macro: <%s> to your alien workspace", mergingMacro.Data());
4518 // TFile::Cp(Form("file:%s",mergingMacro.Data()), Form("alien://%s/%s", workdir.Data(), mergingMacro.Data()));
4519 if (!copyLocal2Alien("WriteMergeMacro",mergingMacro.Data(),
4520 Form("%s/%s", workdir.Data(), mergingMacro.Data()))) Fatal("","Terminating");
4524 //______________________________________________________________________________
4525 Bool_t AliAnalysisAlien::SetupPar(const char *package)
4527 // Compile the par file archive pointed by <package>. This must be present in the current directory.
4528 // Note that for loading the compiled library. The current directory should have precedence in
4530 TString pkgdir = package;
4531 pkgdir.ReplaceAll(".par","");
4532 gSystem->Exec(TString::Format("tar xzf %s.par", pkgdir.Data()));
4533 TString cdir = gSystem->WorkingDirectory();
4534 gSystem->ChangeDirectory(pkgdir);
4535 // Check for BUILD.sh and execute
4536 if (!gSystem->AccessPathName("PROOF-INF/BUILD.sh")) {
4537 printf("**************************************************\n");
4538 printf("*** Building PAR archive %s\n", package);
4539 printf("**************************************************\n");
4540 if (gSystem->Exec("PROOF-INF/BUILD.sh")) {
4541 ::Error("SetupPar", "Cannot build par archive %s", pkgdir.Data());
4542 gSystem->ChangeDirectory(cdir);
4546 ::Error("SetupPar","Cannot access PROOF-INF/BUILD.sh for package %s", pkgdir.Data());
4547 gSystem->ChangeDirectory(cdir);
4550 // Check for SETUP.C and execute
4551 if (!gSystem->AccessPathName("PROOF-INF/SETUP.C")) {
4552 printf("**************************************************\n");
4553 printf("*** Setup PAR archive %s\n", package);
4554 printf("**************************************************\n");
4555 gROOT->Macro("PROOF-INF/SETUP.C");
4556 printf("*** Loaded library: %s\n", gSystem->GetLibraries(pkgdir,"",kFALSE));
4558 ::Error("SetupPar","Cannot access PROOF-INF/SETUP.C for package %s", pkgdir.Data());
4559 gSystem->ChangeDirectory(cdir);
4562 // Restore original workdir
4563 gSystem->ChangeDirectory(cdir);
4567 //______________________________________________________________________________
4568 void AliAnalysisAlien::WriteExecutable()
4570 // Generate the alien executable script.
4571 // Patch executable with -x to catch error code
4572 if (fExecutableCommand.Contains("root") &&
4573 fExecutableCommand.Contains("-q") &&
4574 !fExecutableCommand.Contains("-x")) fExecutableCommand += " -x";
4575 if (!TestBit(AliAnalysisGrid::kSubmit)) {
4577 out.open(fExecutable.Data(), ios::out);
4579 Error("WriteExecutable", "Bad file name for executable: %s", fExecutable.Data());
4582 out << "#!/bin/bash" << endl;
4583 // Make sure we can properly compile par files
4584 out << "export LD_LIBRARY_PATH=.:$LD_LIBRARY_PATH" << endl;
4585 out << "echo \"=========================================\"" << endl;
4586 out << "echo \"############## PATH : ##############\"" << endl;
4587 out << "echo $PATH" << endl;
4588 out << "echo \"############## LD_LIBRARY_PATH : ##############\"" << endl;
4589 out << "echo $LD_LIBRARY_PATH" << endl;
4590 out << "echo \"############## ROOTSYS : ##############\"" << endl;
4591 out << "echo $ROOTSYS" << endl;
4592 out << "echo \"############## which root : ##############\"" << endl;
4593 out << "which root" << endl;
4594 out << "echo \"############## ALICE_ROOT : ##############\"" << endl;
4595 out << "echo $ALICE_ROOT" << endl;
4596 out << "echo \"############## which aliroot : ##############\"" << endl;
4597 out << "which aliroot" << endl;
4598 out << "echo \"############## system limits : ##############\"" << endl;
4599 out << "ulimit -a" << endl;
4600 out << "echo \"############## memory : ##############\"" << endl;
4601 out << "free -m" << endl;
4602 out << "echo \"=========================================\"" << endl << endl;
4603 out << fExecutableCommand << " ";
4604 out << fAnalysisMacro.Data() << " " << fExecutableArgs.Data() << endl;
4605 out << "RET=$?" << endl;
4606 out << "if [ \"$RET\" != \"0\" ];then" << endl;
4607 out << " echo \"======== ERROR : " << fAnalysisMacro.Data() << " finished with NON zero code: $RET ========\"" << endl;
4608 out << " if [ \"$RET\" -gt 128 ] && [ \"$RET\" -lt 160 ]; then"<<endl;
4609 out << " let sig=\"$RET - 128\""<<endl;
4610 out << " sigs='HUP INT QUIT ILL TRAP ABRT BUS FPE"<<endl;
4611 out << " KILL USR1 SEGV USR2 PIPE ALRM TERM STKFLT"<<endl;
4612 out << " CHLD CONT STOP TSTP TTIN TTOU URG XCPU"<<endl;
4613 out << " XFSZ VTALRM PROF WINCH IO PWR SYS'"<<endl;
4614 out << " sig=SIG`echo $sigs | awk '{ print $'\"$sig\"' }'`"<<endl;
4615 out << " echo \"======== it appears to have been killed with signal: $sig ========\""<<endl;
4617 out << " exit $RET"<< endl;
4618 out << "fi" << endl << endl ;
4619 out << "echo \"======== " << fAnalysisMacro.Data() << " finished with exit code: $RET ========\"" << endl;
4620 out << "echo \"############## memory after: ##############\"" << endl;
4621 out << "free -m" << endl;
4623 Bool_t copy = kTRUE;
4624 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
4627 TString workdir = gGrid->GetHomeDirectory();
4628 TString bindir = Form("%s/bin", workdir.Data());
4629 if (!DirectoryExists(bindir)) gGrid->Mkdir(bindir,"-p");
4630 workdir += fGridWorkingDir;
4631 TString executable = Form("%s/bin/%s", gGrid->GetHomeDirectory(), fExecutable.Data());
4632 if (FileExists(executable)) gGrid->Rm(executable);
4633 Info("WriteExecutable", "\n##### Copying executable file <%s> to your AliEn bin directory", fExecutable.Data());
4634 // TFile::Cp(Form("file:%s",fExecutable.Data()), Form("alien://%s", executable.Data()));
4635 if (!copyLocal2Alien("WriteExecutable",fExecutable.Data(),
4636 executable.Data())) Fatal("","Terminating");
4640 //______________________________________________________________________________
4641 void AliAnalysisAlien::WriteMergeExecutable()
4643 // Generate the alien executable script for the merging job.
4644 if (!fMergeViaJDL) return;
4645 TString mergeExec = fExecutable;
4646 mergeExec.ReplaceAll(".sh", "_merge.sh");
4647 if (!TestBit(AliAnalysisGrid::kSubmit)) {
4649 out.open(mergeExec.Data(), ios::out);
4651 Error("WriteMergingExecutable", "Bad file name for executable: %s", mergeExec.Data());
4654 out << "#!/bin/bash" << endl;
4655 // Make sure we can properly compile par files
4656 out << "export LD_LIBRARY_PATH=.:$LD_LIBRARY_PATH" << endl;
4657 out << "echo \"=========================================\"" << endl;
4658 out << "echo \"############## PATH : ##############\"" << endl;
4659 out << "echo $PATH" << endl;
4660 out << "echo \"############## LD_LIBRARY_PATH : ##############\"" << endl;
4661 out << "echo $LD_LIBRARY_PATH" << endl;
4662 out << "echo \"############## ROOTSYS : ##############\"" << endl;
4663 out << "echo $ROOTSYS" << endl;
4664 out << "echo \"############## which root : ##############\"" << endl;
4665 out << "which root" << endl;
4666 out << "echo \"############## ALICE_ROOT : ##############\"" << endl;
4667 out << "echo $ALICE_ROOT" << endl;
4668 out << "echo \"############## which aliroot : ##############\"" << endl;
4669 out << "which aliroot" << endl;
4670 out << "echo \"############## system limits : ##############\"" << endl;
4671 out << "ulimit -a" << endl;
4672 out << "echo \"############## memory : ##############\"" << endl;
4673 out << "free -m" << endl;
4674 out << "echo \"=========================================\"" << endl << endl;
4675 TString mergeMacro = fExecutable;
4676 mergeMacro.ReplaceAll(".sh", "_merge.C");
4677 if (IsOneStageMerging())
4678 out << "export ARG=\"" << mergeMacro << "(\\\"$1\\\")\"" << endl;
4680 out << "export ARG=\"" << mergeMacro << "(\\\"$1\\\",$2)\"" << endl;
4681 out << fExecutableCommand << " " << "$ARG" << endl;
4682 out << "RET=$?" << endl;
4683 out << "if [ \"$RET\" != \"0\" ];then" << endl;
4684 out << " echo \"======== ERROR : " << fAnalysisMacro.Data() << " finished with NON zero code: $RET ========\"" << endl;
4685 out << " if [ \"$RET\" -gt 128 ] && [ \"$RET\" -lt 160 ]; then"<<endl;
4686 out << " let sig=\"$RET - 128\""<<endl;
4687 out << " sigs='HUP INT QUIT ILL TRAP ABRT BUS FPE"<<endl;
4688 out << " KILL USR1 SEGV USR2 PIPE ALRM TERM STKFLT"<<endl;
4689 out << " CHLD CONT STOP TSTP TTIN TTOU URG XCPU"<<endl;
4690 out << " XFSZ VTALRM PROF WINCH IO PWR SYS'"<<endl;
4691 out << " sig=SIG`echo $sigs | awk '{ print $'\"$sig\"' }'`"<<endl;
4692 out << " echo \"======== it appears to have been killed with signal: $sig ========\""<<endl;
4694 out << " exit $RET"<< endl;
4695 out << "fi" << endl << endl ;
4696 out << "echo \"======== " << mergeMacro.Data() << " finished with exit code: $? ========\"" << endl;
4697 out << "echo \"############## memory after: ##############\"" << endl;
4698 out << "free -m" << endl;
4700 Bool_t copy = kTRUE;
4701 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
4704 TString workdir = gGrid->GetHomeDirectory();
4705 TString bindir = Form("%s/bin", workdir.Data());
4706 if (!DirectoryExists(bindir)) gGrid->Mkdir(bindir,"-p");
4707 workdir += fGridWorkingDir;
4708 TString executable = Form("%s/bin/%s", gGrid->GetHomeDirectory(), mergeExec.Data());
4709 if (FileExists(executable)) gGrid->Rm(executable);
4710 Info("WriteMergeExecutable", "\n##### Copying executable file <%s> to your AliEn bin directory", mergeExec.Data());
4711 // TFile::Cp(Form("file:%s",mergeExec.Data()), Form("alien://%s", executable.Data()));
4712 if (!copyLocal2Alien("WriteMergeExecutable",
4713 mergeExec.Data(), executable.Data())) Fatal("","Terminating");
4717 //______________________________________________________________________________
4718 void AliAnalysisAlien::WriteProductionFile(const char *filename) const
4720 // Write the production file to be submitted by LPM manager. The format is:
4721 // First line: full_path_to_jdl estimated_no_subjobs_per_master
4722 // Next lines: full_path_to_dataset XXX (XXX is a string)
4723 // To submit, one has to: submit jdl XXX for all lines
4725 out.open(filename, ios::out);
4727 Error("WriteProductionFile", "Bad file name: %s", filename);
4731 if (!fProductionMode && !fGridWorkingDir.BeginsWith("/alice"))
4732 workdir = gGrid->GetHomeDirectory();
4733 workdir += fGridWorkingDir;
4734 Int_t njobspermaster = 1000*fNrunsPerMaster/fSplitMaxInputFileNumber;
4735 TString locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
4736 out << locjdl << " " << njobspermaster << endl;
4737 Int_t nmasterjobs = fInputFiles->GetEntries();
4738 for (Int_t i=0; i<nmasterjobs; i++) {
4739 TString runOutDir = gSystem->BaseName(fInputFiles->At(i)->GetName());
4740 runOutDir.ReplaceAll(".xml", "");
4742 out << Form("%s", fInputFiles->At(i)->GetName()) << " " << runOutDir << endl;
4744 out << Form("%s", fInputFiles->At(i)->GetName()) << " " << Form("%03d", i) << endl;
4747 Info("WriteProductionFile", "\n##### Copying production file <%s> to your work directory", filename);
4748 if (FileExists(filename)) gGrid->Rm(filename);
4749 // TFile::Cp(Form("file:%s",filename), Form("alien://%s/%s", workdir.Data(),filename));
4750 if (!copyLocal2Alien("WriteProductionFile", filename,
4751 Form("%s/%s", workdir.Data(),filename))) Fatal("","Terminating");
4755 //______________________________________________________________________________
4756 void AliAnalysisAlien::WriteValidationScript(Bool_t merge)
4758 // Generate the alien validation script.
4759 // Generate the validation script
4761 if (fValidationScript.IsNull()) {
4762 fValidationScript = fExecutable;
4763 fValidationScript.ReplaceAll(".sh", "_validation.sh");
4765 TString validationScript = fValidationScript;
4766 if (merge) validationScript.ReplaceAll(".sh", "_merge.sh");
4768 Error("WriteValidationScript", "Alien connection required");
4771 if (!fTerminateFiles.IsNull()) {
4772 fTerminateFiles.Strip();
4773 fTerminateFiles.ReplaceAll(" ",",");
4775 TString outStream = "";
4776 if (!TestBit(AliAnalysisGrid::kTest)) outStream = " >> stdout";
4777 if (!TestBit(AliAnalysisGrid::kSubmit)) {
4779 out.open(validationScript, ios::out);
4780 out << "#!/bin/bash" << endl;
4781 out << "##################################################" << endl;
4782 out << "validateout=`dirname $0`" << endl;
4783 out << "validatetime=`date`" << endl;
4784 out << "validated=\"0\";" << endl;
4785 out << "error=0" << endl;
4786 out << "if [ -z $validateout ]" << endl;
4787 out << "then" << endl;
4788 out << " validateout=\".\"" << endl;
4789 out << "fi" << endl << endl;
4790 out << "cd $validateout;" << endl;
4791 out << "validateworkdir=`pwd`;" << endl << endl;
4792 out << "echo \"*******************************************************\"" << outStream << endl;
4793 out << "echo \"* Automatically generated validation script *\"" << outStream << endl;
4795 out << "echo \"* Time: $validatetime \"" << outStream << endl;
4796 out << "echo \"* Dir: $validateout\"" << outStream << endl;
4797 out << "echo \"* Workdir: $validateworkdir\"" << outStream << endl;
4798 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl;
4799 out << "ls -la ./" << outStream << endl;
4800 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl << endl;
4801 out << "##################################################" << endl;
4804 out << "if [ ! -f stderr ] ; then" << endl;
4805 out << " error=1" << endl;
4806 out << " echo \"* ########## Job not validated - no stderr ###\" " << outStream << endl;
4807 out << " echo \"Error = $error\" " << outStream << endl;
4808 out << "fi" << endl;
4810 out << "parArch=`grep -Ei \"Cannot Build the PAR Archive\" stderr`" << endl;
4811 out << "segViol=`grep -Ei \"Segmentation violation\" stderr`" << endl;
4812 out << "segFault=`grep -Ei \"Segmentation fault\" stderr`" << endl;
4813 out << "glibcErr=`grep -Ei \"*** glibc detected ***\" stderr`" << endl;
4816 out << "if [ \"$parArch\" != \"\" ] ; then" << endl;
4817 out << " error=1" << endl;
4818 out << " echo \"* ########## Job not validated - PAR archive not built ###\" " << outStream << endl;
4819 out << " echo \"$parArch\" " << outStream << endl;
4820 out << " echo \"Error = $error\" " << outStream << endl;
4821 out << "fi" << endl;
4823 out << "if [ \"$segViol\" != \"\" ] ; then" << endl;
4824 out << " error=1" << endl;
4825 out << " echo \"* ########## Job not validated - Segment. violation ###\" " << outStream << endl;
4826 out << " echo \"$segViol\" " << outStream << endl;
4827 out << " echo \"Error = $error\" " << outStream << endl;
4828 out << "fi" << endl;
4830 out << "if [ \"$segFault\" != \"\" ] ; then" << endl;
4831 out << " error=1" << endl;
4832 out << " echo \"* ########## Job not validated - Segment. fault ###\" " << outStream << endl;
4833 out << " echo \"$segFault\" " << outStream << endl;
4834 out << " echo \"Error = $error\" " << outStream << endl;
4835 out << "fi" << endl;
4837 out << "if [ \"$glibcErr\" != \"\" ] ; then" << endl;
4838 out << " error=1" << endl;
4839 out << " echo \"* ########## Job not validated - *** glibc detected *** ###\" " << outStream << endl;
4840 out << " echo \"$glibcErr\" " << outStream << endl;
4841 out << " echo \"Error = $error\" " << outStream << endl;
4842 out << "fi" << endl;
4844 // Part dedicated to the specific analyses running into the train
4846 TString outputFiles = fOutputFiles;
4847 if (merge && !fTerminateFiles.IsNull()) {
4849 outputFiles += fTerminateFiles;
4851 TObjArray *arr = outputFiles.Tokenize(",");
4854 while (!merge && (os=(TObjString*)next1())) {
4855 // No need to validate outputs produced by merging since the merging macro does this
4856 outputFile = os->GetString();
4857 Int_t index = outputFile.Index("@");
4858 if (index > 0) outputFile.Remove(index);
4859 if (fTerminateFiles.Contains(outputFile)) continue;
4860 if (outputFile.Contains("*")) continue;
4861 out << "if ! [ -f " << outputFile.Data() << " ] ; then" << endl;
4862 out << " error=1" << endl;
4863 out << " echo \"Output file " << outputFile << " not found. Job FAILED !\"" << outStream << endl;
4864 out << " echo \"Output file " << outputFile << " not found. Job FAILED !\" >> stderr" << endl;
4865 out << "fi" << endl;
4868 out << "if ! [ -f outputs_valid ] ; then" << endl;
4869 out << " error=1" << endl;
4870 out << " echo \"Output files were not validated by the analysis manager\" >> stdout" << endl;
4871 out << " echo \"Output files were not validated by the analysis manager\" >> stderr" << endl;
4872 out << "fi" << endl;
4874 out << "if [ $error = 0 ] ; then" << endl;
4875 out << " echo \"* ---------------- Job Validated ------------------*\"" << outStream << endl;
4876 if (!IsKeepLogs()) {
4877 out << " echo \"* === Logs std* will be deleted === \"" << endl;
4879 out << " rm -f std*" << endl;
4881 out << "fi" << endl;
4883 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl;
4884 out << "echo \"*******************************************************\"" << outStream << endl;
4885 out << "cd -" << endl;
4886 out << "exit $error" << endl;
4888 Bool_t copy = kTRUE;
4889 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
4892 TString workdir = gGrid->GetHomeDirectory();
4893 workdir += fGridWorkingDir;
4894 Info("WriteValidationScript", "\n##### Copying validation script <%s> to your AliEn working space", validationScript.Data());
4895 if (FileExists(validationScript)) gGrid->Rm(validationScript);
4896 // TFile::Cp(Form("file:%s",validationScript.Data()), Form("alien://%s/%s", workdir.Data(),validationScript.Data()));
4897 if (!copyLocal2Alien("WriteValidationScript", validationScript.Data(),
4898 Form("%s/%s",workdir.Data(), validationScript.Data()))) Fatal("","Terminating");