1 /**************************************************************************
2 * Copyright(c) 1998-2007, ALICE Experiment at CERN, All rights reserved. *
4 * Author: The ALICE Off-line Project. *
5 * Contributors are mentioned in the code where appropriate. *
7 * Permission to use, copy, modify and distribute this software and its *
8 * documentation strictly for non-commercial purposes is hereby granted *
9 * without fee, provided that the above copyright notice appears in all *
10 * copies and that both the copyright notice and this permission notice *
11 * appear in the supporting documentation. The authors make no claims *
12 * about the suitability of this software for any purpose. It is *
13 * provided "as is" without express or implied warranty. *
14 **************************************************************************/
16 // Author: Mihaela Gheata, 01/09/2008
18 //==============================================================================
19 // AliAnalysisAlien - AliEn utility class. Provides interface for creating
20 // a personalized JDL, finding and creating a dataset.
21 //==============================================================================
23 #include "AliAnalysisAlien.h"
25 #include "Riostream.h"
33 #include "TFileCollection.h"
35 #include "TObjString.h"
36 #include "TObjArray.h"
39 #include "TGridResult.h"
40 #include "TGridCollection.h"
42 #include "TGridJobStatusList.h"
43 #include "TGridJobStatus.h"
44 #include "TFileMerger.h"
45 #include "AliAnalysisManager.h"
46 #include "AliAnalysisTaskCfg.h"
47 #include "AliVEventHandler.h"
48 #include "AliAnalysisDataContainer.h"
49 #include "AliMultiInputEventHandler.h"
55 ClassImp(AliAnalysisAlien)
61 Bool_t copyLocal2Alien(const char* where, const char* loc, const char* rem)
63 TString sl(Form("file:%s", loc));
64 TString sr(Form("alien://%s", rem));
65 Bool_t ret = TFile::Cp(sl, sr);
67 Warning(where, "Failed to copy %s to %s", sl.Data(), sr.Data());
73 //______________________________________________________________________________
74 AliAnalysisAlien::AliAnalysisAlien()
80 fSplitMaxInputFileNumber(0),
82 fMasterResubmitThreshold(0),
95 fNproofWorkersPerSlave(0),
105 fAdditionalRootLibs(),
149 //______________________________________________________________________________
150 AliAnalysisAlien::AliAnalysisAlien(const char *name)
151 :AliAnalysisGrid(name),
156 fSplitMaxInputFileNumber(0),
158 fMasterResubmitThreshold(0),
171 fNproofWorkersPerSlave(0),
175 fExecutableCommand(),
181 fAdditionalRootLibs(),
225 //______________________________________________________________________________
226 AliAnalysisAlien::AliAnalysisAlien(const AliAnalysisAlien& other)
227 :AliAnalysisGrid(other),
230 fPrice(other.fPrice),
232 fSplitMaxInputFileNumber(other.fSplitMaxInputFileNumber),
233 fMaxInitFailed(other.fMaxInitFailed),
234 fMasterResubmitThreshold(other.fMasterResubmitThreshold),
235 fNtestFiles(other.fNtestFiles),
236 fNrunsPerMaster(other.fNrunsPerMaster),
237 fMaxMergeFiles(other.fMaxMergeFiles),
238 fMaxMergeStages(other.fMaxMergeStages),
239 fNsubmitted(other.fNsubmitted),
240 fProductionMode(other.fProductionMode),
241 fOutputToRunNo(other.fOutputToRunNo),
242 fMergeViaJDL(other.fMergeViaJDL),
243 fFastReadOption(other.fFastReadOption),
244 fOverwriteMode(other.fOverwriteMode),
245 fNreplicas(other.fNreplicas),
246 fNproofWorkers(other.fNproofWorkers),
247 fNproofWorkersPerSlave(other.fNproofWorkersPerSlave),
248 fProofReset(other.fProofReset),
249 fRunNumbers(other.fRunNumbers),
250 fExecutable(other.fExecutable),
251 fExecutableCommand(other.fExecutableCommand),
252 fArguments(other.fArguments),
253 fExecutableArgs(other.fExecutableArgs),
254 fAnalysisMacro(other.fAnalysisMacro),
255 fAnalysisSource(other.fAnalysisSource),
256 fValidationScript(other.fValidationScript),
257 fAdditionalRootLibs(other.fAdditionalRootLibs),
258 fAdditionalLibs(other.fAdditionalLibs),
259 fSplitMode(other.fSplitMode),
260 fAPIVersion(other.fAPIVersion),
261 fROOTVersion(other.fROOTVersion),
262 fAliROOTVersion(other.fAliROOTVersion),
263 fExternalPackages(other.fExternalPackages),
265 fGridWorkingDir(other.fGridWorkingDir),
266 fGridDataDir(other.fGridDataDir),
267 fDataPattern(other.fDataPattern),
268 fGridOutputDir(other.fGridOutputDir),
269 fOutputArchive(other.fOutputArchive),
270 fOutputFiles(other.fOutputFiles),
271 fInputFormat(other.fInputFormat),
272 fDatasetName(other.fDatasetName),
273 fJDLName(other.fJDLName),
274 fTerminateFiles(other.fTerminateFiles),
275 fMergeExcludes(other.fMergeExcludes),
276 fRegisterExcludes(other.fRegisterExcludes),
277 fIncludePath(other.fIncludePath),
278 fCloseSE(other.fCloseSE),
279 fFriendChainName(other.fFriendChainName),
280 fJobTag(other.fJobTag),
281 fOutputSingle(other.fOutputSingle),
282 fRunPrefix(other.fRunPrefix),
283 fProofCluster(other.fProofCluster),
284 fProofDataSet(other.fProofDataSet),
285 fFileForTestMode(other.fFileForTestMode),
286 fAliRootMode(other.fAliRootMode),
287 fProofProcessOpt(other.fProofProcessOpt),
288 fMergeDirName(other.fMergeDirName),
293 fDropToShell(other.fDropToShell),
294 fGridJobIDs(other.fGridJobIDs),
295 fGridStages(other.fGridStages)
298 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
299 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
300 fRunRange[0] = other.fRunRange[0];
301 fRunRange[1] = other.fRunRange[1];
302 if (other.fInputFiles) {
303 fInputFiles = new TObjArray();
304 TIter next(other.fInputFiles);
306 while ((obj=next())) fInputFiles->Add(new TObjString(obj->GetName()));
307 fInputFiles->SetOwner();
309 if (other.fPackages) {
310 fPackages = new TObjArray();
311 TIter next(other.fPackages);
313 while ((obj=next())) fPackages->Add(new TObjString(obj->GetName()));
314 fPackages->SetOwner();
316 if (other.fModules) {
317 fModules = new TObjArray();
318 fModules->SetOwner();
319 TIter next(other.fModules);
320 AliAnalysisTaskCfg *mod, *crt;
321 while ((crt=(AliAnalysisTaskCfg*)next())) {
322 mod = new AliAnalysisTaskCfg(*crt);
328 //______________________________________________________________________________
329 AliAnalysisAlien::~AliAnalysisAlien()
337 fProofParam.DeleteAll();
340 //______________________________________________________________________________
341 AliAnalysisAlien &AliAnalysisAlien::operator=(const AliAnalysisAlien& other)
344 if (this != &other) {
345 AliAnalysisGrid::operator=(other);
346 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
347 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
348 fPrice = other.fPrice;
350 fSplitMaxInputFileNumber = other.fSplitMaxInputFileNumber;
351 fMaxInitFailed = other.fMaxInitFailed;
352 fMasterResubmitThreshold = other.fMasterResubmitThreshold;
353 fNtestFiles = other.fNtestFiles;
354 fNrunsPerMaster = other.fNrunsPerMaster;
355 fMaxMergeFiles = other.fMaxMergeFiles;
356 fMaxMergeStages = other.fMaxMergeStages;
357 fNsubmitted = other.fNsubmitted;
358 fProductionMode = other.fProductionMode;
359 fOutputToRunNo = other.fOutputToRunNo;
360 fMergeViaJDL = other.fMergeViaJDL;
361 fFastReadOption = other.fFastReadOption;
362 fOverwriteMode = other.fOverwriteMode;
363 fNreplicas = other.fNreplicas;
364 fNproofWorkers = other.fNproofWorkers;
365 fNproofWorkersPerSlave = other.fNproofWorkersPerSlave;
366 fProofReset = other.fProofReset;
367 fRunNumbers = other.fRunNumbers;
368 fExecutable = other.fExecutable;
369 fExecutableCommand = other.fExecutableCommand;
370 fArguments = other.fArguments;
371 fExecutableArgs = other.fExecutableArgs;
372 fAnalysisMacro = other.fAnalysisMacro;
373 fAnalysisSource = other.fAnalysisSource;
374 fValidationScript = other.fValidationScript;
375 fAdditionalRootLibs = other.fAdditionalRootLibs;
376 fAdditionalLibs = other.fAdditionalLibs;
377 fSplitMode = other.fSplitMode;
378 fAPIVersion = other.fAPIVersion;
379 fROOTVersion = other.fROOTVersion;
380 fAliROOTVersion = other.fAliROOTVersion;
381 fExternalPackages = other.fExternalPackages;
383 fGridWorkingDir = other.fGridWorkingDir;
384 fGridDataDir = other.fGridDataDir;
385 fDataPattern = other.fDataPattern;
386 fGridOutputDir = other.fGridOutputDir;
387 fOutputArchive = other.fOutputArchive;
388 fOutputFiles = other.fOutputFiles;
389 fInputFormat = other.fInputFormat;
390 fDatasetName = other.fDatasetName;
391 fJDLName = other.fJDLName;
392 fTerminateFiles = other.fTerminateFiles;
393 fMergeExcludes = other.fMergeExcludes;
394 fRegisterExcludes = other.fRegisterExcludes;
395 fIncludePath = other.fIncludePath;
396 fCloseSE = other.fCloseSE;
397 fFriendChainName = other.fFriendChainName;
398 fJobTag = other.fJobTag;
399 fOutputSingle = other.fOutputSingle;
400 fRunPrefix = other.fRunPrefix;
401 fProofCluster = other.fProofCluster;
402 fProofDataSet = other.fProofDataSet;
403 fFileForTestMode = other.fFileForTestMode;
404 fAliRootMode = other.fAliRootMode;
405 fProofProcessOpt = other.fProofProcessOpt;
406 fMergeDirName = other.fMergeDirName;
407 fDropToShell = other.fDropToShell;
408 fGridJobIDs = other.fGridJobIDs;
409 fGridStages = other.fGridStages;
410 if (other.fInputFiles) {
411 fInputFiles = new TObjArray();
412 TIter next(other.fInputFiles);
414 while ((obj=next())) fInputFiles->Add(new TObjString(obj->GetName()));
415 fInputFiles->SetOwner();
417 if (other.fPackages) {
418 fPackages = new TObjArray();
419 TIter next(other.fPackages);
421 while ((obj=next())) fPackages->Add(new TObjString(obj->GetName()));
422 fPackages->SetOwner();
424 if (other.fModules) {
425 fModules = new TObjArray();
426 fModules->SetOwner();
427 TIter next(other.fModules);
428 AliAnalysisTaskCfg *mod, *crt;
429 while ((crt=(AliAnalysisTaskCfg*)next())) {
430 mod = new AliAnalysisTaskCfg(*crt);
438 //______________________________________________________________________________
439 void AliAnalysisAlien::AddAdditionalLibrary(const char *name)
441 // Add a single additional library to be loaded. Extension must be present.
443 if (!lib.Contains(".")) {
444 Error("AddAdditionalLibrary", "Extension not defined for %s", name);
447 if (fAdditionalLibs.Contains(name)) {
448 Warning("AddAdditionalLibrary", "Library %s already added.", name);
451 if (!fAdditionalLibs.IsNull()) fAdditionalLibs += " ";
452 fAdditionalLibs += lib;
455 //______________________________________________________________________________
456 void AliAnalysisAlien::AddModule(AliAnalysisTaskCfg *module)
458 // Adding a module. Checks if already existing. Becomes owned by this.
460 if (GetModule(module->GetName())) {
461 Error("AddModule", "A module having the same name %s already added", module->GetName());
465 fModules = new TObjArray();
466 fModules->SetOwner();
468 fModules->Add(module);
471 //______________________________________________________________________________
472 void AliAnalysisAlien::AddModules(TObjArray *list)
474 // Adding a list of modules. Checks if already existing. Becomes owned by this.
476 AliAnalysisTaskCfg *module;
477 while ((module = (AliAnalysisTaskCfg*)next())) AddModule(module);
480 //______________________________________________________________________________
481 Bool_t AliAnalysisAlien::CheckDependencies()
483 // Check if all dependencies are satisfied. Reorder modules if needed.
484 Int_t nmodules = GetNmodules();
486 Warning("CheckDependencies", "No modules added yet to check their dependencies");
489 AliAnalysisTaskCfg *mod = 0;
490 AliAnalysisTaskCfg *dep = 0;
493 for (i=0; i<nmodules; i++) {
494 mod = (AliAnalysisTaskCfg*) fModules->At(i);
495 Int_t ndeps = mod->GetNdeps();
497 for (j=0; j<ndeps; j++) {
498 depname = mod->GetDependency(j);
499 dep = GetModule(depname);
501 Error("CheckDependencies","Dependency %s not added for module %s",
502 depname.Data(), mod->GetName());
505 if (dep->NeedsDependency(mod->GetName())) {
506 Error("CheckDependencies","Modules %s and %s circularly depend on each other",
507 mod->GetName(), dep->GetName());
510 Int_t idep = fModules->IndexOf(dep);
511 // The dependency task must come first
513 // Remove at idep and move all objects below up one slot
514 // down to index i included.
515 fModules->RemoveAt(idep);
516 for (k=idep-1; k>=i; k--) fModules->AddAt(fModules->RemoveAt(k),k+1);
517 fModules->AddAt(dep, i++);
519 //Redo from istart if dependencies were inserted
520 if (i>istart) i=istart-1;
526 //______________________________________________________________________________
527 AliAnalysisManager *AliAnalysisAlien::CreateAnalysisManager(const char *name, const char *filename)
529 // Create the analysis manager and optionally execute the macro in filename.
530 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
532 mgr = new AliAnalysisManager(name);
533 mgr->SetGridHandler((AliAnalysisGrid*)this);
534 if (strlen(filename)) {
535 TString line = gSystem->ExpandPathName(filename);
537 gROOT->ProcessLine(line.Data());
542 //______________________________________________________________________________
543 Int_t AliAnalysisAlien::GetNmodules() const
545 // Get number of modules.
546 if (!fModules) return 0;
547 return fModules->GetEntries();
550 //______________________________________________________________________________
551 AliAnalysisTaskCfg *AliAnalysisAlien::GetModule(const char *name)
553 // Get a module by name.
554 if (!fModules) return 0;
555 return (AliAnalysisTaskCfg*)fModules->FindObject(name);
558 //______________________________________________________________________________
559 Bool_t AliAnalysisAlien::LoadModule(AliAnalysisTaskCfg *mod)
561 // Load a given module.
562 if (mod->IsLoaded()) return kTRUE;
563 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
565 Error("LoadModule", "No analysis manager created yet. Use CreateAnalysisManager first.");
568 Int_t ndeps = mod->GetNdeps();
570 for (Int_t j=0; j<ndeps; j++) {
571 depname = mod->GetDependency(j);
572 AliAnalysisTaskCfg *dep = GetModule(depname);
574 Error("LoadModule","Dependency %s not existing for module %s",
575 depname.Data(), mod->GetName());
578 if (!LoadModule(dep)) {
579 Error("LoadModule","Dependency %s for module %s could not be loaded",
580 depname.Data(), mod->GetName());
584 // Load libraries for the module
585 if (!mod->CheckLoadLibraries()) {
586 Error("LoadModule", "Cannot load all libraries for module %s", mod->GetName());
589 // Check if a custom file name was requested
590 if (strlen(mod->GetOutputFileName())) mgr->SetCommonFileName(mod->GetOutputFileName());
592 // Check if a custom terminate file name was requested
593 if (strlen(mod->GetTerminateFileName())) {
594 if (!fTerminateFiles.IsNull()) fTerminateFiles += ",";
595 fTerminateFiles += mod->GetTerminateFileName();
599 if (mod->ExecuteMacro()<0) {
600 Error("LoadModule", "Executing the macro %s with arguments: %s for module %s returned a negative value",
601 mod->GetMacroName(), mod->GetMacroArgs(), mod->GetName());
604 // Configure dependencies
605 if (mod->GetConfigMacro() && mod->ExecuteConfigMacro()<0) {
606 Error("LoadModule", "There was an error executing the deps config macro %s for module %s",
607 mod->GetConfigMacro()->GetTitle(), mod->GetName());
610 // Adjust extra libraries
611 Int_t nlibs = mod->GetNlibs();
613 for (Int_t i=0; i<nlibs; i++) {
614 lib = mod->GetLibrary(i);
615 lib = Form("lib%s.so", lib.Data());
616 if (fAdditionalLibs.Contains(lib)) continue;
617 if (!fAdditionalLibs.IsNull()) fAdditionalLibs += " ";
618 fAdditionalLibs += lib;
623 //______________________________________________________________________________
624 Bool_t AliAnalysisAlien::GenerateTrain(const char *name)
626 // Generate the full train.
627 fAdditionalLibs = "";
628 if (!LoadModules()) return kFALSE;
629 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
630 if (!mgr->InitAnalysis()) return kFALSE;
633 Int_t productionMode = fProductionMode;
635 TString macro = fAnalysisMacro;
636 TString executable = fExecutable;
637 TString validation = fValidationScript;
638 TString execCommand = fExecutableCommand;
639 SetAnalysisMacro(Form("%s.C", name));
640 SetExecutable(Form("%s.sh", name));
641 // SetExecutableCommand("aliroot -b -q ");
642 SetValidationScript(Form("%s_validation.sh", name));
644 SetProductionMode(productionMode);
645 fAnalysisMacro = macro;
646 fExecutable = executable;
647 fExecutableCommand = execCommand;
648 fValidationScript = validation;
652 //______________________________________________________________________________
653 Bool_t AliAnalysisAlien::GenerateTest(const char *name, const char *modname)
655 // Generate test macros for a single module or for the full train.
656 fAdditionalLibs = "";
657 if (strlen(modname)) {
658 if (!CheckDependencies()) return kFALSE;
659 AliAnalysisTaskCfg *mod = GetModule(modname);
661 Error("GenerateTest", "cannot generate test for inexistent module %s", modname);
664 if (!LoadModule(mod)) return kFALSE;
665 } else if (!LoadModules()) return kFALSE;
666 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
667 if (!mgr->InitAnalysis()) return kFALSE;
671 Int_t productionMode = fProductionMode;
673 TString macro = fAnalysisMacro;
674 TString executable = fExecutable;
675 TString validation = fValidationScript;
676 TString execCommand = fExecutableCommand;
677 SetAnalysisMacro(Form("%s.C", name));
678 SetExecutable(Form("%s.sh", name));
679 fOutputFiles = GetListOfFiles("outaod");
680 // Add extra files registered to the analysis manager
681 TString extra = GetListOfFiles("ext");
682 if (!extra.IsNull()) {
683 extra.ReplaceAll(".root", "*.root");
684 if (!fOutputFiles.IsNull()) fOutputFiles += ",";
685 fOutputFiles += extra;
687 // SetExecutableCommand("aliroot -b -q ");
688 SetValidationScript(Form("%s_validation.sh", name));
690 WriteAnalysisMacro();
692 WriteValidationScript();
694 WriteMergeExecutable();
695 WriteValidationScript(kTRUE);
696 SetLocalTest(kFALSE);
697 SetProductionMode(productionMode);
698 fAnalysisMacro = macro;
699 fExecutable = executable;
700 fExecutableCommand = execCommand;
701 fValidationScript = validation;
705 //______________________________________________________________________________
706 Bool_t AliAnalysisAlien::LoadModules()
708 // Load all modules by executing the AddTask macros. Checks first the dependencies.
709 fAdditionalLibs = "";
710 Int_t nmodules = GetNmodules();
712 Warning("LoadModules", "No module to be loaded");
715 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
717 Error("LoadModules", "No analysis manager created yet. Use CreateAnalysisManager first.");
720 if (!CheckDependencies()) return kFALSE;
721 nmodules = GetNmodules();
722 AliAnalysisTaskCfg *mod;
723 for (Int_t imod=0; imod<nmodules; imod++) {
724 mod = (AliAnalysisTaskCfg*)fModules->At(imod);
725 if (!LoadModule(mod)) return kFALSE;
730 //______________________________________________________________________________
731 void AliAnalysisAlien::SetRunPrefix(const char *prefix)
733 // Set the run number format. Can be a prefix or a format like "%09d"
735 if (!fRunPrefix.Contains("%")) fRunPrefix += "%d";
738 //______________________________________________________________________________
739 void AliAnalysisAlien::AddIncludePath(const char *path)
741 // Add include path in the remote analysis macro.
743 if (p.Contains("-I")) fIncludePath += Form("%s ", path);
744 else fIncludePath += Form("-I%s ", path);
747 //______________________________________________________________________________
748 void AliAnalysisAlien::AddRunNumber(Int_t run)
750 // Add a run number to the list of runs to be processed.
751 if (fRunNumbers.Length()) fRunNumbers += " ";
752 fRunNumbers += Form(fRunPrefix.Data(), run);
755 //______________________________________________________________________________
756 void AliAnalysisAlien::AddRunList(const char* runList)
758 // Add several runs into the list of runs; they are expected to be separated by a blank character.
759 TString sList = runList;
760 TObjArray *list = sList.Tokenize(" ");
761 Int_t n = list->GetEntries();
762 for (Int_t i = 0; i < n; i++) {
763 TObjString *os = (TObjString*)list->At(i);
764 AddRunNumber(os->GetString().Atoi());
769 //______________________________________________________________________________
770 void AliAnalysisAlien::AddRunNumber(const char* run)
772 // Add a run number to the list of runs to be processed.
775 TObjArray *arr = runs.Tokenize(" ");
778 prefix.Append(fRunPrefix, fRunPrefix.Index("%d"));
779 while ((os=(TObjString*)next())){
780 if (fRunNumbers.Length()) fRunNumbers += " ";
781 fRunNumbers += Form("%s%s", prefix.Data(), os->GetString().Data());
786 //______________________________________________________________________________
787 void AliAnalysisAlien::AddDataFile(const char *lfn)
789 // Adds a data file to the input to be analysed. The file should be a valid LFN
790 // or point to an existing file in the alien workdir.
791 if (!fInputFiles) fInputFiles = new TObjArray();
792 fInputFiles->Add(new TObjString(lfn));
795 //______________________________________________________________________________
796 void AliAnalysisAlien::AddExternalPackage(const char *package)
798 // Adds external packages w.r.t to the default ones (root,aliroot and gapi)
799 if (fExternalPackages) fExternalPackages += " ";
800 fExternalPackages += package;
803 //______________________________________________________________________________
804 Bool_t AliAnalysisAlien::Connect()
806 // Try to connect to AliEn. User needs a valid token and /tmp/gclient_env_$UID sourced.
807 if (gGrid && gGrid->IsConnected()) return kTRUE;
808 if (fProductionMode) return kTRUE;
810 Info("Connect", "Trying to connect to AliEn ...");
811 TGrid::Connect("alien://");
813 if (!gGrid || !gGrid->IsConnected()) {
814 Error("Connect", "Did not managed to connect to AliEn. Make sure you have a valid token.");
817 fUser = gGrid->GetUser();
818 Info("Connect", "\n##### Connected to AliEn as user %s. Setting analysis user to <%s>", fUser.Data(), fUser.Data());
822 //______________________________________________________________________________
823 void AliAnalysisAlien::CdWork()
825 // Check validity of alien workspace. Create directory if possible.
827 Error("CdWork", "Alien connection required");
830 TString homedir = gGrid->GetHomeDirectory();
831 TString workdir = homedir + fGridWorkingDir;
832 if (DirectoryExists(workdir)) {
836 // Work directory not existing - create it
838 if (gGrid->Mkdir(workdir, "-p")) {
839 gGrid->Cd(fGridWorkingDir);
840 Info("CdWork", "\n##### Created alien working directory %s", fGridWorkingDir.Data());
842 Warning("CdWork", "Working directory %s cannot be created.\n Using %s instead.",
843 workdir.Data(), homedir.Data());
844 fGridWorkingDir = "";
848 //______________________________________________________________________________
849 Bool_t AliAnalysisAlien::CheckFileCopy(const char *alienpath)
851 // Check if file copying is possible.
852 if (fProductionMode) return kTRUE;
853 TString salienpath(alienpath);
854 if (salienpath.Contains(" ")) {
855 Error("CheckFileCopy", "path: <%s> contains blancs - FIX IT !",alienpath);
859 Error("CheckFileCopy", "Not connected to AliEn. File copying cannot be tested.");
862 Info("CheckFileCopy", "Checking possibility to copy files to your AliEn home directory... \
863 \n +++ NOTE: You can disable this via: plugin->SetCheckCopy(kFALSE);");
864 // Check if alien_CLOSE_SE is defined
865 TString closeSE = gSystem->Getenv("alien_CLOSE_SE");
866 if (!closeSE.IsNull()) {
867 Info("CheckFileCopy", "Your current close storage is pointing to: \
868 \n alien_CLOSE_SE = \"%s\"", closeSE.Data());
870 Warning("CheckFileCopy", "Your current close storage is empty ! Depending on your location, file copying may fail.");
872 // Check if grid directory exists.
873 if (!DirectoryExists(alienpath)) {
874 Error("CheckFileCopy", "Alien path %s does not seem to exist", alienpath);
877 TString stest = "plugin_test_copy";
878 TFile f(stest, "RECREATE");
879 // User may not have write permissions to current directory
881 Error("CheckFileCopy", "Cannot create local test file. Do you have write access to current directory: <%s> ?",
882 gSystem->WorkingDirectory());
886 if (FileExists(Form("alien://%s/%s",alienpath, stest.Data()))) gGrid->Rm(Form("alien://%s/%s",alienpath, stest.Data()));
887 if (!TFile::Cp(stest.Data(), Form("alien://%s/%s",alienpath, stest.Data()))) {
888 Error("CheckFileCopy", "Cannot copy files to Alien destination: <%s> This may be temporary, or: \
889 \n# 1. Make sure you have write permissions there. If this is the case: \
890 \n# 2. Check the storage availability at: http://alimonitor.cern.ch/stats?page=SE/table \
891 \n# Do: export alien_CLOSE_SE=\"working_disk_SE\" \
892 \n# To make this permanent put in in your .bashrc (in .alienshrc is not enough) \
893 \n# Redo token: rm /tmp/x509up_u$UID then: alien-token-init <username>", alienpath);
894 gSystem->Unlink(stest.Data());
897 gSystem->Unlink(stest.Data());
898 gGrid->Rm(Form("%s/%s",alienpath,stest.Data()));
899 Info("CheckFileCopy", "### ...SUCCESS ###");
903 //______________________________________________________________________________
904 Bool_t AliAnalysisAlien::CheckInputData()
906 // Check validity of input data. If necessary, create xml files.
907 if (fProductionMode) return kTRUE;
908 if (!fInputFiles && !fRunNumbers.Length() && !fRunRange[0]) {
909 if (!fGridDataDir.Length()) {
910 Error("CkeckInputData", "AliEn path to base data directory must be set.\n = Use: SetGridDataDir()");
914 Error("CheckInputData", "Merging via jdl works only with run numbers, run range or provided xml");
917 Info("CheckInputData", "Analysis will make a single xml for base data directory %s",fGridDataDir.Data());
918 if (fDataPattern.Contains("tag") && TestBit(AliAnalysisGrid::kTest))
919 TObject::SetBit(AliAnalysisGrid::kUseTags, kTRUE); // ADDED (fix problem in determining the tag usage in test mode)
922 // Process declared files
923 Bool_t isCollection = kFALSE;
924 Bool_t isXml = kFALSE;
925 Bool_t useTags = kFALSE;
926 Bool_t checked = kFALSE;
927 if (!TestBit(AliAnalysisGrid::kTest)) CdWork();
929 TString workdir = gGrid->GetHomeDirectory();
930 workdir += fGridWorkingDir;
933 TIter next(fInputFiles);
934 while ((objstr=(TObjString*)next())) {
937 file += objstr->GetString();
938 // Store full lfn path
939 if (FileExists(file)) objstr->SetString(file);
941 file = objstr->GetName();
942 if (!FileExists(objstr->GetName())) {
943 Error("CheckInputData", "Data file %s not found or not in your working dir: %s",
944 objstr->GetName(), workdir.Data());
948 Bool_t iscoll, isxml, usetags;
949 CheckDataType(file, iscoll, isxml, usetags);
952 isCollection = iscoll;
955 TObject::SetBit(AliAnalysisGrid::kUseTags, useTags);
957 if ((iscoll != isCollection) || (isxml != isXml) || (usetags != useTags)) {
958 Error("CheckInputData", "Some conflict was found in the types of inputs");
964 // Process requested run numbers
965 if (!fRunNumbers.Length() && !fRunRange[0]) return kTRUE;
966 // Check validity of alien data directory
967 if (!fGridDataDir.Length()) {
968 Error("CkeckInputData", "AliEn path to base data directory must be set.\n = Use: SetGridDataDir()");
971 if (!DirectoryExists(fGridDataDir)) {
972 Error("CheckInputData", "Data directory %s not existing.", fGridDataDir.Data());
976 Error("CheckInputData", "You are using raw AliEn collections as input. Cannot process run numbers.");
980 if (checked && !isXml) {
981 Error("CheckInputData", "Cannot mix processing of full runs with non-xml files");
984 // Check validity of run number(s)
989 TString schunk, schunk2;
993 useTags = fDataPattern.Contains("tag");
994 TObject::SetBit(AliAnalysisGrid::kUseTags, useTags);
996 if (useTags != fDataPattern.Contains("tag")) {
997 Error("CheckInputData", "Cannot mix input files using/not using tags");
1000 if (fRunNumbers.Length()) {
1001 Info("CheckDataType", "Using supplied run numbers (run ranges are ignored)");
1002 arr = fRunNumbers.Tokenize(" ");
1004 while ((os=(TObjString*)next())) {
1005 path = Form("%s/%s ", fGridDataDir.Data(), os->GetString().Data());
1006 if (!DirectoryExists(path)) {
1007 Warning("CheckInputData", "Run number %s not found in path: <%s>", os->GetString().Data(), path.Data());
1010 path = Form("%s/%s.xml", workdir.Data(),os->GetString().Data());
1011 TString msg = "\n##### file: ";
1013 msg += " type: xml_collection;";
1014 if (useTags) msg += " using_tags: Yes";
1015 else msg += " using_tags: No";
1016 Info("CheckDataType", "%s", msg.Data());
1017 if (fNrunsPerMaster<2) {
1018 AddDataFile(Form("%s.xml", os->GetString().Data()));
1021 if (((nruns-1)%fNrunsPerMaster) == 0) {
1022 schunk = os->GetString();
1024 if ((nruns%fNrunsPerMaster)!=0 && os!=arr->Last()) continue;
1025 schunk += Form("_%s.xml", os->GetString().Data());
1026 AddDataFile(schunk);
1031 Info("CheckDataType", "Using run range [%d, %d]", fRunRange[0], fRunRange[1]);
1032 for (Int_t irun=fRunRange[0]; irun<=fRunRange[1]; irun++) {
1033 format = Form("%%s/%s ", fRunPrefix.Data());
1034 path = Form(format.Data(), fGridDataDir.Data(), irun);
1035 if (!DirectoryExists(path)) {
1038 format = Form("%%s/%s.xml", fRunPrefix.Data());
1039 path = Form(format.Data(), workdir.Data(),irun);
1040 TString msg = "\n##### file: ";
1042 msg += " type: xml_collection;";
1043 if (useTags) msg += " using_tags: Yes";
1044 else msg += " using_tags: No";
1045 Info("CheckDataType", "%s", msg.Data());
1046 if (fNrunsPerMaster<2) {
1047 format = Form("%s.xml", fRunPrefix.Data());
1048 AddDataFile(Form(format.Data(),irun));
1051 if (((nruns-1)%fNrunsPerMaster) == 0) {
1052 schunk = Form(fRunPrefix.Data(),irun);
1054 format = Form("_%s.xml", fRunPrefix.Data());
1055 schunk2 = Form(format.Data(), irun);
1056 if ((nruns%fNrunsPerMaster)!=0 && irun != fRunRange[1]) continue;
1058 AddDataFile(schunk);
1063 AddDataFile(schunk);
1069 //______________________________________________________________________________
1070 Int_t AliAnalysisAlien::CopyLocalDataset(const char *griddir, const char *pattern, Int_t nfiles, const char *output, const char *archivefile, const char *outputdir)
1072 // Copy data from the given grid directory according a pattern and make a local
1074 // archivefile (optional) results in that the archive containing the file <pattern> is copied. archivefile can contain a list of files (semicolon-separated) which are all copied
1076 Error("CopyLocalDataset", "Cannot copy local dataset with no grid connection");
1079 if (!DirectoryExists(griddir)) {
1080 Error("CopyLocalDataset", "Data directory %s not existing.", griddir);
1083 TString command = Form("find -z -l %d %s %s", nfiles, griddir, pattern);
1084 printf("Running command: %s\n", command.Data());
1085 TGridResult *res = gGrid->Command(command);
1086 Int_t nfound = res->GetEntries();
1088 Error("CopyLocalDataset", "No file found in <%s> having pattern <%s>", griddir, pattern);
1091 printf("... found %d files. Copying locally ...\n", nfound);
1094 TObjArray* additionalArchives = 0;
1095 if (strlen(archivefile) > 0 && TString(archivefile).Contains(";")) {
1096 additionalArchives = TString(archivefile).Tokenize(";");
1097 archivefile = additionalArchives->At(0)->GetName();
1098 additionalArchives->RemoveAt(0);
1099 additionalArchives->Compress();
1102 // Copy files locally
1104 out.open(output, ios::out);
1106 TString turl, dirname, filename, temp;
1107 TString cdir = gSystem->WorkingDirectory();
1108 gSystem->MakeDirectory(outputdir);
1109 gSystem->ChangeDirectory(outputdir);
1111 for (Int_t i=0; i<nfound; i++) {
1112 map = (TMap*)res->At(i);
1113 turl = map->GetValue("turl")->GetName();
1114 filename = gSystem->BaseName(turl.Data());
1115 dirname = gSystem->DirName(turl.Data());
1116 dirname = gSystem->BaseName(dirname.Data());
1117 gSystem->MakeDirectory(dirname);
1119 TString source(turl);
1120 TString targetFileName(filename);
1122 if (strlen(archivefile) > 0) {
1123 // TODO here the archive in which the file resides should be determined
1124 // however whereis returns only a guid, and guid2lfn does not work
1125 // Therefore we use the one provided as argument for now
1126 source = Form("%s/%s", gSystem->DirName(source.Data()), archivefile);
1127 targetFileName = archivefile;
1129 if (TFile::Cp(source, Form("file:./%s/%s", dirname.Data(), targetFileName.Data()))) {
1130 Bool_t success = kTRUE;
1131 if (additionalArchives) {
1132 for (Int_t j=0; j<additionalArchives->GetEntriesFast(); j++) {
1134 target.Form("./%s/%s", dirname.Data(), additionalArchives->At(j)->GetName());
1135 gSystem->MakeDirectory(gSystem->DirName(target));
1136 success &= TFile::Cp(Form("%s/%s", gSystem->DirName(source.Data()), additionalArchives->At(j)->GetName()), Form("file:%s", target.Data()));
1141 if (strlen(archivefile) > 0) targetFileName = Form("%s#%s", targetFileName.Data(), gSystem->BaseName(turl.Data()));
1142 out << cdir << Form("/%s/%s/%s", outputdir, dirname.Data(), targetFileName.Data()) << endl;
1147 gSystem->ChangeDirectory(cdir);
1149 delete additionalArchives;
1153 //______________________________________________________________________________
1154 Bool_t AliAnalysisAlien::CreateDataset(const char *pattern)
1156 // Create dataset for the grid data directory + run number.
1157 const Int_t gMaxEntries = 15000;
1158 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline)) return kTRUE;
1160 Error("CreateDataset", "Cannot create dataset with no grid connection");
1165 if (!TestBit(AliAnalysisGrid::kTest)) CdWork();
1166 TString workdir = gGrid->GetHomeDirectory();
1167 workdir += fGridWorkingDir;
1169 // Compose the 'find' command arguments
1172 TString options = "-x collection ";
1173 if (TestBit(AliAnalysisGrid::kTest)) options += Form("-l %d ", fNtestFiles);
1174 else options += Form("-l %d ", gMaxEntries); // Protection for the find command
1175 TString conditions = "";
1182 TString schunk, schunk2;
1183 TGridCollection *cbase=0, *cadd=0;
1184 if (!fRunNumbers.Length() && !fRunRange[0]) {
1185 if (fInputFiles && fInputFiles->GetEntries()) return kTRUE;
1186 // Make a single data collection from data directory.
1187 path = fGridDataDir;
1188 if (!DirectoryExists(path)) {
1189 Error("CreateDataset", "Path to data directory %s not valid",fGridDataDir.Data());
1193 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
1194 else file = Form("%s.xml", gSystem->BaseName(path));
1198 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
1200 command += Form("%s -o %d ",options.Data(), nstart);
1204 command += conditions;
1205 printf("command: %s\n", command.Data());
1206 TGridResult *res = gGrid->Command(command);
1207 if (res) delete res;
1208 // Write standard output to file
1209 gROOT->ProcessLine(Form("gGrid->Stdout(); > __tmp%d__%s", stage, file.Data()));
1210 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
1211 Bool_t nullFile = kFALSE;
1213 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
1215 nullFile = (gSystem->Exec(Form("grep -c /event __tmp%d__%s 2>/dev/null > __tmp__",stage,file.Data()))==0)?kFALSE:kTRUE;
1217 Error("CreateDataset","Dataset %s produced by the previous find command is empty !", file.Data());
1218 gSystem->Exec("rm -f __tmp*");
1226 gSystem->Exec("rm -f __tmp__");
1227 ncount = line.Atoi();
1230 if (ncount == gMaxEntries) {
1231 Info("CreateDataset", "Dataset %s has more than 15K entries. Trying to merge...", file.Data());
1232 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1233 if (!cbase) cbase = cadd;
1241 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1242 printf("... please wait - TAlienCollection::Add() scales badly...\n");
1245 cbase->ExportXML(Form("file://%s", file.Data()),kFALSE,kFALSE, file, "Merged entries for a run");
1246 delete cbase; cbase = 0;
1248 TFile::Cp(Form("__tmp%d__%s",stage, file.Data()), file.Data());
1250 gSystem->Exec("rm -f __tmp*");
1251 Info("CreateDataset", "Created dataset %s with %d files", file.Data(), nstart+ncount);
1255 Bool_t fileExists = FileExists(file);
1256 if (!TestBit(AliAnalysisGrid::kTest) && (!fileExists || fOverwriteMode)) {
1257 // Copy xml file to alien space
1258 if (fileExists) gGrid->Rm(file);
1259 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
1260 if (!FileExists(file)) {
1261 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
1264 // Update list of files to be processed.
1266 AddDataFile(Form("%s/%s", workdir.Data(), file.Data()));
1270 Bool_t nullResult = kTRUE;
1271 if (fRunNumbers.Length()) {
1272 TObjArray *arr = fRunNumbers.Tokenize(" ");
1275 while ((os=(TObjString*)next())) {
1278 path = Form("%s/%s/ ", fGridDataDir.Data(), os->GetString().Data());
1279 if (!DirectoryExists(path)) continue;
1281 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
1282 else file = Form("%s.xml", os->GetString().Data());
1283 // If local collection file does not exist, create it via 'find' command.
1287 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
1289 command += Form("%s -o %d ",options.Data(), nstart);
1292 command += conditions;
1293 TGridResult *res = gGrid->Command(command);
1294 if (res) delete res;
1295 // Write standard output to file
1296 gROOT->ProcessLine(Form("gGrid->Stdout(); > __tmp%d__%s", stage,file.Data()));
1297 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
1298 Bool_t nullFile = kFALSE;
1300 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
1302 nullFile = (gSystem->Exec(Form("grep -c /event __tmp%d__%s 2>/dev/null > __tmp__",stage,file.Data()))==0)?kFALSE:kTRUE;
1304 Warning("CreateDataset","Dataset %s produced by: <%s> is empty !", file.Data(), command.Data());
1305 gSystem->Exec("rm -f __tmp*");
1306 fRunNumbers.ReplaceAll(os->GetString().Data(), "");
1314 gSystem->Exec("rm -f __tmp__");
1315 ncount = line.Atoi();
1317 nullResult = kFALSE;
1319 if (ncount == gMaxEntries) {
1320 Info("CreateDataset", "Dataset %s has more than 15K entries. Trying to merge...", file.Data());
1321 if (fNrunsPerMaster > 1) {
1322 Error("CreateDataset", "File %s has more than %d entries. Please set the number of runs per master to 1 !",
1323 file.Data(),gMaxEntries);
1326 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1327 if (!cbase) cbase = cadd;
1334 if (cbase && fNrunsPerMaster<2) {
1335 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1336 printf("... please wait - TAlienCollection::Add() scales badly...\n");
1339 cbase->ExportXML(Form("file://%s", file.Data()),kFALSE,kFALSE, file, "Merged entries for a run");
1340 delete cbase; cbase = 0;
1342 TFile::Cp(Form("__tmp%d__%s",stage, file.Data()), file.Data());
1344 gSystem->Exec("rm -f __tmp*");
1345 Info("CreateDataset", "Created dataset %s with %d files", file.Data(), nstart+ncount);
1349 if (TestBit(AliAnalysisGrid::kTest)) break;
1350 // Check if there is one run per master job.
1351 if (fNrunsPerMaster<2) {
1352 if (FileExists(file)) {
1353 if (fOverwriteMode) gGrid->Rm(file);
1355 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
1359 // Copy xml file to alien space
1360 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
1361 if (!FileExists(file)) {
1362 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
1368 if (((nruns-1)%fNrunsPerMaster) == 0) {
1369 schunk = os->GetString();
1370 cbase = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
1372 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
1373 printf(" Merging collection <%s> into masterjob input...\n", file.Data());
1377 if ((nruns%fNrunsPerMaster)!=0 && os!=arr->Last()) {
1380 schunk += Form("_%s.xml", os->GetString().Data());
1381 if (FileExists(schunk)) {
1382 if (fOverwriteMode) gGrid->Rm(file);
1384 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", schunk.Data());
1388 printf("Exporting merged collection <%s> and copying to AliEn\n", schunk.Data());
1389 cbase->ExportXML(Form("file://%s", schunk.Data()),kFALSE,kFALSE, schunk, "Merged runs");
1390 TFile::Cp(Form("file:%s",schunk.Data()), Form("alien://%s/%s",workdir.Data(), schunk.Data()));
1391 if (!FileExists(schunk)) {
1392 Error("CreateDataset", "Copy command did NOT succeed for %s", schunk.Data());
1400 Error("CreateDataset", "No valid dataset corresponding to the query!");
1404 // Process a full run range.
1405 for (Int_t irun=fRunRange[0]; irun<=fRunRange[1]; irun++) {
1406 format = Form("%%s/%s ", fRunPrefix.Data());
1409 path = Form(format.Data(), fGridDataDir.Data(), irun);
1410 if (!DirectoryExists(path)) continue;
1412 format = Form("%s.xml", fRunPrefix.Data());
1413 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
1414 else file = Form(format.Data(), irun);
1415 if (FileExists(file) && fNrunsPerMaster<2 && !TestBit(AliAnalysisGrid::kTest)) {
1416 if (fOverwriteMode) gGrid->Rm(file);
1418 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
1422 // If local collection file does not exist, create it via 'find' command.
1426 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
1428 command += Form("%s -o %d ",options.Data(), nstart);
1431 command += conditions;
1432 TGridResult *res = gGrid->Command(command);
1433 if (res) delete res;
1434 // Write standard output to file
1435 gROOT->ProcessLine(Form("gGrid->Stdout(); > __tmp%d__%s", stage,file.Data()));
1436 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
1437 Bool_t nullFile = kFALSE;
1439 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
1441 nullFile = (gSystem->Exec(Form("grep -c /event __tmp%d__%s 2>/dev/null > __tmp__",stage,file.Data()))==0)?kFALSE:kTRUE;
1443 Warning("CreateDataset","Dataset %s produced by: <%s> is empty !", file.Data(), command.Data());
1444 gSystem->Exec("rm -f __tmp*");
1452 gSystem->Exec("rm -f __tmp__");
1453 ncount = line.Atoi();
1455 nullResult = kFALSE;
1457 if (ncount == gMaxEntries) {
1458 Info("CreateDataset", "Dataset %s has more than 15K entries. Trying to merge...", file.Data());
1459 if (fNrunsPerMaster > 1) {
1460 Error("CreateDataset", "File %s has more than %d entries. Please set the number of runs per master to 1 !",
1461 file.Data(),gMaxEntries);
1464 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1465 if (!cbase) cbase = cadd;
1472 if (cbase && fNrunsPerMaster<2) {
1473 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1474 printf("... please wait - TAlienCollection::Add() scales badly...\n");
1477 cbase->ExportXML(Form("file://%s", file.Data()),kFALSE,kFALSE, file, "Merged entries for a run");
1478 delete cbase; cbase = 0;
1480 TFile::Cp(Form("__tmp%d__%s",stage, file.Data()), file.Data());
1482 Info("CreateDataset", "Created dataset %s with %d files", file.Data(), nstart+ncount);
1486 if (TestBit(AliAnalysisGrid::kTest)) break;
1487 // Check if there is one run per master job.
1488 if (fNrunsPerMaster<2) {
1489 if (FileExists(file)) {
1490 if (fOverwriteMode) gGrid->Rm(file);
1492 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
1496 // Copy xml file to alien space
1497 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
1498 if (!FileExists(file)) {
1499 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
1504 // Check if the collection for the chunk exist locally.
1505 Int_t nchunk = (nruns-1)/fNrunsPerMaster;
1506 if (FileExists(fInputFiles->At(nchunk)->GetName())) {
1507 if (fOverwriteMode) gGrid->Rm(fInputFiles->At(nchunk)->GetName());
1510 printf(" Merging collection <%s> into %d runs chunk...\n",file.Data(),fNrunsPerMaster);
1511 if (((nruns-1)%fNrunsPerMaster) == 0) {
1512 schunk = Form(fRunPrefix.Data(), irun);
1513 cbase = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
1515 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
1519 format = Form("%%s_%s.xml", fRunPrefix.Data());
1520 schunk2 = Form(format.Data(), schunk.Data(), irun);
1521 if ((nruns%fNrunsPerMaster)!=0 && irun!=fRunRange[1] && schunk2 != fInputFiles->Last()->GetName()) {
1525 if (FileExists(schunk)) {
1526 if (fOverwriteMode) gGrid->Rm(schunk);
1528 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", schunk.Data());
1532 printf("Exporting merged collection <%s> and copying to AliEn.\n", schunk.Data());
1533 cbase->ExportXML(Form("file://%s", schunk.Data()),kFALSE,kFALSE, schunk, "Merged runs");
1534 if (FileExists(schunk)) {
1535 if (fOverwriteMode) gGrid->Rm(schunk);
1537 Info("CreateDataset", "\n##### Dataset %s exist. Skipping copy...", schunk.Data());
1541 TFile::Cp(Form("file:%s",schunk.Data()), Form("alien://%s/%s",workdir.Data(), schunk.Data()));
1542 if (!FileExists(schunk)) {
1543 Error("CreateDataset", "Copy command did NOT succeed for %s", schunk.Data());
1549 Error("CreateDataset", "No valid dataset corresponding to the query!");
1556 //______________________________________________________________________________
1557 Bool_t AliAnalysisAlien::CreateJDL()
1559 // Generate a JDL file according to current settings. The name of the file is
1560 // specified by fJDLName.
1561 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
1562 Bool_t error = kFALSE;
1564 Bool_t copy = kTRUE;
1565 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
1566 Bool_t generate = kTRUE;
1567 if (TestBit(AliAnalysisGrid::kTest) || TestBit(AliAnalysisGrid::kSubmit)) generate = kFALSE;
1569 Error("CreateJDL", "Alien connection required");
1572 // Check validity of alien workspace
1574 if (!fProductionMode && !fGridWorkingDir.BeginsWith("/alice")) workdir = gGrid->GetHomeDirectory();
1575 if (!fProductionMode && !TestBit(AliAnalysisGrid::kTest)) CdWork();
1576 workdir += fGridWorkingDir;
1580 Error("CreateJDL()", "Define some input files for your analysis.");
1583 // Compose list of input files
1584 // Check if output files were defined
1585 if (!fOutputFiles.Length()) {
1586 Error("CreateJDL", "You must define at least one output file");
1589 // Check if an output directory was defined and valid
1590 if (!fGridOutputDir.Length()) {
1591 Error("CreateJDL", "You must define AliEn output directory");
1594 if (!fProductionMode) {
1595 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s", workdir.Data(), fGridOutputDir.Data());
1596 if (!DirectoryExists(fGridOutputDir)) {
1597 if (gGrid->Mkdir(fGridOutputDir,"-p")) {
1598 Info("CreateJDL", "\n##### Created alien output directory %s", fGridOutputDir.Data());
1600 Error("CreateJDL", "Could not create alien output directory %s", fGridOutputDir.Data());
1604 Warning("CreateJDL", "#### Output directory %s exists! If this contains old data, jobs will fail with ERROR_SV !!! ###", fGridOutputDir.Data());
1609 // Exit if any error up to now
1610 if (error) return kFALSE;
1612 if (!fUser.IsNull()) {
1613 fGridJDL->SetValue("User", Form("\"%s\"", fUser.Data()));
1614 fMergingJDL->SetValue("User", Form("\"%s\"", fUser.Data()));
1616 fGridJDL->SetExecutable(fExecutable, "This is the startup script");
1617 TString mergeExec = fExecutable;
1618 mergeExec.ReplaceAll(".sh", "_merge.sh");
1619 fMergingJDL->SetExecutable(mergeExec, "This is the startup script");
1620 mergeExec.ReplaceAll(".sh", ".C");
1621 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),mergeExec.Data()), "List of input files to be uploaded to workers");
1622 if (!fArguments.IsNull())
1623 fGridJDL->SetArguments(fArguments, "Arguments for the executable command");
1624 if (IsOneStageMerging()) fMergingJDL->SetArguments(fGridOutputDir);
1626 if (fProductionMode) fMergingJDL->SetArguments("wn.xml $4"); // xml, stage
1627 else fMergingJDL->SetArguments("wn.xml $2"); // xml, stage
1630 fGridJDL->SetValue("TTL", Form("\"%d\"",fTTL));
1631 fGridJDL->SetDescription("TTL", Form("Time after which the job is killed (%d min.)", fTTL/60));
1632 fMergingJDL->SetValue("TTL", Form("\"%d\"",fTTL));
1633 fMergingJDL->SetDescription("TTL", Form("Time after which the job is killed (%d min.)", fTTL/60));
1635 if (fMaxInitFailed > 0) {
1636 fGridJDL->SetValue("MaxInitFailed", Form("\"%d\"",fMaxInitFailed));
1637 fGridJDL->SetDescription("MaxInitFailed", "Maximum number of first failing jobs to abort the master job");
1639 if (fSplitMaxInputFileNumber > 0) {
1640 fGridJDL->SetValue("SplitMaxInputFileNumber", Form("\"%d\"", fSplitMaxInputFileNumber));
1641 fGridJDL->SetDescription("SplitMaxInputFileNumber", "Maximum number of input files to be processed per subjob");
1643 if (!IsOneStageMerging()) {
1644 fMergingJDL->SetValue("SplitMaxInputFileNumber", Form("\"%d\"",fMaxMergeFiles));
1645 fMergingJDL->SetDescription("SplitMaxInputFileNumber", "Maximum number of input files to be merged in one go");
1647 if (fSplitMode.Length()) {
1648 fGridJDL->SetValue("Split", Form("\"%s\"", fSplitMode.Data()));
1649 fGridJDL->SetDescription("Split", "We split per SE or file");
1651 fMergingJDL->SetValue("Split", "\"se\"");
1652 fMergingJDL->SetDescription("Split", "We split per SE for merging in stages");
1653 if (!fAliROOTVersion.IsNull()) {
1654 fGridJDL->AddToPackages("AliRoot", fAliROOTVersion,"VO_ALICE", "List of requested packages");
1655 fMergingJDL->AddToPackages("AliRoot", fAliROOTVersion, "VO_ALICE", "List of requested packages");
1657 if (!fROOTVersion.IsNull()) {
1658 fGridJDL->AddToPackages("ROOT", fROOTVersion);
1659 fMergingJDL->AddToPackages("ROOT", fROOTVersion);
1661 if (!fAPIVersion.IsNull()) {
1662 fGridJDL->AddToPackages("APISCONFIG", fAPIVersion);
1663 fMergingJDL->AddToPackages("APISCONFIG", fAPIVersion);
1665 if (!fExternalPackages.IsNull()) {
1666 arr = fExternalPackages.Tokenize(" ");
1668 while ((os=(TObjString*)next())) {
1669 TString pkgname = os->GetString();
1670 Int_t index = pkgname.Index("::");
1671 TString pkgversion = pkgname(index+2, pkgname.Length());
1672 pkgname.Remove(index);
1673 fGridJDL->AddToPackages(pkgname, pkgversion);
1674 fMergingJDL->AddToPackages(pkgname, pkgversion);
1678 fGridJDL->SetInputDataListFormat(fInputFormat, "Format of input data");
1679 fGridJDL->SetInputDataList("wn.xml", "Collection name to be processed on each worker node");
1680 fMergingJDL->SetInputDataListFormat(fInputFormat, "Format of input data");
1681 fMergingJDL->SetInputDataList("wn.xml", "Collection name to be processed on each worker node");
1682 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), fAnalysisMacro.Data()), "List of input files to be uploaded to workers");
1683 TString analysisFile = fExecutable;
1684 analysisFile.ReplaceAll(".sh", ".root");
1685 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),analysisFile.Data()));
1686 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),analysisFile.Data()));
1687 if (fAdditionalLibs.Length()) {
1688 arr = fAdditionalLibs.Tokenize(" ");
1690 while ((os=(TObjString*)next())) {
1691 if (os->GetString().Contains(".so")) continue;
1692 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), os->GetString().Data()));
1693 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), os->GetString().Data()));
1698 TIter next(fPackages);
1700 while ((obj=next())) {
1701 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), obj->GetName()));
1702 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), obj->GetName()));
1705 const char *comment = "List of output files and archives";
1706 if (fOutputArchive.Length()) {
1707 TString outputArchive = fOutputArchive;
1708 if (!fRegisterExcludes.IsNull()) {
1709 arr = fRegisterExcludes.Tokenize(" ");
1711 while ((os=(TObjString*)next1())) {
1712 outputArchive.ReplaceAll(Form("%s,",os->GetString().Data()),"");
1713 outputArchive.ReplaceAll(os->GetString(),"");
1717 arr = outputArchive.Tokenize(" ");
1719 Bool_t first = kTRUE;
1720 while ((os=(TObjString*)next())) {
1721 if (!os->GetString().Contains("@") && fCloseSE.Length())
1722 fGridJDL->AddToSet("Output", Form("%s@%s",os->GetString().Data(), fCloseSE.Data()));
1724 fGridJDL->AddToSet("Output", os->GetString());
1725 if (first) fGridJDL->AddToSetDescription("Output", comment);
1729 // Output archive for the merging jdl
1730 if (TestBit(AliAnalysisGrid::kDefaultOutputs)) {
1731 outputArchive = "log_archive.zip:std*@disk=1 ";
1732 // Add normal output files, extra files + terminate files
1733 TString files = GetListOfFiles("outextter");
1734 // Do not register files in fRegisterExcludes
1735 if (!fRegisterExcludes.IsNull()) {
1736 arr = fRegisterExcludes.Tokenize(" ");
1738 while ((os=(TObjString*)next1())) {
1739 files.ReplaceAll(Form("%s,",os->GetString().Data()),"");
1740 files.ReplaceAll(os->GetString(),"");
1744 files.ReplaceAll(".root", "*.root");
1746 if (mgr->IsCollectThroughput())
1747 outputArchive += Form("root_archive.zip:%s,*.stat@disk=%d %s@disk=%d",files.Data(),fNreplicas, mgr->GetFileInfoLog(),fNreplicas);
1749 outputArchive += Form("root_archive.zip:%s,*.stat@disk=%d",files.Data(),fNreplicas);
1751 TString files = fOutputArchive;
1752 files.ReplaceAll(".root", "*.root"); // nreplicas etc should be already atttached by use
1753 outputArchive = files;
1755 arr = outputArchive.Tokenize(" ");
1758 while ((os=(TObjString*)next2())) {
1759 TString currentfile = os->GetString();
1760 if (!currentfile.Contains("@") && fCloseSE.Length())
1761 fMergingJDL->AddToSet("Output", Form("%s@%s",currentfile.Data(), fCloseSE.Data()));
1763 fMergingJDL->AddToSet("Output", currentfile);
1764 if (first) fMergingJDL->AddToSetDescription("Output", comment);
1769 arr = fOutputFiles.Tokenize(",");
1771 Bool_t first = kTRUE;
1772 while ((os=(TObjString*)next())) {
1773 // Ignore ouputs in jdl that are also in outputarchive
1774 TString sout = os->GetString();
1775 sout.ReplaceAll("*", "");
1776 sout.ReplaceAll(".root", "");
1777 if (sout.Index("@")>0) sout.Remove(sout.Index("@"));
1778 if (fOutputArchive.Contains(sout)) continue;
1779 // Ignore fRegisterExcludes
1780 if (fRegisterExcludes.Contains(sout)) continue;
1781 if (!first) comment = NULL;
1782 if (!os->GetString().Contains("@") && fCloseSE.Length())
1783 fGridJDL->AddToSet("Output", Form("%s@%s",os->GetString().Data(), fCloseSE.Data()));
1785 fGridJDL->AddToSet("Output", os->GetString());
1786 if (first) fGridJDL->AddToSetDescription("Output", comment);
1787 if (fMergeExcludes.Contains(sout)) continue;
1788 if (!os->GetString().Contains("@") && fCloseSE.Length())
1789 fMergingJDL->AddToSet("Output", Form("%s@%s",os->GetString().Data(), fCloseSE.Data()));
1791 fMergingJDL->AddToSet("Output", os->GetString());
1792 if (first) fMergingJDL->AddToSetDescription("Output", comment);
1796 fGridJDL->SetPrice((UInt_t)fPrice, "AliEn price for this job");
1797 fMergingJDL->SetPrice((UInt_t)fPrice, "AliEn price for this job");
1798 TString validationScript = fValidationScript;
1799 fGridJDL->SetValidationCommand(Form("%s/%s", workdir.Data(),validationScript.Data()), "Validation script to be run for each subjob");
1800 validationScript.ReplaceAll(".sh", "_merge.sh");
1801 fMergingJDL->SetValidationCommand(Form("%s/%s", workdir.Data(),validationScript.Data()), "Validation script to be run for each subjob");
1802 if (fMasterResubmitThreshold) {
1803 fGridJDL->SetValue("MasterResubmitThreshold", Form("\"%d%%\"", fMasterResubmitThreshold));
1804 fGridJDL->SetDescription("MasterResubmitThreshold", "Resubmit failed jobs until DONE rate reaches this percentage");
1806 // Write a jdl with 2 input parameters: collection name and output dir name.
1809 // Copy jdl to grid workspace
1811 // Check if an output directory was defined and valid
1812 if (!fGridOutputDir.Length()) {
1813 Error("CreateJDL", "You must define AliEn output directory");
1816 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s", workdir.Data(), fGridOutputDir.Data());
1817 if (!fProductionMode && !DirectoryExists(fGridOutputDir)) {
1818 if (gGrid->Mkdir(fGridOutputDir,"-p")) {
1819 Info("CreateJDL", "\n##### Created alien output directory %s", fGridOutputDir.Data());
1821 Error("CreateJDL", "Could not create alien output directory %s", fGridOutputDir.Data());
1827 if (TestBit(AliAnalysisGrid::kSubmit)) {
1828 TString mergeJDLName = fExecutable;
1829 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
1830 TString locjdl = Form("%s/%s", fGridOutputDir.Data(),fJDLName.Data());
1831 TString locjdl1 = Form("%s/%s", fGridOutputDir.Data(),mergeJDLName.Data());
1832 if (fProductionMode) {
1833 locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
1834 locjdl1 = Form("%s/%s", workdir.Data(),mergeJDLName.Data());
1836 if (FileExists(locjdl)) gGrid->Rm(locjdl);
1837 if (FileExists(locjdl1)) gGrid->Rm(locjdl1);
1838 Info("CreateJDL", "\n##### Copying JDL file <%s> to your AliEn output directory", fJDLName.Data());
1839 if (!copyLocal2Alien("CreateJDL", fJDLName, locjdl))
1840 Fatal("","Terminating");
1841 // TFile::Cp(Form("file:%s",fJDLName.Data()), Form("alien://%s", locjdl.Data()));
1843 Info("CreateJDL", "\n##### Copying merging JDL file <%s> to your AliEn output directory", mergeJDLName.Data());
1844 // TFile::Cp(Form("file:%s",mergeJDLName.Data()), Form("alien://%s", locjdl1.Data()));
1845 if (!copyLocal2Alien("CreateJDL", mergeJDLName.Data(), locjdl1))
1846 Fatal("","Terminating");
1849 if (fAdditionalLibs.Length()) {
1850 arr = fAdditionalLibs.Tokenize(" ");
1853 while ((os=(TObjString*)next())) {
1854 if (os->GetString().Contains(".so")) continue;
1855 Info("CreateJDL", "\n##### Copying dependency: <%s> to your alien workspace", os->GetString().Data());
1856 if (FileExists(os->GetString())) gGrid->Rm(os->GetString());
1857 // TFile::Cp(Form("file:%s",os->GetString().Data()), Form("alien://%s/%s", workdir.Data(), os->GetString().Data()));
1858 if (!copyLocal2Alien("CreateJDL", os->GetString().Data(),
1859 Form("%s/%s", workdir.Data(), os->GetString().Data())))
1860 Fatal("","Terminating");
1865 TIter next(fPackages);
1867 while ((obj=next())) {
1868 if (FileExists(obj->GetName())) gGrid->Rm(obj->GetName());
1869 Info("CreateJDL", "\n##### Copying dependency: <%s> to your alien workspace", obj->GetName());
1870 // TFile::Cp(Form("file:%s",obj->GetName()), Form("alien://%s/%s", workdir.Data(), obj->GetName()));
1871 if (!copyLocal2Alien("CreateJDL",obj->GetName(),
1872 Form("%s/%s", workdir.Data(), obj->GetName())))
1873 Fatal("","Terminating");
1880 //______________________________________________________________________________
1881 Bool_t AliAnalysisAlien::WriteJDL(Bool_t copy)
1883 // Writes one or more JDL's corresponding to findex. If findex is negative,
1884 // all run numbers are considered in one go (jdl). For non-negative indices
1885 // they correspond to the indices in the array fInputFiles.
1886 if (!fInputFiles) return kFALSE;
1889 if (!fProductionMode && !fGridWorkingDir.BeginsWith("/alice")) workdir = gGrid->GetHomeDirectory();
1890 workdir += fGridWorkingDir;
1891 TString stageName = "$2";
1892 if (fProductionMode) stageName = "$4";
1893 if (!fMergeDirName.IsNull()) {
1894 fMergingJDL->AddToInputDataCollection(Form("LF:$1/%s/Stage_%s.xml,nodownload",fMergeDirName.Data(),stageName.Data()), "Collection of files to be merged for current stage");
1895 fMergingJDL->SetOutputDirectory(Form("$1/%s/Stage_%s/#alien_counter_03i#",fMergeDirName.Data(),stageName.Data()), "Output directory");
1897 fMergingJDL->AddToInputDataCollection(Form("LF:$1/Stage_%s.xml,nodownload",stageName.Data()), "Collection of files to be merged for current stage");
1898 fMergingJDL->SetOutputDirectory(Form("$1/Stage_%s/#alien_counter_03i#",stageName.Data()), "Output directory");
1900 if (fProductionMode) {
1901 TIter next(fInputFiles);
1902 while ((os=next())) {
1903 fGridJDL->AddToInputDataCollection(Form("LF:%s,nodownload", os->GetName()), "Input xml collections");
1905 if (!fOutputToRunNo)
1906 fGridJDL->SetOutputDirectory(Form("%s/#alien_counter_04i#", fGridOutputDir.Data()));
1908 fGridJDL->SetOutputDirectory(fGridOutputDir);
1910 if (!fRunNumbers.Length() && !fRunRange[0]) {
1911 // One jdl with no parameters in case input data is specified by name.
1912 TIter next(fInputFiles);
1914 fGridJDL->AddToInputDataCollection(Form("LF:%s,nodownload", os->GetName()), "Input xml collections");
1915 if (!fOutputSingle.IsNull())
1916 fGridJDL->SetOutputDirectory(Form("#alienfulldir#/../%s",fOutputSingle.Data()), "Output directory");
1918 fGridJDL->SetOutputDirectory(Form("%s/#alien_counter_03i#", fGridOutputDir.Data()), "Output directory");
1919 fMergingJDL->SetOutputDirectory(fGridOutputDir);
1922 // One jdl to be submitted with 2 input parameters: data collection name and output dir prefix
1923 fGridJDL->AddToInputDataCollection(Form("LF:%s/$1,nodownload", workdir.Data()), "Input xml collections");
1924 if (!fOutputSingle.IsNull()) {
1925 if (!fOutputToRunNo) fGridJDL->SetOutputDirectory(Form("#alienfulldir#/%s",fOutputSingle.Data()), "Output directory");
1926 else fGridJDL->SetOutputDirectory(Form("%s/$2",fGridOutputDir.Data()), "Output directory");
1928 fGridJDL->SetOutputDirectory(Form("%s/$2/#alien_counter_03i#", fGridOutputDir.Data()), "Output directory");
1933 // Generate the JDL as a string
1934 TString sjdl = fGridJDL->Generate();
1935 TString sjdl1 = fMergingJDL->Generate();
1937 if (!fMergeDirName.IsNull()) {
1938 fMergingJDL->SetOutputDirectory(Form("$1/%s",fMergeDirName.Data()), "Output directory");
1939 fMergingJDL->AddToInputSandbox(Form("LF:$1/%s/Stage_%s.xml",fMergeDirName.Data(),stageName.Data()));
1941 fMergingJDL->SetOutputDirectory("$1", "Output directory");
1942 fMergingJDL->AddToInputSandbox(Form("LF:$1/Stage_%s.xml",stageName.Data()));
1944 TString sjdl2 = fMergingJDL->Generate();
1945 Int_t index, index1;
1946 sjdl.ReplaceAll("\",\"", "\",\n \"");
1947 sjdl.ReplaceAll("(member", "\n (member");
1948 sjdl.ReplaceAll("\",\"VO_", "\",\n \"VO_");
1949 sjdl.ReplaceAll("{", "{\n ");
1950 sjdl.ReplaceAll("};", "\n};");
1951 sjdl.ReplaceAll("{\n \n", "{\n");
1952 sjdl.ReplaceAll("\n\n", "\n");
1953 sjdl.ReplaceAll("OutputDirectory", "OutputDir");
1954 sjdl1.ReplaceAll("\",\"", "\",\n \"");
1955 sjdl1.ReplaceAll("(member", "\n (member");
1956 sjdl1.ReplaceAll("\",\"VO_", "\",\n \"VO_");
1957 sjdl1.ReplaceAll("{", "{\n ");
1958 sjdl1.ReplaceAll("};", "\n};");
1959 sjdl1.ReplaceAll("{\n \n", "{\n");
1960 sjdl1.ReplaceAll("\n\n", "\n");
1961 sjdl1.ReplaceAll("OutputDirectory", "OutputDir");
1962 sjdl2.ReplaceAll("\",\"", "\",\n \"");
1963 sjdl2.ReplaceAll("(member", "\n (member");
1964 sjdl2.ReplaceAll("\",\"VO_", "\",\n \"VO_");
1965 sjdl2.ReplaceAll("{", "{\n ");
1966 sjdl2.ReplaceAll("};", "\n};");
1967 sjdl2.ReplaceAll("{\n \n", "{\n");
1968 sjdl2.ReplaceAll("\n\n", "\n");
1969 sjdl2.ReplaceAll("OutputDirectory", "OutputDir");
1970 sjdl += "JDLVariables = \n{\n \"Packages\",\n \"OutputDir\"\n};\n";
1971 sjdl.Prepend(Form("Jobtag = {\n \"comment:%s\"\n};\n", fJobTag.Data()));
1972 index = sjdl.Index("JDLVariables");
1973 if (index >= 0) sjdl.Insert(index, "\n# JDL variables\n");
1974 sjdl += "Workdirectorysize = {\"5000MB\"};";
1975 sjdl1 += "Workdirectorysize = {\"5000MB\"};";
1976 sjdl1 += "JDLVariables = \n{\n \"Packages\",\n \"OutputDir\"\n};\n";
1977 index = fJobTag.Index(":");
1978 if (index < 0) index = fJobTag.Length();
1979 TString jobTag = fJobTag;
1980 if (fProductionMode) jobTag.Insert(index,"_Stage$4");
1981 sjdl1.Prepend(Form("Jobtag = {\n \"comment:%s_Merging\"\n};\n", jobTag.Data()));
1982 if (fProductionMode) {
1983 sjdl1.Prepend("# Generated merging jdl (production mode) \
1984 \n# $1 = full alien path to output directory to be merged \
1985 \n# $2 = train number \
1986 \n# $3 = production (like LHC10b) \
1987 \n# $4 = merging stage \
1988 \n# Stage_<n>.xml made via: find <OutputDir> *Stage<n-1>/*root_archive.zip\n");
1989 sjdl2.Prepend(Form("Jobtag = {\n \"comment:%s_FinalMerging\"\n};\n", jobTag.Data()));
1990 sjdl2.Prepend("# Generated merging jdl \
1991 \n# $1 = full alien path to output directory to be merged \
1992 \n# $2 = train number \
1993 \n# $3 = production (like LHC10b) \
1994 \n# $4 = merging stage \
1995 \n# Stage_<n>.xml made via: find <OutputDir> *Stage<n-1>/*root_archive.zip\n");
1997 sjdl1.Prepend("# Generated merging jdl \
1998 \n# $1 = full alien path to output directory to be merged \
1999 \n# $2 = merging stage \
2000 \n# xml made via: find <OutputDir> *Stage<n-1>/*root_archive.zip\n");
2001 sjdl2.Prepend(Form("Jobtag = {\n \"comment:%s_FinalMerging\"\n};\n", jobTag.Data()));
2002 sjdl2.Prepend("# Generated merging jdl \
2003 \n# $1 = full alien path to output directory to be merged \
2004 \n# $2 = merging stage \
2005 \n# xml made via: find <OutputDir> *Stage<n-1>/*root_archive.zip\n");
2007 index = sjdl1.Index("JDLVariables");
2008 if (index >= 0) sjdl1.Insert(index, "\n# JDL variables\n");
2009 index = sjdl2.Index("JDLVariables");
2010 if (index >= 0) sjdl2.Insert(index, "\n# JDL variables\n");
2011 sjdl1 += "Workdirectorysize = {\"5000MB\"};";
2012 sjdl2 += "Workdirectorysize = {\"5000MB\"};";
2013 index = sjdl2.Index("Split =");
2015 index1 = sjdl2.Index("\n", index);
2016 sjdl2.Remove(index, index1-index+1);
2018 index = sjdl2.Index("SplitMaxInputFileNumber");
2020 index1 = sjdl2.Index("\n", index);
2021 sjdl2.Remove(index, index1-index+1);
2023 index = sjdl2.Index("InputDataCollection");
2025 index1 = sjdl2.Index(";", index);
2026 sjdl2.Remove(index, index1-index+1);
2028 index = sjdl2.Index("InputDataListFormat");
2030 index1 = sjdl2.Index("\n", index);
2031 sjdl2.Remove(index, index1-index+1);
2033 index = sjdl2.Index("InputDataList");
2035 index1 = sjdl2.Index("\n", index);
2036 sjdl2.Remove(index, index1-index+1);
2038 sjdl2.ReplaceAll("wn.xml", Form("Stage_%s.xml",stageName.Data()));
2039 // Write jdl to file
2041 out.open(fJDLName.Data(), ios::out);
2043 Error("WriteJDL", "Bad file name: %s", fJDLName.Data());
2046 out << sjdl << endl;
2048 TString mergeJDLName = fExecutable;
2049 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
2052 out1.open(mergeJDLName.Data(), ios::out);
2054 Error("WriteJDL", "Bad file name: %s", mergeJDLName.Data());
2057 out1 << sjdl1 << endl;
2060 TString finalJDL = mergeJDLName;
2061 finalJDL.ReplaceAll(".jdl", "_final.jdl");
2062 out2.open(finalJDL.Data(), ios::out);
2064 Error("WriteJDL", "Bad file name: %s", finalJDL.Data());
2067 out2 << sjdl2 << endl;
2071 // Copy jdl to grid workspace
2073 Info("WriteJDL", "\n##### You may want to review jdl:%s and analysis macro:%s before running in <submit> mode", fJDLName.Data(), fAnalysisMacro.Data());
2075 TString locjdl = Form("%s/%s", fGridOutputDir.Data(),fJDLName.Data());
2076 TString locjdl1 = Form("%s/%s", fGridOutputDir.Data(),mergeJDLName.Data());
2077 TString finalJDL = mergeJDLName;
2078 finalJDL.ReplaceAll(".jdl", "_final.jdl");
2079 TString locjdl2 = Form("%s/%s", fGridOutputDir.Data(),finalJDL.Data());
2080 if (fProductionMode) {
2081 locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
2082 locjdl1 = Form("%s/%s", workdir.Data(),mergeJDLName.Data());
2083 locjdl2 = Form("%s/%s", workdir.Data(),finalJDL.Data());
2085 if (FileExists(locjdl)) gGrid->Rm(locjdl);
2086 if (FileExists(locjdl1)) gGrid->Rm(locjdl1);
2087 if (FileExists(locjdl2)) gGrid->Rm(locjdl2);
2088 Info("WriteJDL", "\n##### Copying JDL file <%s> to your AliEn output directory", fJDLName.Data());
2089 // TFile::Cp(Form("file:%s",fJDLName.Data()), Form("alien://%s", locjdl.Data()));
2090 if (!copyLocal2Alien("WriteJDL",fJDLName.Data(),locjdl.Data()))
2091 Fatal("","Terminating");
2093 Info("WriteJDL", "\n##### Copying merging JDL files <%s> to your AliEn output directory", mergeJDLName.Data());
2094 // TFile::Cp(Form("file:%s",mergeJDLName.Data()), Form("alien://%s", locjdl1.Data()));
2095 // TFile::Cp(Form("file:%s",finalJDL.Data()), Form("alien://%s", locjdl2.Data()));
2096 if (!copyLocal2Alien("WriteJDL",mergeJDLName.Data(),locjdl1.Data()))
2097 Fatal("","Terminating");
2098 if (!copyLocal2Alien("WriteJDL",finalJDL.Data(),locjdl2.Data()))
2099 Fatal("","Terminating");
2105 //______________________________________________________________________________
2106 Bool_t AliAnalysisAlien::FileExists(const char *lfn)
2108 // Returns true if file exists.
2109 if (!gGrid) return kFALSE;
2111 slfn.ReplaceAll("alien://","");
2112 TGridResult *res = gGrid->Ls(slfn);
2113 if (!res) return kFALSE;
2114 TMap *map = dynamic_cast<TMap*>(res->At(0));
2119 TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("name"));
2120 if (!objs || !objs->GetString().Length()) {
2128 //______________________________________________________________________________
2129 Bool_t AliAnalysisAlien::DirectoryExists(const char *dirname)
2131 // Returns true if directory exists. Can be also a path.
2132 if (!gGrid) return kFALSE;
2133 // Check if dirname is a path
2134 TString dirstripped = dirname;
2135 dirstripped = dirstripped.Strip();
2136 dirstripped = dirstripped.Strip(TString::kTrailing, '/');
2137 TString dir = gSystem->BaseName(dirstripped);
2139 TString path = gSystem->DirName(dirstripped);
2140 TGridResult *res = gGrid->Ls(path, "-F");
2141 if (!res) return kFALSE;
2145 while ((map=dynamic_cast<TMap*>(next()))) {
2146 obj = map->GetValue("name");
2148 if (dir == obj->GetName()) {
2157 //______________________________________________________________________________
2158 void AliAnalysisAlien::CheckDataType(const char *lfn, Bool_t &isCollection, Bool_t &isXml, Bool_t &useTags)
2160 // Check input data type.
2161 isCollection = kFALSE;
2165 Error("CheckDataType", "No connection to grid");
2168 isCollection = IsCollection(lfn);
2169 TString msg = "\n##### file: ";
2172 msg += " type: raw_collection;";
2173 // special treatment for collections
2175 // check for tag files in the collection
2176 TGridResult *res = gGrid->Command(Form("listFilesFromCollection -z -v %s",lfn), kFALSE);
2178 msg += " using_tags: No (unknown)";
2179 Info("CheckDataType", "%s", msg.Data());
2182 const char* typeStr = res->GetKey(0, "origLFN");
2183 if (!typeStr || !strlen(typeStr)) {
2184 msg += " using_tags: No (unknown)";
2185 Info("CheckDataType", "%s", msg.Data());
2188 TString file = typeStr;
2189 useTags = file.Contains(".tag");
2190 if (useTags) msg += " using_tags: Yes";
2191 else msg += " using_tags: No";
2192 Info("CheckDataType", "%s", msg.Data());
2197 isXml = slfn.Contains(".xml");
2199 // Open xml collection and check if there are tag files inside
2200 msg += " type: xml_collection;";
2201 TGridCollection *coll = (TGridCollection*)gROOT->ProcessLine(Form("TAlienCollection::Open(\"alien://%s\",1);",lfn));
2203 msg += " using_tags: No (unknown)";
2204 Info("CheckDataType", "%s", msg.Data());
2207 TMap *map = coll->Next();
2209 msg += " using_tags: No (unknown)";
2210 Info("CheckDataType", "%s", msg.Data());
2213 map = (TMap*)map->GetValue("");
2215 if (map && map->GetValue("name")) file = map->GetValue("name")->GetName();
2216 useTags = file.Contains(".tag");
2218 if (useTags) msg += " using_tags: Yes";
2219 else msg += " using_tags: No";
2220 Info("CheckDataType", "%s", msg.Data());
2223 useTags = slfn.Contains(".tag");
2224 if (slfn.Contains(".root")) msg += " type: root file;";
2225 else msg += " type: unknown file;";
2226 if (useTags) msg += " using_tags: Yes";
2227 else msg += " using_tags: No";
2228 Info("CheckDataType", "%s", msg.Data());
2231 //______________________________________________________________________________
2232 void AliAnalysisAlien::EnablePackage(const char *package)
2234 // Enables a par file supposed to exist in the current directory.
2235 TString pkg(package);
2236 pkg.ReplaceAll(".par", "");
2238 if (gSystem->AccessPathName(pkg)) {
2239 Fatal("EnablePackage", "Package %s not found", pkg.Data());
2242 if (!TObject::TestBit(AliAnalysisGrid::kUsePars))
2243 Info("EnablePackage", "AliEn plugin will use .par packages");
2244 TObject::SetBit(AliAnalysisGrid::kUsePars, kTRUE);
2246 fPackages = new TObjArray();
2247 fPackages->SetOwner();
2249 fPackages->Add(new TObjString(pkg));
2252 //______________________________________________________________________________
2253 TChain *AliAnalysisAlien::GetChainForTestMode(const char *treeName) const
2255 // Make a tree from files having the location specified in fFileForTestMode.
2256 // Inspired from JF's CreateESDChain.
2257 if (fFileForTestMode.IsNull()) {
2258 Error("GetChainForTestMode", "For proof test mode please use SetFileForTestMode() pointing to a file that contains data file locations.");
2261 if (gSystem->AccessPathName(fFileForTestMode)) {
2262 Error("GetChainForTestMode", "File not found: %s", fFileForTestMode.Data());
2267 in.open(fFileForTestMode);
2269 // Read the input list of files and add them to the chain
2271 TString streeName(treeName);
2272 if (IsUseMCchain()) streeName = "TE";
2273 TChain *chain = new TChain(streeName);
2274 TChain *chainFriend = 0;
2275 if (!fFriendChainName.IsNull()) chainFriend = new TChain(streeName);
2279 if (line.IsNull() || line.BeginsWith("#")) continue;
2280 if (count++ == fNtestFiles) break;
2281 TString esdFile(line);
2282 TFile *file = TFile::Open(esdFile);
2283 if (file && !file->IsZombie()) {
2284 chain->Add(esdFile);
2286 if (!fFriendChainName.IsNull()) {
2287 if (esdFile.Index("#") > -1)
2288 esdFile.Remove(esdFile.Index("#"));
2289 esdFile = gSystem->DirName(esdFile);
2290 esdFile += "/" + fFriendChainName;
2291 file = TFile::Open(esdFile);
2292 if (file && !file->IsZombie()) {
2294 chainFriend->Add(esdFile);
2296 Fatal("GetChainForTestMode", "Cannot open friend file: %s", esdFile.Data());
2301 Error("GetChainforTestMode", "Skipping un-openable file: %s", esdFile.Data());
2305 if (!chain->GetListOfFiles()->GetEntries()) {
2306 Error("GetChainForTestMode", "No file from %s could be opened", fFileForTestMode.Data());
2312 if (!fFriendChainName.IsNull()) chain->AddFriend(chainFriend);
2316 //______________________________________________________________________________
2317 const char *AliAnalysisAlien::GetJobStatus(Int_t jobidstart, Int_t lastid, Int_t &nrunning, Int_t &nwaiting, Int_t &nerror, Int_t &ndone)
2319 // Get job status for all jobs with jobid>jobidstart.
2320 static char mstatus[20];
2326 TGridJobStatusList *list = gGrid->Ps("");
2327 if (!list) return mstatus;
2328 Int_t nentries = list->GetSize();
2329 TGridJobStatus *status;
2331 for (Int_t ijob=0; ijob<nentries; ijob++) {
2332 status = (TGridJobStatus *)list->At(ijob);
2333 pid = gROOT->ProcessLine(Form("atoi(((TAlienJobStatus*)%p)->GetKey(\"queueId\"));", status));
2334 if (pid<jobidstart) continue;
2335 if (pid == lastid) {
2336 gROOT->ProcessLine(Form("sprintf((char*)%p,((TAlienJobStatus*)%p)->GetKey(\"status\"));",mstatus, status));
2338 switch (status->GetStatus()) {
2339 case TGridJobStatus::kWAITING:
2341 case TGridJobStatus::kRUNNING:
2343 case TGridJobStatus::kABORTED:
2344 case TGridJobStatus::kFAIL:
2345 case TGridJobStatus::kUNKNOWN:
2347 case TGridJobStatus::kDONE:
2356 //______________________________________________________________________________
2357 Bool_t AliAnalysisAlien::IsCollection(const char *lfn) const
2359 // Returns true if file is a collection. Functionality duplicated from
2360 // TAlien::Type() because we don't want to directly depend on TAlien.
2362 Error("IsCollection", "No connection to grid");
2365 TGridResult *res = gGrid->Command(Form("type -z %s",lfn),kFALSE);
2366 if (!res) return kFALSE;
2367 const char* typeStr = res->GetKey(0, "type");
2368 if (!typeStr || !strlen(typeStr)) return kFALSE;
2369 if (!strcmp(typeStr, "collection")) return kTRUE;
2374 //______________________________________________________________________________
2375 Bool_t AliAnalysisAlien::IsSingleOutput() const
2377 // Check if single-ouput option is on.
2378 return (!fOutputSingle.IsNull());
2381 //______________________________________________________________________________
2382 void AliAnalysisAlien::Print(Option_t *) const
2384 // Print current plugin settings.
2385 printf("### AliEn analysis plugin current settings ###\n");
2386 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
2387 if (mgr && mgr->IsProofMode()) {
2388 TString proofType = "= PLUGIN IN PROOF MODE ON CLUSTER:_________________";
2389 if (TestBit(AliAnalysisGrid::kTest))
2390 proofType = "= PLUGIN IN PROOF LITE MODE ON CLUSTER:____________";
2391 printf("%s %s\n", proofType.Data(), fProofCluster.Data());
2392 if (!fProofDataSet.IsNull())
2393 printf("= Requested data set:___________________________ %s\n", fProofDataSet.Data());
2395 printf("= Soft reset signal will be send to master______ CHANGE BEHAVIOR AFTER COMPLETION\n");
2397 printf("= Hard reset signal will be send to master______ CHANGE BEHAVIOR AFTER COMPLETION\n");
2398 if (!fROOTVersion.IsNull())
2399 printf("= ROOT version requested________________________ %s\n", fROOTVersion.Data());
2401 printf("= ROOT version requested________________________ default\n");
2402 printf("= AliRoot version requested_____________________ %s\n", fAliROOTVersion.Data());
2403 if (!fAliRootMode.IsNull())
2404 printf("= Requested AliRoot mode________________________ %s\n", fAliRootMode.Data());
2406 printf("= Number of PROOF workers limited to____________ %d\n", fNproofWorkers);
2407 if (fNproofWorkersPerSlave)
2408 printf("= Maximum number of workers per slave___________ %d\n", fNproofWorkersPerSlave);
2409 if (TestSpecialBit(kClearPackages))
2410 printf("= ClearPackages requested...\n");
2411 if (fIncludePath.Data())
2412 printf("= Include path for runtime task compilation: ___ %s\n", fIncludePath.Data());
2413 printf("= Additional libs to be loaded or souces to be compiled runtime: <%s>\n",fAdditionalLibs.Data());
2414 if (fPackages && fPackages->GetEntries()) {
2415 TIter next(fPackages);
2418 while ((obj=next())) list += obj->GetName();
2419 printf("= Par files to be used: ________________________ %s\n", list.Data());
2421 if (TestSpecialBit(kProofConnectGrid))
2422 printf("= Requested PROOF connection to grid\n");
2425 printf("= OverwriteMode:________________________________ %d\n", fOverwriteMode);
2426 if (fOverwriteMode) {
2427 printf("***** NOTE: Overwrite mode will overwrite the input generated datasets and partial results from previous analysis. \
2428 \n***** To disable, use: plugin->SetOverwriteMode(kFALSE);\n");
2430 printf("= Copy files to grid: __________________________ %s\n", (IsUseCopy())?"YES":"NO");
2431 printf("= Check if files can be copied to grid: ________ %s\n", (IsCheckCopy())?"YES":"NO");
2432 printf("= Production mode:______________________________ %d\n", fProductionMode);
2433 printf("= Version of API requested: ____________________ %s\n", fAPIVersion.Data());
2434 printf("= Version of ROOT requested: ___________________ %s\n", fROOTVersion.Data());
2435 printf("= Version of AliRoot requested: ________________ %s\n", fAliROOTVersion.Data());
2437 printf("= User running the plugin: _____________________ %s\n", fUser.Data());
2438 printf("= Grid workdir relative to user $HOME: _________ %s\n", fGridWorkingDir.Data());
2439 printf("= Grid output directory relative to workdir: ___ %s\n", fGridOutputDir.Data());
2440 printf("= Data base directory path requested: __________ %s\n", fGridDataDir.Data());
2441 printf("= Data search pattern: _________________________ %s\n", fDataPattern.Data());
2442 printf("= Input data format: ___________________________ %s\n", fInputFormat.Data());
2443 if (fRunNumbers.Length())
2444 printf("= Run numbers to be processed: _________________ %s\n", fRunNumbers.Data());
2446 printf("= Run range to be processed: ___________________ %d-%d\n", fRunRange[0], fRunRange[1]);
2447 if (!fRunRange[0] && !fRunNumbers.Length()) {
2448 TIter next(fInputFiles);
2451 while ((obj=next())) list += obj->GetName();
2452 printf("= Input files to be processed: _________________ %s\n", list.Data());
2454 if (TestBit(AliAnalysisGrid::kTest))
2455 printf("= Number of input files used in test mode: _____ %d\n", fNtestFiles);
2456 printf("= List of output files to be registered: _______ %s\n", fOutputFiles.Data());
2457 printf("= List of outputs going to be archived: ________ %s\n", fOutputArchive.Data());
2458 printf("= List of outputs that should not be merged: ___ %s\n", fMergeExcludes.Data());
2459 printf("= List of outputs that should not be registered: %s\n", fRegisterExcludes.Data());
2460 printf("= List of outputs produced during Terminate: ___ %s\n", fTerminateFiles.Data());
2461 printf("=====================================================================\n");
2462 printf("= Job price: ___________________________________ %d\n", fPrice);
2463 printf("= Time to live (TTL): __________________________ %d\n", fTTL);
2464 printf("= Max files per subjob: ________________________ %d\n", fSplitMaxInputFileNumber);
2465 if (fMaxInitFailed>0)
2466 printf("= Max number of subjob fails to kill: __________ %d\n", fMaxInitFailed);
2467 if (fMasterResubmitThreshold>0)
2468 printf("= Resubmit master job if failed subjobs >_______ %d\n", fMasterResubmitThreshold);
2469 printf("= Number of replicas for the output files_______ %d\n", fNreplicas);
2470 if (fNrunsPerMaster>0)
2471 printf("= Number of runs per master job: _______________ %d\n", fNrunsPerMaster);
2472 printf("= Number of files in one chunk to be merged: ___ %d\n", fMaxMergeFiles);
2473 printf("= Name of the generated execution script: ______ %s\n", fExecutable.Data());
2474 printf("= Executable command: __________________________ %s\n", fExecutableCommand.Data());
2475 if (fArguments.Length())
2476 printf("= Arguments for the execution script: __________ %s\n",fArguments.Data());
2477 if (fExecutableArgs.Length())
2478 printf("= Arguments after macro name in executable______ %s\n",fExecutableArgs.Data());
2479 printf("= Name of the generated analysis macro: ________ %s\n",fAnalysisMacro.Data());
2480 printf("= User analysis files to be deployed: __________ %s\n",fAnalysisSource.Data());
2481 printf("= Additional libs to be loaded or souces to be compiled runtime: <%s>\n",fAdditionalLibs.Data());
2482 printf("= Master jobs split mode: ______________________ %s\n",fSplitMode.Data());
2484 printf("= Custom name for the dataset to be created: ___ %s\n", fDatasetName.Data());
2485 printf("= Name of the generated JDL: ___________________ %s\n", fJDLName.Data());
2486 if (fIncludePath.Data())
2487 printf("= Include path for runtime task compilation: ___ %s\n", fIncludePath.Data());
2488 if (fCloseSE.Length())
2489 printf("= Force job outputs to storage element: ________ %s\n", fCloseSE.Data());
2490 if (fFriendChainName.Length())
2491 printf("= Open friend chain file on worker: ____________ %s\n", fFriendChainName.Data());
2492 if (fPackages && fPackages->GetEntries()) {
2493 TIter next(fPackages);
2496 while ((obj=next())) list += obj->GetName();
2497 printf("= Par files to be used: ________________________ %s\n", list.Data());
2501 //______________________________________________________________________________
2502 void AliAnalysisAlien::SetDefaults()
2504 // Set default values for everything. What cannot be filled will be left empty.
2505 if (fGridJDL) delete fGridJDL;
2506 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
2507 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
2510 fSplitMaxInputFileNumber = 100;
2512 fMasterResubmitThreshold = 0;
2518 fNrunsPerMaster = 1;
2519 fMaxMergeFiles = 100;
2521 fExecutable = "analysis.sh";
2522 fExecutableCommand = "root -b -q -x";
2524 fExecutableArgs = "";
2525 fAnalysisMacro = "myAnalysis.C";
2526 fAnalysisSource = "";
2527 fAdditionalLibs = "";
2531 fAliROOTVersion = "";
2532 fUser = ""; // Your alien user name
2533 fGridWorkingDir = "";
2534 fGridDataDir = ""; // Can be like: /alice/sim/PDC_08a/LHC08c9/
2535 fDataPattern = "*AliESDs.root"; // Can be like: *AliESDs.root, */pass1/*AliESDs.root, ...
2536 fFriendChainName = "";
2537 fGridOutputDir = "output";
2538 fOutputArchive = "log_archive.zip:std*@disk=1 root_archive.zip:*.root@disk=2";
2539 fOutputFiles = ""; // Like "AliAODs.root histos.root"
2540 fInputFormat = "xml-single";
2541 fJDLName = "analysis.jdl";
2542 fJobTag = "Automatically generated analysis JDL";
2543 fMergeExcludes = "";
2546 SetCheckCopy(kTRUE);
2547 SetDefaultOutputs(kTRUE);
2551 //______________________________________________________________________________
2552 void AliAnalysisAlien::SetRootVersionForProof(const char *version)
2554 // Obsolete method. Use SetROOTVersion instead
2555 Warning("SetRootVersionForProof", "Obsolete. Use SetROOTVersion instead");
2556 if (fROOTVersion.IsNull()) SetROOTVersion(version);
2557 else Error("SetRootVersionForProof", "ROOT version already set to %s", fROOTVersion.Data());
2560 //______________________________________________________________________________
2561 Bool_t AliAnalysisAlien::CheckMergedFiles(const char *filename, const char *aliendir, Int_t nperchunk, const char *jdl)
2563 // Checks current merge stage, makes xml for the next stage, counts number of files, submits next stage.
2564 // First check if the result is already in the output directory.
2565 if (FileExists(Form("%s/%s",aliendir,filename))) {
2566 printf("Final merged results found. Not merging again.\n");
2569 // Now check the last stage done.
2572 if (!FileExists(Form("%s/Stage_%d.xml",aliendir, stage+1))) break;
2575 // Next stage of merging
2577 TString pattern = "*root_archive.zip";
2578 if (stage>1) pattern = Form("Stage_%d/*root_archive.zip", stage-1);
2579 TGridResult *res = gGrid->Command(Form("find -x Stage_%d %s %s", stage, aliendir, pattern.Data()));
2580 if (res) delete res;
2581 // Write standard output to file
2582 gROOT->ProcessLine(Form("gGrid->Stdout(); > %s", Form("Stage_%d.xml",stage)));
2583 // Count the number of files inside
2585 ifile.open(Form("Stage_%d.xml",stage));
2586 if (!ifile.good()) {
2587 ::Error("CheckMergedFiles", "Could not redirect result of the find command to file %s", Form("Stage_%d.xml",stage));
2592 while (!ifile.eof()) {
2594 if (line.Contains("/event")) nfiles++;
2598 ::Error("CheckMergedFiles", "Cannot start Stage_%d merging since Stage_%d did not produced yet output", stage, stage-1);
2601 printf("=== Stage_%d produced %d files\n", stage-1, nfiles);
2603 // Copy the file in the output directory
2604 printf("===> Copying collection %s in the output directory %s\n", Form("Stage_%d.xml",stage), aliendir);
2605 // TFile::Cp(Form("Stage_%d.xml",stage), Form("alien://%s/Stage_%d.xml",aliendir,stage));
2606 if (!copyLocal2Alien("CheckMergedFiles", Form("Stage_%d.xml",stage),
2607 Form("%s/Stage_%d.xml",aliendir,stage))) Fatal("","Terminating");
2608 // Check if this is the last stage to be done.
2609 Bool_t laststage = (nfiles<nperchunk);
2610 if (fMaxMergeStages && stage>=fMaxMergeStages) laststage = kTRUE;
2613 printf("### Submiting final merging stage %d\n", stage);
2614 TString finalJDL = jdl;
2615 finalJDL.ReplaceAll(".jdl", "_final.jdl");
2616 TString query = Form("submit %s %s %d", finalJDL.Data(), aliendir, stage);
2617 jobId = SubmitSingleJob(query);
2619 printf("### Submiting merging stage %d\n", stage);
2620 TString query = Form("submit %s %s %d", jdl, aliendir, stage);
2621 jobId = SubmitSingleJob(query);
2623 if (!jobId) return kFALSE;
2625 if (!fGridJobIDs.IsNull()) fGridJobIDs.Append(" ");
2626 fGridJobIDs.Append(Form("%d", jobId));
2627 if (!fGridStages.IsNull()) fGridStages.Append(" ");
2628 fGridStages.Append(Form("%s_merge_stage%d",
2629 laststage ? "final" : "partial", stage));
2634 //______________________________________________________________________________
2635 AliAnalysisManager *AliAnalysisAlien::LoadAnalysisManager(const char *fname)
2637 // Loat the analysis manager from a file.
2638 TFile *file = TFile::Open(fname);
2640 ::Error("LoadAnalysisManager", "Cannot open file %s", fname);
2643 TIter nextkey(file->GetListOfKeys());
2644 AliAnalysisManager *mgr = 0;
2646 while ((key=(TKey*)nextkey())) {
2647 if (!strcmp(key->GetClassName(), "AliAnalysisManager"))
2648 mgr = (AliAnalysisManager*)file->Get(key->GetName());
2651 ::Error("LoadAnalysisManager", "No analysis manager found in file %s", fname);
2655 //______________________________________________________________________________
2656 Int_t AliAnalysisAlien::SubmitSingleJob(const char *query)
2658 // Submits a single job corresponding to the query and returns job id. If 0 submission failed.
2659 if (!gGrid) return 0;
2660 printf("=> %s ------> ",query);
2661 TGridResult *res = gGrid->Command(query);
2663 TString jobId = res->GetKey(0,"jobId");
2665 if (jobId.IsNull()) {
2666 printf("submission failed. Reason:\n");
2669 ::Error("SubmitSingleJob", "Your query %s could not be submitted", query);
2672 Int_t ijobId = jobId.Atoi();
2673 printf(" Job id: '%s' (%d)\n", jobId.Data(), ijobId);
2677 //______________________________________________________________________________
2678 Bool_t AliAnalysisAlien::MergeInfo(const char *output, const char *collection)
2680 // Merges a collection of output files using concatenation.
2681 TString scoll(collection);
2682 if (!scoll.Contains(".xml")) return kFALSE;
2683 TGridCollection *coll = (TGridCollection*)gROOT->ProcessLine(Form("TAlienCollection::Open(\"%s\");", collection));
2685 ::Error("MergeInfo", "Input XML %s collection empty.", collection);
2688 // Iterate grid collection
2690 Bool_t merged = kFALSE;
2692 while (coll->Next()) {
2693 TString fname = gSystem->DirName(coll->GetTURL());
2696 outtmp = Form("%d_%s", ifile, output);
2697 if (!TFile::Cp(fname, outtmp)) {
2698 ::Error("MergeInfo", "Could not copy %s", fname.Data());
2703 gSystem->Exec(Form("cp %s lastmerged", outtmp.Data()));
2706 gSystem->Exec(Form("cat lastmerged %s > tempmerged", outtmp.Data()));
2707 gSystem->Exec("cp tempmerged lastmerged");
2710 gSystem->Exec(Form("cp lastmerged %s", output));
2711 gSystem->Exec(Form("rm tempmerged lastmerged *_%s", output));
2717 //______________________________________________________________________________
2718 Bool_t AliAnalysisAlien::MergeOutput(const char *output, const char *basedir, Int_t nmaxmerge, Int_t stage)
2720 // Merge given output files from basedir. Basedir can be an alien output directory
2721 // but also an xml file with root_archive.zip locations. The file merger will merge nmaxmerge
2722 // files in a group (ignored for xml input). Merging can be done in stages:
2723 // stage=0 : will merge all existing files in a single stage, supporting resume if run locally
2724 // stage=1 : works with an xml of all root_archive.zip in the output directory
2725 // stage>1 : works with an xml of all root_archive.zip in the Stage_<n-1> directory
2726 TString outputFile = output;
2728 TString outputChunk;
2729 TString previousChunk = "";
2730 TObjArray *listoffiles = new TObjArray();
2731 // listoffiles->SetOwner();
2732 Int_t countChunk = 0;
2733 Int_t countZero = nmaxmerge;
2734 Bool_t merged = kTRUE;
2735 Bool_t isGrid = kTRUE;
2736 Int_t index = outputFile.Index("@");
2737 if (index > 0) outputFile.Remove(index);
2738 TString inputFile = outputFile;
2739 TString sbasedir = basedir;
2740 if (sbasedir.Contains(".xml")) {
2741 // Merge files pointed by the xml - ignore nmaxmerge and set ichunk to 0
2742 nmaxmerge = 9999999;
2743 TGridCollection *coll = (TGridCollection*)gROOT->ProcessLine(Form("TAlienCollection::Open(\"%s\");", basedir));
2745 ::Error("MergeOutput", "Input XML collection empty.");
2748 // Iterate grid collection
2749 while (coll->Next()) {
2750 TString fname = gSystem->DirName(coll->GetTURL());
2753 listoffiles->Add(new TNamed(fname.Data(),""));
2755 } else if (sbasedir.Contains(".txt")) {
2756 // The file having the .txt extension is expected to contain a list of
2757 // folders where the output files will be looked. For alien folders,
2758 // the full folder LFN is expected (starting with alien://)
2759 // Assume lfn's on each line
2764 ::Error("MergeOutput", "File %s cannot be opened. Merging stopped." ,sbasedir.Data());
2770 if (line.IsNull() || line.BeginsWith("#")) continue;
2772 if (!line.Contains("alien:")) isGrid = kFALSE;
2776 listoffiles->Add(new TNamed(line.Data(),""));
2780 ::Error("MergeOutput","Input file %s contains no files to be merged\n", sbasedir.Data());
2785 command = Form("find %s/ *%s", basedir, inputFile.Data());
2786 printf("command: %s\n", command.Data());
2787 TGridResult *res = gGrid->Command(command);
2789 ::Error("MergeOutput","No result for the find command\n");
2795 while ((map=(TMap*)nextmap())) {
2796 TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("turl"));
2797 if (!objs || !objs->GetString().Length()) {
2798 // Nothing found - skip this output
2803 listoffiles->Add(new TNamed(objs->GetName(),""));
2807 if (!listoffiles->GetEntries()) {
2808 ::Error("MergeOutput","No result for the find command\n");
2813 TFileMerger *fm = 0;
2814 TIter next0(listoffiles);
2815 TObjArray *listoffilestmp = new TObjArray();
2816 listoffilestmp->SetOwner();
2819 // Keep only the files at upper level
2820 Int_t countChar = 0;
2821 while ((nextfile=next0())) {
2822 snextfile = nextfile->GetName();
2823 Int_t crtCount = snextfile.CountChar('/');
2824 if (nextfile == listoffiles->First()) countChar = crtCount;
2825 if (crtCount < countChar) countChar = crtCount;
2828 while ((nextfile=next0())) {
2829 snextfile = nextfile->GetName();
2830 Int_t crtCount = snextfile.CountChar('/');
2831 if (crtCount > countChar) {
2835 listoffilestmp->Add(nextfile);
2838 listoffiles = listoffilestmp; // Now contains 'good' files
2839 listoffiles->Print();
2840 TIter next(listoffiles);
2841 // Check if there is a merge operation to resume. Works only for stage 0 or 1.
2842 outputChunk = outputFile;
2843 outputChunk.ReplaceAll(".root", "_*.root");
2844 // Check for existent temporary merge files
2845 // Check overwrite mode and remove previous partial results if needed
2846 // Preserve old merging functionality for stage 0.
2848 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
2850 // Skip as many input files as in a chunk
2851 for (Int_t counter=0; counter<nmaxmerge; counter++) {
2854 ::Error("MergeOutput", "Mismatch found. Please remove partial merged files from local dir.");
2858 snextfile = nextfile->GetName();
2860 outputChunk = outputFile;
2861 outputChunk.ReplaceAll(".root", Form("_%04d.root", countChunk));
2863 if (gSystem->AccessPathName(outputChunk)) continue;
2864 // Merged file with chunks up to <countChunk> found
2865 ::Info("MergeOutput", "Resume merging of <%s> from <%s>\n", outputFile.Data(), outputChunk.Data());
2866 previousChunk = outputChunk;
2870 countZero = nmaxmerge;
2872 while ((nextfile=next())) {
2873 snextfile = nextfile->GetName();
2874 // Loop 'find' results and get next LFN
2875 if (countZero == nmaxmerge) {
2876 // First file in chunk - create file merger and add previous chunk if any.
2877 fm = new TFileMerger(isGrid);
2878 fm->SetFastMethod(kTRUE);
2879 if (previousChunk.Length()) fm->AddFile(previousChunk.Data());
2880 outputChunk = outputFile;
2881 outputChunk.ReplaceAll(".root", Form("_%04d.root", countChunk));
2883 // If last file found, put merged results in the output file
2884 if (nextfile == listoffiles->Last()) outputChunk = outputFile;
2885 // Add file to be merged and decrement chunk counter.
2886 fm->AddFile(snextfile);
2888 if (countZero==0 || nextfile == listoffiles->Last()) {
2889 if (!fm->GetMergeList() || !fm->GetMergeList()->GetSize()) {
2890 // Nothing found - skip this output
2891 ::Warning("MergeOutput", "No <%s> files found.", inputFile.Data());
2895 fm->OutputFile(outputChunk);
2896 // Merge the outputs, then go to next chunk
2898 ::Error("MergeOutput", "Could not merge all <%s> files", outputFile.Data());
2902 ::Info("MergeOutputs", "\n##### Merged %d output files to <%s>", fm->GetMergeList()->GetSize(), outputChunk.Data());
2903 gSystem->Unlink(previousChunk);
2905 if (nextfile == listoffiles->Last()) break;
2907 countZero = nmaxmerge;
2908 previousChunk = outputChunk;
2915 // Merging stage different than 0.
2916 // Move to the begining of the requested chunk.
2917 fm = new TFileMerger(isGrid);
2918 fm->SetFastMethod(kTRUE);
2919 while ((nextfile=next())) fm->AddFile(nextfile->GetName());
2921 if (!fm->GetMergeList() || !fm->GetMergeList()->GetSize()) {
2922 // Nothing found - skip this output
2923 ::Warning("MergeOutput", "No <%s> files found.", inputFile.Data());
2927 fm->OutputFile(outputFile);
2928 // Merge the outputs
2930 ::Error("MergeOutput", "Could not merge all <%s> files", outputFile.Data());
2934 ::Info("MergeOutput", "\n##### Merged %d output files to <%s>", fm->GetMergeList()->GetSize(), outputFile.Data());
2940 //______________________________________________________________________________
2941 Bool_t AliAnalysisAlien::MergeOutputs()
2943 // Merge analysis outputs existing in the AliEn space.
2944 if (TestBit(AliAnalysisGrid::kTest)) return kTRUE;
2945 if (TestBit(AliAnalysisGrid::kOffline)) return kFALSE;
2947 Error("MergeOutputs", "Cannot merge outputs without grid connection. Terminate will NOT be executed");
2951 if (!TestBit(AliAnalysisGrid::kMerge)) {
2952 Info("MergeOutputs", "### Re-run with <MergeViaJDL> option in terminate mode of the plugin to submit merging jobs ###");
2955 if (fProductionMode) {
2956 Info("MergeOutputs", "### Merging will be submitted by LPM manager... ###");
2959 Info("MergeOutputs", "Submitting merging JDL");
2960 if (!SubmitMerging()) return kFALSE;
2961 Info("MergeOutputs", "### Re-run with <MergeViaJDL> off to collect results after merging jobs are done ###");
2962 Info("MergeOutputs", "### The Terminate() method is executed by the merging jobs");
2965 // Get the output path
2966 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
2967 if (!DirectoryExists(fGridOutputDir)) {
2968 Error("MergeOutputs", "Grid output directory %s not found. Terminate() will NOT be executed", fGridOutputDir.Data());
2971 if (!fOutputFiles.Length()) {
2972 Error("MergeOutputs", "No output file names defined. Are you running the right AliAnalysisAlien configuration ?");
2975 // Check if fast read option was requested
2976 Info("MergeOutputs", "Started local merging of output files from: alien://%s \
2977 \n======= overwrite mode = %d", fGridOutputDir.Data(), (Int_t)fOverwriteMode);
2978 if (fFastReadOption) {
2979 Warning("MergeOutputs", "You requested FastRead option. Using xrootd flags to reduce timeouts. This may skip some files that could be accessed ! \
2980 \n+++ NOTE: To disable this option, use: plugin->SetFastReadOption(kFALSE)");
2981 gEnv->SetValue("XNet.ConnectTimeout",50);
2982 gEnv->SetValue("XNet.RequestTimeout",50);
2983 gEnv->SetValue("XNet.MaxRedirectCount",2);
2984 gEnv->SetValue("XNet.ReconnectTimeout",50);
2985 gEnv->SetValue("XNet.FirstConnectMaxCnt",1);
2987 // Make sure we change the temporary directory
2988 gSystem->Setenv("TMPDIR", gSystem->pwd());
2989 // Set temporary compilation directory to current one
2990 gSystem->SetBuildDir(gSystem->pwd(), kTRUE);
2991 TObjArray *list = fOutputFiles.Tokenize(",");
2995 Bool_t merged = kTRUE;
2996 while((str=(TObjString*)next())) {
2997 outputFile = str->GetString();
2998 Int_t index = outputFile.Index("@");
2999 if (index > 0) outputFile.Remove(index);
3000 TString outputChunk = outputFile;
3001 outputChunk.ReplaceAll(".root", "_*.root");
3002 // Skip already merged outputs
3003 if (!gSystem->AccessPathName(outputFile)) {
3004 if (fOverwriteMode) {
3005 Info("MergeOutputs", "Overwrite mode. Existing file %s was deleted.", outputFile.Data());
3006 gSystem->Unlink(outputFile);
3007 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
3008 Info("MergeOutput", "Overwrite mode: partial merged files %s will removed",
3009 outputChunk.Data());
3010 gSystem->Exec(Form("rm -f %s", outputChunk.Data()));
3013 Info("MergeOutputs", "Output file <%s> found. Not merging again.", outputFile.Data());
3017 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
3018 Info("MergeOutput", "Overwrite mode: partial merged files %s will removed",
3019 outputChunk.Data());
3020 gSystem->Exec(Form("rm -f %s", outputChunk.Data()));
3023 if (fMergeExcludes.Contains(outputFile.Data()) ||
3024 fRegisterExcludes.Contains(outputFile.Data())) continue;
3025 // Perform a 'find' command in the output directory, looking for registered outputs
3026 merged = MergeOutput(outputFile, fGridOutputDir, fMaxMergeFiles);
3028 Error("MergeOutputs", "Terminate() will NOT be executed");
3032 TFile *fileOpened = (TFile*)gROOT->GetListOfFiles()->FindObject(outputFile);
3033 if (fileOpened) fileOpened->Close();
3039 //______________________________________________________________________________
3040 void AliAnalysisAlien::SetDefaultOutputs(Bool_t flag)
3042 // Use the output files connected to output containers from the analysis manager
3043 // rather than the files defined by SetOutputFiles
3044 if (flag && !TObject::TestBit(AliAnalysisGrid::kDefaultOutputs))
3045 Info("SetDefaultOutputs", "Plugin will use the output files taken from analysis manager");
3046 TObject::SetBit(AliAnalysisGrid::kDefaultOutputs, flag);
3049 //______________________________________________________________________________
3050 void AliAnalysisAlien::SetOutputFiles(const char *list)
3052 // Manually set the output files list.
3053 // Removes duplicates. Not allowed if default outputs are not disabled.
3054 if (TObject::TestBit(AliAnalysisGrid::kDefaultOutputs)) {
3055 Fatal("SetOutputFiles", "You have to explicitly call SetDefaultOutputs(kFALSE) to manually set output files.");
3058 Info("SetOutputFiles", "Output file list is set manually - you are on your own.");
3060 TString slist = list;
3061 if (slist.Contains("@")) Warning("SetOutputFiles","The plugin does not allow explicit SE's. Please use: SetNumberOfReplicas() instead.");
3062 TObjArray *arr = slist.Tokenize(" ");
3066 while ((os=(TObjString*)next())) {
3067 sout = os->GetString();
3068 if (sout.Index("@")>0) sout.Remove(sout.Index("@"));
3069 if (fOutputFiles.Contains(sout)) continue;
3070 if (!fOutputFiles.IsNull()) fOutputFiles += ",";
3071 fOutputFiles += sout;
3076 //______________________________________________________________________________
3077 void AliAnalysisAlien::SetOutputArchive(const char *list)
3079 // Manually set the output archive list. Free text - you are on your own...
3080 // Not allowed if default outputs are not disabled.
3081 if (TObject::TestBit(AliAnalysisGrid::kDefaultOutputs)) {
3082 Fatal("SetOutputArchive", "You have to explicitly call SetDefaultOutputs(kFALSE) to manually set the output archives.");
3085 Info("SetOutputArchive", "Output archive is set manually - you are on your own.");
3086 fOutputArchive = list;
3089 //______________________________________________________________________________
3090 void AliAnalysisAlien::SetPreferedSE(const char */*se*/)
3092 // Setting a prefered output SE is not allowed anymore.
3093 Warning("SetPreferedSE", "Setting a preferential SE is not allowed anymore via the plugin. Use SetNumberOfReplicas() and SetDefaultOutputs()");
3096 //______________________________________________________________________________
3097 void AliAnalysisAlien::SetProofParameter(const char *pname, const char *value)
3099 // Set some PROOF special parameter.
3100 TPair *pair = dynamic_cast<TPair*>(fProofParam.FindObject(pname));
3102 TObject *old = pair->Key();
3103 TObject *val = pair->Value();
3104 fProofParam.Remove(old);
3108 fProofParam.Add(new TObjString(pname), new TObjString(value));
3111 //______________________________________________________________________________
3112 const char *AliAnalysisAlien::GetProofParameter(const char *pname) const
3114 // Returns a special PROOF parameter.
3115 TPair *pair = dynamic_cast<TPair*>(fProofParam.FindObject(pname));
3116 if (!pair) return 0;
3117 return pair->Value()->GetName();
3120 //______________________________________________________________________________
3121 Bool_t AliAnalysisAlien::StartAnalysis(Long64_t /*nentries*/, Long64_t /*firstEntry*/)
3123 // Start remote grid analysis.
3124 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
3125 Bool_t testMode = TestBit(AliAnalysisGrid::kTest);
3126 if (!mgr || !mgr->IsInitialized()) {
3127 Error("StartAnalysis", "You need an initialized analysis manager for this");
3130 // Are we in PROOF mode ?
3131 if (mgr->IsProofMode()) {
3132 if (testMode) Info("StartAnalysis", "##### Starting PROOF analysis with Proof Lite via the plugin #####");
3133 else Info("StartAnalysis", "##### Starting PROOF analysis on cluster <%s> via the plugin #####", fProofCluster.Data());
3134 if (fProofCluster.IsNull()) {
3135 Error("StartAnalysis", "You need to specify the proof cluster name via SetProofCluster");
3138 if (fProofDataSet.IsNull() && !testMode) {
3139 Error("StartAnalysis", "You need to specify a dataset using SetProofDataSet()");
3142 // Set the needed environment
3143 gEnv->SetValue("XSec.GSI.DelegProxy","2");
3144 // Do we need to reset PROOF ? The success of the Reset operation cannot be checked
3145 if (fProofReset && !testMode) {
3146 if (fProofReset==1) {
3147 Info("StartAnalysis", "Sending soft reset signal to proof cluster %s", fProofCluster.Data());
3148 gROOT->ProcessLine(Form("TProof::Reset(\"%s\", kFALSE);", fProofCluster.Data()));
3150 Info("StartAnalysis", "Sending hard reset signal to proof cluster %s", fProofCluster.Data());
3151 gROOT->ProcessLine(Form("TProof::Reset(\"%s\", kTRUE);", fProofCluster.Data()));
3153 Info("StartAnalysis", "Stopping the analysis. Please use SetProofReset(0) to resume.");
3158 // Check if there is an old active session
3159 Long_t nsessions = gROOT->ProcessLine(Form("TProof::Mgr(\"%s\")->QuerySessions(\"\")->GetEntries();", fProofCluster.Data()));
3161 Error("StartAnalysis","You have to reset your old session first\n");
3165 // Do we need to change the ROOT version ? The success of this cannot be checked.
3166 if (!fROOTVersion.IsNull() && !testMode) {
3167 gROOT->ProcessLine(Form("TProof::Mgr(\"%s\")->SetROOTVersion(\"VO_ALICE@ROOT::%s\");",
3168 fProofCluster.Data(), fROOTVersion.Data()));
3170 // Connect to PROOF and check the status
3173 if (fNproofWorkersPerSlave) sworkers = Form("workers=%dx", fNproofWorkersPerSlave);
3174 else if (fNproofWorkers) sworkers = Form("workers=%d", fNproofWorkers);
3176 if (!sworkers.IsNull())
3177 proof = gROOT->ProcessLine(Form("TProof::Open(\"%s\", \"%s\");", fProofCluster.Data(), sworkers.Data()));
3179 proof = gROOT->ProcessLine(Form("TProof::Open(\"%s\");", fProofCluster.Data()));
3181 proof = gROOT->ProcessLine("TProof::Open(\"\");");
3183 Error("StartAnalysis", "Could not start PROOF in test mode");
3188 Error("StartAnalysis", "Could not connect to PROOF cluster <%s>", fProofCluster.Data());
3191 if (fNproofWorkersPerSlave*fNproofWorkers > 0)
3192 gROOT->ProcessLine(Form("gProof->SetParallel(%d);", fNproofWorkers));
3193 // Set proof special parameters if any
3194 TIter nextpp(&fProofParam);
3195 TObject *proofparam;
3196 while ((proofparam=nextpp())) {
3197 TString svalue = GetProofParameter(proofparam->GetName());
3198 gROOT->ProcessLine(Form("gProof->SetParameter(\"%s\",%s);", proofparam->GetName(), svalue.Data()));
3200 // Is dataset existing ?
3202 TString dataset = fProofDataSet;
3203 Int_t index = dataset.Index("#");
3204 if (index>=0) dataset.Remove(index);
3205 // if (!gROOT->ProcessLine(Form("gProof->ExistsDataSet(\"%s\");",fProofDataSet.Data()))) {
3206 // Error("StartAnalysis", "Dataset %s not existing", fProofDataSet.Data());
3209 // Info("StartAnalysis", "Dataset %s found", dataset.Data());
3211 // Is ClearPackages() needed ?
3212 if (TestSpecialBit(kClearPackages)) {
3213 Info("StartAnalysis", "ClearPackages signal sent to PROOF. Use SetClearPackages(kFALSE) to reset this.");
3214 gROOT->ProcessLine("gProof->ClearPackages();");
3216 // Is a given aliroot mode requested ?
3219 if (!fAliRootMode.IsNull()) {
3220 TString alirootMode = fAliRootMode;
3221 if (alirootMode == "default") alirootMode = "";
3222 Info("StartAnalysis", "You are requesting AliRoot mode: %s", fAliRootMode.Data());
3223 optionsList.SetOwner();
3224 optionsList.Add(new TNamed("ALIROOT_MODE", alirootMode.Data()));
3225 // Check the additional libs to be loaded
3227 Bool_t parMode = kFALSE;
3228 if (!alirootMode.IsNull()) extraLibs = "ANALYSIS:OADB:ANALYSISalice";
3229 // Parse the extra libs for .so
3230 if (fAdditionalLibs.Length()) {
3231 TObjArray *list = fAdditionalLibs.Tokenize(" ");
3234 while((str=(TObjString*)next())) {
3235 if (str->GetString().Contains(".so")) {
3237 Warning("StartAnalysis", "Plugin does not support loading libs after par files in PROOF mode. Library %s and following will not load on workers", str->GetName());
3240 TString stmp = str->GetName();
3241 if (stmp.BeginsWith("lib")) stmp.Remove(0,3);
3242 stmp.ReplaceAll(".so","");
3243 if (!extraLibs.IsNull()) extraLibs += ":";
3247 if (str->GetString().Contains(".par")) {
3248 // The first par file found in the list will not allow any further .so
3250 if (!parLibs.IsNull()) parLibs += ":";
3251 parLibs += str->GetName();
3255 if (list) delete list;
3257 if (!extraLibs.IsNull()) {
3258 Info("StartAnalysis", "Adding extra libs: %s",extraLibs.Data());
3259 optionsList.Add(new TNamed("ALIROOT_EXTRA_LIBS",extraLibs.Data()));
3261 // Check extra includes
3262 if (!fIncludePath.IsNull()) {
3263 TString includePath = fIncludePath;
3264 includePath.ReplaceAll(" ",":");
3265 includePath.ReplaceAll("$ALICE_ROOT/","");
3266 includePath.ReplaceAll("${ALICE_ROOT}/","");
3267 includePath.ReplaceAll("-I","");
3268 includePath.Remove(TString::kTrailing, ':');
3269 Info("StartAnalysis", "Adding extra includes: %s",includePath.Data());
3270 optionsList.Add(new TNamed("ALIROOT_EXTRA_INCLUDES",includePath.Data()));
3272 // Check if connection to grid is requested
3273 if (TestSpecialBit(kProofConnectGrid))
3274 optionsList.Add(new TNamed("ALIROOT_ENABLE_ALIEN", "1"));
3275 // Enable AliRoot par
3277 // Enable proof lite package
3278 TString alirootLite = gSystem->ExpandPathName("$ALICE_ROOT/ANALYSIS/macros/AliRootProofLite.par");
3279 for (Int_t i=0; i<optionsList.GetSize(); i++) {
3280 TNamed *obj = (TNamed*)optionsList.At(i);
3281 printf("%s %s\n", obj->GetName(), obj->GetTitle());
3283 if (!gROOT->ProcessLine(Form("gProof->UploadPackage(\"%s\");",alirootLite.Data()))
3284 && !gROOT->ProcessLine(Form("gProof->EnablePackage(\"%s\", (TList*)%p);",alirootLite.Data(),&optionsList))) {
3285 Info("StartAnalysis", "AliRootProofLite enabled");
3287 Error("StartAnalysis", "There was an error trying to enable package AliRootProofLite.par");
3291 if ( ! fAliROOTVersion.IsNull() ) {
3292 if (gROOT->ProcessLine(Form("gProof->EnablePackage(\"VO_ALICE@AliRoot::%s\", (TList*)%p, kTRUE);",
3293 fAliROOTVersion.Data(), &optionsList))) {
3294 Error("StartAnalysis", "There was an error trying to enable package VO_ALICE@AliRoot::%s", fAliROOTVersion.Data());
3299 // Enable first par files from fAdditionalLibs
3300 if (!parLibs.IsNull()) {
3301 TObjArray *list = parLibs.Tokenize(":");
3303 TObjString *package;
3304 while((package=(TObjString*)next())) {
3305 TString spkg = package->GetName();
3306 spkg.ReplaceAll(".par", "");
3307 gSystem->Exec(TString::Format("rm -rf %s", spkg.Data()));
3308 if (!gROOT->ProcessLine(Form("gProof->UploadPackage(\"%s\");", package->GetName()))) {
3309 TString enablePackage = (testMode)?Form("gProof->EnablePackage(\"%s\",kFALSE);", package->GetName()):Form("gProof->EnablePackage(\"%s\",kTRUE);", package->GetName());
3310 if (gROOT->ProcessLine(enablePackage)) {
3311 Error("StartAnalysis", "There was an error trying to enable package %s", package->GetName());
3315 Error("StartAnalysis", "There was an error trying to upload package %s", package->GetName());
3319 if (list) delete list;
3322 if (fAdditionalLibs.Contains(".so") && !testMode) {
3323 Error("StartAnalysis", "You request additional libs to be loaded but did not enabled any AliRoot mode. Please refer to: \
3324 \n http://aaf.cern.ch/node/83 and use a parameter for SetAliRootMode()");
3328 // Enable par files if requested
3329 if (fPackages && fPackages->GetEntries()) {
3330 TIter next(fPackages);
3332 while ((package=next())) {
3333 // Skip packages already enabled
3334 if (parLibs.Contains(package->GetName())) continue;
3335 TString spkg = package->GetName();
3336 spkg.ReplaceAll(".par", "");
3337 gSystem->Exec(TString::Format("rm -rf %s", spkg.Data()));
3338 if (!gROOT->ProcessLine(Form("gProof->UploadPackage(\"%s\");", package->GetName()))) {
3339 if (gROOT->ProcessLine(Form("gProof->EnablePackage(\"%s\",kTRUE);", package->GetName()))) {
3340 Error("StartAnalysis", "There was an error trying to enable package %s", package->GetName());
3344 Error("StartAnalysis", "There was an error trying to upload package %s", package->GetName());
3349 // Do we need to load analysis source files ?
3350 // NOTE: don't load on client since this is anyway done by the user to attach his task.
3351 if (fAnalysisSource.Length()) {
3352 TObjArray *list = fAnalysisSource.Tokenize(" ");
3355 while((str=(TObjString*)next())) {
3356 gROOT->ProcessLine(Form("gProof->Load(\"%s+g\", kTRUE);", str->GetName()));
3358 if (list) delete list;
3361 // Register dataset to proof lite.
3362 if (fFileForTestMode.IsNull()) {
3363 Error("GetChainForTestMode", "For proof test mode please use SetFileForTestMode() pointing to a file that contains data file locations.");
3366 if (gSystem->AccessPathName(fFileForTestMode)) {
3367 Error("GetChainForTestMode", "File not found: %s", fFileForTestMode.Data());
3370 TFileCollection *coll = new TFileCollection();
3371 coll->AddFromFile(fFileForTestMode);
3372 gROOT->ProcessLine(Form("gProof->RegisterDataSet(\"test_collection\", (TFileCollection*)%p, \"OV\");", coll));
3373 gROOT->ProcessLine("gProof->ShowDataSets()");
3378 // Check if output files have to be taken from the analysis manager
3379 if (TestBit(AliAnalysisGrid::kDefaultOutputs)) {
3380 // Add output files and AOD files
3381 fOutputFiles = GetListOfFiles("outaod");
3382 // Add extra files registered to the analysis manager
3383 TString extra = GetListOfFiles("ext");
3384 if (!extra.IsNull()) {
3385 extra.ReplaceAll(".root", "*.root");
3386 if (!fOutputFiles.IsNull()) fOutputFiles += ",";
3387 fOutputFiles += extra;
3389 // Compose the output archive.
3390 fOutputArchive = "log_archive.zip:std*@disk=1 ";
3391 if (mgr->IsCollectThroughput())
3392 fOutputArchive += Form("root_archive.zip:%s,*.stat@disk=%d %s@disk=%d",fOutputFiles.Data(),fNreplicas, mgr->GetFileInfoLog(),fNreplicas);
3394 fOutputArchive += Form("root_archive.zip:%s,*.stat@disk=%d",fOutputFiles.Data(),fNreplicas);
3396 // if (!fCloseSE.Length()) fCloseSE = gSystem->Getenv("alien_CLOSE_SE");
3397 if (TestBit(AliAnalysisGrid::kOffline)) {
3398 Info("StartAnalysis","\n##### OFFLINE MODE ##### Files to be used in GRID are produced but not copied \
3399 \n there nor any job run. You can revise the JDL and analysis \
3400 \n macro then run the same in \"submit\" mode.");
3401 } else if (TestBit(AliAnalysisGrid::kTest)) {
3402 Info("StartAnalysis","\n##### LOCAL MODE ##### Your analysis will be run locally on a subset of the requested \
3404 } else if (TestBit(AliAnalysisGrid::kSubmit)) {
3405 Info("StartAnalysis","\n##### SUBMIT MODE ##### Files required by your analysis are copied to your grid working \
3406 \n space and job submitted.");
3407 } else if (TestBit(AliAnalysisGrid::kMerge)) {
3408 Info("StartAnalysis","\n##### MERGE MODE ##### The registered outputs of the analysis will be merged");
3409 if (fMergeViaJDL) CheckInputData();
3412 Info("StartAnalysis","\n##### FULL ANALYSIS MODE ##### Producing needed files and submitting your analysis job...");
3417 Error("StartAnalysis", "Cannot start grid analysis without grid connection");
3420 if (IsCheckCopy() && gGrid) CheckFileCopy(gGrid->GetHomeDirectory());
3421 if (!CheckInputData()) {
3422 Error("StartAnalysis", "There was an error in preprocessing your requested input data");
3425 if (!CreateDataset(fDataPattern)) {
3427 if (!fRunNumbers.Length() && !fRunRange[0]) serror = Form("path to data directory: <%s>", fGridDataDir.Data());
3428 if (fRunNumbers.Length()) serror = "run numbers";
3429 if (fRunRange[0]) serror = Form("run range [%d, %d]", fRunRange[0], fRunRange[1]);
3430 serror += Form("\n or data pattern <%s>", fDataPattern.Data());
3431 Error("StartAnalysis", "No data to process. Please fix %s in your plugin configuration.", serror.Data());
3434 WriteAnalysisFile();
3435 WriteAnalysisMacro();
3437 WriteValidationScript();
3439 WriteMergingMacro();
3440 WriteMergeExecutable();
3441 WriteValidationScript(kTRUE);
3443 if (!CreateJDL()) return kFALSE;
3444 if (TestBit(AliAnalysisGrid::kOffline)) return kFALSE;
3446 // Locally testing the analysis
3447 Info("StartAnalysis", "\n_______________________________________________________________________ \
3448 \n Running analysis script in a daughter shell as on a worker node \
3449 \n_______________________________________________________________________");
3450 TObjArray *list = fOutputFiles.Tokenize(",");
3454 while((str=(TObjString*)next())) {
3455 outputFile = str->GetString();
3456 Int_t index = outputFile.Index("@");
3457 if (index > 0) outputFile.Remove(index);
3458 if (!gSystem->AccessPathName(outputFile)) gSystem->Exec(Form("rm %s", outputFile.Data()));
3461 gSystem->Exec(Form("bash %s 2>stderr", fExecutable.Data()));
3462 gSystem->Exec(Form("bash %s",fValidationScript.Data()));
3463 // gSystem->Exec("cat stdout");
3466 // Check if submitting is managed by LPM manager
3467 if (fProductionMode) {
3468 //TString prodfile = fJDLName;
3469 //prodfile.ReplaceAll(".jdl", ".prod");
3470 //WriteProductionFile(prodfile);
3471 Info("StartAnalysis", "Job submitting is managed by LPM. Rerun in terminate mode after jobs finished.");
3474 // Submit AliEn job(s)
3475 gGrid->Cd(fGridOutputDir);
3480 if (!fRunNumbers.Length() && !fRunRange[0]) {
3481 // Submit a given xml or a set of runs
3482 res = gGrid->Command(Form("submit %s", fJDLName.Data()));
3483 printf("*************************** %s\n",Form("submit %s", fJDLName.Data()));
3485 const char *cjobId = res->GetKey(0,"jobId");
3489 Error("StartAnalysis", "Your JDL %s could not be submitted", fJDLName.Data());
3492 Info("StartAnalysis", "\n_______________________________________________________________________ \
3493 \n##### Your JDL %s was successfully submitted. \nTHE JOB ID IS: %s \
3494 \n_______________________________________________________________________",
3495 fJDLName.Data(), cjobId);
3498 if (!fGridJobIDs.IsNull()) fGridJobIDs.Append(" ");
3499 fGridJobIDs.Append(jobID);
3500 if (!fGridStages.IsNull()) fGridStages.Append(" ");
3501 fGridStages.Append("full");
3506 Error("StartAnalysis", "No grid result after submission !!! Bailing out...");
3510 // Submit for a range of enumeration of runs.
3511 if (!Submit()) return kFALSE;
3512 jobID = fGridJobIDs;
3516 Info("StartAnalysis", "\n#### STARTING AN ALIEN SHELL FOR YOU. EXIT WHEN YOUR JOB %s HAS FINISHED. #### \
3517 \n You may exit at any time and terminate the job later using the option <terminate> \
3518 \n ##################################################################################", jobID.Data());
3519 gSystem->Exec("aliensh");
3521 Info("StartAnalysis", "\n#### SUBMITTED JOB %s TO ALIEN QUEUE #### \
3522 \n Remember to terminate the job later using the option <terminate> \
3523 \n ##################################################################################", jobID.Data());
3528 //______________________________________________________________________________
3529 const char *AliAnalysisAlien::GetListOfFiles(const char *type)
3531 // Get a comma-separated list of output files of the requested type.
3532 // Type can be (case unsensitive):
3533 // aod - list of aod files (std, extensions and filters)
3534 // out - list of output files connected to containers (but not aod's or extras)
3535 // ext - list of extra files registered to the manager
3536 // ter - list of files produced in terminate
3537 static TString files;
3539 TString stype = type;
3541 TString aodfiles, extra;
3542 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
3544 ::Error("GetListOfFiles", "Cannot call this without analysis manager");
3545 return files.Data();
3547 if (mgr->GetOutputEventHandler()) {
3548 aodfiles = mgr->GetOutputEventHandler()->GetOutputFileName();
3549 TString extraaod = mgr->GetOutputEventHandler()->GetExtraOutputs();
3550 if (!extraaod.IsNull()) {
3552 aodfiles += extraaod;
3555 if (stype.Contains("aod")) {
3557 if (stype == "aod") return files.Data();
3559 // Add output files that are not in the list of AOD files
3560 TString outputfiles = "";
3561 TIter next(mgr->GetOutputs());
3562 AliAnalysisDataContainer *output;
3563 const char *filename = 0;
3564 while ((output=(AliAnalysisDataContainer*)next())) {
3565 filename = output->GetFileName();
3566 if (!(strcmp(filename, "default"))) continue;
3567 if (outputfiles.Contains(filename)) continue;
3568 if (aodfiles.Contains(filename)) continue;
3569 if (!outputfiles.IsNull()) outputfiles += ",";
3570 outputfiles += filename;
3572 if (stype.Contains("out")) {
3573 if (!files.IsNull()) files += ",";
3574 files += outputfiles;
3575 if (stype == "out") return files.Data();
3577 // Add extra files registered to the analysis manager
3579 extra = mgr->GetExtraFiles();
3580 if (!extra.IsNull()) {
3582 extra.ReplaceAll(" ", ",");
3583 TObjArray *fextra = extra.Tokenize(",");
3584 TIter nextx(fextra);
3586 while ((obj=nextx())) {
3587 if (aodfiles.Contains(obj->GetName())) continue;
3588 if (outputfiles.Contains(obj->GetName())) continue;
3589 if (sextra.Contains(obj->GetName())) continue;
3590 if (!sextra.IsNull()) sextra += ",";
3591 sextra += obj->GetName();
3594 if (stype.Contains("ext")) {
3595 if (!files.IsNull()) files += ",";
3599 if (stype == "ext") return files.Data();
3601 if (!fTerminateFiles.IsNull()) {
3602 fTerminateFiles.Strip();
3603 fTerminateFiles.ReplaceAll(" ",",");
3604 TObjArray *fextra = fTerminateFiles.Tokenize(",");
3605 TIter nextx(fextra);
3607 while ((obj=nextx())) {
3608 if (aodfiles.Contains(obj->GetName())) continue;
3609 if (outputfiles.Contains(obj->GetName())) continue;
3610 if (termfiles.Contains(obj->GetName())) continue;
3611 if (sextra.Contains(obj->GetName())) continue;
3612 if (!termfiles.IsNull()) termfiles += ",";
3613 termfiles += obj->GetName();
3617 if (stype.Contains("ter")) {
3618 if (!files.IsNull() && !termfiles.IsNull()) {
3623 return files.Data();
3626 //______________________________________________________________________________
3627 Bool_t AliAnalysisAlien::Submit()
3629 // Submit all master jobs.
3630 Int_t nmasterjobs = fInputFiles->GetEntries();
3631 Long_t tshoot = gSystem->Now();
3632 if (!fNsubmitted && !SubmitNext()) return kFALSE;
3633 while (fNsubmitted < nmasterjobs) {
3634 Long_t now = gSystem->Now();
3635 if ((now-tshoot)>30000) {
3637 if (!SubmitNext()) return kFALSE;
3643 //______________________________________________________________________________
3644 Bool_t AliAnalysisAlien::SubmitMerging()
3646 // Submit all merging jobs.
3647 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
3648 gGrid->Cd(fGridOutputDir);
3649 TString mergeJDLName = fExecutable;
3650 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
3652 Error("SubmitMerging", "You have to use explicit run numbers or run range to merge via JDL!");
3655 Int_t ntosubmit = fInputFiles->GetEntries();
3656 for (Int_t i=0; i<ntosubmit; i++) {
3657 TString runOutDir = gSystem->BaseName(fInputFiles->At(i)->GetName());
3658 runOutDir.ReplaceAll(".xml", "");
3659 if (fOutputToRunNo) {
3660 // The output directory is the run number
3661 printf("### Submitting merging job for run <%s>\n", runOutDir.Data());
3662 runOutDir = Form("%s/%s", fGridOutputDir.Data(), runOutDir.Data());
3664 if (!fRunNumbers.Length() && !fRunRange[0]) {
3665 // The output directory is the grid outdir
3666 printf("### Submitting merging job for the full output directory %s.\n", fGridOutputDir.Data());
3667 runOutDir = fGridOutputDir;
3669 // The output directory is the master number in 3 digits format
3670 printf("### Submitting merging job for master <%03d>\n", i);
3671 runOutDir = Form("%s/%03d",fGridOutputDir.Data(), i);
3674 // Check now the number of merging stages.
3675 TObjArray *list = fOutputFiles.Tokenize(",");
3679 while((str=(TObjString*)next())) {
3680 outputFile = str->GetString();
3681 Int_t index = outputFile.Index("@");
3682 if (index > 0) outputFile.Remove(index);
3683 if (!fMergeExcludes.Contains(outputFile) &&
3684 !fRegisterExcludes.Contains(outputFile)) break;
3687 Bool_t done = CheckMergedFiles(outputFile, runOutDir, fMaxMergeFiles, mergeJDLName);
3688 if (!done && (i==ntosubmit-1)) return kFALSE;
3689 if (!fRunNumbers.Length() && !fRunRange[0]) break;
3691 if (!ntosubmit) return kTRUE;
3693 Info("StartAnalysis", "\n #### STARTING AN ALIEN SHELL FOR YOU. You can exit any time or inspect your jobs in a different shell.##########\
3694 \n Make sure your jobs are in a final state (you can resubmit failed ones via 'masterjob <id> resubmit ERROR_ALL')\
3695 \n Rerun in 'terminate' mode to submit all merging stages, each AFTER the previous one completed. The final merged \
3696 \n output will be written to your alien output directory, while separate stages in <Stage_n>. \
3697 \n ################################################################################################################");
3698 gSystem->Exec("aliensh");
3700 Info("StartAnalysis", "\n #### STARTED MERGING JOBS FOR YOU #### \
3701 \n Make sure your jobs are in a final state (you can resubmit failed ones via 'masterjob <id> resubmit ERROR_ALL') \
3702 \n Rerun in 'terminate' mode to submit all merging stages, each AFTER the previous one completed. The final merged \
3703 \n output will be written to your alien output directory, while separate stages in <Stage_n>. \
3704 \n ################################################################################################################");
3709 //______________________________________________________________________________
3710 Bool_t AliAnalysisAlien::SubmitNext()
3712 // Submit next bunch of master jobs if the queue is free. The first master job is
3713 // submitted right away, while the next will not be unless the previous was split.
3714 // The plugin will not submit new master jobs if there are more that 500 jobs in
3716 static Bool_t iscalled = kFALSE;
3717 static Int_t firstmaster = 0;
3718 static Int_t lastmaster = 0;
3719 static Int_t npermaster = 0;
3720 if (iscalled) return kTRUE;
3722 Int_t nrunning=0, nwaiting=0, nerror=0, ndone=0;
3723 Int_t ntosubmit = 0;
3726 Int_t nmasterjobs = fInputFiles->GetEntries();
3729 if (!IsUseSubmitPolicy()) {
3731 Info("SubmitNext","### Warning submit policy not used ! Submitting too many jobs at a time may be prohibitted. \
3732 \n### You can use SetUseSubmitPolicy() to enable if you have problems.");
3733 ntosubmit = nmasterjobs;
3736 TString status = GetJobStatus(firstmaster, lastmaster, nrunning, nwaiting, nerror, ndone);
3737 printf("=== master %d: %s\n", lastmaster, status.Data());
3738 // If last master not split, just return
3739 if (status != "SPLIT") {iscalled = kFALSE; return kTRUE;}
3740 // No more than 100 waiting jobs
3741 if (nwaiting>500) {iscalled = kFALSE; return kTRUE;}
3742 npermaster = (nrunning+nwaiting+nerror+ndone)/fNsubmitted;
3743 if (npermaster) ntosubmit = (500-nwaiting)/npermaster;
3744 if (!ntosubmit) ntosubmit = 1;
3745 printf("=== WAITING(%d) RUNNING(%d) DONE(%d) OTHER(%d) NperMaster=%d => to submit %d jobs\n",
3746 nwaiting, nrunning, ndone, nerror, npermaster, ntosubmit);
3748 for (Int_t i=0; i<ntosubmit; i++) {
3749 // Submit for a range of enumeration of runs.
3750 if (fNsubmitted>=nmasterjobs) {iscalled = kFALSE; return kTRUE;}
3752 TString runOutDir = gSystem->BaseName(fInputFiles->At(fNsubmitted)->GetName());
3753 runOutDir.ReplaceAll(".xml", "");
3755 query = Form("submit %s %s %s", fJDLName.Data(), fInputFiles->At(fNsubmitted)->GetName(), runOutDir.Data());
3757 query = Form("submit %s %s %03d", fJDLName.Data(), fInputFiles->At(fNsubmitted)->GetName(), fNsubmitted);
3758 printf("********* %s\n",query.Data());
3759 res = gGrid->Command(query);
3761 TString cjobId1 = res->GetKey(0,"jobId");
3762 if (!cjobId1.Length()) {
3766 Error("StartAnalysis", "Your JDL %s could not be submitted. The message was:", fJDLName.Data());
3769 Info("StartAnalysis", "\n_______________________________________________________________________ \
3770 \n##### Your JDL %s submitted (%d to go). \nTHE JOB ID IS: %s \
3771 \n_______________________________________________________________________",
3772 fJDLName.Data(), nmasterjobs-fNsubmitted-1, cjobId1.Data());
3773 if (!fGridJobIDs.IsNull()) fGridJobIDs.Append(" ");
3774 fGridJobIDs.Append(cjobId1);
3775 if (!fGridStages.IsNull()) fGridStages.Append(" ");
3776 fGridStages.Append("full");
3779 lastmaster = cjobId1.Atoi();
3780 if (!firstmaster) firstmaster = lastmaster;
3785 Error("StartAnalysis", "No grid result after submission !!! Bailing out...");
3793 //______________________________________________________________________________
3794 void AliAnalysisAlien::WriteAnalysisFile()
3796 // Write current analysis manager into the file <analysisFile>
3797 TString analysisFile = fExecutable;
3798 analysisFile.ReplaceAll(".sh", ".root");
3799 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3800 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
3801 if (!mgr || !mgr->IsInitialized()) {
3802 Error("WriteAnalysisFile", "You need an initialized analysis manager for this");
3805 // Check analysis type
3807 if (mgr->GetMCtruthEventHandler()) TObject::SetBit(AliAnalysisGrid::kUseMC);
3808 handler = (TObject*)mgr->GetInputEventHandler();
3810 if (handler->InheritsFrom("AliMultiInputEventHandler")) {
3811 AliMultiInputEventHandler *multiIH = (AliMultiInputEventHandler*)handler;
3812 if (multiIH->GetFirstInputEventHandler()->InheritsFrom("AliESDInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseESD);
3813 if (multiIH->GetFirstInputEventHandler()->InheritsFrom("AliAODInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseAOD);
3815 if (handler->InheritsFrom("AliESDInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseESD);
3816 if (handler->InheritsFrom("AliAODInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseAOD);
3819 TDirectory *cdir = gDirectory;
3820 TFile *file = TFile::Open(analysisFile, "RECREATE");
3822 // Skip task Terminate calls for the grid job (but not in test mode, where we want to check also the terminate mode
3823 if (!TestBit(AliAnalysisGrid::kTest)) mgr->SetSkipTerminate(kTRUE);
3824 // Unless merging makes no sense
3825 if (IsSingleOutput()) mgr->SetSkipTerminate(kFALSE);
3828 // Enable termination for local jobs
3829 mgr->SetSkipTerminate(kFALSE);
3831 if (cdir) cdir->cd();
3832 Info("WriteAnalysisFile", "\n##### Analysis manager: %s wrote to file <%s>\n", mgr->GetName(),analysisFile.Data());
3834 Bool_t copy = kTRUE;
3835 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3838 TString workdir = gGrid->GetHomeDirectory();
3839 workdir += fGridWorkingDir;
3840 Info("WriteAnalysisFile", "\n##### Copying file <%s> containing your initialized analysis manager to your alien workspace", analysisFile.Data());
3841 if (FileExists(analysisFile)) gGrid->Rm(analysisFile);
3842 if (!copyLocal2Alien("WriteAnalysisFile",analysisFile.Data(),
3843 Form("%s/%s", workdir.Data(),analysisFile.Data()))) Fatal("","Terminating");
3847 //______________________________________________________________________________
3848 void AliAnalysisAlien::WriteAnalysisMacro()
3850 // Write the analysis macro that will steer the analysis in grid mode.
3851 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3853 out.open(fAnalysisMacro.Data(), ios::out);
3855 Error("WriteAnalysisMacro", "could not open file %s for writing", fAnalysisMacro.Data());
3858 Bool_t hasSTEERBase = kFALSE;
3859 Bool_t hasESD = kFALSE;
3860 Bool_t hasAOD = kFALSE;
3861 Bool_t hasANALYSIS = kFALSE;
3862 Bool_t hasOADB = kFALSE;
3863 Bool_t hasANALYSISalice = kFALSE;
3864 Bool_t hasCORRFW = kFALSE;
3865 TString func = fAnalysisMacro;
3866 TString type = "ESD";
3867 TString comment = "// Analysis using ";
3868 if (IsUseMCchain()) {
3872 if (TObject::TestBit(AliAnalysisGrid::kUseESD)) comment += "ESD";
3873 if (TObject::TestBit(AliAnalysisGrid::kUseAOD)) {
3878 if (type!="AOD" && fFriendChainName!="") {
3879 Error("WriteAnalysisMacro", "Friend chain can be attached only to AOD");
3882 if (TObject::TestBit(AliAnalysisGrid::kUseMC)) comment += "/MC";
3883 else comment += " data";
3884 out << "const char *anatype = \"" << type.Data() << "\";" << endl << endl;
3885 func.ReplaceAll(".C", "");
3886 out << "void " << func.Data() << "()" << endl;
3888 out << comment.Data() << endl;
3889 out << "// Automatically generated analysis steering macro executed in grid subjobs" << endl << endl;
3890 out << " TStopwatch timer;" << endl;
3891 out << " timer.Start();" << endl << endl;
3892 // Change temp directory to current one
3893 if (!IsLocalTest()) {
3894 out << "// connect to AliEn and make the chain" << endl;
3895 out << " if (!TGrid::Connect(\"alien://\")) return;" << endl;
3897 out << "// Set temporary merging directory to current one" << endl;
3898 out << " gSystem->Setenv(\"TMPDIR\", gSystem->pwd());" << endl << endl;
3899 out << "// Set temporary compilation directory to current one" << endl;
3900 out << " gSystem->SetBuildDir(gSystem->pwd(), kTRUE);" << endl << endl;
3901 // Reset existing include path
3902 out << "// Reset existing include path and add current directory first in the search" << endl;
3903 out << " gSystem->SetIncludePath(\"-I.\");" << endl;
3904 if (!fExecutableCommand.Contains("aliroot")) {
3905 out << "// load base root libraries" << endl;
3906 out << " gSystem->Load(\"libTree\");" << endl;
3907 out << " gSystem->Load(\"libGeom\");" << endl;
3908 out << " gSystem->Load(\"libVMC\");" << endl;
3909 out << " gSystem->Load(\"libPhysics\");" << endl << endl;
3910 out << " gSystem->Load(\"libMinuit\");" << endl << endl;
3912 if (fAdditionalRootLibs.Length()) {
3913 // in principle libtree /lib geom libvmc etc. can go into this list, too
3914 out << "// Add aditional libraries" << endl;
3915 TObjArray *list = fAdditionalRootLibs.Tokenize(" ");
3918 while((str=(TObjString*)next())) {
3919 if (str->GetString().Contains(".so"))
3920 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
3922 if (list) delete list;
3924 out << "// Load analysis framework libraries" << endl;
3925 TString setupPar = "AliAnalysisAlien::SetupPar";
3927 if (!fExecutableCommand.Contains("aliroot")) {
3928 out << " gSystem->Load(\"libSTEERBase\");" << endl;
3929 out << " gSystem->Load(\"libESD\");" << endl;
3930 out << " gSystem->Load(\"libAOD\");" << endl;
3932 out << " gSystem->Load(\"libANALYSIS\");" << endl;
3933 out << " gSystem->Load(\"libOADB\");" << endl;
3934 out << " gSystem->Load(\"libANALYSISalice\");" << endl;
3935 out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
3937 TIter next(fPackages);
3940 while ((obj=next())) {
3941 pkgname = obj->GetName();
3942 if (pkgname == "STEERBase" ||
3943 pkgname == "STEERBase.par") hasSTEERBase = kTRUE;
3944 if (pkgname == "ESD" ||
3945 pkgname == "ESD.par") hasESD = kTRUE;
3946 if (pkgname == "AOD" ||
3947 pkgname == "AOD.par") hasAOD = kTRUE;
3948 if (pkgname == "ANALYSIS" ||
3949 pkgname == "ANALYSIS.par") hasANALYSIS = kTRUE;
3950 if (pkgname == "OADB" ||
3951 pkgname == "OADB.par") hasOADB = kTRUE;
3952 if (pkgname == "ANALYSISalice" ||
3953 pkgname == "ANALYSISalice.par") hasANALYSISalice = kTRUE;
3954 if (pkgname == "CORRFW" ||
3955 pkgname == "CORRFW.par") hasCORRFW = kTRUE;
3957 if (hasANALYSISalice) setupPar = "SetupPar";
3958 if (!hasSTEERBase) out << " gSystem->Load(\"libSTEERBase\");" << endl;
3959 else out << " if (!" << setupPar << "(\"STEERBase\")) return;" << endl;
3960 if (!hasESD) out << " gSystem->Load(\"libESD\");" << endl;
3961 else out << " if (!" << setupPar << "(\"ESD\")) return;" << endl;
3962 if (!hasAOD) out << " gSystem->Load(\"libAOD\");" << endl;
3963 else out << " if (!" << setupPar << "(\"AOD\")) return;" << endl;
3964 if (!hasANALYSIS) out << " gSystem->Load(\"libANALYSIS\");" << endl;
3965 else out << " if (!" << setupPar << "(\"ANALYSIS\")) return;" << endl;
3966 if (!hasOADB) out << " gSystem->Load(\"libOADB\");" << endl;
3967 else out << " if (!" << setupPar << "(\"OADB\")) return;" << endl;
3968 if (!hasANALYSISalice) out << " gSystem->Load(\"libANALYSISalice\");" << endl;
3969 else out << " if (!" << setupPar << "(\"ANALYSISalice\")) return;" << endl;
3970 if (!hasCORRFW) out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
3971 else out << " if (!" << setupPar << "(\"CORRFW\")) return;" << endl << endl;
3972 out << "// Compile other par packages" << endl;
3974 while ((obj=next())) {
3975 pkgname = obj->GetName();
3976 if (pkgname == "STEERBase" ||
3977 pkgname == "STEERBase.par" ||
3979 pkgname == "ESD.par" ||
3981 pkgname == "AOD.par" ||
3982 pkgname == "ANALYSIS" ||
3983 pkgname == "ANALYSIS.par" ||
3984 pkgname == "OADB" ||
3985 pkgname == "OADB.par" ||
3986 pkgname == "ANALYSISalice" ||
3987 pkgname == "ANALYSISalice.par" ||
3988 pkgname == "CORRFW" ||
3989 pkgname == "CORRFW.par") continue;
3990 out << " if (!" << setupPar << "(\"" << obj->GetName() << "\")) return;" << endl;
3993 out << "// include path" << endl;
3994 // Get the include path from the interpreter and remove entries pointing to AliRoot
3995 out << " TString intPath = gInterpreter->GetIncludePath();" << endl;
3996 out << " TObjArray *listpaths = intPath.Tokenize(\" \");" << endl;
3997 out << " TIter nextpath(listpaths);" << endl;
3998 out << " TObjString *pname;" << endl;
3999 out << " while ((pname=(TObjString*)nextpath())) {" << endl;
4000 out << " TString current = pname->GetName();" << endl;
4001 out << " if (current.Contains(\"AliRoot\") || current.Contains(\"ALICE_ROOT\")) continue;" << endl;
4002 out << " gSystem->AddIncludePath(current);" << endl;
4003 out << " }" << endl;
4004 out << " if (listpaths) delete listpaths;" << endl;
4005 if (fIncludePath.Length()) out << " gSystem->AddIncludePath(\"" << fIncludePath.Data() << "\");" << endl;
4006 out << " gROOT->ProcessLine(\".include $ALICE_ROOT/include\");" << endl;
4007 out << " printf(\"Include path: %s\\n\", gSystem->GetIncludePath());" << endl << endl;
4008 if (fAdditionalLibs.Length()) {
4009 out << "// Add aditional AliRoot libraries" << endl;
4010 TObjArray *list = fAdditionalLibs.Tokenize(" ");
4013 while((str=(TObjString*)next())) {
4014 if (str->GetString().Contains(".so"))
4015 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
4016 if (str->GetString().Contains(".par"))
4017 out << " if (!" << setupPar << "(\"" << str->GetString() << "\")) return;" << endl;
4019 if (list) delete list;
4022 out << "// analysis source to be compiled at runtime (if any)" << endl;
4023 if (fAnalysisSource.Length()) {
4024 TObjArray *list = fAnalysisSource.Tokenize(" ");
4027 while((str=(TObjString*)next())) {
4028 out << " gROOT->ProcessLine(\".L " << str->GetString().Data() << "+g\");" << endl;
4030 if (list) delete list;
4033 // out << " printf(\"Currently load libraries:\\n\");" << endl;
4034 // out << " printf(\"%s\\n\", gSystem->GetLibraries());" << endl;
4035 if (fFastReadOption) {
4036 Warning("WriteAnalysisMacro", "!!! You requested FastRead option. Using xrootd flags to reduce timeouts in the grid jobs. This may skip some files that could be accessed !!! \
4037 \n+++ NOTE: To disable this option, use: plugin->SetFastReadOption(kFALSE)");
4038 out << "// fast xrootd reading enabled" << endl;
4039 out << " printf(\"!!! You requested FastRead option. Using xrootd flags to reduce timeouts. Note that this may skip some files that could be accessed !!!\");" << endl;
4040 out << " gEnv->SetValue(\"XNet.ConnectTimeout\",50);" << endl;
4041 out << " gEnv->SetValue(\"XNet.RequestTimeout\",50);" << endl;
4042 out << " gEnv->SetValue(\"XNet.MaxRedirectCount\",2);" << endl;
4043 out << " gEnv->SetValue(\"XNet.ReconnectTimeout\",50);" << endl;
4044 out << " gEnv->SetValue(\"XNet.FirstConnectMaxCnt\",1);" << endl << endl;
4046 out << "// read the analysis manager from file" << endl;
4047 TString analysisFile = fExecutable;
4048 analysisFile.ReplaceAll(".sh", ".root");
4049 out << " AliAnalysisManager *mgr = AliAnalysisAlien::LoadAnalysisManager(\""
4050 << analysisFile << "\");" << endl;
4051 out << " if (!mgr) return;" << endl;
4052 if (IsLocalTest()) {
4053 out << " AliAnalysisAlien *plugin = new AliAnalysisAlien();" << endl;
4054 out << " plugin->SetRunMode(\"test\");" << endl;
4055 if (fFileForTestMode.IsNull())
4056 out << " plugin->SetFileForTestMode(\"data.txt\");" << endl;
4058 out << " plugin->SetFileForTestMode(\"" << fFileForTestMode << "\");" << endl;
4059 out << " plugin->SetNtestFiles(" << fNtestFiles << ");" << endl;
4060 if (!fFriendChainName.IsNull())
4061 out << " plugin->SetFriendChainName(\"" << fFriendChainName << "\");" << endl;
4063 out << " plugin->SetUseMCchain();" << endl;
4064 out << " mgr->SetGridHandler(plugin);" << endl;
4065 if (AliAnalysisManager::GetAnalysisManager()) {
4066 out << " mgr->SetDebugLevel(" << AliAnalysisManager::GetAnalysisManager()->GetDebugLevel() << ");" << endl;
4067 out << " mgr->SetNSysInfo(" << AliAnalysisManager::GetAnalysisManager()->GetNsysInfo() << ");" << endl;
4069 out << " mgr->SetDebugLevel(10);" << endl;
4070 out << " mgr->SetNSysInfo(100);" << endl;
4073 out << " mgr->PrintStatus();" << endl;
4074 if (AliAnalysisManager::GetAnalysisManager()) {
4075 if (AliAnalysisManager::GetAnalysisManager()->GetDebugLevel()>3) {
4076 out << " gEnv->SetValue(\"XNet.Debug\", \"1\");" << endl;
4078 if (TestBit(AliAnalysisGrid::kTest))
4079 out << " AliLog::SetGlobalLogLevel(AliLog::kWarning);" << endl;
4081 out << " AliLog::SetGlobalLogLevel(AliLog::kError);" << endl;
4084 if (!IsLocalTest()) {
4085 out << " TChain *chain = CreateChain(\"wn.xml\", anatype);" << endl << endl;
4086 out << " mgr->StartAnalysis(\"localfile\", chain);" << endl;
4088 out << " mgr->StartAnalysis(\"localfile\");" << endl;
4090 out << " timer.Stop();" << endl;
4091 out << " timer.Print();" << endl;
4092 out << "}" << endl << endl;
4093 if (!IsLocalTest()) {
4094 out <<"//________________________________________________________________________________" << endl;
4095 out << "TChain* CreateChain(const char *xmlfile, const char *type=\"ESD\")" << endl;
4097 out << "// Create a chain using url's from xml file" << endl;
4098 out << " TString filename;" << endl;
4099 out << " Int_t run = 0;" << endl;
4100 if (IsUseMCchain()) {
4101 out << " TString treename = \"TE\";" << endl;
4103 out << " TString treename = type;" << endl;
4104 out << " treename.ToLower();" << endl;
4105 out << " treename += \"Tree\";" << endl;
4107 out << " printf(\"***************************************\\n\");" << endl;
4108 out << " printf(\" Getting chain of trees %s\\n\", treename.Data());" << endl;
4109 out << " printf(\"***************************************\\n\");" << endl;
4110 out << " TAlienCollection *coll = TAlienCollection::Open(xmlfile);" << endl;
4111 out << " if (!coll) {" << endl;
4112 out << " ::Error(\"CreateChain\", \"Cannot create an AliEn collection from %s\", xmlfile);" << endl;
4113 out << " return NULL;" << endl;
4114 out << " }" << endl;
4115 out << " AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();" << endl;
4116 out << " TChain *chain = new TChain(treename);" << endl;
4117 if(fFriendChainName!="") {
4118 out << " TChain *chainFriend = new TChain(treename);" << endl;
4120 out << " coll->Reset();" << endl;
4121 out << " while (coll->Next()) {" << endl;
4122 out << " filename = coll->GetTURL("");" << endl;
4123 out << " if (mgr) {" << endl;
4124 out << " Int_t nrun = AliAnalysisManager::GetRunFromAlienPath(filename);" << endl;
4125 out << " if (nrun && nrun != run) {" << endl;
4126 out << " printf(\"### Run number detected from chain: %d\\n\", nrun);" << endl;
4127 out << " mgr->SetRunFromPath(nrun);" << endl;
4128 out << " run = nrun;" << endl;
4129 out << " }" << endl;
4130 out << " }" << endl;
4131 out << " chain->Add(filename);" << endl;
4132 if(fFriendChainName!="") {
4133 out << " TString fileFriend=coll->GetTURL(\"\");" << endl;
4134 out << " if (fileFriend.Index(\"#\") > -1) fileFriend.Remove(fileFriend.Index(\"#\"));" << endl;
4135 out << " fileFriend = gSystem->DirName(fileFriend);" << endl;
4136 out << " fileFriend += \"/\";" << endl;
4137 out << " fileFriend += \"" << fFriendChainName << "\";";
4138 out << " TFile *file = TFile::Open(fileFriend);" << endl;
4139 out << " if (file) {" << endl;
4140 out << " file->Close();" << endl;
4141 out << " chainFriend->Add(fileFriend.Data());" << endl;
4142 out << " } else {" << endl;
4143 out << " ::Fatal(\"CreateChain\", \"Cannot open friend file: %s\", fileFriend.Data());" << endl;
4144 out << " return 0;" << endl;
4145 out << " }" << endl;
4147 out << " }" << endl;
4148 out << " if (!chain->GetNtrees()) {" << endl;
4149 out << " ::Error(\"CreateChain\", \"No tree found from collection %s\", xmlfile);" << endl;
4150 out << " return NULL;" << endl;
4151 out << " }" << endl;
4152 if(fFriendChainName!="") {
4153 out << " chain->AddFriend(chainFriend);" << endl;
4155 out << " return chain;" << endl;
4156 out << "}" << endl << endl;
4158 if (hasANALYSISalice) {
4159 out <<"//________________________________________________________________________________" << endl;
4160 out << "Bool_t SetupPar(const char *package) {" << endl;
4161 out << "// Compile the package and set it up." << endl;
4162 out << " TString pkgdir = package;" << endl;
4163 out << " pkgdir.ReplaceAll(\".par\",\"\");" << endl;
4164 out << " gSystem->Exec(TString::Format(\"tar xvzf %s.par\", pkgdir.Data()));" << endl;
4165 out << " TString cdir = gSystem->WorkingDirectory();" << endl;
4166 out << " gSystem->ChangeDirectory(pkgdir);" << endl;
4167 out << " // Check for BUILD.sh and execute" << endl;
4168 out << " if (!gSystem->AccessPathName(\"PROOF-INF/BUILD.sh\")) {" << endl;
4169 out << " printf(\"*******************************\\n\");" << endl;
4170 out << " printf(\"*** Building PAR archive ***\\n\");" << endl;
4171 out << " printf(\"*******************************\\n\");" << endl;
4172 out << " if (gSystem->Exec(\"PROOF-INF/BUILD.sh\")) {" << endl;
4173 out << " ::Error(\"SetupPar\", \"Cannot build par archive %s\", pkgdir.Data());" << endl;
4174 out << " gSystem->ChangeDirectory(cdir);" << endl;
4175 out << " return kFALSE;" << endl;
4176 out << " }" << endl;
4177 out << " } else {" << endl;
4178 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/BUILD.sh for package %s\", pkgdir.Data());" << endl;
4179 out << " gSystem->ChangeDirectory(cdir);" << endl;
4180 out << " return kFALSE;" << endl;
4181 out << " }" << endl;
4182 out << " // Check for SETUP.C and execute" << endl;
4183 out << " if (!gSystem->AccessPathName(\"PROOF-INF/SETUP.C\")) {" << endl;
4184 out << " printf(\"*******************************\\n\");" << endl;
4185 out << " printf(\"*** Setup PAR archive ***\\n\");" << endl;
4186 out << " printf(\"*******************************\\n\");" << endl;
4187 out << " gROOT->Macro(\"PROOF-INF/SETUP.C\");" << endl;
4188 out << " } else {" << endl;
4189 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/SETUP.C for package %s\", pkgdir.Data());" << endl;
4190 out << " gSystem->ChangeDirectory(cdir);" << endl;
4191 out << " return kFALSE;" << endl;
4192 out << " }" << endl;
4193 out << " // Restore original workdir" << endl;
4194 out << " gSystem->ChangeDirectory(cdir);" << endl;
4195 out << " return kTRUE;" << endl;
4198 Info("WriteAnalysisMacro", "\n##### Analysis macro to run on worker nodes <%s> written",fAnalysisMacro.Data());
4200 Bool_t copy = kTRUE;
4201 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
4204 TString workdir = gGrid->GetHomeDirectory();
4205 workdir += fGridWorkingDir;
4206 if (FileExists(fAnalysisMacro)) gGrid->Rm(fAnalysisMacro);
4207 Info("WriteAnalysisMacro", "\n##### Copying analysis macro: <%s> to your alien workspace", fAnalysisMacro.Data());
4208 // TFile::Cp(Form("file:%s",fAnalysisMacro.Data()), Form("alien://%s/%s", workdir.Data(), fAnalysisMacro.Data()));
4209 if (!copyLocal2Alien("WriteAnalysisMacro",fAnalysisMacro.Data(),
4210 Form("alien://%s/%s", workdir.Data(),
4211 fAnalysisMacro.Data()))) Fatal("","Terminating");
4215 //______________________________________________________________________________
4216 void AliAnalysisAlien::WriteMergingMacro()
4218 // Write a macro to merge the outputs per master job.
4219 if (!fMergeViaJDL) return;
4220 if (!fOutputFiles.Length()) {
4221 Error("WriteMergingMacro", "No output file names defined. Are you running the right AliAnalysisAlien configuration ?");
4224 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
4225 TString mergingMacro = fExecutable;
4226 mergingMacro.ReplaceAll(".sh","_merge.C");
4227 if (gGrid && !fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
4228 if (!TestBit(AliAnalysisGrid::kSubmit)) {
4230 out.open(mergingMacro.Data(), ios::out);
4232 Error("WriteMergingMacro", "could not open file %s for writing", fAnalysisMacro.Data());
4235 Bool_t hasSTEERBase = kFALSE;
4236 Bool_t hasESD = kFALSE;
4237 Bool_t hasAOD = kFALSE;
4238 Bool_t hasANALYSIS = kFALSE;
4239 Bool_t hasOADB = kFALSE;
4240 Bool_t hasANALYSISalice = kFALSE;
4241 Bool_t hasCORRFW = kFALSE;
4242 TString func = mergingMacro;
4244 func.ReplaceAll(".C", "");
4245 out << "void " << func.Data() << "(const char *dir, Int_t stage=0)" << endl;
4247 out << "// Automatically generated merging macro executed in grid subjobs" << endl << endl;
4248 out << " TStopwatch timer;" << endl;
4249 out << " timer.Start();" << endl << endl;
4250 // Reset existing include path
4251 out << "// Reset existing include path and add current directory first in the search" << endl;
4252 out << " gSystem->SetIncludePath(\"-I.\");" << endl;
4253 if (!fExecutableCommand.Contains("aliroot")) {
4254 out << "// load base root libraries" << endl;
4255 out << " gSystem->Load(\"libTree\");" << endl;
4256 out << " gSystem->Load(\"libGeom\");" << endl;
4257 out << " gSystem->Load(\"libVMC\");" << endl;
4258 out << " gSystem->Load(\"libPhysics\");" << endl << endl;
4259 out << " gSystem->Load(\"libMinuit\");" << endl << endl;
4261 if (fAdditionalRootLibs.Length()) {
4262 // in principle libtree /lib geom libvmc etc. can go into this list, too
4263 out << "// Add aditional libraries" << endl;
4264 TObjArray *list = fAdditionalRootLibs.Tokenize(" ");
4267 while((str=(TObjString*)next())) {
4268 if (str->GetString().Contains(".so"))
4269 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
4271 if (list) delete list;
4273 out << "// Load analysis framework libraries" << endl;
4275 if (!fExecutableCommand.Contains("aliroot")) {
4276 out << " gSystem->Load(\"libSTEERBase\");" << endl;
4277 out << " gSystem->Load(\"libESD\");" << endl;
4278 out << " gSystem->Load(\"libAOD\");" << endl;
4280 out << " gSystem->Load(\"libANALYSIS\");" << endl;
4281 out << " gSystem->Load(\"libOADB\");" << endl;
4282 out << " gSystem->Load(\"libANALYSISalice\");" << endl;
4283 out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
4285 TIter next(fPackages);
4288 TString setupPar = "AliAnalysisAlien::SetupPar";
4289 while ((obj=next())) {
4290 pkgname = obj->GetName();
4291 if (pkgname == "STEERBase" ||
4292 pkgname == "STEERBase.par") hasSTEERBase = kTRUE;
4293 if (pkgname == "ESD" ||
4294 pkgname == "ESD.par") hasESD = kTRUE;
4295 if (pkgname == "AOD" ||
4296 pkgname == "AOD.par") hasAOD = kTRUE;
4297 if (pkgname == "ANALYSIS" ||
4298 pkgname == "ANALYSIS.par") hasANALYSIS = kTRUE;
4299 if (pkgname == "OADB" ||
4300 pkgname == "OADB.par") hasOADB = kTRUE;
4301 if (pkgname == "ANALYSISalice" ||
4302 pkgname == "ANALYSISalice.par") hasANALYSISalice = kTRUE;
4303 if (pkgname == "CORRFW" ||
4304 pkgname == "CORRFW.par") hasCORRFW = kTRUE;
4306 if (hasANALYSISalice) setupPar = "SetupPar";
4307 if (!hasSTEERBase) out << " gSystem->Load(\"libSTEERBase\");" << endl;
4308 else out << " if (!" << setupPar << "(\"STEERBase\")) return;" << endl;
4309 if (!hasESD) out << " gSystem->Load(\"libESD\");" << endl;
4310 else out << " if (!" << setupPar << "(\"ESD\")) return;" << endl;
4311 if (!hasAOD) out << " gSystem->Load(\"libAOD\");" << endl;
4312 else out << " if (!" << setupPar << "(\"AOD\")) return;" << endl;
4313 out << " gSystem->Load(\"libOADB\");" << endl;
4314 if (!hasANALYSIS) out << " gSystem->Load(\"libANALYSIS\");" << endl;
4315 else out << " if (!" << setupPar << "(\"ANALYSIS\")) return;" << endl;
4316 if (!hasOADB) out << " gSystem->Load(\"libOADB\");" << endl;
4317 else out << " if (!" << setupPar << "(\"OADB\")) return;" << endl;
4318 if (!hasANALYSISalice) out << " gSystem->Load(\"libANALYSISalice\");" << endl;
4319 else out << " if (!" << setupPar << "(\"ANALYSISalice\")) return;" << endl;
4320 if (!hasCORRFW) out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
4321 else out << " if (!" << setupPar << "(\"CORRFW\")) return;" << endl << endl;
4322 out << "// Compile other par packages" << endl;
4324 while ((obj=next())) {
4325 pkgname = obj->GetName();
4326 if (pkgname == "STEERBase" ||
4327 pkgname == "STEERBase.par" ||
4329 pkgname == "ESD.par" ||
4331 pkgname == "AOD.par" ||
4332 pkgname == "ANALYSIS" ||
4333 pkgname == "ANALYSIS.par" ||
4334 pkgname == "OADB" ||
4335 pkgname == "OADB.par" ||
4336 pkgname == "ANALYSISalice" ||
4337 pkgname == "ANALYSISalice.par" ||
4338 pkgname == "CORRFW" ||
4339 pkgname == "CORRFW.par") continue;
4340 out << " if (!" << setupPar << "(\"" << obj->GetName() << "\")) return;" << endl;
4343 out << "// include path" << endl;
4344 // Get the include path from the interpreter and remove entries pointing to AliRoot
4345 out << " TString intPath = gInterpreter->GetIncludePath();" << endl;
4346 out << " TObjArray *listpaths = intPath.Tokenize(\" \");" << endl;
4347 out << " TIter nextpath(listpaths);" << endl;
4348 out << " TObjString *pname;" << endl;
4349 out << " while ((pname=(TObjString*)nextpath())) {" << endl;
4350 out << " TString current = pname->GetName();" << endl;
4351 out << " if (current.Contains(\"AliRoot\") || current.Contains(\"ALICE_ROOT\")) continue;" << endl;
4352 out << " gSystem->AddIncludePath(current);" << endl;
4353 out << " }" << endl;
4354 out << " if (listpaths) delete listpaths;" << endl;
4355 if (fIncludePath.Length()) out << " gSystem->AddIncludePath(\"" << fIncludePath.Data() << "\");" << endl;
4356 out << " gROOT->ProcessLine(\".include $ALICE_ROOT/include\");" << endl;
4357 out << " printf(\"Include path: %s\\n\", gSystem->GetIncludePath());" << endl << endl;
4358 if (fAdditionalLibs.Length()) {
4359 out << "// Add aditional AliRoot libraries" << endl;
4360 TObjArray *list = fAdditionalLibs.Tokenize(" ");
4363 while((str=(TObjString*)next())) {
4364 if (str->GetString().Contains(".so"))
4365 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
4367 if (list) delete list;
4370 out << "// Analysis source to be compiled at runtime (if any)" << endl;
4371 if (fAnalysisSource.Length()) {
4372 TObjArray *list = fAnalysisSource.Tokenize(" ");
4375 while((str=(TObjString*)next())) {
4376 out << " gROOT->ProcessLine(\".L " << str->GetString().Data() << "+g\");" << endl;
4378 if (list) delete list;
4382 if (fFastReadOption) {
4383 Warning("WriteMergingMacro", "!!! You requested FastRead option. Using xrootd flags to reduce timeouts in the grid merging jobs. Note that this may skip some files that could be accessed !!!");
4384 out << "// fast xrootd reading enabled" << endl;
4385 out << " printf(\"!!! You requested FastRead option. Using xrootd flags to reduce timeouts. Note that this may skip some files that could be accessed !!!\");" << endl;
4386 out << " gEnv->SetValue(\"XNet.ConnectTimeout\",50);" << endl;
4387 out << " gEnv->SetValue(\"XNet.RequestTimeout\",50);" << endl;
4388 out << " gEnv->SetValue(\"XNet.MaxRedirectCount\",2);" << endl;
4389 out << " gEnv->SetValue(\"XNet.ReconnectTimeout\",50);" << endl;
4390 out << " gEnv->SetValue(\"XNet.FirstConnectMaxCnt\",1);" << endl << endl;
4392 // Change temp directory to current one
4393 out << "// Connect to AliEn" << endl;
4394 out << " if (!TGrid::Connect(\"alien://\")) return;" << endl;
4395 out << "// Set temporary merging directory to current one" << endl;
4396 out << " gSystem->Setenv(\"TMPDIR\", gSystem->pwd());" << endl << endl;
4397 out << "// Set temporary compilation directory to current one" << endl;
4398 out << " gSystem->SetBuildDir(gSystem->pwd(), kTRUE);" << endl << endl;
4399 out << " TString outputDir = dir;" << endl;
4400 out << " TString outputFiles = \"" << GetListOfFiles("out") << "\";" << endl;
4401 out << " TString mergeExcludes = \"" << fMergeExcludes << " " << fRegisterExcludes << "\";" << endl;
4402 out << " TObjArray *list = outputFiles.Tokenize(\",\");" << endl;
4403 out << " TIter *iter = new TIter(list);" << endl;
4404 out << " TObjString *str;" << endl;
4405 out << " TString outputFile;" << endl;
4406 out << " Bool_t merged = kTRUE;" << endl;
4407 TString analysisFile = fExecutable;
4408 analysisFile.ReplaceAll(".sh", ".root");
4409 out << " AliAnalysisManager *mgr = AliAnalysisAlien::LoadAnalysisManager(\""
4410 << analysisFile << "\");" << endl;
4411 out << " if (!mgr) {" << endl;
4412 out << " printf(\"ERROR: Analysis manager could not be extracted from file \");" << endl;
4413 out << " return;" << endl;
4414 out << " }" << endl;
4415 if (IsLocalTest()) {
4416 out << " printf(\"===================================\n\");" << endl;
4417 out << " printf(\"Testing merging...\\n\");" << endl;
4418 out << " printf(\"===================================\n\");" << endl;
4420 out << " while((str=(TObjString*)iter->Next())) {" << endl;
4421 out << " outputFile = str->GetString();" << endl;
4422 out << " if (outputFile.Contains(\"*\")) continue;" << endl;
4423 out << " Int_t index = outputFile.Index(\"@\");" << endl;
4424 out << " if (index > 0) outputFile.Remove(index);" << endl;
4425 out << " // Skip already merged outputs" << endl;
4426 out << " if (!gSystem->AccessPathName(outputFile)) {" << endl;
4427 out << " printf(\"Output file <%s> found. Not merging again.\\n\",outputFile.Data());" << endl;
4428 out << " continue;" << endl;
4429 out << " }" << endl;
4430 out << " if (mergeExcludes.Contains(outputFile.Data())) continue;" << endl;
4431 out << " merged = AliAnalysisAlien::MergeOutput(outputFile, outputDir, " << fMaxMergeFiles << ", stage);" << endl;
4432 out << " if (!merged) {" << endl;
4433 out << " printf(\"ERROR: Cannot merge %s\\n\", outputFile.Data());" << endl;
4434 out << " return;" << endl;
4435 out << " }" << endl;
4436 out << " }" << endl;
4437 if (mgr && mgr->IsCollectThroughput() && !IsLocalTest()) {
4438 out << " TString infolog = \"" << mgr->GetFileInfoLog() << "\";" << endl;
4439 out << " AliAnalysisAlien::MergeInfo(infolog, outputDir);" << endl;
4441 out << " // all outputs merged, validate" << endl;
4442 out << " ofstream out;" << endl;
4443 out << " out.open(\"outputs_valid\", ios::out);" << endl;
4444 out << " out.close();" << endl;
4445 out << " // read the analysis manager from file" << endl;
4446 if (IsLocalTest()) {
4447 out << " printf(\"===================================\n\");" << endl;
4448 out << " printf(\"Testing Terminate()...\\n\");" << endl;
4449 out << " printf(\"===================================\n\");" << endl;
4451 out << " if (!outputDir.Contains(\"Stage\")) return;" << endl;
4453 out << " mgr->SetRunFromPath(mgr->GetRunFromAlienPath(dir));" << endl;
4454 out << " mgr->SetSkipTerminate(kFALSE);" << endl;
4455 out << " mgr->PrintStatus();" << endl;
4457 if (mgr->GetDebugLevel()>3) {
4458 out << " gEnv->SetValue(\"XNet.Debug\", \"1\");" << endl;
4460 if (TestBit(AliAnalysisGrid::kTest))
4461 out << " AliLog::SetGlobalLogLevel(AliLog::kWarning);" << endl;
4463 out << " AliLog::SetGlobalLogLevel(AliLog::kError);" << endl;
4466 out << " TTree *tree = NULL;" << endl;
4467 out << " mgr->StartAnalysis(\"gridterminate\", tree);" << endl;
4468 out << "}" << endl << endl;
4469 if (hasANALYSISalice) {
4470 out <<"//________________________________________________________________________________" << endl;
4471 out << "Bool_t SetupPar(const char *package) {" << endl;
4472 out << "// Compile the package and set it up." << endl;
4473 out << " TString pkgdir = package;" << endl;
4474 out << " pkgdir.ReplaceAll(\".par\",\"\");" << endl;
4475 out << " gSystem->Exec(TString::Format(\"tar xvzf %s.par\", pkgdir.Data()));" << endl;
4476 out << " TString cdir = gSystem->WorkingDirectory();" << endl;
4477 out << " gSystem->ChangeDirectory(pkgdir);" << endl;
4478 out << " // Check for BUILD.sh and execute" << endl;
4479 out << " if (!gSystem->AccessPathName(\"PROOF-INF/BUILD.sh\")) {" << endl;
4480 out << " printf(\"*******************************\\n\");" << endl;
4481 out << " printf(\"*** Building PAR archive ***\\n\");" << endl;
4482 out << " printf(\"*******************************\\n\");" << endl;
4483 out << " if (gSystem->Exec(\"PROOF-INF/BUILD.sh\")) {" << endl;
4484 out << " ::Error(\"SetupPar\", \"Cannot build par archive %s\", pkgdir.Data());" << endl;
4485 out << " gSystem->ChangeDirectory(cdir);" << endl;
4486 out << " return kFALSE;" << endl;
4487 out << " }" << endl;
4488 out << " } else {" << endl;
4489 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/BUILD.sh for package %s\", pkgdir.Data());" << endl;
4490 out << " gSystem->ChangeDirectory(cdir);" << endl;
4491 out << " return kFALSE;" << endl;
4492 out << " }" << endl;
4493 out << " // Check for SETUP.C and execute" << endl;
4494 out << " if (!gSystem->AccessPathName(\"PROOF-INF/SETUP.C\")) {" << endl;
4495 out << " printf(\"*******************************\\n\");" << endl;
4496 out << " printf(\"*** Setup PAR archive ***\\n\");" << endl;
4497 out << " printf(\"*******************************\\n\");" << endl;
4498 out << " gROOT->Macro(\"PROOF-INF/SETUP.C\");" << endl;
4499 out << " } else {" << endl;
4500 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/SETUP.C for package %s\", pkgdir.Data());" << endl;
4501 out << " gSystem->ChangeDirectory(cdir);" << endl;
4502 out << " return kFALSE;" << endl;
4503 out << " }" << endl;
4504 out << " // Restore original workdir" << endl;
4505 out << " gSystem->ChangeDirectory(cdir);" << endl;
4506 out << " return kTRUE;" << endl;
4510 Bool_t copy = kTRUE;
4511 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
4514 TString workdir = gGrid->GetHomeDirectory();
4515 workdir += fGridWorkingDir;
4516 if (FileExists(mergingMacro)) gGrid->Rm(mergingMacro);
4517 Info("WriteMergingMacro", "\n##### Copying merging macro: <%s> to your alien workspace", mergingMacro.Data());
4518 // TFile::Cp(Form("file:%s",mergingMacro.Data()), Form("alien://%s/%s", workdir.Data(), mergingMacro.Data()));
4519 if (!copyLocal2Alien("WriteMergeMacro",mergingMacro.Data(),
4520 Form("%s/%s", workdir.Data(), mergingMacro.Data()))) Fatal("","Terminating");
4524 //______________________________________________________________________________
4525 Bool_t AliAnalysisAlien::SetupPar(const char *package)
4527 // Compile the par file archive pointed by <package>. This must be present in the current directory.
4528 // Note that for loading the compiled library. The current directory should have precedence in
4530 TString pkgdir = package;
4531 pkgdir.ReplaceAll(".par","");
4532 gSystem->Exec(TString::Format("tar xzf %s.par", pkgdir.Data()));
4533 TString cdir = gSystem->WorkingDirectory();
4534 gSystem->ChangeDirectory(pkgdir);
4535 // Check for BUILD.sh and execute
4536 if (!gSystem->AccessPathName("PROOF-INF/BUILD.sh")) {
4537 printf("**************************************************\n");
4538 printf("*** Building PAR archive %s\n", package);
4539 printf("**************************************************\n");
4540 if (gSystem->Exec("PROOF-INF/BUILD.sh")) {
4541 ::Error("SetupPar", "Cannot build par archive %s", pkgdir.Data());
4542 gSystem->ChangeDirectory(cdir);
4546 ::Error("SetupPar","Cannot access PROOF-INF/BUILD.sh for package %s", pkgdir.Data());
4547 gSystem->ChangeDirectory(cdir);
4550 // Check for SETUP.C and execute
4551 if (!gSystem->AccessPathName("PROOF-INF/SETUP.C")) {
4552 printf("**************************************************\n");
4553 printf("*** Setup PAR archive %s\n", package);
4554 printf("**************************************************\n");
4555 gROOT->Macro("PROOF-INF/SETUP.C");
4556 printf("*** Loaded library: %s\n", gSystem->GetLibraries(pkgdir,"",kFALSE));
4558 ::Error("SetupPar","Cannot access PROOF-INF/SETUP.C for package %s", pkgdir.Data());
4559 gSystem->ChangeDirectory(cdir);
4562 // Restore original workdir
4563 gSystem->ChangeDirectory(cdir);
4567 //______________________________________________________________________________
4568 void AliAnalysisAlien::WriteExecutable()
4570 // Generate the alien executable script.
4571 // Patch executable with -x to catch error code
4572 if (fExecutableCommand.Contains("root") &&
4573 fExecutableCommand.Contains("-q") &&
4574 !fExecutableCommand.Contains("-x")) fExecutableCommand += " -x";
4575 if (!TestBit(AliAnalysisGrid::kSubmit)) {
4577 out.open(fExecutable.Data(), ios::out);
4579 Error("WriteExecutable", "Bad file name for executable: %s", fExecutable.Data());
4582 out << "#!/bin/bash" << endl;
4583 // Make sure we can properly compile par files
4584 out << "export LD_LIBRARY_PATH=.:$LD_LIBRARY_PATH" << endl;
4585 out << "echo \"=========================================\"" << endl;
4586 out << "echo \"############## PATH : ##############\"" << endl;
4587 out << "echo $PATH" << endl;
4588 out << "echo \"############## LD_LIBRARY_PATH : ##############\"" << endl;
4589 out << "echo $LD_LIBRARY_PATH" << endl;
4590 out << "echo \"############## ROOTSYS : ##############\"" << endl;
4591 out << "echo $ROOTSYS" << endl;
4592 out << "echo \"############## which root : ##############\"" << endl;
4593 out << "which root" << endl;
4594 out << "echo \"############## ALICE_ROOT : ##############\"" << endl;
4595 out << "echo $ALICE_ROOT" << endl;
4596 out << "echo \"############## which aliroot : ##############\"" << endl;
4597 out << "which aliroot" << endl;
4598 out << "echo \"############## system limits : ##############\"" << endl;
4599 out << "ulimit -a" << endl;
4600 out << "echo \"############## memory : ##############\"" << endl;
4601 out << "free -m" << endl;
4602 out << "echo \"=========================================\"" << endl << endl;
4603 out << fExecutableCommand << " ";
4604 out << fAnalysisMacro.Data() << " " << fExecutableArgs.Data() << endl;
4605 out << "RET=$?" << endl;
4606 out << "if [ \"$RET\" != \"0\" ];then" << endl;
4607 out << " echo \"======== ERROR : " << fAnalysisMacro.Data() << " finished with NON zero code: $RET ========\"" << endl;
4608 out << " if [ \"$RET\" -gt 128 ] && [ \"$RET\" -lt 160 ]; then"<<endl;
4609 out << " let sig=\"$RET - 128\""<<endl;
4610 out << " sigs='HUP INT QUIT ILL TRAP ABRT BUS FPE"<<endl;
4611 out << " KILL USR1 SEGV USR2 PIPE ALRM TERM STKFLT"<<endl;
4612 out << " CHLD CONT STOP TSTP TTIN TTOU URG XCPU"<<endl;
4613 out << " XFSZ VTALRM PROF WINCH IO PWR SYS'"<<endl;
4614 out << " sig=SIG`echo $sigs | awk '{ print $'\"$sig\"' }'`"<<endl;
4615 out << " echo \"======== it appears to have been killed with signal: $sig ========\""<<endl;
4617 out << " exit $RET"<< endl;
4618 out << "fi" << endl << endl ;
4619 out << "echo \"======== " << fAnalysisMacro.Data() << " finished with exit code: $RET ========\"" << endl;
4620 out << "echo \"############## memory after: ##############\"" << endl;
4621 out << "free -m" << endl;
4623 Bool_t copy = kTRUE;
4624 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
4627 TString workdir = gGrid->GetHomeDirectory();
4628 TString bindir = Form("%s/bin", workdir.Data());
4629 if (!DirectoryExists(bindir)) gGrid->Mkdir(bindir,"-p");
4630 workdir += fGridWorkingDir;
4631 TString executable = Form("%s/bin/%s", gGrid->GetHomeDirectory(), fExecutable.Data());
4632 if (FileExists(executable)) gGrid->Rm(executable);
4633 Info("WriteExecutable", "\n##### Copying executable file <%s> to your AliEn bin directory", fExecutable.Data());
4634 // TFile::Cp(Form("file:%s",fExecutable.Data()), Form("alien://%s", executable.Data()));
4635 if (!copyLocal2Alien("WriteExecutable",fExecutable.Data(),
4636 executable.Data())) Fatal("","Terminating");
4640 //______________________________________________________________________________
4641 void AliAnalysisAlien::WriteMergeExecutable()
4643 // Generate the alien executable script for the merging job.
4644 if (!fMergeViaJDL) return;
4645 TString mergeExec = fExecutable;
4646 mergeExec.ReplaceAll(".sh", "_merge.sh");
4647 if (!TestBit(AliAnalysisGrid::kSubmit)) {
4649 out.open(mergeExec.Data(), ios::out);
4651 Error("WriteMergingExecutable", "Bad file name for executable: %s", mergeExec.Data());
4654 out << "#!/bin/bash" << endl;
4655 // Make sure we can properly compile par files
4656 out << "export LD_LIBRARY_PATH=.:$LD_LIBRARY_PATH" << endl;
4657 out << "echo \"=========================================\"" << endl;
4658 out << "echo \"############## PATH : ##############\"" << endl;
4659 out << "echo $PATH" << endl;
4660 out << "echo \"############## LD_LIBRARY_PATH : ##############\"" << endl;
4661 out << "echo $LD_LIBRARY_PATH" << endl;
4662 out << "echo \"############## ROOTSYS : ##############\"" << endl;
4663 out << "echo $ROOTSYS" << endl;
4664 out << "echo \"############## which root : ##############\"" << endl;
4665 out << "which root" << endl;
4666 out << "echo \"############## ALICE_ROOT : ##############\"" << endl;
4667 out << "echo $ALICE_ROOT" << endl;
4668 out << "echo \"############## which aliroot : ##############\"" << endl;
4669 out << "which aliroot" << endl;
4670 out << "echo \"############## system limits : ##############\"" << endl;
4671 out << "ulimit -a" << endl;
4672 out << "echo \"############## memory : ##############\"" << endl;
4673 out << "free -m" << endl;
4674 out << "echo \"=========================================\"" << endl << endl;
4675 TString mergeMacro = fExecutable;
4676 mergeMacro.ReplaceAll(".sh", "_merge.C");
4677 if (IsOneStageMerging())
4678 out << "export ARG=\"" << mergeMacro << "(\\\"$1\\\")\"" << endl;
4680 out << "export ARG=\"" << mergeMacro << "(\\\"$1\\\",$2)\"" << endl;
4681 out << fExecutableCommand << " " << "$ARG" << endl;
4682 out << "RET=$?" << endl;
4683 out << "if [ \"$RET\" != \"0\" ];then" << endl;
4684 out << " echo \"======== ERROR : " << fAnalysisMacro.Data() << " finished with NON zero code: $RET ========\"" << endl;
4685 out << " if [ \"$RET\" -gt 128 ] && [ \"$RET\" -lt 160 ]; then"<<endl;
4686 out << " let sig=\"$RET - 128\""<<endl;
4687 out << " sigs='HUP INT QUIT ILL TRAP ABRT BUS FPE"<<endl;
4688 out << " KILL USR1 SEGV USR2 PIPE ALRM TERM STKFLT"<<endl;
4689 out << " CHLD CONT STOP TSTP TTIN TTOU URG XCPU"<<endl;
4690 out << " XFSZ VTALRM PROF WINCH IO PWR SYS'"<<endl;
4691 out << " sig=SIG`echo $sigs | awk '{ print $'\"$sig\"' }'`"<<endl;
4692 out << " echo \"======== it appears to have been killed with signal: $sig ========\""<<endl;
4694 out << " exit $RET"<< endl;
4695 out << "fi" << endl << endl ;
4696 out << "echo \"======== " << mergeMacro.Data() << " finished with exit code: $? ========\"" << endl;
4697 out << "echo \"############## memory after: ##############\"" << endl;
4698 out << "free -m" << endl;
4700 Bool_t copy = kTRUE;
4701 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
4704 TString workdir = gGrid->GetHomeDirectory();
4705 TString bindir = Form("%s/bin", workdir.Data());
4706 if (!DirectoryExists(bindir)) gGrid->Mkdir(bindir,"-p");
4707 workdir += fGridWorkingDir;
4708 TString executable = Form("%s/bin/%s", gGrid->GetHomeDirectory(), mergeExec.Data());
4709 if (FileExists(executable)) gGrid->Rm(executable);
4710 Info("WriteMergeExecutable", "\n##### Copying executable file <%s> to your AliEn bin directory", mergeExec.Data());
4711 // TFile::Cp(Form("file:%s",mergeExec.Data()), Form("alien://%s", executable.Data()));
4712 if (!copyLocal2Alien("WriteMergeExecutable",
4713 mergeExec.Data(), executable.Data())) Fatal("","Terminating");
4717 //______________________________________________________________________________
4718 void AliAnalysisAlien::WriteProductionFile(const char *filename) const
4720 // Write the production file to be submitted by LPM manager. The format is:
4721 // First line: full_path_to_jdl estimated_no_subjobs_per_master
4722 // Next lines: full_path_to_dataset XXX (XXX is a string)
4723 // To submit, one has to: submit jdl XXX for all lines
4725 out.open(filename, ios::out);
4727 Error("WriteProductionFile", "Bad file name: %s", filename);
4731 if (!fProductionMode && !fGridWorkingDir.BeginsWith("/alice"))
4732 workdir = gGrid->GetHomeDirectory();
4733 workdir += fGridWorkingDir;
4734 Int_t njobspermaster = 1000*fNrunsPerMaster/fSplitMaxInputFileNumber;
4735 TString locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
4736 out << locjdl << " " << njobspermaster << endl;
4737 Int_t nmasterjobs = fInputFiles->GetEntries();
4738 for (Int_t i=0; i<nmasterjobs; i++) {
4739 TString runOutDir = gSystem->BaseName(fInputFiles->At(i)->GetName());
4740 runOutDir.ReplaceAll(".xml", "");
4742 out << Form("%s", fInputFiles->At(i)->GetName()) << " " << runOutDir << endl;
4744 out << Form("%s", fInputFiles->At(i)->GetName()) << " " << Form("%03d", i) << endl;
4747 Info("WriteProductionFile", "\n##### Copying production file <%s> to your work directory", filename);
4748 if (FileExists(filename)) gGrid->Rm(filename);
4749 // TFile::Cp(Form("file:%s",filename), Form("alien://%s/%s", workdir.Data(),filename));
4750 if (!copyLocal2Alien("WriteProductionFile", filename,
4751 Form("%s/%s", workdir.Data(),filename))) Fatal("","Terminating");
4755 //______________________________________________________________________________
4756 void AliAnalysisAlien::WriteValidationScript(Bool_t merge)
4758 // Generate the alien validation script.
4759 // Generate the validation script
4761 if (fValidationScript.IsNull()) {
4762 fValidationScript = fExecutable;
4763 fValidationScript.ReplaceAll(".sh", "_validation.sh");
4765 TString validationScript = fValidationScript;
4766 if (merge) validationScript.ReplaceAll(".sh", "_merge.sh");
4768 Error("WriteValidationScript", "Alien connection required");
4771 if (!fTerminateFiles.IsNull()) {
4772 fTerminateFiles.Strip();
4773 fTerminateFiles.ReplaceAll(" ",",");
4775 TString outStream = "";
4776 if (!TestBit(AliAnalysisGrid::kTest)) outStream = " >> stdout";
4777 if (!TestBit(AliAnalysisGrid::kSubmit)) {
4779 out.open(validationScript, ios::out);
4780 out << "#!/bin/bash" << endl;
4781 out << "##################################################" << endl;
4782 out << "validateout=`dirname $0`" << endl;
4783 out << "validatetime=`date`" << endl;
4784 out << "validated=\"0\";" << endl;
4785 out << "error=0" << endl;
4786 out << "if [ -z $validateout ]" << endl;
4787 out << "then" << endl;
4788 out << " validateout=\".\"" << endl;
4789 out << "fi" << endl << endl;
4790 out << "cd $validateout;" << endl;
4791 out << "validateworkdir=`pwd`;" << endl << endl;
4792 out << "echo \"*******************************************************\"" << outStream << endl;
4793 out << "echo \"* Automatically generated validation script *\"" << outStream << endl;
4795 out << "echo \"* Time: $validatetime \"" << outStream << endl;
4796 out << "echo \"* Dir: $validateout\"" << outStream << endl;
4797 out << "echo \"* Workdir: $validateworkdir\"" << outStream << endl;
4798 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl;
4799 out << "ls -la ./" << outStream << endl;
4800 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl << endl;
4801 out << "##################################################" << endl;
4804 out << "if [ ! -f stderr ] ; then" << endl;
4805 out << " error=1" << endl;
4806 out << " echo \"* ########## Job not validated - no stderr ###\" " << outStream << endl;
4807 out << " echo \"Error = $error\" " << outStream << endl;
4808 out << "fi" << endl;
4810 out << "parArch=`grep -Ei \"Cannot Build the PAR Archive\" stderr`" << endl;
4811 out << "segViol=`grep -Ei \"Segmentation violation\" stderr`" << endl;
4812 out << "segFault=`grep -Ei \"Segmentation fault\" stderr`" << endl;
4813 out << "glibcErr=`grep -Ei \"*** glibc detected ***\" stderr`" << endl;
4816 out << "if [ \"$parArch\" != \"\" ] ; then" << endl;
4817 out << " error=1" << endl;
4818 out << " echo \"* ########## Job not validated - PAR archive not built ###\" " << outStream << endl;
4819 out << " echo \"$parArch\" " << outStream << endl;
4820 out << " echo \"Error = $error\" " << outStream << endl;
4821 out << "fi" << endl;
4823 out << "if [ \"$segViol\" != \"\" ] ; then" << endl;
4824 out << " error=1" << endl;
4825 out << " echo \"* ########## Job not validated - Segment. violation ###\" " << outStream << endl;
4826 out << " echo \"$segViol\" " << outStream << endl;
4827 out << " echo \"Error = $error\" " << outStream << endl;
4828 out << "fi" << endl;
4830 out << "if [ \"$segFault\" != \"\" ] ; then" << endl;
4831 out << " error=1" << endl;
4832 out << " echo \"* ########## Job not validated - Segment. fault ###\" " << outStream << endl;
4833 out << " echo \"$segFault\" " << outStream << endl;
4834 out << " echo \"Error = $error\" " << outStream << endl;
4835 out << "fi" << endl;
4837 out << "if [ \"$glibcErr\" != \"\" ] ; then" << endl;
4838 out << " error=1" << endl;
4839 out << " echo \"* ########## Job not validated - *** glibc detected *** ###\" " << outStream << endl;
4840 out << " echo \"$glibcErr\" " << outStream << endl;
4841 out << " echo \"Error = $error\" " << outStream << endl;
4842 out << "fi" << endl;
4844 // Part dedicated to the specific analyses running into the train
4846 TString outputFiles = fOutputFiles;
4847 if (merge && !fTerminateFiles.IsNull()) {
4849 outputFiles += fTerminateFiles;
4851 TObjArray *arr = outputFiles.Tokenize(",");
4854 while (!merge && (os=(TObjString*)next1())) {
4855 // No need to validate outputs produced by merging since the merging macro does this
4856 outputFile = os->GetString();
4857 Int_t index = outputFile.Index("@");
4858 if (index > 0) outputFile.Remove(index);
4859 if (fTerminateFiles.Contains(outputFile)) continue;
4860 if (outputFile.Contains("*")) continue;
4861 out << "if ! [ -f " << outputFile.Data() << " ] ; then" << endl;
4862 out << " error=1" << endl;
4863 out << " echo \"Output file " << outputFile << " not found. Job FAILED !\"" << outStream << endl;
4864 out << " echo \"Output file " << outputFile << " not found. Job FAILED !\" >> stderr" << endl;
4865 out << "fi" << endl;
4868 out << "if ! [ -f outputs_valid ] ; then" << endl;
4869 out << " error=1" << endl;
4870 out << " echo \"Output files were not validated by the analysis manager\" >> stdout" << endl;
4871 out << " echo \"Output files were not validated by the analysis manager\" >> stderr" << endl;
4872 out << "fi" << endl;
4874 out << "if [ $error = 0 ] ; then" << endl;
4875 out << " echo \"* ---------------- Job Validated ------------------*\"" << outStream << endl;
4876 if (!IsKeepLogs()) {
4877 out << " echo \"* === Logs std* will be deleted === \"" << endl;
4879 out << " rm -f std*" << endl;
4881 out << "fi" << endl;
4883 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl;
4884 out << "echo \"*******************************************************\"" << outStream << endl;
4885 out << "cd -" << endl;
4886 out << "exit $error" << endl;
4888 Bool_t copy = kTRUE;
4889 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
4892 TString workdir = gGrid->GetHomeDirectory();
4893 workdir += fGridWorkingDir;
4894 Info("WriteValidationScript", "\n##### Copying validation script <%s> to your AliEn working space", validationScript.Data());
4895 if (FileExists(validationScript)) gGrid->Rm(validationScript);
4896 // TFile::Cp(Form("file:%s",validationScript.Data()), Form("alien://%s/%s", workdir.Data(),validationScript.Data()));
4897 if (!copyLocal2Alien("WriteValidationScript", validationScript.Data(),
4898 Form("%s/%s",workdir.Data(), validationScript.Data()))) Fatal("","Terminating");