1 /**************************************************************************
2 * Copyright(c) 1998-2007, ALICE Experiment at CERN, All rights reserved. *
4 * Author: The ALICE Off-line Project. *
5 * Contributors are mentioned in the code where appropriate. *
7 * Permission to use, copy, modify and distribute this software and its *
8 * documentation strictly for non-commercial purposes is hereby granted *
9 * without fee, provided that the above copyright notice appears in all *
10 * copies and that both the copyright notice and this permission notice *
11 * appear in the supporting documentation. The authors make no claims *
12 * about the suitability of this software for any purpose. It is *
13 * provided "as is" without express or implied warranty. *
14 **************************************************************************/
16 // Author: Mihaela Gheata, 01/09/2008
18 //==============================================================================
19 // AliAnalysisAlien - AliEn utility class. Provides interface for creating
20 // a personalized JDL, finding and creating a dataset.
21 //==============================================================================
23 #include "AliAnalysisAlien.h"
25 #include "Riostream.h"
33 #include "TFileCollection.h"
35 #include "TObjString.h"
36 #include "TObjArray.h"
39 #include "TGridResult.h"
40 #include "TGridCollection.h"
42 #include "TGridJobStatusList.h"
43 #include "TGridJobStatus.h"
44 #include "TFileMerger.h"
45 #include "AliAnalysisManager.h"
46 #include "AliAnalysisTaskCfg.h"
47 #include "AliVEventHandler.h"
48 #include "AliAnalysisDataContainer.h"
49 #include "AliMultiInputEventHandler.h"
55 ClassImp(AliAnalysisAlien)
61 Bool_t copyLocal2Alien(const char* where, const char* loc, const char* rem)
63 TString sl(Form("file:%s", loc));
64 TString sr(Form("alien://%s", rem));
65 Bool_t ret = TFile::Cp(sl, sr);
67 Warning(where, "Failed to copy %s to %s", sl.Data(), sr.Data());
73 //______________________________________________________________________________
74 AliAnalysisAlien::AliAnalysisAlien()
80 fSplitMaxInputFileNumber(0),
82 fMasterResubmitThreshold(0),
95 fNproofWorkersPerSlave(0),
105 fAdditionalRootLibs(),
150 //______________________________________________________________________________
151 AliAnalysisAlien::AliAnalysisAlien(const char *name)
152 :AliAnalysisGrid(name),
157 fSplitMaxInputFileNumber(0),
159 fMasterResubmitThreshold(0),
172 fNproofWorkersPerSlave(0),
176 fExecutableCommand(),
182 fAdditionalRootLibs(),
227 //______________________________________________________________________________
228 AliAnalysisAlien::AliAnalysisAlien(const AliAnalysisAlien& other)
229 :AliAnalysisGrid(other),
232 fPrice(other.fPrice),
234 fSplitMaxInputFileNumber(other.fSplitMaxInputFileNumber),
235 fMaxInitFailed(other.fMaxInitFailed),
236 fMasterResubmitThreshold(other.fMasterResubmitThreshold),
237 fNtestFiles(other.fNtestFiles),
238 fNrunsPerMaster(other.fNrunsPerMaster),
239 fMaxMergeFiles(other.fMaxMergeFiles),
240 fMaxMergeStages(other.fMaxMergeStages),
241 fNsubmitted(other.fNsubmitted),
242 fProductionMode(other.fProductionMode),
243 fOutputToRunNo(other.fOutputToRunNo),
244 fMergeViaJDL(other.fMergeViaJDL),
245 fFastReadOption(other.fFastReadOption),
246 fOverwriteMode(other.fOverwriteMode),
247 fNreplicas(other.fNreplicas),
248 fNproofWorkers(other.fNproofWorkers),
249 fNproofWorkersPerSlave(other.fNproofWorkersPerSlave),
250 fProofReset(other.fProofReset),
251 fRunNumbers(other.fRunNumbers),
252 fExecutable(other.fExecutable),
253 fExecutableCommand(other.fExecutableCommand),
254 fArguments(other.fArguments),
255 fExecutableArgs(other.fExecutableArgs),
256 fAnalysisMacro(other.fAnalysisMacro),
257 fAnalysisSource(other.fAnalysisSource),
258 fValidationScript(other.fValidationScript),
259 fAdditionalRootLibs(other.fAdditionalRootLibs),
260 fAdditionalLibs(other.fAdditionalLibs),
261 fSplitMode(other.fSplitMode),
262 fAPIVersion(other.fAPIVersion),
263 fROOTVersion(other.fROOTVersion),
264 fAliROOTVersion(other.fAliROOTVersion),
265 fExternalPackages(other.fExternalPackages),
267 fGridWorkingDir(other.fGridWorkingDir),
268 fGridDataDir(other.fGridDataDir),
269 fDataPattern(other.fDataPattern),
270 fGridOutputDir(other.fGridOutputDir),
271 fOutputArchive(other.fOutputArchive),
272 fOutputFiles(other.fOutputFiles),
273 fInputFormat(other.fInputFormat),
274 fDatasetName(other.fDatasetName),
275 fJDLName(other.fJDLName),
276 fTerminateFiles(other.fTerminateFiles),
277 fMergeExcludes(other.fMergeExcludes),
278 fRegisterExcludes(other.fRegisterExcludes),
279 fIncludePath(other.fIncludePath),
280 fCloseSE(other.fCloseSE),
281 fFriendChainName(other.fFriendChainName),
282 fJobTag(other.fJobTag),
283 fOutputSingle(other.fOutputSingle),
284 fRunPrefix(other.fRunPrefix),
285 fProofCluster(other.fProofCluster),
286 fProofDataSet(other.fProofDataSet),
287 fFileForTestMode(other.fFileForTestMode),
288 fAliRootMode(other.fAliRootMode),
289 fProofProcessOpt(other.fProofProcessOpt),
290 fMergeDirName(other.fMergeDirName),
295 fDropToShell(other.fDropToShell),
296 fGridJobIDs(other.fGridJobIDs),
297 fGridStages(other.fGridStages),
298 fFriendLibs(other.fFriendLibs)
301 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
302 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
303 fRunRange[0] = other.fRunRange[0];
304 fRunRange[1] = other.fRunRange[1];
305 if (other.fInputFiles) {
306 fInputFiles = new TObjArray();
307 TIter next(other.fInputFiles);
309 while ((obj=next())) fInputFiles->Add(new TObjString(obj->GetName()));
310 fInputFiles->SetOwner();
312 if (other.fPackages) {
313 fPackages = new TObjArray();
314 TIter next(other.fPackages);
316 while ((obj=next())) fPackages->Add(new TObjString(obj->GetName()));
317 fPackages->SetOwner();
319 if (other.fModules) {
320 fModules = new TObjArray();
321 fModules->SetOwner();
322 TIter next(other.fModules);
323 AliAnalysisTaskCfg *mod, *crt;
324 while ((crt=(AliAnalysisTaskCfg*)next())) {
325 mod = new AliAnalysisTaskCfg(*crt);
331 //______________________________________________________________________________
332 AliAnalysisAlien::~AliAnalysisAlien()
340 fProofParam.DeleteAll();
343 //______________________________________________________________________________
344 AliAnalysisAlien &AliAnalysisAlien::operator=(const AliAnalysisAlien& other)
347 if (this != &other) {
348 AliAnalysisGrid::operator=(other);
349 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
350 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
351 fPrice = other.fPrice;
353 fSplitMaxInputFileNumber = other.fSplitMaxInputFileNumber;
354 fMaxInitFailed = other.fMaxInitFailed;
355 fMasterResubmitThreshold = other.fMasterResubmitThreshold;
356 fNtestFiles = other.fNtestFiles;
357 fNrunsPerMaster = other.fNrunsPerMaster;
358 fMaxMergeFiles = other.fMaxMergeFiles;
359 fMaxMergeStages = other.fMaxMergeStages;
360 fNsubmitted = other.fNsubmitted;
361 fProductionMode = other.fProductionMode;
362 fOutputToRunNo = other.fOutputToRunNo;
363 fMergeViaJDL = other.fMergeViaJDL;
364 fFastReadOption = other.fFastReadOption;
365 fOverwriteMode = other.fOverwriteMode;
366 fNreplicas = other.fNreplicas;
367 fNproofWorkers = other.fNproofWorkers;
368 fNproofWorkersPerSlave = other.fNproofWorkersPerSlave;
369 fProofReset = other.fProofReset;
370 fRunNumbers = other.fRunNumbers;
371 fExecutable = other.fExecutable;
372 fExecutableCommand = other.fExecutableCommand;
373 fArguments = other.fArguments;
374 fExecutableArgs = other.fExecutableArgs;
375 fAnalysisMacro = other.fAnalysisMacro;
376 fAnalysisSource = other.fAnalysisSource;
377 fValidationScript = other.fValidationScript;
378 fAdditionalRootLibs = other.fAdditionalRootLibs;
379 fAdditionalLibs = other.fAdditionalLibs;
380 fSplitMode = other.fSplitMode;
381 fAPIVersion = other.fAPIVersion;
382 fROOTVersion = other.fROOTVersion;
383 fAliROOTVersion = other.fAliROOTVersion;
384 fExternalPackages = other.fExternalPackages;
386 fGridWorkingDir = other.fGridWorkingDir;
387 fGridDataDir = other.fGridDataDir;
388 fDataPattern = other.fDataPattern;
389 fGridOutputDir = other.fGridOutputDir;
390 fOutputArchive = other.fOutputArchive;
391 fOutputFiles = other.fOutputFiles;
392 fInputFormat = other.fInputFormat;
393 fDatasetName = other.fDatasetName;
394 fJDLName = other.fJDLName;
395 fTerminateFiles = other.fTerminateFiles;
396 fMergeExcludes = other.fMergeExcludes;
397 fRegisterExcludes = other.fRegisterExcludes;
398 fIncludePath = other.fIncludePath;
399 fCloseSE = other.fCloseSE;
400 fFriendChainName = other.fFriendChainName;
401 fJobTag = other.fJobTag;
402 fOutputSingle = other.fOutputSingle;
403 fRunPrefix = other.fRunPrefix;
404 fProofCluster = other.fProofCluster;
405 fProofDataSet = other.fProofDataSet;
406 fFileForTestMode = other.fFileForTestMode;
407 fAliRootMode = other.fAliRootMode;
408 fProofProcessOpt = other.fProofProcessOpt;
409 fMergeDirName = other.fMergeDirName;
410 fDropToShell = other.fDropToShell;
411 fGridJobIDs = other.fGridJobIDs;
412 fGridStages = other.fGridStages;
413 fFriendLibs = other.fFriendLibs;
414 if (other.fInputFiles) {
415 fInputFiles = new TObjArray();
416 TIter next(other.fInputFiles);
418 while ((obj=next())) fInputFiles->Add(new TObjString(obj->GetName()));
419 fInputFiles->SetOwner();
421 if (other.fPackages) {
422 fPackages = new TObjArray();
423 TIter next(other.fPackages);
425 while ((obj=next())) fPackages->Add(new TObjString(obj->GetName()));
426 fPackages->SetOwner();
428 if (other.fModules) {
429 fModules = new TObjArray();
430 fModules->SetOwner();
431 TIter next(other.fModules);
432 AliAnalysisTaskCfg *mod, *crt;
433 while ((crt=(AliAnalysisTaskCfg*)next())) {
434 mod = new AliAnalysisTaskCfg(*crt);
442 //______________________________________________________________________________
443 void AliAnalysisAlien::AddAdditionalLibrary(const char *name)
445 // Add a single additional library to be loaded. Extension must be present.
447 if (!lib.Contains(".")) {
448 Error("AddAdditionalLibrary", "Extension not defined for %s", name);
451 if (fAdditionalLibs.Contains(name)) {
452 Warning("AddAdditionalLibrary", "Library %s already added.", name);
455 if (!fAdditionalLibs.IsNull()) fAdditionalLibs += " ";
456 fAdditionalLibs += lib;
459 //______________________________________________________________________________
460 void AliAnalysisAlien::AddModule(AliAnalysisTaskCfg *module)
462 // Adding a module. Checks if already existing. Becomes owned by this.
464 if (GetModule(module->GetName())) {
465 Error("AddModule", "A module having the same name %s already added", module->GetName());
469 fModules = new TObjArray();
470 fModules->SetOwner();
472 fModules->Add(module);
475 //______________________________________________________________________________
476 void AliAnalysisAlien::AddModules(TObjArray *list)
478 // Adding a list of modules. Checks if already existing. Becomes owned by this.
480 AliAnalysisTaskCfg *module;
481 while ((module = (AliAnalysisTaskCfg*)next())) AddModule(module);
484 //______________________________________________________________________________
485 Bool_t AliAnalysisAlien::CheckDependencies()
487 // Check if all dependencies are satisfied. Reorder modules if needed.
488 Int_t nmodules = GetNmodules();
490 Warning("CheckDependencies", "No modules added yet to check their dependencies");
493 AliAnalysisTaskCfg *mod = 0;
494 AliAnalysisTaskCfg *dep = 0;
497 for (i=0; i<nmodules; i++) {
498 mod = (AliAnalysisTaskCfg*) fModules->At(i);
499 Int_t ndeps = mod->GetNdeps();
501 for (j=0; j<ndeps; j++) {
502 depname = mod->GetDependency(j);
503 dep = GetModule(depname);
505 Error("CheckDependencies","Dependency %s not added for module %s",
506 depname.Data(), mod->GetName());
509 if (dep->NeedsDependency(mod->GetName())) {
510 Error("CheckDependencies","Modules %s and %s circularly depend on each other",
511 mod->GetName(), dep->GetName());
514 Int_t idep = fModules->IndexOf(dep);
515 // The dependency task must come first
517 // Remove at idep and move all objects below up one slot
518 // down to index i included.
519 fModules->RemoveAt(idep);
520 for (k=idep-1; k>=i; k--) fModules->AddAt(fModules->RemoveAt(k),k+1);
521 fModules->AddAt(dep, i++);
523 //Redo from istart if dependencies were inserted
524 if (i>istart) i=istart-1;
530 //______________________________________________________________________________
531 AliAnalysisManager *AliAnalysisAlien::CreateAnalysisManager(const char *name, const char *filename)
533 // Create the analysis manager and optionally execute the macro in filename.
534 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
536 mgr = new AliAnalysisManager(name);
537 mgr->SetGridHandler((AliAnalysisGrid*)this);
538 if (strlen(filename)) {
539 TString line = gSystem->ExpandPathName(filename);
541 gROOT->ProcessLine(line.Data());
546 //______________________________________________________________________________
547 Int_t AliAnalysisAlien::GetNmodules() const
549 // Get number of modules.
550 if (!fModules) return 0;
551 return fModules->GetEntries();
554 //______________________________________________________________________________
555 AliAnalysisTaskCfg *AliAnalysisAlien::GetModule(const char *name)
557 // Get a module by name.
558 if (!fModules) return 0;
559 return (AliAnalysisTaskCfg*)fModules->FindObject(name);
562 //______________________________________________________________________________
563 Bool_t AliAnalysisAlien::LoadModule(AliAnalysisTaskCfg *mod)
565 // Load a given module.
566 if (mod->IsLoaded()) return kTRUE;
567 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
569 Error("LoadModule", "No analysis manager created yet. Use CreateAnalysisManager first.");
572 Int_t ndeps = mod->GetNdeps();
574 for (Int_t j=0; j<ndeps; j++) {
575 depname = mod->GetDependency(j);
576 AliAnalysisTaskCfg *dep = GetModule(depname);
578 Error("LoadModule","Dependency %s not existing for module %s",
579 depname.Data(), mod->GetName());
582 if (!LoadModule(dep)) {
583 Error("LoadModule","Dependency %s for module %s could not be loaded",
584 depname.Data(), mod->GetName());
588 // Load libraries for the module
589 if (!mod->CheckLoadLibraries()) {
590 Error("LoadModule", "Cannot load all libraries for module %s", mod->GetName());
593 // Check if a custom file name was requested
594 if (strlen(mod->GetOutputFileName())) mgr->SetCommonFileName(mod->GetOutputFileName());
596 // Check if a custom terminate file name was requested
597 if (strlen(mod->GetTerminateFileName())) {
598 if (!fTerminateFiles.IsNull()) fTerminateFiles += ",";
599 fTerminateFiles += mod->GetTerminateFileName();
603 if (mod->ExecuteMacro()<0) {
604 Error("LoadModule", "Executing the macro %s with arguments: %s for module %s returned a negative value",
605 mod->GetMacroName(), mod->GetMacroArgs(), mod->GetName());
608 // Configure dependencies
609 if (mod->GetConfigMacro() && mod->ExecuteConfigMacro()<0) {
610 Error("LoadModule", "There was an error executing the deps config macro %s for module %s",
611 mod->GetConfigMacro()->GetTitle(), mod->GetName());
614 // Adjust extra libraries
615 Int_t nlibs = mod->GetNlibs();
617 for (Int_t i=0; i<nlibs; i++) {
618 lib = mod->GetLibrary(i);
619 lib = Form("lib%s.so", lib.Data());
620 if (fAdditionalLibs.Contains(lib)) continue;
621 if (!fAdditionalLibs.IsNull()) fAdditionalLibs += " ";
622 fAdditionalLibs += lib;
627 //______________________________________________________________________________
628 Bool_t AliAnalysisAlien::GenerateTrain(const char *name)
630 // Generate the full train.
631 fAdditionalLibs = "";
632 if (!LoadModules()) return kFALSE;
633 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
634 if (!mgr->InitAnalysis()) return kFALSE;
637 Int_t productionMode = fProductionMode;
639 TString macro = fAnalysisMacro;
640 TString executable = fExecutable;
641 TString validation = fValidationScript;
642 TString execCommand = fExecutableCommand;
643 SetAnalysisMacro(Form("%s.C", name));
644 SetExecutable(Form("%s.sh", name));
645 // SetExecutableCommand("aliroot -b -q ");
646 SetValidationScript(Form("%s_validation.sh", name));
648 SetProductionMode(productionMode);
649 fAnalysisMacro = macro;
650 fExecutable = executable;
651 fExecutableCommand = execCommand;
652 fValidationScript = validation;
656 //______________________________________________________________________________
657 Bool_t AliAnalysisAlien::GenerateTest(const char *name, const char *modname)
659 // Generate test macros for a single module or for the full train.
660 fAdditionalLibs = "";
661 if (strlen(modname)) {
662 if (!CheckDependencies()) return kFALSE;
663 AliAnalysisTaskCfg *mod = GetModule(modname);
665 Error("GenerateTest", "cannot generate test for inexistent module %s", modname);
668 if (!LoadModule(mod)) return kFALSE;
669 if (!LoadFriendLibs()) return kFALSE;
670 } else if (!LoadModules()) return kFALSE;
671 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
672 if (!mgr->InitAnalysis()) return kFALSE;
676 Int_t productionMode = fProductionMode;
678 TString macro = fAnalysisMacro;
679 TString executable = fExecutable;
680 TString validation = fValidationScript;
681 TString execCommand = fExecutableCommand;
682 SetAnalysisMacro(Form("%s.C", name));
683 SetExecutable(Form("%s.sh", name));
684 fOutputFiles = GetListOfFiles("outaod");
685 // Add extra files registered to the analysis manager
686 TString extra = GetListOfFiles("ext");
687 if (!extra.IsNull()) {
688 extra.ReplaceAll(".root", "*.root");
689 if (!fOutputFiles.IsNull()) fOutputFiles += ",";
690 fOutputFiles += extra;
692 // SetExecutableCommand("aliroot -b -q ");
693 SetValidationScript(Form("%s_validation.sh", name));
695 WriteAnalysisMacro();
697 WriteValidationScript();
699 WriteMergeExecutable();
700 WriteValidationScript(kTRUE);
701 SetLocalTest(kFALSE);
702 SetProductionMode(productionMode);
703 fAnalysisMacro = macro;
704 fExecutable = executable;
705 fExecutableCommand = execCommand;
706 fValidationScript = validation;
710 //______________________________________________________________________________
711 Bool_t AliAnalysisAlien::LoadModules()
713 // Load all modules by executing the AddTask macros. Checks first the dependencies.
714 fAdditionalLibs = "";
715 Int_t nmodules = GetNmodules();
717 Warning("LoadModules", "No module to be loaded");
720 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
722 Error("LoadModules", "No analysis manager created yet. Use CreateAnalysisManager first.");
725 if (!CheckDependencies()) return kFALSE;
726 nmodules = GetNmodules();
727 AliAnalysisTaskCfg *mod;
728 for (Int_t imod=0; imod<nmodules; imod++) {
729 mod = (AliAnalysisTaskCfg*)fModules->At(imod);
730 if (!LoadModule(mod)) return kFALSE;
732 // Load additional friend libraries
733 return LoadFriendLibs();
736 //______________________________________________________________________________
737 Bool_t AliAnalysisAlien::LoadFriendLibs() const
739 // Load libraries required for reading friends.
740 if (fFriendLibs.Length()) {
743 if (fFriendLibs.Contains(",")) list = fFriendLibs.Tokenize(",");
744 else list = fFriendLibs.Tokenize(" ");
745 for (Int_t ilib=0; ilib<list->GetEntriesFast(); ilib++) {
746 lib = list->At(ilib)->GetName();
747 lib.ReplaceAll(".so","");
748 lib.ReplaceAll(" ","");
749 if (lib.BeginsWith("lib")) lib.Remove(0, 3);
751 Int_t loaded = strlen(gSystem->GetLibraries(lib,"",kFALSE));
752 if (!loaded) loaded = gSystem->Load(lib);
754 Error("LoadModules", "Cannot load library for friends %s", lib.Data());
763 //______________________________________________________________________________
764 void AliAnalysisAlien::SetRunPrefix(const char *prefix)
766 // Set the run number format. Can be a prefix or a format like "%09d"
768 if (!fRunPrefix.Contains("%")) fRunPrefix += "%d";
771 //______________________________________________________________________________
772 void AliAnalysisAlien::AddIncludePath(const char *path)
774 // Add include path in the remote analysis macro.
776 if (p.Contains("-I")) fIncludePath += Form("%s ", path);
777 else fIncludePath += Form("-I%s ", path);
780 //______________________________________________________________________________
781 void AliAnalysisAlien::AddRunNumber(Int_t run)
783 // Add a run number to the list of runs to be processed.
784 if (fRunNumbers.Length()) fRunNumbers += " ";
785 fRunNumbers += Form(fRunPrefix.Data(), run);
788 //______________________________________________________________________________
789 void AliAnalysisAlien::AddRunList(const char* runList)
791 // Add several runs into the list of runs; they are expected to be separated by a blank character.
792 TString sList = runList;
793 TObjArray *list = sList.Tokenize(" ");
794 Int_t n = list->GetEntries();
795 for (Int_t i = 0; i < n; i++) {
796 TObjString *os = (TObjString*)list->At(i);
797 AddRunNumber(os->GetString().Atoi());
802 //______________________________________________________________________________
803 void AliAnalysisAlien::AddRunNumber(const char* run)
805 // Add a run number to the list of runs to be processed.
808 TObjArray *arr = runs.Tokenize(" ");
811 prefix.Append(fRunPrefix, fRunPrefix.Index("%d"));
812 while ((os=(TObjString*)next())){
813 if (fRunNumbers.Length()) fRunNumbers += " ";
814 fRunNumbers += Form("%s%s", prefix.Data(), os->GetString().Data());
819 //______________________________________________________________________________
820 void AliAnalysisAlien::AddDataFile(const char *lfn)
822 // Adds a data file to the input to be analysed. The file should be a valid LFN
823 // or point to an existing file in the alien workdir.
824 if (!fInputFiles) fInputFiles = new TObjArray();
825 fInputFiles->Add(new TObjString(lfn));
828 //______________________________________________________________________________
829 void AliAnalysisAlien::AddExternalPackage(const char *package)
831 // Adds external packages w.r.t to the default ones (root,aliroot and gapi)
832 if (fExternalPackages) fExternalPackages += " ";
833 fExternalPackages += package;
836 //______________________________________________________________________________
837 Bool_t AliAnalysisAlien::Connect()
839 // Try to connect to AliEn. User needs a valid token and /tmp/gclient_env_$UID sourced.
840 if (gGrid && gGrid->IsConnected()) return kTRUE;
841 if (fProductionMode) return kTRUE;
843 Info("Connect", "Trying to connect to AliEn ...");
844 TGrid::Connect("alien://");
846 if (!gGrid || !gGrid->IsConnected()) {
847 Error("Connect", "Did not managed to connect to AliEn. Make sure you have a valid token.");
850 fUser = gGrid->GetUser();
851 Info("Connect", "\n##### Connected to AliEn as user %s. Setting analysis user to <%s>", fUser.Data(), fUser.Data());
855 //______________________________________________________________________________
856 void AliAnalysisAlien::CdWork()
858 // Check validity of alien workspace. Create directory if possible.
860 Error("CdWork", "Alien connection required");
863 TString homedir = gGrid->GetHomeDirectory();
864 TString workdir = homedir + fGridWorkingDir;
865 if (DirectoryExists(workdir)) {
869 // Work directory not existing - create it
871 if (gGrid->Mkdir(workdir, "-p")) {
872 gGrid->Cd(fGridWorkingDir);
873 Info("CdWork", "\n##### Created alien working directory %s", fGridWorkingDir.Data());
875 Warning("CdWork", "Working directory %s cannot be created.\n Using %s instead.",
876 workdir.Data(), homedir.Data());
877 fGridWorkingDir = "";
881 //______________________________________________________________________________
882 Bool_t AliAnalysisAlien::CheckFileCopy(const char *alienpath)
884 // Check if file copying is possible.
885 if (fProductionMode) return kTRUE;
886 TString salienpath(alienpath);
887 if (salienpath.Contains(" ")) {
888 Error("CheckFileCopy", "path: <%s> contains blancs - FIX IT !",alienpath);
892 Error("CheckFileCopy", "Not connected to AliEn. File copying cannot be tested.");
895 Info("CheckFileCopy", "Checking possibility to copy files to your AliEn home directory... \
896 \n +++ NOTE: You can disable this via: plugin->SetCheckCopy(kFALSE);");
897 // Check if alien_CLOSE_SE is defined
898 TString closeSE = gSystem->Getenv("alien_CLOSE_SE");
899 if (!closeSE.IsNull()) {
900 Info("CheckFileCopy", "Your current close storage is pointing to: \
901 \n alien_CLOSE_SE = \"%s\"", closeSE.Data());
903 Warning("CheckFileCopy", "Your current close storage is empty ! Depending on your location, file copying may fail.");
905 // Check if grid directory exists.
906 if (!DirectoryExists(alienpath)) {
907 Error("CheckFileCopy", "Alien path %s does not seem to exist", alienpath);
910 TString stest = "plugin_test_copy";
911 TFile f(stest, "RECREATE");
912 // User may not have write permissions to current directory
914 Error("CheckFileCopy", "Cannot create local test file. Do you have write access to current directory: <%s> ?",
915 gSystem->WorkingDirectory());
919 if (FileExists(Form("alien://%s/%s",alienpath, stest.Data()))) gGrid->Rm(Form("alien://%s/%s",alienpath, stest.Data()));
920 if (!TFile::Cp(stest.Data(), Form("alien://%s/%s",alienpath, stest.Data()))) {
921 Error("CheckFileCopy", "Cannot copy files to Alien destination: <%s> This may be temporary, or: \
922 \n# 1. Make sure you have write permissions there. If this is the case: \
923 \n# 2. Check the storage availability at: http://alimonitor.cern.ch/stats?page=SE/table \
924 \n# Do: export alien_CLOSE_SE=\"working_disk_SE\" \
925 \n# To make this permanent put in in your .bashrc (in .alienshrc is not enough) \
926 \n# Redo token: rm /tmp/x509up_u$UID then: alien-token-init <username>", alienpath);
927 gSystem->Unlink(stest.Data());
930 gSystem->Unlink(stest.Data());
931 gGrid->Rm(Form("%s/%s",alienpath,stest.Data()));
932 Info("CheckFileCopy", "### ...SUCCESS ###");
936 //______________________________________________________________________________
937 Bool_t AliAnalysisAlien::CheckInputData()
939 // Check validity of input data. If necessary, create xml files.
940 if (fProductionMode) return kTRUE;
941 if (!fInputFiles && !fRunNumbers.Length() && !fRunRange[0]) {
942 if (!fGridDataDir.Length()) {
943 Error("CkeckInputData", "AliEn path to base data directory must be set.\n = Use: SetGridDataDir()");
947 Error("CheckInputData", "Merging via jdl works only with run numbers, run range or provided xml");
950 Info("CheckInputData", "Analysis will make a single xml for base data directory %s",fGridDataDir.Data());
951 if (fDataPattern.Contains("tag") && TestBit(AliAnalysisGrid::kTest))
952 TObject::SetBit(AliAnalysisGrid::kUseTags, kTRUE); // ADDED (fix problem in determining the tag usage in test mode)
955 // Process declared files
956 Bool_t isCollection = kFALSE;
957 Bool_t isXml = kFALSE;
958 Bool_t useTags = kFALSE;
959 Bool_t checked = kFALSE;
960 if (!TestBit(AliAnalysisGrid::kTest)) CdWork();
962 TString workdir = gGrid->GetHomeDirectory();
963 workdir += fGridWorkingDir;
966 TIter next(fInputFiles);
967 while ((objstr=(TObjString*)next())) {
970 file += objstr->GetString();
971 // Store full lfn path
972 if (FileExists(file)) objstr->SetString(file);
974 file = objstr->GetName();
975 if (!FileExists(objstr->GetName())) {
976 Error("CheckInputData", "Data file %s not found or not in your working dir: %s",
977 objstr->GetName(), workdir.Data());
981 Bool_t iscoll, isxml, usetags;
982 CheckDataType(file, iscoll, isxml, usetags);
985 isCollection = iscoll;
988 TObject::SetBit(AliAnalysisGrid::kUseTags, useTags);
990 if ((iscoll != isCollection) || (isxml != isXml) || (usetags != useTags)) {
991 Error("CheckInputData", "Some conflict was found in the types of inputs");
997 // Process requested run numbers
998 if (!fRunNumbers.Length() && !fRunRange[0]) return kTRUE;
999 // Check validity of alien data directory
1000 if (!fGridDataDir.Length()) {
1001 Error("CkeckInputData", "AliEn path to base data directory must be set.\n = Use: SetGridDataDir()");
1004 if (!DirectoryExists(fGridDataDir)) {
1005 Error("CheckInputData", "Data directory %s not existing.", fGridDataDir.Data());
1009 Error("CheckInputData", "You are using raw AliEn collections as input. Cannot process run numbers.");
1013 if (checked && !isXml) {
1014 Error("CheckInputData", "Cannot mix processing of full runs with non-xml files");
1017 // Check validity of run number(s)
1022 TString schunk, schunk2;
1026 useTags = fDataPattern.Contains("tag");
1027 TObject::SetBit(AliAnalysisGrid::kUseTags, useTags);
1029 if (useTags != fDataPattern.Contains("tag")) {
1030 Error("CheckInputData", "Cannot mix input files using/not using tags");
1033 if (fRunNumbers.Length()) {
1034 Info("CheckDataType", "Using supplied run numbers (run ranges are ignored)");
1035 arr = fRunNumbers.Tokenize(" ");
1037 while ((os=(TObjString*)next())) {
1038 path = Form("%s/%s ", fGridDataDir.Data(), os->GetString().Data());
1039 if (!DirectoryExists(path)) {
1040 Warning("CheckInputData", "Run number %s not found in path: <%s>", os->GetString().Data(), path.Data());
1043 path = Form("%s/%s.xml", workdir.Data(),os->GetString().Data());
1044 TString msg = "\n##### file: ";
1046 msg += " type: xml_collection;";
1047 if (useTags) msg += " using_tags: Yes";
1048 else msg += " using_tags: No";
1049 Info("CheckDataType", "%s", msg.Data());
1050 if (fNrunsPerMaster<2) {
1051 AddDataFile(Form("%s.xml", os->GetString().Data()));
1054 if (((nruns-1)%fNrunsPerMaster) == 0) {
1055 schunk = os->GetString();
1057 if ((nruns%fNrunsPerMaster)!=0 && os!=arr->Last()) continue;
1058 schunk += Form("_%s.xml", os->GetString().Data());
1059 AddDataFile(schunk);
1064 Info("CheckDataType", "Using run range [%d, %d]", fRunRange[0], fRunRange[1]);
1065 for (Int_t irun=fRunRange[0]; irun<=fRunRange[1]; irun++) {
1066 format = Form("%%s/%s ", fRunPrefix.Data());
1067 path = Form(format.Data(), fGridDataDir.Data(), irun);
1068 if (!DirectoryExists(path)) {
1071 format = Form("%%s/%s.xml", fRunPrefix.Data());
1072 path = Form(format.Data(), workdir.Data(),irun);
1073 TString msg = "\n##### file: ";
1075 msg += " type: xml_collection;";
1076 if (useTags) msg += " using_tags: Yes";
1077 else msg += " using_tags: No";
1078 Info("CheckDataType", "%s", msg.Data());
1079 if (fNrunsPerMaster<2) {
1080 format = Form("%s.xml", fRunPrefix.Data());
1081 AddDataFile(Form(format.Data(),irun));
1084 if (((nruns-1)%fNrunsPerMaster) == 0) {
1085 schunk = Form(fRunPrefix.Data(),irun);
1087 format = Form("_%s.xml", fRunPrefix.Data());
1088 schunk2 = Form(format.Data(), irun);
1089 if ((nruns%fNrunsPerMaster)!=0 && irun != fRunRange[1]) continue;
1091 AddDataFile(schunk);
1096 AddDataFile(schunk);
1102 //______________________________________________________________________________
1103 Int_t AliAnalysisAlien::CopyLocalDataset(const char *griddir, const char *pattern, Int_t nfiles, const char *output, const char *archivefile, const char *outputdir)
1105 // Copy data from the given grid directory according a pattern and make a local
1107 // archivefile (optional) results in that the archive containing the file <pattern> is copied. archivefile can contain a list of files (semicolon-separated) which are all copied
1109 Error("CopyLocalDataset", "Cannot copy local dataset with no grid connection");
1112 if (!DirectoryExists(griddir)) {
1113 Error("CopyLocalDataset", "Data directory %s not existing.", griddir);
1116 TString command = Form("find -z -l %d %s %s", nfiles, griddir, pattern);
1117 printf("Running command: %s\n", command.Data());
1118 TGridResult *res = gGrid->Command(command);
1119 Int_t nfound = res->GetEntries();
1121 Error("CopyLocalDataset", "No file found in <%s> having pattern <%s>", griddir, pattern);
1124 printf("... found %d files. Copying locally ...\n", nfound);
1127 TObjArray* additionalArchives = 0;
1128 if (strlen(archivefile) > 0 && TString(archivefile).Contains(";")) {
1129 additionalArchives = TString(archivefile).Tokenize(";");
1130 archivefile = additionalArchives->At(0)->GetName();
1131 additionalArchives->RemoveAt(0);
1132 additionalArchives->Compress();
1135 // Copy files locally
1137 out.open(output, ios::out);
1139 TString turl, dirname, filename, temp;
1140 TString cdir = gSystem->WorkingDirectory();
1141 gSystem->MakeDirectory(outputdir);
1142 gSystem->ChangeDirectory(outputdir);
1144 for (Int_t i=0; i<nfound; i++) {
1145 map = (TMap*)res->At(i);
1146 turl = map->GetValue("turl")->GetName();
1147 filename = gSystem->BaseName(turl.Data());
1148 dirname = gSystem->DirName(turl.Data());
1149 dirname = gSystem->BaseName(dirname.Data());
1150 gSystem->MakeDirectory(dirname);
1152 TString source(turl);
1153 TString targetFileName(filename);
1155 if (strlen(archivefile) > 0) {
1156 // TODO here the archive in which the file resides should be determined
1157 // however whereis returns only a guid, and guid2lfn does not work
1158 // Therefore we use the one provided as argument for now
1159 source = Form("%s/%s", gSystem->DirName(source.Data()), archivefile);
1160 targetFileName = archivefile;
1162 if (TFile::Cp(source, Form("file:./%s/%s", dirname.Data(), targetFileName.Data()))) {
1163 Bool_t success = kTRUE;
1164 if (additionalArchives) {
1165 for (Int_t j=0; j<additionalArchives->GetEntriesFast(); j++) {
1167 target.Form("./%s/%s", dirname.Data(), additionalArchives->At(j)->GetName());
1168 gSystem->MakeDirectory(gSystem->DirName(target));
1169 success &= TFile::Cp(Form("%s/%s", gSystem->DirName(source.Data()), additionalArchives->At(j)->GetName()), Form("file:%s", target.Data()));
1174 if (strlen(archivefile) > 0) targetFileName = Form("%s#%s", targetFileName.Data(), gSystem->BaseName(turl.Data()));
1175 out << cdir << Form("/%s/%s/%s", outputdir, dirname.Data(), targetFileName.Data()) << endl;
1180 gSystem->ChangeDirectory(cdir);
1182 delete additionalArchives;
1186 //______________________________________________________________________________
1187 Bool_t AliAnalysisAlien::CreateDataset(const char *pattern)
1189 // Create dataset for the grid data directory + run number.
1190 const Int_t gMaxEntries = 15000;
1191 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline)) return kTRUE;
1193 Error("CreateDataset", "Cannot create dataset with no grid connection");
1198 if (!TestBit(AliAnalysisGrid::kTest)) CdWork();
1199 TString workdir = gGrid->GetHomeDirectory();
1200 workdir += fGridWorkingDir;
1202 // Compose the 'find' command arguments
1205 TString options = "-x collection ";
1206 if (TestBit(AliAnalysisGrid::kTest)) options += Form("-l %d ", fNtestFiles);
1207 else options += Form("-l %d ", gMaxEntries); // Protection for the find command
1208 TString conditions = "";
1215 TString schunk, schunk2;
1216 TGridCollection *cbase=0, *cadd=0;
1217 if (!fRunNumbers.Length() && !fRunRange[0]) {
1218 if (fInputFiles && fInputFiles->GetEntries()) return kTRUE;
1219 // Make a single data collection from data directory.
1220 path = fGridDataDir;
1221 if (!DirectoryExists(path)) {
1222 Error("CreateDataset", "Path to data directory %s not valid",fGridDataDir.Data());
1226 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
1227 else file = Form("%s.xml", gSystem->BaseName(path));
1231 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
1233 command += Form("%s -o %d ",options.Data(), nstart);
1237 command += conditions;
1238 printf("command: %s\n", command.Data());
1239 TGridResult *res = gGrid->Command(command);
1240 if (res) delete res;
1241 // Write standard output to file
1242 gROOT->ProcessLine(Form("gGrid->Stdout(); > __tmp%d__%s", stage, file.Data()));
1243 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
1244 Bool_t nullFile = kFALSE;
1246 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
1248 nullFile = (gSystem->Exec(Form("grep -c /event __tmp%d__%s 2>/dev/null > __tmp__",stage,file.Data()))==0)?kFALSE:kTRUE;
1250 Error("CreateDataset","Dataset %s produced by the previous find command is empty !", file.Data());
1251 gSystem->Exec("rm -f __tmp*");
1259 gSystem->Exec("rm -f __tmp__");
1260 ncount = line.Atoi();
1263 if (ncount == gMaxEntries) {
1264 Info("CreateDataset", "Dataset %s has more than 15K entries. Trying to merge...", file.Data());
1265 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1266 if (!cbase) cbase = cadd;
1274 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1275 printf("... please wait - TAlienCollection::Add() scales badly...\n");
1278 cbase->ExportXML(Form("file://%s", file.Data()),kFALSE,kFALSE, file, "Merged entries for a run");
1279 delete cbase; cbase = 0;
1281 TFile::Cp(Form("__tmp%d__%s",stage, file.Data()), file.Data());
1283 gSystem->Exec("rm -f __tmp*");
1284 Info("CreateDataset", "Created dataset %s with %d files", file.Data(), nstart+ncount);
1288 Bool_t fileExists = FileExists(file);
1289 if (!TestBit(AliAnalysisGrid::kTest) && (!fileExists || fOverwriteMode)) {
1290 // Copy xml file to alien space
1291 if (fileExists) gGrid->Rm(file);
1292 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
1293 if (!FileExists(file)) {
1294 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
1297 // Update list of files to be processed.
1299 AddDataFile(Form("%s/%s", workdir.Data(), file.Data()));
1303 Bool_t nullResult = kTRUE;
1304 if (fRunNumbers.Length()) {
1305 TObjArray *arr = fRunNumbers.Tokenize(" ");
1308 while ((os=(TObjString*)next())) {
1311 path = Form("%s/%s/ ", fGridDataDir.Data(), os->GetString().Data());
1312 if (!DirectoryExists(path)) continue;
1314 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
1315 else file = Form("%s.xml", os->GetString().Data());
1316 // If local collection file does not exist, create it via 'find' command.
1320 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
1322 command += Form("%s -o %d ",options.Data(), nstart);
1325 command += conditions;
1326 TGridResult *res = gGrid->Command(command);
1327 if (res) delete res;
1328 // Write standard output to file
1329 gROOT->ProcessLine(Form("gGrid->Stdout(); > __tmp%d__%s", stage,file.Data()));
1330 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
1331 Bool_t nullFile = kFALSE;
1333 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
1335 nullFile = (gSystem->Exec(Form("grep -c /event __tmp%d__%s 2>/dev/null > __tmp__",stage,file.Data()))==0)?kFALSE:kTRUE;
1337 Warning("CreateDataset","Dataset %s produced by: <%s> is empty !", file.Data(), command.Data());
1338 gSystem->Exec("rm -f __tmp*");
1339 fRunNumbers.ReplaceAll(os->GetString().Data(), "");
1347 gSystem->Exec("rm -f __tmp__");
1348 ncount = line.Atoi();
1350 nullResult = kFALSE;
1352 if (ncount == gMaxEntries) {
1353 Info("CreateDataset", "Dataset %s has more than 15K entries. Trying to merge...", file.Data());
1354 if (fNrunsPerMaster > 1) {
1355 Error("CreateDataset", "File %s has more than %d entries. Please set the number of runs per master to 1 !",
1356 file.Data(),gMaxEntries);
1359 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1360 if (!cbase) cbase = cadd;
1367 if (cbase && fNrunsPerMaster<2) {
1368 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1369 printf("... please wait - TAlienCollection::Add() scales badly...\n");
1372 cbase->ExportXML(Form("file://%s", file.Data()),kFALSE,kFALSE, file, "Merged entries for a run");
1373 delete cbase; cbase = 0;
1375 TFile::Cp(Form("__tmp%d__%s",stage, file.Data()), file.Data());
1377 gSystem->Exec("rm -f __tmp*");
1378 Info("CreateDataset", "Created dataset %s with %d files", file.Data(), nstart+ncount);
1382 if (TestBit(AliAnalysisGrid::kTest)) break;
1383 // Check if there is one run per master job.
1384 if (fNrunsPerMaster<2) {
1385 if (FileExists(file)) {
1386 if (fOverwriteMode) gGrid->Rm(file);
1388 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
1392 // Copy xml file to alien space
1393 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
1394 if (!FileExists(file)) {
1395 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
1401 if (((nruns-1)%fNrunsPerMaster) == 0) {
1402 schunk = os->GetString();
1403 cbase = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
1405 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
1406 printf(" Merging collection <%s> into masterjob input...\n", file.Data());
1410 if ((nruns%fNrunsPerMaster)!=0 && os!=arr->Last()) {
1413 schunk += Form("_%s.xml", os->GetString().Data());
1414 if (FileExists(schunk)) {
1415 if (fOverwriteMode) gGrid->Rm(file);
1417 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", schunk.Data());
1421 printf("Exporting merged collection <%s> and copying to AliEn\n", schunk.Data());
1422 cbase->ExportXML(Form("file://%s", schunk.Data()),kFALSE,kFALSE, schunk, "Merged runs");
1423 TFile::Cp(Form("file:%s",schunk.Data()), Form("alien://%s/%s",workdir.Data(), schunk.Data()));
1424 if (!FileExists(schunk)) {
1425 Error("CreateDataset", "Copy command did NOT succeed for %s", schunk.Data());
1433 Error("CreateDataset", "No valid dataset corresponding to the query!");
1437 // Process a full run range.
1438 for (Int_t irun=fRunRange[0]; irun<=fRunRange[1]; irun++) {
1439 format = Form("%%s/%s ", fRunPrefix.Data());
1442 path = Form(format.Data(), fGridDataDir.Data(), irun);
1443 if (!DirectoryExists(path)) continue;
1445 format = Form("%s.xml", fRunPrefix.Data());
1446 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
1447 else file = Form(format.Data(), irun);
1448 if (FileExists(file) && fNrunsPerMaster<2 && !TestBit(AliAnalysisGrid::kTest)) {
1449 if (fOverwriteMode) gGrid->Rm(file);
1451 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
1455 // If local collection file does not exist, create it via 'find' command.
1459 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
1461 command += Form("%s -o %d ",options.Data(), nstart);
1464 command += conditions;
1465 TGridResult *res = gGrid->Command(command);
1466 if (res) delete res;
1467 // Write standard output to file
1468 gROOT->ProcessLine(Form("gGrid->Stdout(); > __tmp%d__%s", stage,file.Data()));
1469 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
1470 Bool_t nullFile = kFALSE;
1472 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
1474 nullFile = (gSystem->Exec(Form("grep -c /event __tmp%d__%s 2>/dev/null > __tmp__",stage,file.Data()))==0)?kFALSE:kTRUE;
1476 Warning("CreateDataset","Dataset %s produced by: <%s> is empty !", file.Data(), command.Data());
1477 gSystem->Exec("rm -f __tmp*");
1485 gSystem->Exec("rm -f __tmp__");
1486 ncount = line.Atoi();
1488 nullResult = kFALSE;
1490 if (ncount == gMaxEntries) {
1491 Info("CreateDataset", "Dataset %s has more than 15K entries. Trying to merge...", file.Data());
1492 if (fNrunsPerMaster > 1) {
1493 Error("CreateDataset", "File %s has more than %d entries. Please set the number of runs per master to 1 !",
1494 file.Data(),gMaxEntries);
1497 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1498 if (!cbase) cbase = cadd;
1505 if (cbase && fNrunsPerMaster<2) {
1506 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1507 printf("... please wait - TAlienCollection::Add() scales badly...\n");
1510 cbase->ExportXML(Form("file://%s", file.Data()),kFALSE,kFALSE, file, "Merged entries for a run");
1511 delete cbase; cbase = 0;
1513 TFile::Cp(Form("__tmp%d__%s",stage, file.Data()), file.Data());
1515 Info("CreateDataset", "Created dataset %s with %d files", file.Data(), nstart+ncount);
1519 if (TestBit(AliAnalysisGrid::kTest)) break;
1520 // Check if there is one run per master job.
1521 if (fNrunsPerMaster<2) {
1522 if (FileExists(file)) {
1523 if (fOverwriteMode) gGrid->Rm(file);
1525 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
1529 // Copy xml file to alien space
1530 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
1531 if (!FileExists(file)) {
1532 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
1537 // Check if the collection for the chunk exist locally.
1538 Int_t nchunk = (nruns-1)/fNrunsPerMaster;
1539 if (FileExists(fInputFiles->At(nchunk)->GetName())) {
1540 if (fOverwriteMode) gGrid->Rm(fInputFiles->At(nchunk)->GetName());
1543 printf(" Merging collection <%s> into %d runs chunk...\n",file.Data(),fNrunsPerMaster);
1544 if (((nruns-1)%fNrunsPerMaster) == 0) {
1545 schunk = Form(fRunPrefix.Data(), irun);
1546 cbase = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
1548 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
1552 format = Form("%%s_%s.xml", fRunPrefix.Data());
1553 schunk2 = Form(format.Data(), schunk.Data(), irun);
1554 if ((nruns%fNrunsPerMaster)!=0 && irun!=fRunRange[1] && schunk2 != fInputFiles->Last()->GetName()) {
1558 if (FileExists(schunk)) {
1559 if (fOverwriteMode) gGrid->Rm(schunk);
1561 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", schunk.Data());
1565 printf("Exporting merged collection <%s> and copying to AliEn.\n", schunk.Data());
1566 cbase->ExportXML(Form("file://%s", schunk.Data()),kFALSE,kFALSE, schunk, "Merged runs");
1567 if (FileExists(schunk)) {
1568 if (fOverwriteMode) gGrid->Rm(schunk);
1570 Info("CreateDataset", "\n##### Dataset %s exist. Skipping copy...", schunk.Data());
1574 TFile::Cp(Form("file:%s",schunk.Data()), Form("alien://%s/%s",workdir.Data(), schunk.Data()));
1575 if (!FileExists(schunk)) {
1576 Error("CreateDataset", "Copy command did NOT succeed for %s", schunk.Data());
1582 Error("CreateDataset", "No valid dataset corresponding to the query!");
1589 //______________________________________________________________________________
1590 Bool_t AliAnalysisAlien::CreateJDL()
1592 // Generate a JDL file according to current settings. The name of the file is
1593 // specified by fJDLName.
1594 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
1595 Bool_t error = kFALSE;
1597 Bool_t copy = kTRUE;
1598 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
1599 Bool_t generate = kTRUE;
1600 if (TestBit(AliAnalysisGrid::kTest) || TestBit(AliAnalysisGrid::kSubmit)) generate = kFALSE;
1602 Error("CreateJDL", "Alien connection required");
1605 // Check validity of alien workspace
1607 if (!fProductionMode && !fGridWorkingDir.BeginsWith("/alice")) workdir = gGrid->GetHomeDirectory();
1608 if (!fProductionMode && !TestBit(AliAnalysisGrid::kTest)) CdWork();
1609 workdir += fGridWorkingDir;
1613 Error("CreateJDL()", "Define some input files for your analysis.");
1616 // Compose list of input files
1617 // Check if output files were defined
1618 if (!fOutputFiles.Length()) {
1619 Error("CreateJDL", "You must define at least one output file");
1622 // Check if an output directory was defined and valid
1623 if (!fGridOutputDir.Length()) {
1624 Error("CreateJDL", "You must define AliEn output directory");
1627 if (!fProductionMode) {
1628 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s", workdir.Data(), fGridOutputDir.Data());
1629 if (!DirectoryExists(fGridOutputDir)) {
1630 if (gGrid->Mkdir(fGridOutputDir,"-p")) {
1631 Info("CreateJDL", "\n##### Created alien output directory %s", fGridOutputDir.Data());
1633 Error("CreateJDL", "Could not create alien output directory %s", fGridOutputDir.Data());
1637 Warning("CreateJDL", "#### Output directory %s exists! If this contains old data, jobs will fail with ERROR_SV !!! ###", fGridOutputDir.Data());
1642 // Exit if any error up to now
1643 if (error) return kFALSE;
1645 if (!fUser.IsNull()) {
1646 fGridJDL->SetValue("User", Form("\"%s\"", fUser.Data()));
1647 fMergingJDL->SetValue("User", Form("\"%s\"", fUser.Data()));
1649 fGridJDL->SetExecutable(fExecutable, "This is the startup script");
1650 TString mergeExec = fExecutable;
1651 mergeExec.ReplaceAll(".sh", "_merge.sh");
1652 fMergingJDL->SetExecutable(mergeExec, "This is the startup script");
1653 mergeExec.ReplaceAll(".sh", ".C");
1654 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),mergeExec.Data()), "List of input files to be uploaded to workers");
1655 if (!fArguments.IsNull())
1656 fGridJDL->SetArguments(fArguments, "Arguments for the executable command");
1657 if (IsOneStageMerging()) fMergingJDL->SetArguments(fGridOutputDir);
1659 if (fProductionMode) fMergingJDL->SetArguments("wn.xml $4"); // xml, stage
1660 else fMergingJDL->SetArguments("wn.xml $2"); // xml, stage
1663 fGridJDL->SetValue("TTL", Form("\"%d\"",fTTL));
1664 fGridJDL->SetDescription("TTL", Form("Time after which the job is killed (%d min.)", fTTL/60));
1665 fMergingJDL->SetValue("TTL", Form("\"%d\"",fTTL));
1666 fMergingJDL->SetDescription("TTL", Form("Time after which the job is killed (%d min.)", fTTL/60));
1668 if (fMaxInitFailed > 0) {
1669 fGridJDL->SetValue("MaxInitFailed", Form("\"%d\"",fMaxInitFailed));
1670 fGridJDL->SetDescription("MaxInitFailed", "Maximum number of first failing jobs to abort the master job");
1672 if (fSplitMaxInputFileNumber > 0) {
1673 fGridJDL->SetValue("SplitMaxInputFileNumber", Form("\"%d\"", fSplitMaxInputFileNumber));
1674 fGridJDL->SetDescription("SplitMaxInputFileNumber", "Maximum number of input files to be processed per subjob");
1676 if (!IsOneStageMerging()) {
1677 fMergingJDL->SetValue("SplitMaxInputFileNumber", Form("\"%d\"",fMaxMergeFiles));
1678 fMergingJDL->SetDescription("SplitMaxInputFileNumber", "Maximum number of input files to be merged in one go");
1680 if (fSplitMode.Length()) {
1681 fGridJDL->SetValue("Split", Form("\"%s\"", fSplitMode.Data()));
1682 fGridJDL->SetDescription("Split", "We split per SE or file");
1684 fMergingJDL->SetValue("Split", "\"se\"");
1685 fMergingJDL->SetDescription("Split", "We split per SE for merging in stages");
1686 if (!fAliROOTVersion.IsNull()) {
1687 fGridJDL->AddToPackages("AliRoot", fAliROOTVersion,"VO_ALICE", "List of requested packages");
1688 fMergingJDL->AddToPackages("AliRoot", fAliROOTVersion, "VO_ALICE", "List of requested packages");
1690 if (!fROOTVersion.IsNull()) {
1691 fGridJDL->AddToPackages("ROOT", fROOTVersion);
1692 fMergingJDL->AddToPackages("ROOT", fROOTVersion);
1694 if (!fAPIVersion.IsNull()) {
1695 fGridJDL->AddToPackages("APISCONFIG", fAPIVersion);
1696 fMergingJDL->AddToPackages("APISCONFIG", fAPIVersion);
1698 if (!fExternalPackages.IsNull()) {
1699 arr = fExternalPackages.Tokenize(" ");
1701 while ((os=(TObjString*)next())) {
1702 TString pkgname = os->GetString();
1703 Int_t index = pkgname.Index("::");
1704 TString pkgversion = pkgname(index+2, pkgname.Length());
1705 pkgname.Remove(index);
1706 fGridJDL->AddToPackages(pkgname, pkgversion);
1707 fMergingJDL->AddToPackages(pkgname, pkgversion);
1711 fGridJDL->SetInputDataListFormat(fInputFormat, "Format of input data");
1712 fGridJDL->SetInputDataList("wn.xml", "Collection name to be processed on each worker node");
1713 fMergingJDL->SetInputDataListFormat(fInputFormat, "Format of input data");
1714 fMergingJDL->SetInputDataList("wn.xml", "Collection name to be processed on each worker node");
1715 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), fAnalysisMacro.Data()), "List of input files to be uploaded to workers");
1716 TString analysisFile = fExecutable;
1717 analysisFile.ReplaceAll(".sh", ".root");
1718 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),analysisFile.Data()));
1719 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),analysisFile.Data()));
1720 if (fAdditionalLibs.Length()) {
1721 arr = fAdditionalLibs.Tokenize(" ");
1723 while ((os=(TObjString*)next())) {
1724 if (os->GetString().Contains(".so")) continue;
1725 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), os->GetString().Data()));
1726 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), os->GetString().Data()));
1731 TIter next(fPackages);
1733 while ((obj=next())) {
1734 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), obj->GetName()));
1735 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), obj->GetName()));
1738 const char *comment = "List of output files and archives";
1739 if (fOutputArchive.Length()) {
1740 TString outputArchive = fOutputArchive;
1741 if (!fRegisterExcludes.IsNull()) {
1742 arr = fRegisterExcludes.Tokenize(" ");
1744 while ((os=(TObjString*)next1())) {
1745 outputArchive.ReplaceAll(Form("%s,",os->GetString().Data()),"");
1746 outputArchive.ReplaceAll(os->GetString(),"");
1750 arr = outputArchive.Tokenize(" ");
1752 Bool_t first = kTRUE;
1753 while ((os=(TObjString*)next())) {
1754 if (!os->GetString().Contains("@") && fCloseSE.Length())
1755 fGridJDL->AddToSet("Output", Form("%s@%s",os->GetString().Data(), fCloseSE.Data()));
1757 fGridJDL->AddToSet("Output", os->GetString());
1758 if (first) fGridJDL->AddToSetDescription("Output", comment);
1762 // Output archive for the merging jdl
1763 if (TestBit(AliAnalysisGrid::kDefaultOutputs)) {
1764 outputArchive = "log_archive.zip:std*@disk=1 ";
1765 // Add normal output files, extra files + terminate files
1766 TString files = GetListOfFiles("outextter");
1767 // Do not register files in fRegisterExcludes
1768 if (!fRegisterExcludes.IsNull()) {
1769 arr = fRegisterExcludes.Tokenize(" ");
1771 while ((os=(TObjString*)next1())) {
1772 files.ReplaceAll(Form("%s,",os->GetString().Data()),"");
1773 files.ReplaceAll(os->GetString(),"");
1777 files.ReplaceAll(".root", "*.root");
1779 if (mgr->IsCollectThroughput())
1780 outputArchive += Form("root_archive.zip:%s,*.stat@disk=%d %s@disk=%d",files.Data(),fNreplicas, mgr->GetFileInfoLog(),fNreplicas);
1782 outputArchive += Form("root_archive.zip:%s,*.stat@disk=%d",files.Data(),fNreplicas);
1784 TString files = fOutputArchive;
1785 files.ReplaceAll(".root", "*.root"); // nreplicas etc should be already atttached by use
1786 outputArchive = files;
1788 arr = outputArchive.Tokenize(" ");
1791 while ((os=(TObjString*)next2())) {
1792 TString currentfile = os->GetString();
1793 if (!currentfile.Contains("@") && fCloseSE.Length())
1794 fMergingJDL->AddToSet("Output", Form("%s@%s",currentfile.Data(), fCloseSE.Data()));
1796 fMergingJDL->AddToSet("Output", currentfile);
1797 if (first) fMergingJDL->AddToSetDescription("Output", comment);
1802 arr = fOutputFiles.Tokenize(",");
1804 Bool_t first = kTRUE;
1805 while ((os=(TObjString*)next())) {
1806 // Ignore ouputs in jdl that are also in outputarchive
1807 TString sout = os->GetString();
1808 sout.ReplaceAll("*", "");
1809 sout.ReplaceAll(".root", "");
1810 if (sout.Index("@")>0) sout.Remove(sout.Index("@"));
1811 if (fOutputArchive.Contains(sout)) continue;
1812 // Ignore fRegisterExcludes
1813 if (fRegisterExcludes.Contains(sout)) continue;
1814 if (!first) comment = NULL;
1815 if (!os->GetString().Contains("@") && fCloseSE.Length())
1816 fGridJDL->AddToSet("Output", Form("%s@%s",os->GetString().Data(), fCloseSE.Data()));
1818 fGridJDL->AddToSet("Output", os->GetString());
1819 if (first) fGridJDL->AddToSetDescription("Output", comment);
1820 if (fMergeExcludes.Contains(sout)) continue;
1821 if (!os->GetString().Contains("@") && fCloseSE.Length())
1822 fMergingJDL->AddToSet("Output", Form("%s@%s",os->GetString().Data(), fCloseSE.Data()));
1824 fMergingJDL->AddToSet("Output", os->GetString());
1825 if (first) fMergingJDL->AddToSetDescription("Output", comment);
1829 fGridJDL->SetPrice((UInt_t)fPrice, "AliEn price for this job");
1830 fMergingJDL->SetPrice((UInt_t)fPrice, "AliEn price for this job");
1831 TString validationScript = fValidationScript;
1832 fGridJDL->SetValidationCommand(Form("%s/%s", workdir.Data(),validationScript.Data()), "Validation script to be run for each subjob");
1833 validationScript.ReplaceAll(".sh", "_merge.sh");
1834 fMergingJDL->SetValidationCommand(Form("%s/%s", workdir.Data(),validationScript.Data()), "Validation script to be run for each subjob");
1835 if (fMasterResubmitThreshold) {
1836 fGridJDL->SetValue("MasterResubmitThreshold", Form("\"%d%%\"", fMasterResubmitThreshold));
1837 fGridJDL->SetDescription("MasterResubmitThreshold", "Resubmit failed jobs until DONE rate reaches this percentage");
1839 // Write a jdl with 2 input parameters: collection name and output dir name.
1842 // Copy jdl to grid workspace
1844 // Check if an output directory was defined and valid
1845 if (!fGridOutputDir.Length()) {
1846 Error("CreateJDL", "You must define AliEn output directory");
1849 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s", workdir.Data(), fGridOutputDir.Data());
1850 if (!fProductionMode && !DirectoryExists(fGridOutputDir)) {
1851 if (gGrid->Mkdir(fGridOutputDir,"-p")) {
1852 Info("CreateJDL", "\n##### Created alien output directory %s", fGridOutputDir.Data());
1854 Error("CreateJDL", "Could not create alien output directory %s", fGridOutputDir.Data());
1860 if (TestBit(AliAnalysisGrid::kSubmit)) {
1861 TString mergeJDLName = fExecutable;
1862 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
1863 TString locjdl = Form("%s/%s", fGridOutputDir.Data(),fJDLName.Data());
1864 TString locjdl1 = Form("%s/%s", fGridOutputDir.Data(),mergeJDLName.Data());
1865 if (fProductionMode) {
1866 locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
1867 locjdl1 = Form("%s/%s", workdir.Data(),mergeJDLName.Data());
1869 if (FileExists(locjdl)) gGrid->Rm(locjdl);
1870 if (FileExists(locjdl1)) gGrid->Rm(locjdl1);
1871 Info("CreateJDL", "\n##### Copying JDL file <%s> to your AliEn output directory", fJDLName.Data());
1872 if (!copyLocal2Alien("CreateJDL", fJDLName, locjdl))
1873 Fatal("","Terminating");
1874 // TFile::Cp(Form("file:%s",fJDLName.Data()), Form("alien://%s", locjdl.Data()));
1876 Info("CreateJDL", "\n##### Copying merging JDL file <%s> to your AliEn output directory", mergeJDLName.Data());
1877 // TFile::Cp(Form("file:%s",mergeJDLName.Data()), Form("alien://%s", locjdl1.Data()));
1878 if (!copyLocal2Alien("CreateJDL", mergeJDLName.Data(), locjdl1))
1879 Fatal("","Terminating");
1882 if (fAdditionalLibs.Length()) {
1883 arr = fAdditionalLibs.Tokenize(" ");
1886 while ((os=(TObjString*)next())) {
1887 if (os->GetString().Contains(".so")) continue;
1888 Info("CreateJDL", "\n##### Copying dependency: <%s> to your alien workspace", os->GetString().Data());
1889 if (FileExists(os->GetString())) gGrid->Rm(os->GetString());
1890 // TFile::Cp(Form("file:%s",os->GetString().Data()), Form("alien://%s/%s", workdir.Data(), os->GetString().Data()));
1891 if (!copyLocal2Alien("CreateJDL", os->GetString().Data(),
1892 Form("%s/%s", workdir.Data(), os->GetString().Data())))
1893 Fatal("","Terminating");
1898 TIter next(fPackages);
1900 while ((obj=next())) {
1901 if (FileExists(obj->GetName())) gGrid->Rm(obj->GetName());
1902 Info("CreateJDL", "\n##### Copying dependency: <%s> to your alien workspace", obj->GetName());
1903 // TFile::Cp(Form("file:%s",obj->GetName()), Form("alien://%s/%s", workdir.Data(), obj->GetName()));
1904 if (!copyLocal2Alien("CreateJDL",obj->GetName(),
1905 Form("%s/%s", workdir.Data(), obj->GetName())))
1906 Fatal("","Terminating");
1913 //______________________________________________________________________________
1914 Bool_t AliAnalysisAlien::WriteJDL(Bool_t copy)
1916 // Writes one or more JDL's corresponding to findex. If findex is negative,
1917 // all run numbers are considered in one go (jdl). For non-negative indices
1918 // they correspond to the indices in the array fInputFiles.
1919 if (!fInputFiles) return kFALSE;
1922 if (!fProductionMode && !fGridWorkingDir.BeginsWith("/alice")) workdir = gGrid->GetHomeDirectory();
1923 workdir += fGridWorkingDir;
1924 TString stageName = "$2";
1925 if (fProductionMode) stageName = "$4";
1926 if (!fMergeDirName.IsNull()) {
1927 fMergingJDL->AddToInputDataCollection(Form("LF:$1/%s/Stage_%s.xml,nodownload",fMergeDirName.Data(),stageName.Data()), "Collection of files to be merged for current stage");
1928 fMergingJDL->SetOutputDirectory(Form("$1/%s/Stage_%s/#alien_counter_03i#",fMergeDirName.Data(),stageName.Data()), "Output directory");
1930 fMergingJDL->AddToInputDataCollection(Form("LF:$1/Stage_%s.xml,nodownload",stageName.Data()), "Collection of files to be merged for current stage");
1931 fMergingJDL->SetOutputDirectory(Form("$1/Stage_%s/#alien_counter_03i#",stageName.Data()), "Output directory");
1933 if (fProductionMode) {
1934 TIter next(fInputFiles);
1935 while ((os=next())) {
1936 fGridJDL->AddToInputDataCollection(Form("LF:%s,nodownload", os->GetName()), "Input xml collections");
1938 if (!fOutputToRunNo)
1939 fGridJDL->SetOutputDirectory(Form("%s/#alien_counter_04i#", fGridOutputDir.Data()));
1941 fGridJDL->SetOutputDirectory(fGridOutputDir);
1943 if (!fRunNumbers.Length() && !fRunRange[0]) {
1944 // One jdl with no parameters in case input data is specified by name.
1945 TIter next(fInputFiles);
1947 fGridJDL->AddToInputDataCollection(Form("LF:%s,nodownload", os->GetName()), "Input xml collections");
1948 if (!fOutputSingle.IsNull())
1949 fGridJDL->SetOutputDirectory(Form("#alienfulldir#/../%s",fOutputSingle.Data()), "Output directory");
1951 fGridJDL->SetOutputDirectory(Form("%s/#alien_counter_03i#", fGridOutputDir.Data()), "Output directory");
1952 fMergingJDL->SetOutputDirectory(fGridOutputDir);
1955 // One jdl to be submitted with 2 input parameters: data collection name and output dir prefix
1956 fGridJDL->AddToInputDataCollection(Form("LF:%s/$1,nodownload", workdir.Data()), "Input xml collections");
1957 if (!fOutputSingle.IsNull()) {
1958 if (!fOutputToRunNo) fGridJDL->SetOutputDirectory(Form("#alienfulldir#/%s",fOutputSingle.Data()), "Output directory");
1959 else fGridJDL->SetOutputDirectory(Form("%s/$2",fGridOutputDir.Data()), "Output directory");
1961 fGridJDL->SetOutputDirectory(Form("%s/$2/#alien_counter_03i#", fGridOutputDir.Data()), "Output directory");
1966 // Generate the JDL as a string
1967 TString sjdl = fGridJDL->Generate();
1968 TString sjdl1 = fMergingJDL->Generate();
1970 if (!fMergeDirName.IsNull()) {
1971 fMergingJDL->SetOutputDirectory(Form("$1/%s",fMergeDirName.Data()), "Output directory");
1972 fMergingJDL->AddToInputSandbox(Form("LF:$1/%s/Stage_%s.xml",fMergeDirName.Data(),stageName.Data()));
1974 fMergingJDL->SetOutputDirectory("$1", "Output directory");
1975 fMergingJDL->AddToInputSandbox(Form("LF:$1/Stage_%s.xml",stageName.Data()));
1977 TString sjdl2 = fMergingJDL->Generate();
1978 Int_t index, index1;
1979 sjdl.ReplaceAll("\",\"", "\",\n \"");
1980 sjdl.ReplaceAll("(member", "\n (member");
1981 sjdl.ReplaceAll("\",\"VO_", "\",\n \"VO_");
1982 sjdl.ReplaceAll("{", "{\n ");
1983 sjdl.ReplaceAll("};", "\n};");
1984 sjdl.ReplaceAll("{\n \n", "{\n");
1985 sjdl.ReplaceAll("\n\n", "\n");
1986 sjdl.ReplaceAll("OutputDirectory", "OutputDir");
1987 sjdl1.ReplaceAll("\",\"", "\",\n \"");
1988 sjdl1.ReplaceAll("(member", "\n (member");
1989 sjdl1.ReplaceAll("\",\"VO_", "\",\n \"VO_");
1990 sjdl1.ReplaceAll("{", "{\n ");
1991 sjdl1.ReplaceAll("};", "\n};");
1992 sjdl1.ReplaceAll("{\n \n", "{\n");
1993 sjdl1.ReplaceAll("\n\n", "\n");
1994 sjdl1.ReplaceAll("OutputDirectory", "OutputDir");
1995 sjdl2.ReplaceAll("\",\"", "\",\n \"");
1996 sjdl2.ReplaceAll("(member", "\n (member");
1997 sjdl2.ReplaceAll("\",\"VO_", "\",\n \"VO_");
1998 sjdl2.ReplaceAll("{", "{\n ");
1999 sjdl2.ReplaceAll("};", "\n};");
2000 sjdl2.ReplaceAll("{\n \n", "{\n");
2001 sjdl2.ReplaceAll("\n\n", "\n");
2002 sjdl2.ReplaceAll("OutputDirectory", "OutputDir");
2003 sjdl += "JDLVariables = \n{\n \"Packages\",\n \"OutputDir\"\n};\n";
2004 sjdl.Prepend(Form("Jobtag = {\n \"comment:%s\"\n};\n", fJobTag.Data()));
2005 index = sjdl.Index("JDLVariables");
2006 if (index >= 0) sjdl.Insert(index, "\n# JDL variables\n");
2007 sjdl += "Workdirectorysize = {\"5000MB\"};";
2008 sjdl1 += "Workdirectorysize = {\"5000MB\"};";
2009 sjdl1 += "JDLVariables = \n{\n \"Packages\",\n \"OutputDir\"\n};\n";
2010 index = fJobTag.Index(":");
2011 if (index < 0) index = fJobTag.Length();
2012 TString jobTag = fJobTag;
2013 if (fProductionMode) jobTag.Insert(index,"_Stage$4");
2014 sjdl1.Prepend(Form("Jobtag = {\n \"comment:%s_Merging\"\n};\n", jobTag.Data()));
2015 if (fProductionMode) {
2016 sjdl1.Prepend("# Generated merging jdl (production mode) \
2017 \n# $1 = full alien path to output directory to be merged \
2018 \n# $2 = train number \
2019 \n# $3 = production (like LHC10b) \
2020 \n# $4 = merging stage \
2021 \n# Stage_<n>.xml made via: find <OutputDir> *Stage<n-1>/*root_archive.zip\n");
2022 sjdl2.Prepend(Form("Jobtag = {\n \"comment:%s_FinalMerging\"\n};\n", jobTag.Data()));
2023 sjdl2.Prepend("# Generated merging jdl \
2024 \n# $1 = full alien path to output directory to be merged \
2025 \n# $2 = train number \
2026 \n# $3 = production (like LHC10b) \
2027 \n# $4 = merging stage \
2028 \n# Stage_<n>.xml made via: find <OutputDir> *Stage<n-1>/*root_archive.zip\n");
2030 sjdl1.Prepend("# Generated merging jdl \
2031 \n# $1 = full alien path to output directory to be merged \
2032 \n# $2 = merging stage \
2033 \n# xml made via: find <OutputDir> *Stage<n-1>/*root_archive.zip\n");
2034 sjdl2.Prepend(Form("Jobtag = {\n \"comment:%s_FinalMerging\"\n};\n", jobTag.Data()));
2035 sjdl2.Prepend("# Generated merging jdl \
2036 \n# $1 = full alien path to output directory to be merged \
2037 \n# $2 = merging stage \
2038 \n# xml made via: find <OutputDir> *Stage<n-1>/*root_archive.zip\n");
2040 index = sjdl1.Index("JDLVariables");
2041 if (index >= 0) sjdl1.Insert(index, "\n# JDL variables\n");
2042 index = sjdl2.Index("JDLVariables");
2043 if (index >= 0) sjdl2.Insert(index, "\n# JDL variables\n");
2044 sjdl1 += "Workdirectorysize = {\"5000MB\"};";
2045 sjdl2 += "Workdirectorysize = {\"5000MB\"};";
2046 index = sjdl2.Index("Split =");
2048 index1 = sjdl2.Index("\n", index);
2049 sjdl2.Remove(index, index1-index+1);
2051 index = sjdl2.Index("SplitMaxInputFileNumber");
2053 index1 = sjdl2.Index("\n", index);
2054 sjdl2.Remove(index, index1-index+1);
2056 index = sjdl2.Index("InputDataCollection");
2058 index1 = sjdl2.Index(";", index);
2059 sjdl2.Remove(index, index1-index+1);
2061 index = sjdl2.Index("InputDataListFormat");
2063 index1 = sjdl2.Index("\n", index);
2064 sjdl2.Remove(index, index1-index+1);
2066 index = sjdl2.Index("InputDataList");
2068 index1 = sjdl2.Index("\n", index);
2069 sjdl2.Remove(index, index1-index+1);
2071 sjdl2.ReplaceAll("wn.xml", Form("Stage_%s.xml",stageName.Data()));
2072 // Write jdl to file
2074 out.open(fJDLName.Data(), ios::out);
2076 Error("WriteJDL", "Bad file name: %s", fJDLName.Data());
2079 out << sjdl << endl;
2081 TString mergeJDLName = fExecutable;
2082 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
2085 out1.open(mergeJDLName.Data(), ios::out);
2087 Error("WriteJDL", "Bad file name: %s", mergeJDLName.Data());
2090 out1 << sjdl1 << endl;
2093 TString finalJDL = mergeJDLName;
2094 finalJDL.ReplaceAll(".jdl", "_final.jdl");
2095 out2.open(finalJDL.Data(), ios::out);
2097 Error("WriteJDL", "Bad file name: %s", finalJDL.Data());
2100 out2 << sjdl2 << endl;
2104 // Copy jdl to grid workspace
2106 Info("WriteJDL", "\n##### You may want to review jdl:%s and analysis macro:%s before running in <submit> mode", fJDLName.Data(), fAnalysisMacro.Data());
2108 TString locjdl = Form("%s/%s", fGridOutputDir.Data(),fJDLName.Data());
2109 TString locjdl1 = Form("%s/%s", fGridOutputDir.Data(),mergeJDLName.Data());
2110 TString finalJDL = mergeJDLName;
2111 finalJDL.ReplaceAll(".jdl", "_final.jdl");
2112 TString locjdl2 = Form("%s/%s", fGridOutputDir.Data(),finalJDL.Data());
2113 if (fProductionMode) {
2114 locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
2115 locjdl1 = Form("%s/%s", workdir.Data(),mergeJDLName.Data());
2116 locjdl2 = Form("%s/%s", workdir.Data(),finalJDL.Data());
2118 if (FileExists(locjdl)) gGrid->Rm(locjdl);
2119 if (FileExists(locjdl1)) gGrid->Rm(locjdl1);
2120 if (FileExists(locjdl2)) gGrid->Rm(locjdl2);
2121 Info("WriteJDL", "\n##### Copying JDL file <%s> to your AliEn output directory", fJDLName.Data());
2122 // TFile::Cp(Form("file:%s",fJDLName.Data()), Form("alien://%s", locjdl.Data()));
2123 if (!copyLocal2Alien("WriteJDL",fJDLName.Data(),locjdl.Data()))
2124 Fatal("","Terminating");
2126 Info("WriteJDL", "\n##### Copying merging JDL files <%s> to your AliEn output directory", mergeJDLName.Data());
2127 // TFile::Cp(Form("file:%s",mergeJDLName.Data()), Form("alien://%s", locjdl1.Data()));
2128 // TFile::Cp(Form("file:%s",finalJDL.Data()), Form("alien://%s", locjdl2.Data()));
2129 if (!copyLocal2Alien("WriteJDL",mergeJDLName.Data(),locjdl1.Data()))
2130 Fatal("","Terminating");
2131 if (!copyLocal2Alien("WriteJDL",finalJDL.Data(),locjdl2.Data()))
2132 Fatal("","Terminating");
2138 //______________________________________________________________________________
2139 Bool_t AliAnalysisAlien::FileExists(const char *lfn)
2141 // Returns true if file exists.
2142 if (!gGrid) return kFALSE;
2144 slfn.ReplaceAll("alien://","");
2145 TGridResult *res = gGrid->Ls(slfn);
2146 if (!res) return kFALSE;
2147 TMap *map = dynamic_cast<TMap*>(res->At(0));
2152 TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("name"));
2153 if (!objs || !objs->GetString().Length()) {
2161 //______________________________________________________________________________
2162 Bool_t AliAnalysisAlien::DirectoryExists(const char *dirname)
2164 // Returns true if directory exists. Can be also a path.
2165 if (!gGrid) return kFALSE;
2166 // Check if dirname is a path
2167 TString dirstripped = dirname;
2168 dirstripped = dirstripped.Strip();
2169 dirstripped = dirstripped.Strip(TString::kTrailing, '/');
2170 TString dir = gSystem->BaseName(dirstripped);
2172 TString path = gSystem->DirName(dirstripped);
2173 TGridResult *res = gGrid->Ls(path, "-F");
2174 if (!res) return kFALSE;
2178 while ((map=dynamic_cast<TMap*>(next()))) {
2179 obj = map->GetValue("name");
2181 if (dir == obj->GetName()) {
2190 //______________________________________________________________________________
2191 void AliAnalysisAlien::CheckDataType(const char *lfn, Bool_t &isCollection, Bool_t &isXml, Bool_t &useTags)
2193 // Check input data type.
2194 isCollection = kFALSE;
2198 Error("CheckDataType", "No connection to grid");
2201 isCollection = IsCollection(lfn);
2202 TString msg = "\n##### file: ";
2205 msg += " type: raw_collection;";
2206 // special treatment for collections
2208 // check for tag files in the collection
2209 TGridResult *res = gGrid->Command(Form("listFilesFromCollection -z -v %s",lfn), kFALSE);
2211 msg += " using_tags: No (unknown)";
2212 Info("CheckDataType", "%s", msg.Data());
2215 const char* typeStr = res->GetKey(0, "origLFN");
2216 if (!typeStr || !strlen(typeStr)) {
2217 msg += " using_tags: No (unknown)";
2218 Info("CheckDataType", "%s", msg.Data());
2221 TString file = typeStr;
2222 useTags = file.Contains(".tag");
2223 if (useTags) msg += " using_tags: Yes";
2224 else msg += " using_tags: No";
2225 Info("CheckDataType", "%s", msg.Data());
2230 isXml = slfn.Contains(".xml");
2232 // Open xml collection and check if there are tag files inside
2233 msg += " type: xml_collection;";
2234 TGridCollection *coll = (TGridCollection*)gROOT->ProcessLine(Form("TAlienCollection::Open(\"alien://%s\",1);",lfn));
2236 msg += " using_tags: No (unknown)";
2237 Info("CheckDataType", "%s", msg.Data());
2240 TMap *map = coll->Next();
2242 msg += " using_tags: No (unknown)";
2243 Info("CheckDataType", "%s", msg.Data());
2246 map = (TMap*)map->GetValue("");
2248 if (map && map->GetValue("name")) file = map->GetValue("name")->GetName();
2249 useTags = file.Contains(".tag");
2251 if (useTags) msg += " using_tags: Yes";
2252 else msg += " using_tags: No";
2253 Info("CheckDataType", "%s", msg.Data());
2256 useTags = slfn.Contains(".tag");
2257 if (slfn.Contains(".root")) msg += " type: root file;";
2258 else msg += " type: unknown file;";
2259 if (useTags) msg += " using_tags: Yes";
2260 else msg += " using_tags: No";
2261 Info("CheckDataType", "%s", msg.Data());
2264 //______________________________________________________________________________
2265 void AliAnalysisAlien::EnablePackage(const char *package)
2267 // Enables a par file supposed to exist in the current directory.
2268 TString pkg(package);
2269 pkg.ReplaceAll(".par", "");
2271 if (gSystem->AccessPathName(pkg)) {
2272 Fatal("EnablePackage", "Package %s not found", pkg.Data());
2275 if (!TObject::TestBit(AliAnalysisGrid::kUsePars))
2276 Info("EnablePackage", "AliEn plugin will use .par packages");
2277 TObject::SetBit(AliAnalysisGrid::kUsePars, kTRUE);
2279 fPackages = new TObjArray();
2280 fPackages->SetOwner();
2282 fPackages->Add(new TObjString(pkg));
2285 //______________________________________________________________________________
2286 TChain *AliAnalysisAlien::GetChainForTestMode(const char *treeName) const
2288 // Make a tree from files having the location specified in fFileForTestMode.
2289 // Inspired from JF's CreateESDChain.
2290 if (fFileForTestMode.IsNull()) {
2291 Error("GetChainForTestMode", "For proof test mode please use SetFileForTestMode() pointing to a file that contains data file locations.");
2294 if (gSystem->AccessPathName(fFileForTestMode)) {
2295 Error("GetChainForTestMode", "File not found: %s", fFileForTestMode.Data());
2300 in.open(fFileForTestMode);
2302 // Read the input list of files and add them to the chain
2304 TString streeName(treeName);
2305 if (IsUseMCchain()) streeName = "TE";
2306 TChain *chain = new TChain(streeName);
2307 TChain *chainFriend = 0;
2308 if (!fFriendChainName.IsNull()) chainFriend = new TChain(streeName);
2312 if (line.IsNull() || line.BeginsWith("#")) continue;
2313 if (count++ == fNtestFiles) break;
2314 TString esdFile(line);
2315 TFile *file = TFile::Open(esdFile);
2316 if (file && !file->IsZombie()) {
2317 chain->Add(esdFile);
2319 if (!fFriendChainName.IsNull()) {
2320 if (esdFile.Index("#") > -1)
2321 esdFile.Remove(esdFile.Index("#"));
2322 esdFile = gSystem->DirName(esdFile);
2323 esdFile += "/" + fFriendChainName;
2324 file = TFile::Open(esdFile);
2325 if (file && !file->IsZombie()) {
2327 chainFriend->Add(esdFile);
2329 Fatal("GetChainForTestMode", "Cannot open friend file: %s", esdFile.Data());
2334 Error("GetChainforTestMode", "Skipping un-openable file: %s", esdFile.Data());
2338 if (!chain->GetListOfFiles()->GetEntries()) {
2339 Error("GetChainForTestMode", "No file from %s could be opened", fFileForTestMode.Data());
2345 if (!fFriendChainName.IsNull()) chain->AddFriend(chainFriend);
2349 //______________________________________________________________________________
2350 const char *AliAnalysisAlien::GetJobStatus(Int_t jobidstart, Int_t lastid, Int_t &nrunning, Int_t &nwaiting, Int_t &nerror, Int_t &ndone)
2352 // Get job status for all jobs with jobid>jobidstart.
2353 static char mstatus[20];
2359 TGridJobStatusList *list = gGrid->Ps("");
2360 if (!list) return mstatus;
2361 Int_t nentries = list->GetSize();
2362 TGridJobStatus *status;
2364 for (Int_t ijob=0; ijob<nentries; ijob++) {
2365 status = (TGridJobStatus *)list->At(ijob);
2366 pid = gROOT->ProcessLine(Form("atoi(((TAlienJobStatus*)%p)->GetKey(\"queueId\"));", status));
2367 if (pid<jobidstart) continue;
2368 if (pid == lastid) {
2369 gROOT->ProcessLine(Form("sprintf((char*)%p,((TAlienJobStatus*)%p)->GetKey(\"status\"));",mstatus, status));
2371 switch (status->GetStatus()) {
2372 case TGridJobStatus::kWAITING:
2374 case TGridJobStatus::kRUNNING:
2376 case TGridJobStatus::kABORTED:
2377 case TGridJobStatus::kFAIL:
2378 case TGridJobStatus::kUNKNOWN:
2380 case TGridJobStatus::kDONE:
2389 //______________________________________________________________________________
2390 Bool_t AliAnalysisAlien::IsCollection(const char *lfn) const
2392 // Returns true if file is a collection. Functionality duplicated from
2393 // TAlien::Type() because we don't want to directly depend on TAlien.
2395 Error("IsCollection", "No connection to grid");
2398 TGridResult *res = gGrid->Command(Form("type -z %s",lfn),kFALSE);
2399 if (!res) return kFALSE;
2400 const char* typeStr = res->GetKey(0, "type");
2401 if (!typeStr || !strlen(typeStr)) return kFALSE;
2402 if (!strcmp(typeStr, "collection")) return kTRUE;
2407 //______________________________________________________________________________
2408 Bool_t AliAnalysisAlien::IsSingleOutput() const
2410 // Check if single-ouput option is on.
2411 return (!fOutputSingle.IsNull());
2414 //______________________________________________________________________________
2415 void AliAnalysisAlien::Print(Option_t *) const
2417 // Print current plugin settings.
2418 printf("### AliEn analysis plugin current settings ###\n");
2419 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
2420 if (mgr && mgr->IsProofMode()) {
2421 TString proofType = "= PLUGIN IN PROOF MODE ON CLUSTER:_________________";
2422 if (TestBit(AliAnalysisGrid::kTest))
2423 proofType = "= PLUGIN IN PROOF LITE MODE ON CLUSTER:____________";
2424 printf("%s %s\n", proofType.Data(), fProofCluster.Data());
2425 if (!fProofDataSet.IsNull())
2426 printf("= Requested data set:___________________________ %s\n", fProofDataSet.Data());
2428 printf("= Soft reset signal will be send to master______ CHANGE BEHAVIOR AFTER COMPLETION\n");
2430 printf("= Hard reset signal will be send to master______ CHANGE BEHAVIOR AFTER COMPLETION\n");
2431 if (!fROOTVersion.IsNull())
2432 printf("= ROOT version requested________________________ %s\n", fROOTVersion.Data());
2434 printf("= ROOT version requested________________________ default\n");
2435 printf("= AliRoot version requested_____________________ %s\n", fAliROOTVersion.Data());
2436 if (!fAliRootMode.IsNull())
2437 printf("= Requested AliRoot mode________________________ %s\n", fAliRootMode.Data());
2439 printf("= Number of PROOF workers limited to____________ %d\n", fNproofWorkers);
2440 if (fNproofWorkersPerSlave)
2441 printf("= Maximum number of workers per slave___________ %d\n", fNproofWorkersPerSlave);
2442 if (TestSpecialBit(kClearPackages))
2443 printf("= ClearPackages requested...\n");
2444 if (fIncludePath.Data())
2445 printf("= Include path for runtime task compilation: ___ %s\n", fIncludePath.Data());
2446 printf("= Additional libs to be loaded or souces to be compiled runtime: <%s>\n",fAdditionalLibs.Data());
2447 if (fPackages && fPackages->GetEntries()) {
2448 TIter next(fPackages);
2451 while ((obj=next())) list += obj->GetName();
2452 printf("= Par files to be used: ________________________ %s\n", list.Data());
2454 if (TestSpecialBit(kProofConnectGrid))
2455 printf("= Requested PROOF connection to grid\n");
2458 printf("= OverwriteMode:________________________________ %d\n", fOverwriteMode);
2459 if (fOverwriteMode) {
2460 printf("***** NOTE: Overwrite mode will overwrite the input generated datasets and partial results from previous analysis. \
2461 \n***** To disable, use: plugin->SetOverwriteMode(kFALSE);\n");
2463 printf("= Copy files to grid: __________________________ %s\n", (IsUseCopy())?"YES":"NO");
2464 printf("= Check if files can be copied to grid: ________ %s\n", (IsCheckCopy())?"YES":"NO");
2465 printf("= Production mode:______________________________ %d\n", fProductionMode);
2466 printf("= Version of API requested: ____________________ %s\n", fAPIVersion.Data());
2467 printf("= Version of ROOT requested: ___________________ %s\n", fROOTVersion.Data());
2468 printf("= Version of AliRoot requested: ________________ %s\n", fAliROOTVersion.Data());
2470 printf("= User running the plugin: _____________________ %s\n", fUser.Data());
2471 printf("= Grid workdir relative to user $HOME: _________ %s\n", fGridWorkingDir.Data());
2472 printf("= Grid output directory relative to workdir: ___ %s\n", fGridOutputDir.Data());
2473 printf("= Data base directory path requested: __________ %s\n", fGridDataDir.Data());
2474 printf("= Data search pattern: _________________________ %s\n", fDataPattern.Data());
2475 printf("= Input data format: ___________________________ %s\n", fInputFormat.Data());
2476 if (fRunNumbers.Length())
2477 printf("= Run numbers to be processed: _________________ %s\n", fRunNumbers.Data());
2479 printf("= Run range to be processed: ___________________ %d-%d\n", fRunRange[0], fRunRange[1]);
2480 if (!fRunRange[0] && !fRunNumbers.Length()) {
2481 TIter next(fInputFiles);
2484 while ((obj=next())) list += obj->GetName();
2485 printf("= Input files to be processed: _________________ %s\n", list.Data());
2487 if (TestBit(AliAnalysisGrid::kTest))
2488 printf("= Number of input files used in test mode: _____ %d\n", fNtestFiles);
2489 printf("= List of output files to be registered: _______ %s\n", fOutputFiles.Data());
2490 printf("= List of outputs going to be archived: ________ %s\n", fOutputArchive.Data());
2491 printf("= List of outputs that should not be merged: ___ %s\n", fMergeExcludes.Data());
2492 printf("= List of outputs that should not be registered: %s\n", fRegisterExcludes.Data());
2493 printf("= List of outputs produced during Terminate: ___ %s\n", fTerminateFiles.Data());
2494 printf("=====================================================================\n");
2495 printf("= Job price: ___________________________________ %d\n", fPrice);
2496 printf("= Time to live (TTL): __________________________ %d\n", fTTL);
2497 printf("= Max files per subjob: ________________________ %d\n", fSplitMaxInputFileNumber);
2498 if (fMaxInitFailed>0)
2499 printf("= Max number of subjob fails to kill: __________ %d\n", fMaxInitFailed);
2500 if (fMasterResubmitThreshold>0)
2501 printf("= Resubmit master job if failed subjobs >_______ %d\n", fMasterResubmitThreshold);
2502 printf("= Number of replicas for the output files_______ %d\n", fNreplicas);
2503 if (fNrunsPerMaster>0)
2504 printf("= Number of runs per master job: _______________ %d\n", fNrunsPerMaster);
2505 printf("= Number of files in one chunk to be merged: ___ %d\n", fMaxMergeFiles);
2506 printf("= Name of the generated execution script: ______ %s\n", fExecutable.Data());
2507 printf("= Executable command: __________________________ %s\n", fExecutableCommand.Data());
2508 if (fArguments.Length())
2509 printf("= Arguments for the execution script: __________ %s\n",fArguments.Data());
2510 if (fExecutableArgs.Length())
2511 printf("= Arguments after macro name in executable______ %s\n",fExecutableArgs.Data());
2512 printf("= Name of the generated analysis macro: ________ %s\n",fAnalysisMacro.Data());
2513 printf("= User analysis files to be deployed: __________ %s\n",fAnalysisSource.Data());
2514 printf("= Additional libs to be loaded or souces to be compiled runtime: <%s>\n",fAdditionalLibs.Data());
2515 printf("= Master jobs split mode: ______________________ %s\n",fSplitMode.Data());
2517 printf("= Custom name for the dataset to be created: ___ %s\n", fDatasetName.Data());
2518 printf("= Name of the generated JDL: ___________________ %s\n", fJDLName.Data());
2519 if (fIncludePath.Data())
2520 printf("= Include path for runtime task compilation: ___ %s\n", fIncludePath.Data());
2521 if (fCloseSE.Length())
2522 printf("= Force job outputs to storage element: ________ %s\n", fCloseSE.Data());
2523 if (fFriendChainName.Length())
2524 printf("= Open friend chain file on worker: ____________ %s\n", fFriendChainName.Data());
2525 if (fPackages && fPackages->GetEntries()) {
2526 TIter next(fPackages);
2529 while ((obj=next())) list += obj->GetName();
2530 printf("= Par files to be used: ________________________ %s\n", list.Data());
2534 //______________________________________________________________________________
2535 void AliAnalysisAlien::SetDefaults()
2537 // Set default values for everything. What cannot be filled will be left empty.
2538 if (fGridJDL) delete fGridJDL;
2539 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
2540 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
2543 fSplitMaxInputFileNumber = 100;
2545 fMasterResubmitThreshold = 0;
2551 fNrunsPerMaster = 1;
2552 fMaxMergeFiles = 100;
2554 fExecutable = "analysis.sh";
2555 fExecutableCommand = "root -b -q -x";
2557 fExecutableArgs = "";
2558 fAnalysisMacro = "myAnalysis.C";
2559 fAnalysisSource = "";
2560 fAdditionalLibs = "";
2564 fAliROOTVersion = "";
2565 fUser = ""; // Your alien user name
2566 fGridWorkingDir = "";
2567 fGridDataDir = ""; // Can be like: /alice/sim/PDC_08a/LHC08c9/
2568 fDataPattern = "*AliESDs.root"; // Can be like: *AliESDs.root, */pass1/*AliESDs.root, ...
2569 fFriendChainName = "";
2570 fGridOutputDir = "output";
2571 fOutputArchive = "log_archive.zip:std*@disk=1 root_archive.zip:*.root@disk=2";
2572 fOutputFiles = ""; // Like "AliAODs.root histos.root"
2573 fInputFormat = "xml-single";
2574 fJDLName = "analysis.jdl";
2575 fJobTag = "Automatically generated analysis JDL";
2576 fMergeExcludes = "";
2579 SetCheckCopy(kTRUE);
2580 SetDefaultOutputs(kTRUE);
2584 //______________________________________________________________________________
2585 void AliAnalysisAlien::SetFriendChainName(const char *name, const char *libnames)
2587 // Set file name for the chain of friends and optionally additional libs to be loaded.
2588 // Libs should be separated by blancs.
2589 fFriendChainName = name;
2590 fFriendLibs = libnames;
2591 if (fFriendLibs.Length() && !fFriendLibs.Contains(".so")) {
2592 Fatal("SetFriendChainName()", "You should provide explicit library names (with extension)");
2596 //______________________________________________________________________________
2597 void AliAnalysisAlien::SetRootVersionForProof(const char *version)
2599 // Obsolete method. Use SetROOTVersion instead
2600 Warning("SetRootVersionForProof", "Obsolete. Use SetROOTVersion instead");
2601 if (fROOTVersion.IsNull()) SetROOTVersion(version);
2602 else Error("SetRootVersionForProof", "ROOT version already set to %s", fROOTVersion.Data());
2605 //______________________________________________________________________________
2606 Bool_t AliAnalysisAlien::CheckMergedFiles(const char *filename, const char *aliendir, Int_t nperchunk, const char *jdl)
2608 // Checks current merge stage, makes xml for the next stage, counts number of files, submits next stage.
2609 // First check if the result is already in the output directory.
2610 if (FileExists(Form("%s/%s",aliendir,filename))) {
2611 printf("Final merged results found. Not merging again.\n");
2614 // Now check the last stage done.
2617 if (!FileExists(Form("%s/Stage_%d.xml",aliendir, stage+1))) break;
2620 // Next stage of merging
2622 TString pattern = "*root_archive.zip";
2623 if (stage>1) pattern = Form("Stage_%d/*root_archive.zip", stage-1);
2624 TGridResult *res = gGrid->Command(Form("find -x Stage_%d %s %s", stage, aliendir, pattern.Data()));
2625 if (res) delete res;
2626 // Write standard output to file
2627 gROOT->ProcessLine(Form("gGrid->Stdout(); > %s", Form("Stage_%d.xml",stage)));
2628 // Count the number of files inside
2630 ifile.open(Form("Stage_%d.xml",stage));
2631 if (!ifile.good()) {
2632 ::Error("CheckMergedFiles", "Could not redirect result of the find command to file %s", Form("Stage_%d.xml",stage));
2637 while (!ifile.eof()) {
2639 if (line.Contains("/event")) nfiles++;
2643 ::Error("CheckMergedFiles", "Cannot start Stage_%d merging since Stage_%d did not produced yet output", stage, stage-1);
2646 printf("=== Stage_%d produced %d files\n", stage-1, nfiles);
2648 // Copy the file in the output directory
2649 printf("===> Copying collection %s in the output directory %s\n", Form("Stage_%d.xml",stage), aliendir);
2650 // TFile::Cp(Form("Stage_%d.xml",stage), Form("alien://%s/Stage_%d.xml",aliendir,stage));
2651 if (!copyLocal2Alien("CheckMergedFiles", Form("Stage_%d.xml",stage),
2652 Form("%s/Stage_%d.xml",aliendir,stage))) Fatal("","Terminating");
2653 // Check if this is the last stage to be done.
2654 Bool_t laststage = (nfiles<nperchunk);
2655 if (fMaxMergeStages && stage>=fMaxMergeStages) laststage = kTRUE;
2658 printf("### Submiting final merging stage %d\n", stage);
2659 TString finalJDL = jdl;
2660 finalJDL.ReplaceAll(".jdl", "_final.jdl");
2661 TString query = Form("submit %s %s %d", finalJDL.Data(), aliendir, stage);
2662 jobId = SubmitSingleJob(query);
2664 printf("### Submiting merging stage %d\n", stage);
2665 TString query = Form("submit %s %s %d", jdl, aliendir, stage);
2666 jobId = SubmitSingleJob(query);
2668 if (!jobId) return kFALSE;
2670 if (!fGridJobIDs.IsNull()) fGridJobIDs.Append(" ");
2671 fGridJobIDs.Append(Form("%d", jobId));
2672 if (!fGridStages.IsNull()) fGridStages.Append(" ");
2673 fGridStages.Append(Form("%s_merge_stage%d",
2674 laststage ? "final" : "partial", stage));
2679 //______________________________________________________________________________
2680 AliAnalysisManager *AliAnalysisAlien::LoadAnalysisManager(const char *fname)
2682 // Loat the analysis manager from a file.
2683 TFile *file = TFile::Open(fname);
2685 ::Error("LoadAnalysisManager", "Cannot open file %s", fname);
2688 TIter nextkey(file->GetListOfKeys());
2689 AliAnalysisManager *mgr = 0;
2691 while ((key=(TKey*)nextkey())) {
2692 if (!strcmp(key->GetClassName(), "AliAnalysisManager"))
2693 mgr = (AliAnalysisManager*)file->Get(key->GetName());
2696 ::Error("LoadAnalysisManager", "No analysis manager found in file %s", fname);
2700 //______________________________________________________________________________
2701 Int_t AliAnalysisAlien::SubmitSingleJob(const char *query)
2703 // Submits a single job corresponding to the query and returns job id. If 0 submission failed.
2704 if (!gGrid) return 0;
2705 printf("=> %s ------> ",query);
2706 TGridResult *res = gGrid->Command(query);
2708 TString jobId = res->GetKey(0,"jobId");
2710 if (jobId.IsNull()) {
2711 printf("submission failed. Reason:\n");
2714 ::Error("SubmitSingleJob", "Your query %s could not be submitted", query);
2717 Int_t ijobId = jobId.Atoi();
2718 printf(" Job id: '%s' (%d)\n", jobId.Data(), ijobId);
2722 //______________________________________________________________________________
2723 Bool_t AliAnalysisAlien::MergeInfo(const char *output, const char *collection)
2725 // Merges a collection of output files using concatenation.
2726 TString scoll(collection);
2727 if (!scoll.Contains(".xml")) return kFALSE;
2728 TGridCollection *coll = (TGridCollection*)gROOT->ProcessLine(Form("TAlienCollection::Open(\"%s\");", collection));
2730 ::Error("MergeInfo", "Input XML %s collection empty.", collection);
2733 // Iterate grid collection
2735 Bool_t merged = kFALSE;
2737 while (coll->Next()) {
2738 TString fname = gSystem->DirName(coll->GetTURL());
2741 outtmp = Form("%d_%s", ifile, output);
2742 if (!TFile::Cp(fname, outtmp)) {
2743 ::Error("MergeInfo", "Could not copy %s", fname.Data());
2748 gSystem->Exec(Form("cp %s lastmerged", outtmp.Data()));
2751 gSystem->Exec(Form("cat lastmerged %s > tempmerged", outtmp.Data()));
2752 gSystem->Exec("cp tempmerged lastmerged");
2755 gSystem->Exec(Form("cp lastmerged %s", output));
2756 gSystem->Exec(Form("rm tempmerged lastmerged *_%s", output));
2762 //______________________________________________________________________________
2763 Bool_t AliAnalysisAlien::MergeOutput(const char *output, const char *basedir, Int_t nmaxmerge, Int_t stage)
2765 // Merge given output files from basedir. Basedir can be an alien output directory
2766 // but also an xml file with root_archive.zip locations. The file merger will merge nmaxmerge
2767 // files in a group (ignored for xml input). Merging can be done in stages:
2768 // stage=0 : will merge all existing files in a single stage, supporting resume if run locally
2769 // stage=1 : works with an xml of all root_archive.zip in the output directory
2770 // stage>1 : works with an xml of all root_archive.zip in the Stage_<n-1> directory
2771 TString outputFile = output;
2773 TString outputChunk;
2774 TString previousChunk = "";
2775 TObjArray *listoffiles = new TObjArray();
2776 // listoffiles->SetOwner();
2777 Int_t countChunk = 0;
2778 Int_t countZero = nmaxmerge;
2779 Bool_t merged = kTRUE;
2780 Bool_t isGrid = kTRUE;
2781 Int_t index = outputFile.Index("@");
2782 if (index > 0) outputFile.Remove(index);
2783 TString inputFile = outputFile;
2784 TString sbasedir = basedir;
2785 if (sbasedir.Contains(".xml")) {
2786 // Merge files pointed by the xml - ignore nmaxmerge and set ichunk to 0
2787 nmaxmerge = 9999999;
2788 TGridCollection *coll = (TGridCollection*)gROOT->ProcessLine(Form("TAlienCollection::Open(\"%s\");", basedir));
2790 ::Error("MergeOutput", "Input XML collection empty.");
2793 // Iterate grid collection
2794 while (coll->Next()) {
2795 TString fname = gSystem->DirName(coll->GetTURL());
2798 listoffiles->Add(new TNamed(fname.Data(),""));
2800 } else if (sbasedir.Contains(".txt")) {
2801 // The file having the .txt extension is expected to contain a list of
2802 // folders where the output files will be looked. For alien folders,
2803 // the full folder LFN is expected (starting with alien://)
2804 // Assume lfn's on each line
2809 ::Error("MergeOutput", "File %s cannot be opened. Merging stopped." ,sbasedir.Data());
2815 if (line.IsNull() || line.BeginsWith("#")) continue;
2817 if (!line.Contains("alien:")) isGrid = kFALSE;
2821 listoffiles->Add(new TNamed(line.Data(),""));
2825 ::Error("MergeOutput","Input file %s contains no files to be merged\n", sbasedir.Data());
2830 command = Form("find %s/ *%s", basedir, inputFile.Data());
2831 printf("command: %s\n", command.Data());
2832 TGridResult *res = gGrid->Command(command);
2834 ::Error("MergeOutput","No result for the find command\n");
2840 while ((map=(TMap*)nextmap())) {
2841 TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("turl"));
2842 if (!objs || !objs->GetString().Length()) {
2843 // Nothing found - skip this output
2848 listoffiles->Add(new TNamed(objs->GetName(),""));
2852 if (!listoffiles->GetEntries()) {
2853 ::Error("MergeOutput","No result for the find command\n");
2858 TFileMerger *fm = 0;
2859 TIter next0(listoffiles);
2860 TObjArray *listoffilestmp = new TObjArray();
2861 listoffilestmp->SetOwner();
2864 // Keep only the files at upper level
2865 Int_t countChar = 0;
2866 while ((nextfile=next0())) {
2867 snextfile = nextfile->GetName();
2868 Int_t crtCount = snextfile.CountChar('/');
2869 if (nextfile == listoffiles->First()) countChar = crtCount;
2870 if (crtCount < countChar) countChar = crtCount;
2873 while ((nextfile=next0())) {
2874 snextfile = nextfile->GetName();
2875 Int_t crtCount = snextfile.CountChar('/');
2876 if (crtCount > countChar) {
2880 listoffilestmp->Add(nextfile);
2883 listoffiles = listoffilestmp; // Now contains 'good' files
2884 listoffiles->Print();
2885 TIter next(listoffiles);
2886 // Check if there is a merge operation to resume. Works only for stage 0 or 1.
2887 outputChunk = outputFile;
2888 outputChunk.ReplaceAll(".root", "_*.root");
2889 // Check for existent temporary merge files
2890 // Check overwrite mode and remove previous partial results if needed
2891 // Preserve old merging functionality for stage 0.
2893 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
2895 // Skip as many input files as in a chunk
2896 for (Int_t counter=0; counter<nmaxmerge; counter++) {
2899 ::Error("MergeOutput", "Mismatch found. Please remove partial merged files from local dir.");
2903 snextfile = nextfile->GetName();
2905 outputChunk = outputFile;
2906 outputChunk.ReplaceAll(".root", Form("_%04d.root", countChunk));
2908 if (gSystem->AccessPathName(outputChunk)) continue;
2909 // Merged file with chunks up to <countChunk> found
2910 ::Info("MergeOutput", "Resume merging of <%s> from <%s>\n", outputFile.Data(), outputChunk.Data());
2911 previousChunk = outputChunk;
2915 countZero = nmaxmerge;
2917 while ((nextfile=next())) {
2918 snextfile = nextfile->GetName();
2919 // Loop 'find' results and get next LFN
2920 if (countZero == nmaxmerge) {
2921 // First file in chunk - create file merger and add previous chunk if any.
2922 fm = new TFileMerger(isGrid);
2923 fm->SetFastMethod(kTRUE);
2924 if (previousChunk.Length()) fm->AddFile(previousChunk.Data());
2925 outputChunk = outputFile;
2926 outputChunk.ReplaceAll(".root", Form("_%04d.root", countChunk));
2928 // If last file found, put merged results in the output file
2929 if (nextfile == listoffiles->Last()) outputChunk = outputFile;
2930 // Add file to be merged and decrement chunk counter.
2931 fm->AddFile(snextfile);
2933 if (countZero==0 || nextfile == listoffiles->Last()) {
2934 if (!fm->GetMergeList() || !fm->GetMergeList()->GetSize()) {
2935 // Nothing found - skip this output
2936 ::Warning("MergeOutput", "No <%s> files found.", inputFile.Data());
2940 fm->OutputFile(outputChunk);
2941 // Merge the outputs, then go to next chunk
2943 ::Error("MergeOutput", "Could not merge all <%s> files", outputFile.Data());
2947 ::Info("MergeOutputs", "\n##### Merged %d output files to <%s>", fm->GetMergeList()->GetSize(), outputChunk.Data());
2948 gSystem->Unlink(previousChunk);
2950 if (nextfile == listoffiles->Last()) break;
2952 countZero = nmaxmerge;
2953 previousChunk = outputChunk;
2960 // Merging stage different than 0.
2961 // Move to the begining of the requested chunk.
2962 fm = new TFileMerger(isGrid);
2963 fm->SetFastMethod(kTRUE);
2964 while ((nextfile=next())) fm->AddFile(nextfile->GetName());
2966 if (!fm->GetMergeList() || !fm->GetMergeList()->GetSize()) {
2967 // Nothing found - skip this output
2968 ::Warning("MergeOutput", "No <%s> files found.", inputFile.Data());
2972 fm->OutputFile(outputFile);
2973 // Merge the outputs
2975 ::Error("MergeOutput", "Could not merge all <%s> files", outputFile.Data());
2979 ::Info("MergeOutput", "\n##### Merged %d output files to <%s>", fm->GetMergeList()->GetSize(), outputFile.Data());
2985 //______________________________________________________________________________
2986 Bool_t AliAnalysisAlien::MergeOutputs()
2988 // Merge analysis outputs existing in the AliEn space.
2989 if (TestBit(AliAnalysisGrid::kTest)) return kTRUE;
2990 if (TestBit(AliAnalysisGrid::kOffline)) return kFALSE;
2992 Error("MergeOutputs", "Cannot merge outputs without grid connection. Terminate will NOT be executed");
2996 if (!TestBit(AliAnalysisGrid::kMerge)) {
2997 Info("MergeOutputs", "### Re-run with <MergeViaJDL> option in terminate mode of the plugin to submit merging jobs ###");
3000 if (fProductionMode) {
3001 Info("MergeOutputs", "### Merging will be submitted by LPM manager... ###");
3004 Info("MergeOutputs", "Submitting merging JDL");
3005 if (!SubmitMerging()) return kFALSE;
3006 Info("MergeOutputs", "### Re-run with <MergeViaJDL> off to collect results after merging jobs are done ###");
3007 Info("MergeOutputs", "### The Terminate() method is executed by the merging jobs");
3010 // Get the output path
3011 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
3012 if (!DirectoryExists(fGridOutputDir)) {
3013 Error("MergeOutputs", "Grid output directory %s not found. Terminate() will NOT be executed", fGridOutputDir.Data());
3016 if (!fOutputFiles.Length()) {
3017 Error("MergeOutputs", "No output file names defined. Are you running the right AliAnalysisAlien configuration ?");
3020 // Check if fast read option was requested
3021 Info("MergeOutputs", "Started local merging of output files from: alien://%s \
3022 \n======= overwrite mode = %d", fGridOutputDir.Data(), (Int_t)fOverwriteMode);
3023 if (fFastReadOption) {
3024 Warning("MergeOutputs", "You requested FastRead option. Using xrootd flags to reduce timeouts. This may skip some files that could be accessed ! \
3025 \n+++ NOTE: To disable this option, use: plugin->SetFastReadOption(kFALSE)");
3026 gEnv->SetValue("XNet.ConnectTimeout",50);
3027 gEnv->SetValue("XNet.RequestTimeout",50);
3028 gEnv->SetValue("XNet.MaxRedirectCount",2);
3029 gEnv->SetValue("XNet.ReconnectTimeout",50);
3030 gEnv->SetValue("XNet.FirstConnectMaxCnt",1);
3032 // Make sure we change the temporary directory
3033 gSystem->Setenv("TMPDIR", gSystem->pwd());
3034 // Set temporary compilation directory to current one
3035 gSystem->SetBuildDir(gSystem->pwd(), kTRUE);
3036 TObjArray *list = fOutputFiles.Tokenize(",");
3040 Bool_t merged = kTRUE;
3041 while((str=(TObjString*)next())) {
3042 outputFile = str->GetString();
3043 Int_t index = outputFile.Index("@");
3044 if (index > 0) outputFile.Remove(index);
3045 TString outputChunk = outputFile;
3046 outputChunk.ReplaceAll(".root", "_*.root");
3047 // Skip already merged outputs
3048 if (!gSystem->AccessPathName(outputFile)) {
3049 if (fOverwriteMode) {
3050 Info("MergeOutputs", "Overwrite mode. Existing file %s was deleted.", outputFile.Data());
3051 gSystem->Unlink(outputFile);
3052 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
3053 Info("MergeOutput", "Overwrite mode: partial merged files %s will removed",
3054 outputChunk.Data());
3055 gSystem->Exec(Form("rm -f %s", outputChunk.Data()));
3058 Info("MergeOutputs", "Output file <%s> found. Not merging again.", outputFile.Data());
3062 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
3063 Info("MergeOutput", "Overwrite mode: partial merged files %s will removed",
3064 outputChunk.Data());
3065 gSystem->Exec(Form("rm -f %s", outputChunk.Data()));
3068 if (fMergeExcludes.Contains(outputFile.Data()) ||
3069 fRegisterExcludes.Contains(outputFile.Data())) continue;
3070 // Perform a 'find' command in the output directory, looking for registered outputs
3071 merged = MergeOutput(outputFile, fGridOutputDir, fMaxMergeFiles);
3073 Error("MergeOutputs", "Terminate() will NOT be executed");
3077 TFile *fileOpened = (TFile*)gROOT->GetListOfFiles()->FindObject(outputFile);
3078 if (fileOpened) fileOpened->Close();
3084 //______________________________________________________________________________
3085 void AliAnalysisAlien::SetDefaultOutputs(Bool_t flag)
3087 // Use the output files connected to output containers from the analysis manager
3088 // rather than the files defined by SetOutputFiles
3089 if (flag && !TObject::TestBit(AliAnalysisGrid::kDefaultOutputs))
3090 Info("SetDefaultOutputs", "Plugin will use the output files taken from analysis manager");
3091 TObject::SetBit(AliAnalysisGrid::kDefaultOutputs, flag);
3094 //______________________________________________________________________________
3095 void AliAnalysisAlien::SetOutputFiles(const char *list)
3097 // Manually set the output files list.
3098 // Removes duplicates. Not allowed if default outputs are not disabled.
3099 if (TObject::TestBit(AliAnalysisGrid::kDefaultOutputs)) {
3100 Fatal("SetOutputFiles", "You have to explicitly call SetDefaultOutputs(kFALSE) to manually set output files.");
3103 Info("SetOutputFiles", "Output file list is set manually - you are on your own.");
3105 TString slist = list;
3106 if (slist.Contains("@")) Warning("SetOutputFiles","The plugin does not allow explicit SE's. Please use: SetNumberOfReplicas() instead.");
3107 TObjArray *arr = slist.Tokenize(" ");
3111 while ((os=(TObjString*)next())) {
3112 sout = os->GetString();
3113 if (sout.Index("@")>0) sout.Remove(sout.Index("@"));
3114 if (fOutputFiles.Contains(sout)) continue;
3115 if (!fOutputFiles.IsNull()) fOutputFiles += ",";
3116 fOutputFiles += sout;
3121 //______________________________________________________________________________
3122 void AliAnalysisAlien::SetOutputArchive(const char *list)
3124 // Manually set the output archive list. Free text - you are on your own...
3125 // Not allowed if default outputs are not disabled.
3126 if (TObject::TestBit(AliAnalysisGrid::kDefaultOutputs)) {
3127 Fatal("SetOutputArchive", "You have to explicitly call SetDefaultOutputs(kFALSE) to manually set the output archives.");
3130 Info("SetOutputArchive", "Output archive is set manually - you are on your own.");
3131 fOutputArchive = list;
3134 //______________________________________________________________________________
3135 void AliAnalysisAlien::SetPreferedSE(const char */*se*/)
3137 // Setting a prefered output SE is not allowed anymore.
3138 Warning("SetPreferedSE", "Setting a preferential SE is not allowed anymore via the plugin. Use SetNumberOfReplicas() and SetDefaultOutputs()");
3141 //______________________________________________________________________________
3142 void AliAnalysisAlien::SetProofParameter(const char *pname, const char *value)
3144 // Set some PROOF special parameter.
3145 TPair *pair = dynamic_cast<TPair*>(fProofParam.FindObject(pname));
3147 TObject *old = pair->Key();
3148 TObject *val = pair->Value();
3149 fProofParam.Remove(old);
3153 fProofParam.Add(new TObjString(pname), new TObjString(value));
3156 //______________________________________________________________________________
3157 const char *AliAnalysisAlien::GetProofParameter(const char *pname) const
3159 // Returns a special PROOF parameter.
3160 TPair *pair = dynamic_cast<TPair*>(fProofParam.FindObject(pname));
3161 if (!pair) return 0;
3162 return pair->Value()->GetName();
3165 //______________________________________________________________________________
3166 Bool_t AliAnalysisAlien::StartAnalysis(Long64_t /*nentries*/, Long64_t /*firstEntry*/)
3168 // Start remote grid analysis.
3169 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
3170 Bool_t testMode = TestBit(AliAnalysisGrid::kTest);
3171 if (!mgr || !mgr->IsInitialized()) {
3172 Error("StartAnalysis", "You need an initialized analysis manager for this");
3175 // Are we in PROOF mode ?
3176 if (mgr->IsProofMode()) {
3177 if (testMode) Info("StartAnalysis", "##### Starting PROOF analysis with Proof Lite via the plugin #####");
3178 else Info("StartAnalysis", "##### Starting PROOF analysis on cluster <%s> via the plugin #####", fProofCluster.Data());
3179 if (fProofCluster.IsNull()) {
3180 Error("StartAnalysis", "You need to specify the proof cluster name via SetProofCluster");
3183 if (fProofDataSet.IsNull() && !testMode) {
3184 Error("StartAnalysis", "You need to specify a dataset using SetProofDataSet()");
3187 // Set the needed environment
3188 gEnv->SetValue("XSec.GSI.DelegProxy","2");
3189 // Do we need to reset PROOF ? The success of the Reset operation cannot be checked
3190 if (fProofReset && !testMode) {
3191 if (fProofReset==1) {
3192 Info("StartAnalysis", "Sending soft reset signal to proof cluster %s", fProofCluster.Data());
3193 gROOT->ProcessLine(Form("TProof::Reset(\"%s\", kFALSE);", fProofCluster.Data()));
3195 Info("StartAnalysis", "Sending hard reset signal to proof cluster %s", fProofCluster.Data());
3196 gROOT->ProcessLine(Form("TProof::Reset(\"%s\", kTRUE);", fProofCluster.Data()));
3198 Info("StartAnalysis", "Stopping the analysis. Please use SetProofReset(0) to resume.");
3203 // Check if there is an old active session
3204 Long_t nsessions = gROOT->ProcessLine(Form("TProof::Mgr(\"%s\")->QuerySessions(\"\")->GetEntries();", fProofCluster.Data()));
3206 Error("StartAnalysis","You have to reset your old session first\n");
3210 // Do we need to change the ROOT version ? The success of this cannot be checked.
3211 if (!fROOTVersion.IsNull() && !testMode) {
3212 gROOT->ProcessLine(Form("TProof::Mgr(\"%s\")->SetROOTVersion(\"VO_ALICE@ROOT::%s\");",
3213 fProofCluster.Data(), fROOTVersion.Data()));
3215 // Connect to PROOF and check the status
3218 if (fNproofWorkersPerSlave) sworkers = Form("workers=%dx", fNproofWorkersPerSlave);
3219 else if (fNproofWorkers) sworkers = Form("workers=%d", fNproofWorkers);
3221 if (!sworkers.IsNull())
3222 proof = gROOT->ProcessLine(Form("TProof::Open(\"%s\", \"%s\");", fProofCluster.Data(), sworkers.Data()));
3224 proof = gROOT->ProcessLine(Form("TProof::Open(\"%s\");", fProofCluster.Data()));
3226 proof = gROOT->ProcessLine("TProof::Open(\"\");");
3228 Error("StartAnalysis", "Could not start PROOF in test mode");
3233 Error("StartAnalysis", "Could not connect to PROOF cluster <%s>", fProofCluster.Data());
3236 if (fNproofWorkersPerSlave*fNproofWorkers > 0)
3237 gROOT->ProcessLine(Form("gProof->SetParallel(%d);", fNproofWorkers));
3238 // Set proof special parameters if any
3239 TIter nextpp(&fProofParam);
3240 TObject *proofparam;
3241 while ((proofparam=nextpp())) {
3242 TString svalue = GetProofParameter(proofparam->GetName());
3243 gROOT->ProcessLine(Form("gProof->SetParameter(\"%s\",%s);", proofparam->GetName(), svalue.Data()));
3245 // Is dataset existing ?
3247 TString dataset = fProofDataSet;
3248 Int_t index = dataset.Index("#");
3249 if (index>=0) dataset.Remove(index);
3250 // if (!gROOT->ProcessLine(Form("gProof->ExistsDataSet(\"%s\");",fProofDataSet.Data()))) {
3251 // Error("StartAnalysis", "Dataset %s not existing", fProofDataSet.Data());
3254 // Info("StartAnalysis", "Dataset %s found", dataset.Data());
3256 // Is ClearPackages() needed ?
3257 if (TestSpecialBit(kClearPackages)) {
3258 Info("StartAnalysis", "ClearPackages signal sent to PROOF. Use SetClearPackages(kFALSE) to reset this.");
3259 gROOT->ProcessLine("gProof->ClearPackages();");
3261 // Is a given aliroot mode requested ?
3264 if (!fAliRootMode.IsNull()) {
3265 TString alirootMode = fAliRootMode;
3266 if (alirootMode == "default") alirootMode = "";
3267 Info("StartAnalysis", "You are requesting AliRoot mode: %s", fAliRootMode.Data());
3268 optionsList.SetOwner();
3269 optionsList.Add(new TNamed("ALIROOT_MODE", alirootMode.Data()));
3270 // Check the additional libs to be loaded
3272 Bool_t parMode = kFALSE;
3273 if (!alirootMode.IsNull()) extraLibs = "ANALYSIS:OADB:ANALYSISalice";
3274 // Parse the extra libs for .so
3275 if (fAdditionalLibs.Length()) {
3276 TString additionalLibs = fAdditionalLibs;
3277 additionalLibs.Strip();
3278 if (additionalLibs.Length() && fFriendLibs.Length())
3279 additionalLibs += " ";
3280 additionalLibs += fFriendLibs;
3281 TObjArray *list = additionalLibs.Tokenize(" ");
3284 while((str=(TObjString*)next())) {
3285 if (str->GetString().Contains(".so")) {
3287 Warning("StartAnalysis", "Plugin does not support loading libs after par files in PROOF mode. Library %s and following will not load on workers", str->GetName());
3290 TString stmp = str->GetName();
3291 if (stmp.BeginsWith("lib")) stmp.Remove(0,3);
3292 stmp.ReplaceAll(".so","");
3293 if (!extraLibs.IsNull()) extraLibs += ":";
3297 if (str->GetString().Contains(".par")) {
3298 // The first par file found in the list will not allow any further .so
3300 if (!parLibs.IsNull()) parLibs += ":";
3301 parLibs += str->GetName();
3305 if (list) delete list;
3307 if (!extraLibs.IsNull()) {
3308 Info("StartAnalysis", "Adding extra libs: %s",extraLibs.Data());
3309 optionsList.Add(new TNamed("ALIROOT_EXTRA_LIBS",extraLibs.Data()));
3311 // Check extra includes
3312 if (!fIncludePath.IsNull()) {
3313 TString includePath = fIncludePath;
3314 includePath.ReplaceAll(" ",":");
3315 includePath.ReplaceAll("$ALICE_ROOT/","");
3316 includePath.ReplaceAll("${ALICE_ROOT}/","");
3317 includePath.ReplaceAll("-I","");
3318 includePath.Remove(TString::kTrailing, ':');
3319 Info("StartAnalysis", "Adding extra includes: %s",includePath.Data());
3320 optionsList.Add(new TNamed("ALIROOT_EXTRA_INCLUDES",includePath.Data()));
3322 // Check if connection to grid is requested
3323 if (TestSpecialBit(kProofConnectGrid))
3324 optionsList.Add(new TNamed("ALIROOT_ENABLE_ALIEN", "1"));
3325 // Enable AliRoot par
3327 // Enable proof lite package
3328 TString alirootLite = gSystem->ExpandPathName("$ALICE_ROOT/ANALYSIS/macros/AliRootProofLite.par");
3329 for (Int_t i=0; i<optionsList.GetSize(); i++) {
3330 TNamed *obj = (TNamed*)optionsList.At(i);
3331 printf("%s %s\n", obj->GetName(), obj->GetTitle());
3333 if (!gROOT->ProcessLine(Form("gProof->UploadPackage(\"%s\");",alirootLite.Data()))
3334 && !gROOT->ProcessLine(Form("gProof->EnablePackage(\"%s\", (TList*)%p);",alirootLite.Data(),&optionsList))) {
3335 Info("StartAnalysis", "AliRootProofLite enabled");
3337 Error("StartAnalysis", "There was an error trying to enable package AliRootProofLite.par");
3341 if ( ! fAliROOTVersion.IsNull() ) {
3342 if (gROOT->ProcessLine(Form("gProof->EnablePackage(\"VO_ALICE@AliRoot::%s\", (TList*)%p, kTRUE);",
3343 fAliROOTVersion.Data(), &optionsList))) {
3344 Error("StartAnalysis", "There was an error trying to enable package VO_ALICE@AliRoot::%s", fAliROOTVersion.Data());
3349 // Enable first par files from fAdditionalLibs
3350 if (!parLibs.IsNull()) {
3351 TObjArray *list = parLibs.Tokenize(":");
3353 TObjString *package;
3354 while((package=(TObjString*)next())) {
3355 TString spkg = package->GetName();
3356 spkg.ReplaceAll(".par", "");
3357 gSystem->Exec(TString::Format("rm -rf %s", spkg.Data()));
3358 if (!gROOT->ProcessLine(Form("gProof->UploadPackage(\"%s\");", package->GetName()))) {
3359 TString enablePackage = (testMode)?Form("gProof->EnablePackage(\"%s\",kFALSE);", package->GetName()):Form("gProof->EnablePackage(\"%s\",kTRUE);", package->GetName());
3360 if (gROOT->ProcessLine(enablePackage)) {
3361 Error("StartAnalysis", "There was an error trying to enable package %s", package->GetName());
3365 Error("StartAnalysis", "There was an error trying to upload package %s", package->GetName());
3369 if (list) delete list;
3372 if (fAdditionalLibs.Contains(".so") && !testMode) {
3373 Error("StartAnalysis", "You request additional libs to be loaded but did not enabled any AliRoot mode. Please refer to: \
3374 \n http://aaf.cern.ch/node/83 and use a parameter for SetAliRootMode()");
3378 // Enable par files if requested
3379 if (fPackages && fPackages->GetEntries()) {
3380 TIter next(fPackages);
3382 while ((package=next())) {
3383 // Skip packages already enabled
3384 if (parLibs.Contains(package->GetName())) continue;
3385 TString spkg = package->GetName();
3386 spkg.ReplaceAll(".par", "");
3387 gSystem->Exec(TString::Format("rm -rf %s", spkg.Data()));
3388 if (!gROOT->ProcessLine(Form("gProof->UploadPackage(\"%s\");", package->GetName()))) {
3389 if (gROOT->ProcessLine(Form("gProof->EnablePackage(\"%s\",kTRUE);", package->GetName()))) {
3390 Error("StartAnalysis", "There was an error trying to enable package %s", package->GetName());
3394 Error("StartAnalysis", "There was an error trying to upload package %s", package->GetName());
3399 // Do we need to load analysis source files ?
3400 // NOTE: don't load on client since this is anyway done by the user to attach his task.
3401 if (fAnalysisSource.Length()) {
3402 TObjArray *list = fAnalysisSource.Tokenize(" ");
3405 while((str=(TObjString*)next())) {
3406 gROOT->ProcessLine(Form("gProof->Load(\"%s+g\", kTRUE);", str->GetName()));
3408 if (list) delete list;
3411 // Register dataset to proof lite.
3412 if (fFileForTestMode.IsNull()) {
3413 Error("GetChainForTestMode", "For proof test mode please use SetFileForTestMode() pointing to a file that contains data file locations.");
3416 if (gSystem->AccessPathName(fFileForTestMode)) {
3417 Error("GetChainForTestMode", "File not found: %s", fFileForTestMode.Data());
3420 TFileCollection *coll = new TFileCollection();
3421 coll->AddFromFile(fFileForTestMode);
3422 gROOT->ProcessLine(Form("gProof->RegisterDataSet(\"test_collection\", (TFileCollection*)%p, \"OV\");", coll));
3423 gROOT->ProcessLine("gProof->ShowDataSets()");
3428 // Check if output files have to be taken from the analysis manager
3429 if (TestBit(AliAnalysisGrid::kDefaultOutputs)) {
3430 // Add output files and AOD files
3431 fOutputFiles = GetListOfFiles("outaod");
3432 // Add extra files registered to the analysis manager
3433 TString extra = GetListOfFiles("ext");
3434 if (!extra.IsNull()) {
3435 extra.ReplaceAll(".root", "*.root");
3436 if (!fOutputFiles.IsNull()) fOutputFiles += ",";
3437 fOutputFiles += extra;
3439 // Compose the output archive.
3440 fOutputArchive = "log_archive.zip:std*@disk=1 ";
3441 if (mgr->IsCollectThroughput())
3442 fOutputArchive += Form("root_archive.zip:%s,*.stat@disk=%d %s@disk=%d",fOutputFiles.Data(),fNreplicas, mgr->GetFileInfoLog(),fNreplicas);
3444 fOutputArchive += Form("root_archive.zip:%s,*.stat@disk=%d",fOutputFiles.Data(),fNreplicas);
3446 // if (!fCloseSE.Length()) fCloseSE = gSystem->Getenv("alien_CLOSE_SE");
3447 if (TestBit(AliAnalysisGrid::kOffline)) {
3448 Info("StartAnalysis","\n##### OFFLINE MODE ##### Files to be used in GRID are produced but not copied \
3449 \n there nor any job run. You can revise the JDL and analysis \
3450 \n macro then run the same in \"submit\" mode.");
3451 } else if (TestBit(AliAnalysisGrid::kTest)) {
3452 Info("StartAnalysis","\n##### LOCAL MODE ##### Your analysis will be run locally on a subset of the requested \
3454 } else if (TestBit(AliAnalysisGrid::kSubmit)) {
3455 Info("StartAnalysis","\n##### SUBMIT MODE ##### Files required by your analysis are copied to your grid working \
3456 \n space and job submitted.");
3457 } else if (TestBit(AliAnalysisGrid::kMerge)) {
3458 Info("StartAnalysis","\n##### MERGE MODE ##### The registered outputs of the analysis will be merged");
3459 if (fMergeViaJDL) CheckInputData();
3462 Info("StartAnalysis","\n##### FULL ANALYSIS MODE ##### Producing needed files and submitting your analysis job...");
3467 Error("StartAnalysis", "Cannot start grid analysis without grid connection");
3470 if (IsCheckCopy() && gGrid) CheckFileCopy(gGrid->GetHomeDirectory());
3471 if (!CheckInputData()) {
3472 Error("StartAnalysis", "There was an error in preprocessing your requested input data");
3475 if (!CreateDataset(fDataPattern)) {
3477 if (!fRunNumbers.Length() && !fRunRange[0]) serror = Form("path to data directory: <%s>", fGridDataDir.Data());
3478 if (fRunNumbers.Length()) serror = "run numbers";
3479 if (fRunRange[0]) serror = Form("run range [%d, %d]", fRunRange[0], fRunRange[1]);
3480 serror += Form("\n or data pattern <%s>", fDataPattern.Data());
3481 Error("StartAnalysis", "No data to process. Please fix %s in your plugin configuration.", serror.Data());
3484 WriteAnalysisFile();
3485 WriteAnalysisMacro();
3487 WriteValidationScript();
3489 WriteMergingMacro();
3490 WriteMergeExecutable();
3491 WriteValidationScript(kTRUE);
3493 if (!CreateJDL()) return kFALSE;
3494 if (TestBit(AliAnalysisGrid::kOffline)) return kFALSE;
3496 // Locally testing the analysis
3497 Info("StartAnalysis", "\n_______________________________________________________________________ \
3498 \n Running analysis script in a daughter shell as on a worker node \
3499 \n_______________________________________________________________________");
3500 TObjArray *list = fOutputFiles.Tokenize(",");
3504 while((str=(TObjString*)next())) {
3505 outputFile = str->GetString();
3506 Int_t index = outputFile.Index("@");
3507 if (index > 0) outputFile.Remove(index);
3508 if (!gSystem->AccessPathName(outputFile)) gSystem->Exec(Form("rm %s", outputFile.Data()));
3511 gSystem->Exec(Form("bash %s 2>stderr", fExecutable.Data()));
3512 gSystem->Exec(Form("bash %s",fValidationScript.Data()));
3513 // gSystem->Exec("cat stdout");
3516 // Check if submitting is managed by LPM manager
3517 if (fProductionMode) {
3518 //TString prodfile = fJDLName;
3519 //prodfile.ReplaceAll(".jdl", ".prod");
3520 //WriteProductionFile(prodfile);
3521 Info("StartAnalysis", "Job submitting is managed by LPM. Rerun in terminate mode after jobs finished.");
3524 // Submit AliEn job(s)
3525 gGrid->Cd(fGridOutputDir);
3530 if (!fRunNumbers.Length() && !fRunRange[0]) {
3531 // Submit a given xml or a set of runs
3532 res = gGrid->Command(Form("submit %s", fJDLName.Data()));
3533 printf("*************************** %s\n",Form("submit %s", fJDLName.Data()));
3535 const char *cjobId = res->GetKey(0,"jobId");
3539 Error("StartAnalysis", "Your JDL %s could not be submitted", fJDLName.Data());
3542 Info("StartAnalysis", "\n_______________________________________________________________________ \
3543 \n##### Your JDL %s was successfully submitted. \nTHE JOB ID IS: %s \
3544 \n_______________________________________________________________________",
3545 fJDLName.Data(), cjobId);
3548 if (!fGridJobIDs.IsNull()) fGridJobIDs.Append(" ");
3549 fGridJobIDs.Append(jobID);
3550 if (!fGridStages.IsNull()) fGridStages.Append(" ");
3551 fGridStages.Append("full");
3556 Error("StartAnalysis", "No grid result after submission !!! Bailing out...");
3560 // Submit for a range of enumeration of runs.
3561 if (!Submit()) return kFALSE;
3562 jobID = fGridJobIDs;
3566 Info("StartAnalysis", "\n#### STARTING AN ALIEN SHELL FOR YOU. EXIT WHEN YOUR JOB %s HAS FINISHED. #### \
3567 \n You may exit at any time and terminate the job later using the option <terminate> \
3568 \n ##################################################################################", jobID.Data());
3569 gSystem->Exec("aliensh");
3571 Info("StartAnalysis", "\n#### SUBMITTED JOB %s TO ALIEN QUEUE #### \
3572 \n Remember to terminate the job later using the option <terminate> \
3573 \n ##################################################################################", jobID.Data());
3578 //______________________________________________________________________________
3579 const char *AliAnalysisAlien::GetListOfFiles(const char *type)
3581 // Get a comma-separated list of output files of the requested type.
3582 // Type can be (case unsensitive):
3583 // aod - list of aod files (std, extensions and filters)
3584 // out - list of output files connected to containers (but not aod's or extras)
3585 // ext - list of extra files registered to the manager
3586 // ter - list of files produced in terminate
3587 static TString files;
3589 TString stype = type;
3591 TString aodfiles, extra;
3592 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
3594 ::Error("GetListOfFiles", "Cannot call this without analysis manager");
3595 return files.Data();
3597 if (mgr->GetOutputEventHandler()) {
3598 aodfiles = mgr->GetOutputEventHandler()->GetOutputFileName();
3599 TString extraaod = mgr->GetOutputEventHandler()->GetExtraOutputs();
3600 if (!extraaod.IsNull()) {
3602 aodfiles += extraaod;
3605 if (stype.Contains("aod")) {
3607 if (stype == "aod") return files.Data();
3609 // Add output files that are not in the list of AOD files
3610 TString outputfiles = "";
3611 TIter next(mgr->GetOutputs());
3612 AliAnalysisDataContainer *output;
3613 const char *filename = 0;
3614 while ((output=(AliAnalysisDataContainer*)next())) {
3615 filename = output->GetFileName();
3616 if (!(strcmp(filename, "default"))) continue;
3617 if (outputfiles.Contains(filename)) continue;
3618 if (aodfiles.Contains(filename)) continue;
3619 if (!outputfiles.IsNull()) outputfiles += ",";
3620 outputfiles += filename;
3622 if (stype.Contains("out")) {
3623 if (!files.IsNull()) files += ",";
3624 files += outputfiles;
3625 if (stype == "out") return files.Data();
3627 // Add extra files registered to the analysis manager
3629 extra = mgr->GetExtraFiles();
3630 if (!extra.IsNull()) {
3632 extra.ReplaceAll(" ", ",");
3633 TObjArray *fextra = extra.Tokenize(",");
3634 TIter nextx(fextra);
3636 while ((obj=nextx())) {
3637 if (aodfiles.Contains(obj->GetName())) continue;
3638 if (outputfiles.Contains(obj->GetName())) continue;
3639 if (sextra.Contains(obj->GetName())) continue;
3640 if (!sextra.IsNull()) sextra += ",";
3641 sextra += obj->GetName();
3644 if (stype.Contains("ext")) {
3645 if (!files.IsNull()) files += ",";
3649 if (stype == "ext") return files.Data();
3651 if (!fTerminateFiles.IsNull()) {
3652 fTerminateFiles.Strip();
3653 fTerminateFiles.ReplaceAll(" ",",");
3654 TObjArray *fextra = fTerminateFiles.Tokenize(",");
3655 TIter nextx(fextra);
3657 while ((obj=nextx())) {
3658 if (aodfiles.Contains(obj->GetName())) continue;
3659 if (outputfiles.Contains(obj->GetName())) continue;
3660 if (termfiles.Contains(obj->GetName())) continue;
3661 if (sextra.Contains(obj->GetName())) continue;
3662 if (!termfiles.IsNull()) termfiles += ",";
3663 termfiles += obj->GetName();
3667 if (stype.Contains("ter")) {
3668 if (!files.IsNull() && !termfiles.IsNull()) {
3673 return files.Data();
3676 //______________________________________________________________________________
3677 Bool_t AliAnalysisAlien::Submit()
3679 // Submit all master jobs.
3680 Int_t nmasterjobs = fInputFiles->GetEntries();
3681 Long_t tshoot = gSystem->Now();
3682 if (!fNsubmitted && !SubmitNext()) return kFALSE;
3683 while (fNsubmitted < nmasterjobs) {
3684 Long_t now = gSystem->Now();
3685 if ((now-tshoot)>30000) {
3687 if (!SubmitNext()) return kFALSE;
3693 //______________________________________________________________________________
3694 Bool_t AliAnalysisAlien::SubmitMerging()
3696 // Submit all merging jobs.
3697 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
3698 gGrid->Cd(fGridOutputDir);
3699 TString mergeJDLName = fExecutable;
3700 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
3702 Error("SubmitMerging", "You have to use explicit run numbers or run range to merge via JDL!");
3705 Int_t ntosubmit = fInputFiles->GetEntries();
3706 for (Int_t i=0; i<ntosubmit; i++) {
3707 TString runOutDir = gSystem->BaseName(fInputFiles->At(i)->GetName());
3708 runOutDir.ReplaceAll(".xml", "");
3709 if (fOutputToRunNo) {
3710 // The output directory is the run number
3711 printf("### Submitting merging job for run <%s>\n", runOutDir.Data());
3712 runOutDir = Form("%s/%s", fGridOutputDir.Data(), runOutDir.Data());
3714 if (!fRunNumbers.Length() && !fRunRange[0]) {
3715 // The output directory is the grid outdir
3716 printf("### Submitting merging job for the full output directory %s.\n", fGridOutputDir.Data());
3717 runOutDir = fGridOutputDir;
3719 // The output directory is the master number in 3 digits format
3720 printf("### Submitting merging job for master <%03d>\n", i);
3721 runOutDir = Form("%s/%03d",fGridOutputDir.Data(), i);
3724 // Check now the number of merging stages.
3725 TObjArray *list = fOutputFiles.Tokenize(",");
3729 while((str=(TObjString*)next())) {
3730 outputFile = str->GetString();
3731 Int_t index = outputFile.Index("@");
3732 if (index > 0) outputFile.Remove(index);
3733 if (!fMergeExcludes.Contains(outputFile) &&
3734 !fRegisterExcludes.Contains(outputFile)) break;
3737 Bool_t done = CheckMergedFiles(outputFile, runOutDir, fMaxMergeFiles, mergeJDLName);
3738 if (!done && (i==ntosubmit-1)) return kFALSE;
3739 if (!fRunNumbers.Length() && !fRunRange[0]) break;
3741 if (!ntosubmit) return kTRUE;
3743 Info("StartAnalysis", "\n #### STARTING AN ALIEN SHELL FOR YOU. You can exit any time or inspect your jobs in a different shell.##########\
3744 \n Make sure your jobs are in a final state (you can resubmit failed ones via 'masterjob <id> resubmit ERROR_ALL')\
3745 \n Rerun in 'terminate' mode to submit all merging stages, each AFTER the previous one completed. The final merged \
3746 \n output will be written to your alien output directory, while separate stages in <Stage_n>. \
3747 \n ################################################################################################################");
3748 gSystem->Exec("aliensh");
3750 Info("StartAnalysis", "\n #### STARTED MERGING JOBS FOR YOU #### \
3751 \n Make sure your jobs are in a final state (you can resubmit failed ones via 'masterjob <id> resubmit ERROR_ALL') \
3752 \n Rerun in 'terminate' mode to submit all merging stages, each AFTER the previous one completed. The final merged \
3753 \n output will be written to your alien output directory, while separate stages in <Stage_n>. \
3754 \n ################################################################################################################");
3759 //______________________________________________________________________________
3760 Bool_t AliAnalysisAlien::SubmitNext()
3762 // Submit next bunch of master jobs if the queue is free. The first master job is
3763 // submitted right away, while the next will not be unless the previous was split.
3764 // The plugin will not submit new master jobs if there are more that 500 jobs in
3766 static Bool_t iscalled = kFALSE;
3767 static Int_t firstmaster = 0;
3768 static Int_t lastmaster = 0;
3769 static Int_t npermaster = 0;
3770 if (iscalled) return kTRUE;
3772 Int_t nrunning=0, nwaiting=0, nerror=0, ndone=0;
3773 Int_t ntosubmit = 0;
3776 Int_t nmasterjobs = fInputFiles->GetEntries();
3779 if (!IsUseSubmitPolicy()) {
3781 Info("SubmitNext","### Warning submit policy not used ! Submitting too many jobs at a time may be prohibitted. \
3782 \n### You can use SetUseSubmitPolicy() to enable if you have problems.");
3783 ntosubmit = nmasterjobs;
3786 TString status = GetJobStatus(firstmaster, lastmaster, nrunning, nwaiting, nerror, ndone);
3787 printf("=== master %d: %s\n", lastmaster, status.Data());
3788 // If last master not split, just return
3789 if (status != "SPLIT") {iscalled = kFALSE; return kTRUE;}
3790 // No more than 100 waiting jobs
3791 if (nwaiting>500) {iscalled = kFALSE; return kTRUE;}
3792 npermaster = (nrunning+nwaiting+nerror+ndone)/fNsubmitted;
3793 if (npermaster) ntosubmit = (500-nwaiting)/npermaster;
3794 if (!ntosubmit) ntosubmit = 1;
3795 printf("=== WAITING(%d) RUNNING(%d) DONE(%d) OTHER(%d) NperMaster=%d => to submit %d jobs\n",
3796 nwaiting, nrunning, ndone, nerror, npermaster, ntosubmit);
3798 for (Int_t i=0; i<ntosubmit; i++) {
3799 // Submit for a range of enumeration of runs.
3800 if (fNsubmitted>=nmasterjobs) {iscalled = kFALSE; return kTRUE;}
3802 TString runOutDir = gSystem->BaseName(fInputFiles->At(fNsubmitted)->GetName());
3803 runOutDir.ReplaceAll(".xml", "");
3805 query = Form("submit %s %s %s", fJDLName.Data(), fInputFiles->At(fNsubmitted)->GetName(), runOutDir.Data());
3807 query = Form("submit %s %s %03d", fJDLName.Data(), fInputFiles->At(fNsubmitted)->GetName(), fNsubmitted);
3808 printf("********* %s\n",query.Data());
3809 res = gGrid->Command(query);
3811 TString cjobId1 = res->GetKey(0,"jobId");
3812 if (!cjobId1.Length()) {
3816 Error("StartAnalysis", "Your JDL %s could not be submitted. The message was:", fJDLName.Data());
3819 Info("StartAnalysis", "\n_______________________________________________________________________ \
3820 \n##### Your JDL %s submitted (%d to go). \nTHE JOB ID IS: %s \
3821 \n_______________________________________________________________________",
3822 fJDLName.Data(), nmasterjobs-fNsubmitted-1, cjobId1.Data());
3823 if (!fGridJobIDs.IsNull()) fGridJobIDs.Append(" ");
3824 fGridJobIDs.Append(cjobId1);
3825 if (!fGridStages.IsNull()) fGridStages.Append(" ");
3826 fGridStages.Append("full");
3829 lastmaster = cjobId1.Atoi();
3830 if (!firstmaster) firstmaster = lastmaster;
3835 Error("StartAnalysis", "No grid result after submission !!! Bailing out...");
3843 //______________________________________________________________________________
3844 void AliAnalysisAlien::WriteAnalysisFile()
3846 // Write current analysis manager into the file <analysisFile>
3847 TString analysisFile = fExecutable;
3848 analysisFile.ReplaceAll(".sh", ".root");
3849 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3850 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
3851 if (!mgr || !mgr->IsInitialized()) {
3852 Error("WriteAnalysisFile", "You need an initialized analysis manager for this");
3855 // Check analysis type
3857 if (mgr->GetMCtruthEventHandler()) TObject::SetBit(AliAnalysisGrid::kUseMC);
3858 handler = (TObject*)mgr->GetInputEventHandler();
3860 if (handler->InheritsFrom("AliMultiInputEventHandler")) {
3861 AliMultiInputEventHandler *multiIH = (AliMultiInputEventHandler*)handler;
3862 if (multiIH->GetFirstInputEventHandler()->InheritsFrom("AliESDInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseESD);
3863 if (multiIH->GetFirstInputEventHandler()->InheritsFrom("AliAODInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseAOD);
3865 if (handler->InheritsFrom("AliESDInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseESD);
3866 if (handler->InheritsFrom("AliAODInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseAOD);
3869 TDirectory *cdir = gDirectory;
3870 TFile *file = TFile::Open(analysisFile, "RECREATE");
3872 // Skip task Terminate calls for the grid job (but not in test mode, where we want to check also the terminate mode
3873 if (!TestBit(AliAnalysisGrid::kTest)) mgr->SetSkipTerminate(kTRUE);
3874 // Unless merging makes no sense
3875 if (IsSingleOutput()) mgr->SetSkipTerminate(kFALSE);
3878 // Enable termination for local jobs
3879 mgr->SetSkipTerminate(kFALSE);
3881 if (cdir) cdir->cd();
3882 Info("WriteAnalysisFile", "\n##### Analysis manager: %s wrote to file <%s>\n", mgr->GetName(),analysisFile.Data());
3884 Bool_t copy = kTRUE;
3885 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3888 TString workdir = gGrid->GetHomeDirectory();
3889 workdir += fGridWorkingDir;
3890 Info("WriteAnalysisFile", "\n##### Copying file <%s> containing your initialized analysis manager to your alien workspace", analysisFile.Data());
3891 if (FileExists(analysisFile)) gGrid->Rm(analysisFile);
3892 if (!copyLocal2Alien("WriteAnalysisFile",analysisFile.Data(),
3893 Form("%s/%s", workdir.Data(),analysisFile.Data()))) Fatal("","Terminating");
3897 //______________________________________________________________________________
3898 void AliAnalysisAlien::WriteAnalysisMacro()
3900 // Write the analysis macro that will steer the analysis in grid mode.
3901 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3903 out.open(fAnalysisMacro.Data(), ios::out);
3905 Error("WriteAnalysisMacro", "could not open file %s for writing", fAnalysisMacro.Data());
3908 Bool_t hasSTEERBase = kFALSE;
3909 Bool_t hasESD = kFALSE;
3910 Bool_t hasAOD = kFALSE;
3911 Bool_t hasANALYSIS = kFALSE;
3912 Bool_t hasOADB = kFALSE;
3913 Bool_t hasANALYSISalice = kFALSE;
3914 Bool_t hasCORRFW = kFALSE;
3915 TString func = fAnalysisMacro;
3916 TString type = "ESD";
3917 TString comment = "// Analysis using ";
3918 if (IsUseMCchain()) {
3922 if (TObject::TestBit(AliAnalysisGrid::kUseESD)) comment += "ESD";
3923 if (TObject::TestBit(AliAnalysisGrid::kUseAOD)) {
3928 if (type!="AOD" && fFriendChainName!="") {
3929 Error("WriteAnalysisMacro", "Friend chain can be attached only to AOD");
3932 if (TObject::TestBit(AliAnalysisGrid::kUseMC)) comment += "/MC";
3933 else comment += " data";
3934 out << "const char *anatype = \"" << type.Data() << "\";" << endl << endl;
3935 func.ReplaceAll(".C", "");
3936 out << "void " << func.Data() << "()" << endl;
3938 out << comment.Data() << endl;
3939 out << "// Automatically generated analysis steering macro executed in grid subjobs" << endl << endl;
3940 out << " TStopwatch timer;" << endl;
3941 out << " timer.Start();" << endl << endl;
3942 // Change temp directory to current one
3943 if (!IsLocalTest()) {
3944 out << "// connect to AliEn and make the chain" << endl;
3945 out << " if (!TGrid::Connect(\"alien://\")) return;" << endl;
3947 out << "// Set temporary merging directory to current one" << endl;
3948 out << " gSystem->Setenv(\"TMPDIR\", gSystem->pwd());" << endl << endl;
3949 out << "// Set temporary compilation directory to current one" << endl;
3950 out << " gSystem->SetBuildDir(gSystem->pwd(), kTRUE);" << endl << endl;
3951 // Reset existing include path
3952 out << "// Reset existing include path and add current directory first in the search" << endl;
3953 out << " gSystem->SetIncludePath(\"-I.\");" << endl;
3954 if (!fExecutableCommand.Contains("aliroot")) {
3955 out << "// load base root libraries" << endl;
3956 out << " gSystem->Load(\"libTree\");" << endl;
3957 out << " gSystem->Load(\"libGeom\");" << endl;
3958 out << " gSystem->Load(\"libVMC\");" << endl;
3959 out << " gSystem->Load(\"libPhysics\");" << endl << endl;
3960 out << " gSystem->Load(\"libMinuit\");" << endl << endl;
3962 if (fAdditionalRootLibs.Length()) {
3963 // in principle libtree /lib geom libvmc etc. can go into this list, too
3964 out << "// Add aditional libraries" << endl;
3965 TObjArray *list = fAdditionalRootLibs.Tokenize(" ");
3968 while((str=(TObjString*)next())) {
3969 if (str->GetString().Contains(".so"))
3970 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
3972 if (list) delete list;
3974 out << "// Load analysis framework libraries" << endl;
3975 TString setupPar = "AliAnalysisAlien::SetupPar";
3977 if (!fExecutableCommand.Contains("aliroot")) {
3978 out << " gSystem->Load(\"libSTEERBase\");" << endl;
3979 out << " gSystem->Load(\"libESD\");" << endl;
3980 out << " gSystem->Load(\"libAOD\");" << endl;
3982 out << " gSystem->Load(\"libANALYSIS\");" << endl;
3983 out << " gSystem->Load(\"libOADB\");" << endl;
3984 out << " gSystem->Load(\"libANALYSISalice\");" << endl;
3985 out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
3987 TIter next(fPackages);
3990 while ((obj=next())) {
3991 pkgname = obj->GetName();
3992 if (pkgname == "STEERBase" ||
3993 pkgname == "STEERBase.par") hasSTEERBase = kTRUE;
3994 if (pkgname == "ESD" ||
3995 pkgname == "ESD.par") hasESD = kTRUE;
3996 if (pkgname == "AOD" ||
3997 pkgname == "AOD.par") hasAOD = kTRUE;
3998 if (pkgname == "ANALYSIS" ||
3999 pkgname == "ANALYSIS.par") hasANALYSIS = kTRUE;
4000 if (pkgname == "OADB" ||
4001 pkgname == "OADB.par") hasOADB = kTRUE;
4002 if (pkgname == "ANALYSISalice" ||
4003 pkgname == "ANALYSISalice.par") hasANALYSISalice = kTRUE;
4004 if (pkgname == "CORRFW" ||
4005 pkgname == "CORRFW.par") hasCORRFW = kTRUE;
4007 if (hasANALYSISalice) setupPar = "SetupPar";
4008 if (!hasSTEERBase) out << " gSystem->Load(\"libSTEERBase\");" << endl;
4009 else out << " if (!" << setupPar << "(\"STEERBase\")) return;" << endl;
4010 if (!hasESD) out << " gSystem->Load(\"libESD\");" << endl;
4011 else out << " if (!" << setupPar << "(\"ESD\")) return;" << endl;
4012 if (!hasAOD) out << " gSystem->Load(\"libAOD\");" << endl;
4013 else out << " if (!" << setupPar << "(\"AOD\")) return;" << endl;
4014 if (!hasANALYSIS) out << " gSystem->Load(\"libANALYSIS\");" << endl;
4015 else out << " if (!" << setupPar << "(\"ANALYSIS\")) return;" << endl;
4016 if (!hasOADB) out << " gSystem->Load(\"libOADB\");" << endl;
4017 else out << " if (!" << setupPar << "(\"OADB\")) return;" << endl;
4018 if (!hasANALYSISalice) out << " gSystem->Load(\"libANALYSISalice\");" << endl;
4019 else out << " if (!" << setupPar << "(\"ANALYSISalice\")) return;" << endl;
4020 if (!hasCORRFW) out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
4021 else out << " if (!" << setupPar << "(\"CORRFW\")) return;" << endl << endl;
4022 out << "// Compile other par packages" << endl;
4024 while ((obj=next())) {
4025 pkgname = obj->GetName();
4026 if (pkgname == "STEERBase" ||
4027 pkgname == "STEERBase.par" ||
4029 pkgname == "ESD.par" ||
4031 pkgname == "AOD.par" ||
4032 pkgname == "ANALYSIS" ||
4033 pkgname == "ANALYSIS.par" ||
4034 pkgname == "OADB" ||
4035 pkgname == "OADB.par" ||
4036 pkgname == "ANALYSISalice" ||
4037 pkgname == "ANALYSISalice.par" ||
4038 pkgname == "CORRFW" ||
4039 pkgname == "CORRFW.par") continue;
4040 out << " if (!" << setupPar << "(\"" << obj->GetName() << "\")) return;" << endl;
4043 out << "// include path" << endl;
4044 // Get the include path from the interpreter and remove entries pointing to AliRoot
4045 out << " TString intPath = gInterpreter->GetIncludePath();" << endl;
4046 out << " TObjArray *listpaths = intPath.Tokenize(\" \");" << endl;
4047 out << " TIter nextpath(listpaths);" << endl;
4048 out << " TObjString *pname;" << endl;
4049 out << " while ((pname=(TObjString*)nextpath())) {" << endl;
4050 out << " TString current = pname->GetName();" << endl;
4051 out << " if (current.Contains(\"AliRoot\") || current.Contains(\"ALICE_ROOT\")) continue;" << endl;
4052 out << " gSystem->AddIncludePath(current);" << endl;
4053 out << " }" << endl;
4054 out << " if (listpaths) delete listpaths;" << endl;
4055 if (fIncludePath.Length()) out << " gSystem->AddIncludePath(\"" << fIncludePath.Data() << "\");" << endl;
4056 out << " gROOT->ProcessLine(\".include $ALICE_ROOT/include\");" << endl;
4057 out << " printf(\"Include path: %s\\n\", gSystem->GetIncludePath());" << endl << endl;
4058 if (fAdditionalLibs.Length()) {
4059 out << "// Add aditional AliRoot libraries" << endl;
4060 TString additionalLibs = fAdditionalLibs;
4061 additionalLibs.Strip();
4062 if (additionalLibs.Length() && fFriendLibs.Length())
4063 additionalLibs += " ";
4064 additionalLibs += fFriendLibs;
4065 TObjArray *list = additionalLibs.Tokenize(" ");
4068 while((str=(TObjString*)next())) {
4069 if (str->GetString().Contains(".so"))
4070 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
4071 if (str->GetString().Contains(".par"))
4072 out << " if (!" << setupPar << "(\"" << str->GetString() << "\")) return;" << endl;
4074 if (list) delete list;
4077 out << "// analysis source to be compiled at runtime (if any)" << endl;
4078 if (fAnalysisSource.Length()) {
4079 TObjArray *list = fAnalysisSource.Tokenize(" ");
4082 while((str=(TObjString*)next())) {
4083 out << " gROOT->ProcessLine(\".L " << str->GetString().Data() << "+g\");" << endl;
4085 if (list) delete list;
4088 // out << " printf(\"Currently load libraries:\\n\");" << endl;
4089 // out << " printf(\"%s\\n\", gSystem->GetLibraries());" << endl;
4090 if (fFastReadOption) {
4091 Warning("WriteAnalysisMacro", "!!! You requested FastRead option. Using xrootd flags to reduce timeouts in the grid jobs. This may skip some files that could be accessed !!! \
4092 \n+++ NOTE: To disable this option, use: plugin->SetFastReadOption(kFALSE)");
4093 out << "// fast xrootd reading enabled" << endl;
4094 out << " printf(\"!!! You requested FastRead option. Using xrootd flags to reduce timeouts. Note that this may skip some files that could be accessed !!!\");" << endl;
4095 out << " gEnv->SetValue(\"XNet.ConnectTimeout\",50);" << endl;
4096 out << " gEnv->SetValue(\"XNet.RequestTimeout\",50);" << endl;
4097 out << " gEnv->SetValue(\"XNet.MaxRedirectCount\",2);" << endl;
4098 out << " gEnv->SetValue(\"XNet.ReconnectTimeout\",50);" << endl;
4099 out << " gEnv->SetValue(\"XNet.FirstConnectMaxCnt\",1);" << endl << endl;
4101 out << "// read the analysis manager from file" << endl;
4102 TString analysisFile = fExecutable;
4103 analysisFile.ReplaceAll(".sh", ".root");
4104 out << " AliAnalysisManager *mgr = AliAnalysisAlien::LoadAnalysisManager(\""
4105 << analysisFile << "\");" << endl;
4106 out << " if (!mgr) return;" << endl;
4107 if (IsLocalTest()) {
4108 out << " AliAnalysisAlien *plugin = new AliAnalysisAlien();" << endl;
4109 out << " plugin->SetRunMode(\"test\");" << endl;
4110 if (fFileForTestMode.IsNull())
4111 out << " plugin->SetFileForTestMode(\"data.txt\");" << endl;
4113 out << " plugin->SetFileForTestMode(\"" << fFileForTestMode << "\");" << endl;
4114 out << " plugin->SetNtestFiles(" << fNtestFiles << ");" << endl;
4115 if (!fFriendChainName.IsNull())
4116 out << " plugin->SetFriendChainName(\"" << fFriendChainName << "\");" << endl;
4118 out << " plugin->SetUseMCchain();" << endl;
4119 out << " mgr->SetGridHandler(plugin);" << endl;
4120 if (AliAnalysisManager::GetAnalysisManager()) {
4121 out << " mgr->SetDebugLevel(" << AliAnalysisManager::GetAnalysisManager()->GetDebugLevel() << ");" << endl;
4122 out << " mgr->SetNSysInfo(" << AliAnalysisManager::GetAnalysisManager()->GetNsysInfo() << ");" << endl;
4124 out << " mgr->SetDebugLevel(10);" << endl;
4125 out << " mgr->SetNSysInfo(100);" << endl;
4128 out << " mgr->PrintStatus();" << endl;
4129 if (AliAnalysisManager::GetAnalysisManager()) {
4130 if (AliAnalysisManager::GetAnalysisManager()->GetDebugLevel()>3) {
4131 out << " gEnv->SetValue(\"XNet.Debug\", \"1\");" << endl;
4133 if (TestBit(AliAnalysisGrid::kTest))
4134 out << " AliLog::SetGlobalLogLevel(AliLog::kWarning);" << endl;
4136 out << " AliLog::SetGlobalLogLevel(AliLog::kError);" << endl;
4139 if (!IsLocalTest()) {
4140 out << " TChain *chain = CreateChain(\"wn.xml\", anatype);" << endl << endl;
4141 out << " mgr->StartAnalysis(\"localfile\", chain);" << endl;
4143 out << " mgr->StartAnalysis(\"localfile\");" << endl;
4145 out << " timer.Stop();" << endl;
4146 out << " timer.Print();" << endl;
4147 out << "}" << endl << endl;
4148 if (!IsLocalTest()) {
4149 out <<"//________________________________________________________________________________" << endl;
4150 out << "TChain* CreateChain(const char *xmlfile, const char *type=\"ESD\")" << endl;
4152 out << "// Create a chain using url's from xml file" << endl;
4153 out << " TString filename;" << endl;
4154 out << " Int_t run = 0;" << endl;
4155 if (IsUseMCchain()) {
4156 out << " TString treename = \"TE\";" << endl;
4158 out << " TString treename = type;" << endl;
4159 out << " treename.ToLower();" << endl;
4160 out << " treename += \"Tree\";" << endl;
4162 out << " printf(\"***************************************\\n\");" << endl;
4163 out << " printf(\" Getting chain of trees %s\\n\", treename.Data());" << endl;
4164 out << " printf(\"***************************************\\n\");" << endl;
4165 out << " TAlienCollection *coll = TAlienCollection::Open(xmlfile);" << endl;
4166 out << " if (!coll) {" << endl;
4167 out << " ::Error(\"CreateChain\", \"Cannot create an AliEn collection from %s\", xmlfile);" << endl;
4168 out << " return NULL;" << endl;
4169 out << " }" << endl;
4170 out << " AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();" << endl;
4171 out << " TChain *chain = new TChain(treename);" << endl;
4172 if(fFriendChainName!="") {
4173 out << " TChain *chainFriend = new TChain(treename);" << endl;
4175 out << " coll->Reset();" << endl;
4176 out << " while (coll->Next()) {" << endl;
4177 out << " filename = coll->GetTURL("");" << endl;
4178 out << " if (mgr) {" << endl;
4179 out << " Int_t nrun = AliAnalysisManager::GetRunFromAlienPath(filename);" << endl;
4180 out << " if (nrun && nrun != run) {" << endl;
4181 out << " printf(\"### Run number detected from chain: %d\\n\", nrun);" << endl;
4182 out << " mgr->SetRunFromPath(nrun);" << endl;
4183 out << " run = nrun;" << endl;
4184 out << " }" << endl;
4185 out << " }" << endl;
4186 out << " chain->Add(filename);" << endl;
4187 if(fFriendChainName!="") {
4188 out << " TString fileFriend=coll->GetTURL(\"\");" << endl;
4189 out << " if (fileFriend.Index(\"#\") > -1) fileFriend.Remove(fileFriend.Index(\"#\"));" << endl;
4190 out << " fileFriend = gSystem->DirName(fileFriend);" << endl;
4191 out << " fileFriend += \"/\";" << endl;
4192 out << " fileFriend += \"" << fFriendChainName << "\";";
4193 out << " TFile *file = TFile::Open(fileFriend);" << endl;
4194 out << " if (file) {" << endl;
4195 out << " file->Close();" << endl;
4196 out << " chainFriend->Add(fileFriend.Data());" << endl;
4197 out << " } else {" << endl;
4198 out << " ::Fatal(\"CreateChain\", \"Cannot open friend file: %s\", fileFriend.Data());" << endl;
4199 out << " return 0;" << endl;
4200 out << " }" << endl;
4202 out << " }" << endl;
4203 out << " if (!chain->GetNtrees()) {" << endl;
4204 out << " ::Error(\"CreateChain\", \"No tree found from collection %s\", xmlfile);" << endl;
4205 out << " return NULL;" << endl;
4206 out << " }" << endl;
4207 if(fFriendChainName!="") {
4208 out << " chain->AddFriend(chainFriend);" << endl;
4210 out << " return chain;" << endl;
4211 out << "}" << endl << endl;
4213 if (hasANALYSISalice) {
4214 out <<"//________________________________________________________________________________" << endl;
4215 out << "Bool_t SetupPar(const char *package) {" << endl;
4216 out << "// Compile the package and set it up." << endl;
4217 out << " TString pkgdir = package;" << endl;
4218 out << " pkgdir.ReplaceAll(\".par\",\"\");" << endl;
4219 out << " gSystem->Exec(TString::Format(\"tar xvzf %s.par\", pkgdir.Data()));" << endl;
4220 out << " TString cdir = gSystem->WorkingDirectory();" << endl;
4221 out << " gSystem->ChangeDirectory(pkgdir);" << endl;
4222 out << " // Check for BUILD.sh and execute" << endl;
4223 out << " if (!gSystem->AccessPathName(\"PROOF-INF/BUILD.sh\")) {" << endl;
4224 out << " printf(\"*******************************\\n\");" << endl;
4225 out << " printf(\"*** Building PAR archive ***\\n\");" << endl;
4226 out << " printf(\"*******************************\\n\");" << endl;
4227 out << " if (gSystem->Exec(\"PROOF-INF/BUILD.sh\")) {" << endl;
4228 out << " ::Error(\"SetupPar\", \"Cannot build par archive %s\", pkgdir.Data());" << endl;
4229 out << " gSystem->ChangeDirectory(cdir);" << endl;
4230 out << " return kFALSE;" << endl;
4231 out << " }" << endl;
4232 out << " } else {" << endl;
4233 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/BUILD.sh for package %s\", pkgdir.Data());" << endl;
4234 out << " gSystem->ChangeDirectory(cdir);" << endl;
4235 out << " return kFALSE;" << endl;
4236 out << " }" << endl;
4237 out << " // Check for SETUP.C and execute" << endl;
4238 out << " if (!gSystem->AccessPathName(\"PROOF-INF/SETUP.C\")) {" << endl;
4239 out << " printf(\"*******************************\\n\");" << endl;
4240 out << " printf(\"*** Setup PAR archive ***\\n\");" << endl;
4241 out << " printf(\"*******************************\\n\");" << endl;
4242 out << " gROOT->Macro(\"PROOF-INF/SETUP.C\");" << endl;
4243 out << " } else {" << endl;
4244 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/SETUP.C for package %s\", pkgdir.Data());" << endl;
4245 out << " gSystem->ChangeDirectory(cdir);" << endl;
4246 out << " return kFALSE;" << endl;
4247 out << " }" << endl;
4248 out << " // Restore original workdir" << endl;
4249 out << " gSystem->ChangeDirectory(cdir);" << endl;
4250 out << " return kTRUE;" << endl;
4253 Info("WriteAnalysisMacro", "\n##### Analysis macro to run on worker nodes <%s> written",fAnalysisMacro.Data());
4255 Bool_t copy = kTRUE;
4256 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
4259 TString workdir = gGrid->GetHomeDirectory();
4260 workdir += fGridWorkingDir;
4261 if (FileExists(fAnalysisMacro)) gGrid->Rm(fAnalysisMacro);
4262 Info("WriteAnalysisMacro", "\n##### Copying analysis macro: <%s> to your alien workspace", fAnalysisMacro.Data());
4263 // TFile::Cp(Form("file:%s",fAnalysisMacro.Data()), Form("alien://%s/%s", workdir.Data(), fAnalysisMacro.Data()));
4264 if (!copyLocal2Alien("WriteAnalysisMacro",fAnalysisMacro.Data(),
4265 Form("alien://%s/%s", workdir.Data(),
4266 fAnalysisMacro.Data()))) Fatal("","Terminating");
4270 //______________________________________________________________________________
4271 void AliAnalysisAlien::WriteMergingMacro()
4273 // Write a macro to merge the outputs per master job.
4274 if (!fMergeViaJDL) return;
4275 if (!fOutputFiles.Length()) {
4276 Error("WriteMergingMacro", "No output file names defined. Are you running the right AliAnalysisAlien configuration ?");
4279 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
4280 TString mergingMacro = fExecutable;
4281 mergingMacro.ReplaceAll(".sh","_merge.C");
4282 if (gGrid && !fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
4283 if (!TestBit(AliAnalysisGrid::kSubmit)) {
4285 out.open(mergingMacro.Data(), ios::out);
4287 Error("WriteMergingMacro", "could not open file %s for writing", fAnalysisMacro.Data());
4290 Bool_t hasSTEERBase = kFALSE;
4291 Bool_t hasESD = kFALSE;
4292 Bool_t hasAOD = kFALSE;
4293 Bool_t hasANALYSIS = kFALSE;
4294 Bool_t hasOADB = kFALSE;
4295 Bool_t hasANALYSISalice = kFALSE;
4296 Bool_t hasCORRFW = kFALSE;
4297 TString func = mergingMacro;
4299 func.ReplaceAll(".C", "");
4300 out << "void " << func.Data() << "(const char *dir, Int_t stage=0)" << endl;
4302 out << "// Automatically generated merging macro executed in grid subjobs" << endl << endl;
4303 out << " TStopwatch timer;" << endl;
4304 out << " timer.Start();" << endl << endl;
4305 // Reset existing include path
4306 out << "// Reset existing include path and add current directory first in the search" << endl;
4307 out << " gSystem->SetIncludePath(\"-I.\");" << endl;
4308 if (!fExecutableCommand.Contains("aliroot")) {
4309 out << "// load base root libraries" << endl;
4310 out << " gSystem->Load(\"libTree\");" << endl;
4311 out << " gSystem->Load(\"libGeom\");" << endl;
4312 out << " gSystem->Load(\"libVMC\");" << endl;
4313 out << " gSystem->Load(\"libPhysics\");" << endl << endl;
4314 out << " gSystem->Load(\"libMinuit\");" << endl << endl;
4316 if (fAdditionalRootLibs.Length()) {
4317 // in principle libtree /lib geom libvmc etc. can go into this list, too
4318 out << "// Add aditional libraries" << endl;
4319 TObjArray *list = fAdditionalRootLibs.Tokenize(" ");
4322 while((str=(TObjString*)next())) {
4323 if (str->GetString().Contains(".so"))
4324 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
4326 if (list) delete list;
4328 out << "// Load analysis framework libraries" << endl;
4330 if (!fExecutableCommand.Contains("aliroot")) {
4331 out << " gSystem->Load(\"libSTEERBase\");" << endl;
4332 out << " gSystem->Load(\"libESD\");" << endl;
4333 out << " gSystem->Load(\"libAOD\");" << endl;
4335 out << " gSystem->Load(\"libANALYSIS\");" << endl;
4336 out << " gSystem->Load(\"libOADB\");" << endl;
4337 out << " gSystem->Load(\"libANALYSISalice\");" << endl;
4338 out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
4340 TIter next(fPackages);
4343 TString setupPar = "AliAnalysisAlien::SetupPar";
4344 while ((obj=next())) {
4345 pkgname = obj->GetName();
4346 if (pkgname == "STEERBase" ||
4347 pkgname == "STEERBase.par") hasSTEERBase = kTRUE;
4348 if (pkgname == "ESD" ||
4349 pkgname == "ESD.par") hasESD = kTRUE;
4350 if (pkgname == "AOD" ||
4351 pkgname == "AOD.par") hasAOD = kTRUE;
4352 if (pkgname == "ANALYSIS" ||
4353 pkgname == "ANALYSIS.par") hasANALYSIS = kTRUE;
4354 if (pkgname == "OADB" ||
4355 pkgname == "OADB.par") hasOADB = kTRUE;
4356 if (pkgname == "ANALYSISalice" ||
4357 pkgname == "ANALYSISalice.par") hasANALYSISalice = kTRUE;
4358 if (pkgname == "CORRFW" ||
4359 pkgname == "CORRFW.par") hasCORRFW = kTRUE;
4361 if (hasANALYSISalice) setupPar = "SetupPar";
4362 if (!hasSTEERBase) out << " gSystem->Load(\"libSTEERBase\");" << endl;
4363 else out << " if (!" << setupPar << "(\"STEERBase\")) return;" << endl;
4364 if (!hasESD) out << " gSystem->Load(\"libESD\");" << endl;
4365 else out << " if (!" << setupPar << "(\"ESD\")) return;" << endl;
4366 if (!hasAOD) out << " gSystem->Load(\"libAOD\");" << endl;
4367 else out << " if (!" << setupPar << "(\"AOD\")) return;" << endl;
4368 out << " gSystem->Load(\"libOADB\");" << endl;
4369 if (!hasANALYSIS) out << " gSystem->Load(\"libANALYSIS\");" << endl;
4370 else out << " if (!" << setupPar << "(\"ANALYSIS\")) return;" << endl;
4371 if (!hasOADB) out << " gSystem->Load(\"libOADB\");" << endl;
4372 else out << " if (!" << setupPar << "(\"OADB\")) return;" << endl;
4373 if (!hasANALYSISalice) out << " gSystem->Load(\"libANALYSISalice\");" << endl;
4374 else out << " if (!" << setupPar << "(\"ANALYSISalice\")) return;" << endl;
4375 if (!hasCORRFW) out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
4376 else out << " if (!" << setupPar << "(\"CORRFW\")) return;" << endl << endl;
4377 out << "// Compile other par packages" << endl;
4379 while ((obj=next())) {
4380 pkgname = obj->GetName();
4381 if (pkgname == "STEERBase" ||
4382 pkgname == "STEERBase.par" ||
4384 pkgname == "ESD.par" ||
4386 pkgname == "AOD.par" ||
4387 pkgname == "ANALYSIS" ||
4388 pkgname == "ANALYSIS.par" ||
4389 pkgname == "OADB" ||
4390 pkgname == "OADB.par" ||
4391 pkgname == "ANALYSISalice" ||
4392 pkgname == "ANALYSISalice.par" ||
4393 pkgname == "CORRFW" ||
4394 pkgname == "CORRFW.par") continue;
4395 out << " if (!" << setupPar << "(\"" << obj->GetName() << "\")) return;" << endl;
4398 out << "// include path" << endl;
4399 // Get the include path from the interpreter and remove entries pointing to AliRoot
4400 out << " TString intPath = gInterpreter->GetIncludePath();" << endl;
4401 out << " TObjArray *listpaths = intPath.Tokenize(\" \");" << endl;
4402 out << " TIter nextpath(listpaths);" << endl;
4403 out << " TObjString *pname;" << endl;
4404 out << " while ((pname=(TObjString*)nextpath())) {" << endl;
4405 out << " TString current = pname->GetName();" << endl;
4406 out << " if (current.Contains(\"AliRoot\") || current.Contains(\"ALICE_ROOT\")) continue;" << endl;
4407 out << " gSystem->AddIncludePath(current);" << endl;
4408 out << " }" << endl;
4409 out << " if (listpaths) delete listpaths;" << endl;
4410 if (fIncludePath.Length()) out << " gSystem->AddIncludePath(\"" << fIncludePath.Data() << "\");" << endl;
4411 out << " gROOT->ProcessLine(\".include $ALICE_ROOT/include\");" << endl;
4412 out << " printf(\"Include path: %s\\n\", gSystem->GetIncludePath());" << endl << endl;
4413 if (fAdditionalLibs.Length()) {
4414 out << "// Add aditional AliRoot libraries" << endl;
4415 TString additionalLibs = fAdditionalLibs;
4416 additionalLibs.Strip();
4417 if (additionalLibs.Length() && fFriendLibs.Length())
4418 additionalLibs += " ";
4419 additionalLibs += fFriendLibs;
4420 TObjArray *list = additionalLibs.Tokenize(" ");
4423 while((str=(TObjString*)next())) {
4424 if (str->GetString().Contains(".so"))
4425 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
4427 if (list) delete list;
4430 out << "// Analysis source to be compiled at runtime (if any)" << endl;
4431 if (fAnalysisSource.Length()) {
4432 TObjArray *list = fAnalysisSource.Tokenize(" ");
4435 while((str=(TObjString*)next())) {
4436 out << " gROOT->ProcessLine(\".L " << str->GetString().Data() << "+g\");" << endl;
4438 if (list) delete list;
4442 if (fFastReadOption) {
4443 Warning("WriteMergingMacro", "!!! You requested FastRead option. Using xrootd flags to reduce timeouts in the grid merging jobs. Note that this may skip some files that could be accessed !!!");
4444 out << "// fast xrootd reading enabled" << endl;
4445 out << " printf(\"!!! You requested FastRead option. Using xrootd flags to reduce timeouts. Note that this may skip some files that could be accessed !!!\");" << endl;
4446 out << " gEnv->SetValue(\"XNet.ConnectTimeout\",50);" << endl;
4447 out << " gEnv->SetValue(\"XNet.RequestTimeout\",50);" << endl;
4448 out << " gEnv->SetValue(\"XNet.MaxRedirectCount\",2);" << endl;
4449 out << " gEnv->SetValue(\"XNet.ReconnectTimeout\",50);" << endl;
4450 out << " gEnv->SetValue(\"XNet.FirstConnectMaxCnt\",1);" << endl << endl;
4452 // Change temp directory to current one
4453 out << "// Connect to AliEn" << endl;
4454 out << " if (!TGrid::Connect(\"alien://\")) return;" << endl;
4455 out << "// Set temporary merging directory to current one" << endl;
4456 out << " gSystem->Setenv(\"TMPDIR\", gSystem->pwd());" << endl << endl;
4457 out << "// Set temporary compilation directory to current one" << endl;
4458 out << " gSystem->SetBuildDir(gSystem->pwd(), kTRUE);" << endl << endl;
4459 out << " TString outputDir = dir;" << endl;
4460 out << " TString outputFiles = \"" << GetListOfFiles("out") << "\";" << endl;
4461 out << " TString mergeExcludes = \"" << fMergeExcludes << " " << fRegisterExcludes << "\";" << endl;
4462 out << " TObjArray *list = outputFiles.Tokenize(\",\");" << endl;
4463 out << " TIter *iter = new TIter(list);" << endl;
4464 out << " TObjString *str;" << endl;
4465 out << " TString outputFile;" << endl;
4466 out << " Bool_t merged = kTRUE;" << endl;
4467 TString analysisFile = fExecutable;
4468 analysisFile.ReplaceAll(".sh", ".root");
4469 out << " AliAnalysisManager *mgr = AliAnalysisAlien::LoadAnalysisManager(\""
4470 << analysisFile << "\");" << endl;
4471 out << " if (!mgr) {" << endl;
4472 out << " printf(\"ERROR: Analysis manager could not be extracted from file \");" << endl;
4473 out << " return;" << endl;
4474 out << " }" << endl;
4475 if (IsLocalTest()) {
4476 out << " printf(\"===================================\n\");" << endl;
4477 out << " printf(\"Testing merging...\\n\");" << endl;
4478 out << " printf(\"===================================\n\");" << endl;
4480 out << " while((str=(TObjString*)iter->Next())) {" << endl;
4481 out << " outputFile = str->GetString();" << endl;
4482 out << " if (outputFile.Contains(\"*\")) continue;" << endl;
4483 out << " Int_t index = outputFile.Index(\"@\");" << endl;
4484 out << " if (index > 0) outputFile.Remove(index);" << endl;
4485 out << " // Skip already merged outputs" << endl;
4486 out << " if (!gSystem->AccessPathName(outputFile)) {" << endl;
4487 out << " printf(\"Output file <%s> found. Not merging again.\\n\",outputFile.Data());" << endl;
4488 out << " continue;" << endl;
4489 out << " }" << endl;
4490 out << " if (mergeExcludes.Contains(outputFile.Data())) continue;" << endl;
4491 out << " merged = AliAnalysisAlien::MergeOutput(outputFile, outputDir, " << fMaxMergeFiles << ", stage);" << endl;
4492 out << " if (!merged) {" << endl;
4493 out << " printf(\"ERROR: Cannot merge %s\\n\", outputFile.Data());" << endl;
4494 out << " return;" << endl;
4495 out << " }" << endl;
4496 out << " }" << endl;
4497 if (mgr && mgr->IsCollectThroughput() && !IsLocalTest()) {
4498 out << " TString infolog = \"" << mgr->GetFileInfoLog() << "\";" << endl;
4499 out << " AliAnalysisAlien::MergeInfo(infolog, outputDir);" << endl;
4501 out << " // all outputs merged, validate" << endl;
4502 out << " ofstream out;" << endl;
4503 out << " out.open(\"outputs_valid\", ios::out);" << endl;
4504 out << " out.close();" << endl;
4505 out << " // read the analysis manager from file" << endl;
4506 if (IsLocalTest()) {
4507 out << " printf(\"===================================\n\");" << endl;
4508 out << " printf(\"Testing Terminate()...\\n\");" << endl;
4509 out << " printf(\"===================================\n\");" << endl;
4511 out << " if (!outputDir.Contains(\"Stage\")) return;" << endl;
4513 out << " mgr->SetRunFromPath(mgr->GetRunFromAlienPath(dir));" << endl;
4514 out << " mgr->SetSkipTerminate(kFALSE);" << endl;
4515 out << " mgr->PrintStatus();" << endl;
4517 if (mgr->GetDebugLevel()>3) {
4518 out << " gEnv->SetValue(\"XNet.Debug\", \"1\");" << endl;
4520 if (TestBit(AliAnalysisGrid::kTest))
4521 out << " AliLog::SetGlobalLogLevel(AliLog::kWarning);" << endl;
4523 out << " AliLog::SetGlobalLogLevel(AliLog::kError);" << endl;
4526 out << " TTree *tree = NULL;" << endl;
4527 out << " mgr->StartAnalysis(\"gridterminate\", tree);" << endl;
4528 out << "}" << endl << endl;
4529 if (hasANALYSISalice) {
4530 out <<"//________________________________________________________________________________" << endl;
4531 out << "Bool_t SetupPar(const char *package) {" << endl;
4532 out << "// Compile the package and set it up." << endl;
4533 out << " TString pkgdir = package;" << endl;
4534 out << " pkgdir.ReplaceAll(\".par\",\"\");" << endl;
4535 out << " gSystem->Exec(TString::Format(\"tar xvzf %s.par\", pkgdir.Data()));" << endl;
4536 out << " TString cdir = gSystem->WorkingDirectory();" << endl;
4537 out << " gSystem->ChangeDirectory(pkgdir);" << endl;
4538 out << " // Check for BUILD.sh and execute" << endl;
4539 out << " if (!gSystem->AccessPathName(\"PROOF-INF/BUILD.sh\")) {" << endl;
4540 out << " printf(\"*******************************\\n\");" << endl;
4541 out << " printf(\"*** Building PAR archive ***\\n\");" << endl;
4542 out << " printf(\"*******************************\\n\");" << endl;
4543 out << " if (gSystem->Exec(\"PROOF-INF/BUILD.sh\")) {" << endl;
4544 out << " ::Error(\"SetupPar\", \"Cannot build par archive %s\", pkgdir.Data());" << endl;
4545 out << " gSystem->ChangeDirectory(cdir);" << endl;
4546 out << " return kFALSE;" << endl;
4547 out << " }" << endl;
4548 out << " } else {" << endl;
4549 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/BUILD.sh for package %s\", pkgdir.Data());" << endl;
4550 out << " gSystem->ChangeDirectory(cdir);" << endl;
4551 out << " return kFALSE;" << endl;
4552 out << " }" << endl;
4553 out << " // Check for SETUP.C and execute" << endl;
4554 out << " if (!gSystem->AccessPathName(\"PROOF-INF/SETUP.C\")) {" << endl;
4555 out << " printf(\"*******************************\\n\");" << endl;
4556 out << " printf(\"*** Setup PAR archive ***\\n\");" << endl;
4557 out << " printf(\"*******************************\\n\");" << endl;
4558 out << " gROOT->Macro(\"PROOF-INF/SETUP.C\");" << endl;
4559 out << " } else {" << endl;
4560 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/SETUP.C for package %s\", pkgdir.Data());" << endl;
4561 out << " gSystem->ChangeDirectory(cdir);" << endl;
4562 out << " return kFALSE;" << endl;
4563 out << " }" << endl;
4564 out << " // Restore original workdir" << endl;
4565 out << " gSystem->ChangeDirectory(cdir);" << endl;
4566 out << " return kTRUE;" << endl;
4570 Bool_t copy = kTRUE;
4571 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
4574 TString workdir = gGrid->GetHomeDirectory();
4575 workdir += fGridWorkingDir;
4576 if (FileExists(mergingMacro)) gGrid->Rm(mergingMacro);
4577 Info("WriteMergingMacro", "\n##### Copying merging macro: <%s> to your alien workspace", mergingMacro.Data());
4578 // TFile::Cp(Form("file:%s",mergingMacro.Data()), Form("alien://%s/%s", workdir.Data(), mergingMacro.Data()));
4579 if (!copyLocal2Alien("WriteMergeMacro",mergingMacro.Data(),
4580 Form("%s/%s", workdir.Data(), mergingMacro.Data()))) Fatal("","Terminating");
4584 //______________________________________________________________________________
4585 Bool_t AliAnalysisAlien::SetupPar(const char *package)
4587 // Compile the par file archive pointed by <package>. This must be present in the current directory.
4588 // Note that for loading the compiled library. The current directory should have precedence in
4590 TString pkgdir = package;
4591 pkgdir.ReplaceAll(".par","");
4592 gSystem->Exec(TString::Format("tar xzf %s.par", pkgdir.Data()));
4593 TString cdir = gSystem->WorkingDirectory();
4594 gSystem->ChangeDirectory(pkgdir);
4595 // Check for BUILD.sh and execute
4596 if (!gSystem->AccessPathName("PROOF-INF/BUILD.sh")) {
4597 printf("**************************************************\n");
4598 printf("*** Building PAR archive %s\n", package);
4599 printf("**************************************************\n");
4600 if (gSystem->Exec("PROOF-INF/BUILD.sh")) {
4601 ::Error("SetupPar", "Cannot build par archive %s", pkgdir.Data());
4602 gSystem->ChangeDirectory(cdir);
4606 ::Error("SetupPar","Cannot access PROOF-INF/BUILD.sh for package %s", pkgdir.Data());
4607 gSystem->ChangeDirectory(cdir);
4610 // Check for SETUP.C and execute
4611 if (!gSystem->AccessPathName("PROOF-INF/SETUP.C")) {
4612 printf("**************************************************\n");
4613 printf("*** Setup PAR archive %s\n", package);
4614 printf("**************************************************\n");
4615 gROOT->Macro("PROOF-INF/SETUP.C");
4616 printf("*** Loaded library: %s\n", gSystem->GetLibraries(pkgdir,"",kFALSE));
4618 ::Error("SetupPar","Cannot access PROOF-INF/SETUP.C for package %s", pkgdir.Data());
4619 gSystem->ChangeDirectory(cdir);
4622 // Restore original workdir
4623 gSystem->ChangeDirectory(cdir);
4627 //______________________________________________________________________________
4628 void AliAnalysisAlien::WriteExecutable()
4630 // Generate the alien executable script.
4631 // Patch executable with -x to catch error code
4632 if (fExecutableCommand.Contains("root") &&
4633 fExecutableCommand.Contains("-q") &&
4634 !fExecutableCommand.Contains("-x")) fExecutableCommand += " -x";
4635 if (!TestBit(AliAnalysisGrid::kSubmit)) {
4637 out.open(fExecutable.Data(), ios::out);
4639 Error("WriteExecutable", "Bad file name for executable: %s", fExecutable.Data());
4642 out << "#!/bin/bash" << endl;
4643 // Make sure we can properly compile par files
4644 out << "export LD_LIBRARY_PATH=.:$LD_LIBRARY_PATH" << endl;
4645 out << "echo \"=========================================\"" << endl;
4646 out << "echo \"############## PATH : ##############\"" << endl;
4647 out << "echo $PATH" << endl;
4648 out << "echo \"############## LD_LIBRARY_PATH : ##############\"" << endl;
4649 out << "echo $LD_LIBRARY_PATH" << endl;
4650 out << "echo \"############## ROOTSYS : ##############\"" << endl;
4651 out << "echo $ROOTSYS" << endl;
4652 out << "echo \"############## which root : ##############\"" << endl;
4653 out << "which root" << endl;
4654 out << "echo \"############## ALICE_ROOT : ##############\"" << endl;
4655 out << "echo $ALICE_ROOT" << endl;
4656 out << "echo \"############## which aliroot : ##############\"" << endl;
4657 out << "which aliroot" << endl;
4658 out << "echo \"############## system limits : ##############\"" << endl;
4659 out << "ulimit -a" << endl;
4660 out << "echo \"############## memory : ##############\"" << endl;
4661 out << "free -m" << endl;
4662 out << "echo \"=========================================\"" << endl << endl;
4663 out << fExecutableCommand << " ";
4664 out << fAnalysisMacro.Data() << " " << fExecutableArgs.Data() << endl;
4665 out << "RET=$?" << endl;
4666 out << "if [ \"$RET\" != \"0\" ];then" << endl;
4667 out << " echo \"======== ERROR : " << fAnalysisMacro.Data() << " finished with NON zero code: $RET ========\"" << endl;
4668 out << " if [ \"$RET\" -gt 128 ] && [ \"$RET\" -lt 160 ]; then"<<endl;
4669 out << " let sig=\"$RET - 128\""<<endl;
4670 out << " sigs='HUP INT QUIT ILL TRAP ABRT BUS FPE"<<endl;
4671 out << " KILL USR1 SEGV USR2 PIPE ALRM TERM STKFLT"<<endl;
4672 out << " CHLD CONT STOP TSTP TTIN TTOU URG XCPU"<<endl;
4673 out << " XFSZ VTALRM PROF WINCH IO PWR SYS'"<<endl;
4674 out << " sig=SIG`echo $sigs | awk '{ print $'\"$sig\"' }'`"<<endl;
4675 out << " echo \"======== it appears to have been killed with signal: $sig ========\""<<endl;
4677 out << " exit $RET"<< endl;
4678 out << "fi" << endl << endl ;
4679 out << "echo \"======== " << fAnalysisMacro.Data() << " finished with exit code: $RET ========\"" << endl;
4680 out << "echo \"############## memory after: ##############\"" << endl;
4681 out << "free -m" << endl;
4683 Bool_t copy = kTRUE;
4684 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
4687 TString workdir = gGrid->GetHomeDirectory();
4688 TString bindir = Form("%s/bin", workdir.Data());
4689 if (!DirectoryExists(bindir)) gGrid->Mkdir(bindir,"-p");
4690 workdir += fGridWorkingDir;
4691 TString executable = Form("%s/bin/%s", gGrid->GetHomeDirectory(), fExecutable.Data());
4692 if (FileExists(executable)) gGrid->Rm(executable);
4693 Info("WriteExecutable", "\n##### Copying executable file <%s> to your AliEn bin directory", fExecutable.Data());
4694 // TFile::Cp(Form("file:%s",fExecutable.Data()), Form("alien://%s", executable.Data()));
4695 if (!copyLocal2Alien("WriteExecutable",fExecutable.Data(),
4696 executable.Data())) Fatal("","Terminating");
4700 //______________________________________________________________________________
4701 void AliAnalysisAlien::WriteMergeExecutable()
4703 // Generate the alien executable script for the merging job.
4704 if (!fMergeViaJDL) return;
4705 TString mergeExec = fExecutable;
4706 mergeExec.ReplaceAll(".sh", "_merge.sh");
4707 if (!TestBit(AliAnalysisGrid::kSubmit)) {
4709 out.open(mergeExec.Data(), ios::out);
4711 Error("WriteMergingExecutable", "Bad file name for executable: %s", mergeExec.Data());
4714 out << "#!/bin/bash" << endl;
4715 // Make sure we can properly compile par files
4716 out << "export LD_LIBRARY_PATH=.:$LD_LIBRARY_PATH" << endl;
4717 out << "echo \"=========================================\"" << endl;
4718 out << "echo \"############## PATH : ##############\"" << endl;
4719 out << "echo $PATH" << endl;
4720 out << "echo \"############## LD_LIBRARY_PATH : ##############\"" << endl;
4721 out << "echo $LD_LIBRARY_PATH" << endl;
4722 out << "echo \"############## ROOTSYS : ##############\"" << endl;
4723 out << "echo $ROOTSYS" << endl;
4724 out << "echo \"############## which root : ##############\"" << endl;
4725 out << "which root" << endl;
4726 out << "echo \"############## ALICE_ROOT : ##############\"" << endl;
4727 out << "echo $ALICE_ROOT" << endl;
4728 out << "echo \"############## which aliroot : ##############\"" << endl;
4729 out << "which aliroot" << endl;
4730 out << "echo \"############## system limits : ##############\"" << endl;
4731 out << "ulimit -a" << endl;
4732 out << "echo \"############## memory : ##############\"" << endl;
4733 out << "free -m" << endl;
4734 out << "echo \"=========================================\"" << endl << endl;
4735 TString mergeMacro = fExecutable;
4736 mergeMacro.ReplaceAll(".sh", "_merge.C");
4737 if (IsOneStageMerging())
4738 out << "export ARG=\"" << mergeMacro << "(\\\"$1\\\")\"" << endl;
4740 out << "export ARG=\"" << mergeMacro << "(\\\"$1\\\",$2)\"" << endl;
4741 out << fExecutableCommand << " " << "$ARG" << endl;
4742 out << "RET=$?" << endl;
4743 out << "if [ \"$RET\" != \"0\" ];then" << endl;
4744 out << " echo \"======== ERROR : " << fAnalysisMacro.Data() << " finished with NON zero code: $RET ========\"" << endl;
4745 out << " if [ \"$RET\" -gt 128 ] && [ \"$RET\" -lt 160 ]; then"<<endl;
4746 out << " let sig=\"$RET - 128\""<<endl;
4747 out << " sigs='HUP INT QUIT ILL TRAP ABRT BUS FPE"<<endl;
4748 out << " KILL USR1 SEGV USR2 PIPE ALRM TERM STKFLT"<<endl;
4749 out << " CHLD CONT STOP TSTP TTIN TTOU URG XCPU"<<endl;
4750 out << " XFSZ VTALRM PROF WINCH IO PWR SYS'"<<endl;
4751 out << " sig=SIG`echo $sigs | awk '{ print $'\"$sig\"' }'`"<<endl;
4752 out << " echo \"======== it appears to have been killed with signal: $sig ========\""<<endl;
4754 out << " exit $RET"<< endl;
4755 out << "fi" << endl << endl ;
4756 out << "echo \"======== " << mergeMacro.Data() << " finished with exit code: $? ========\"" << endl;
4757 out << "echo \"############## memory after: ##############\"" << endl;
4758 out << "free -m" << endl;
4760 Bool_t copy = kTRUE;
4761 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
4764 TString workdir = gGrid->GetHomeDirectory();
4765 TString bindir = Form("%s/bin", workdir.Data());
4766 if (!DirectoryExists(bindir)) gGrid->Mkdir(bindir,"-p");
4767 workdir += fGridWorkingDir;
4768 TString executable = Form("%s/bin/%s", gGrid->GetHomeDirectory(), mergeExec.Data());
4769 if (FileExists(executable)) gGrid->Rm(executable);
4770 Info("WriteMergeExecutable", "\n##### Copying executable file <%s> to your AliEn bin directory", mergeExec.Data());
4771 // TFile::Cp(Form("file:%s",mergeExec.Data()), Form("alien://%s", executable.Data()));
4772 if (!copyLocal2Alien("WriteMergeExecutable",
4773 mergeExec.Data(), executable.Data())) Fatal("","Terminating");
4777 //______________________________________________________________________________
4778 void AliAnalysisAlien::WriteProductionFile(const char *filename) const
4780 // Write the production file to be submitted by LPM manager. The format is:
4781 // First line: full_path_to_jdl estimated_no_subjobs_per_master
4782 // Next lines: full_path_to_dataset XXX (XXX is a string)
4783 // To submit, one has to: submit jdl XXX for all lines
4785 out.open(filename, ios::out);
4787 Error("WriteProductionFile", "Bad file name: %s", filename);
4791 if (!fProductionMode && !fGridWorkingDir.BeginsWith("/alice"))
4792 workdir = gGrid->GetHomeDirectory();
4793 workdir += fGridWorkingDir;
4794 Int_t njobspermaster = 1000*fNrunsPerMaster/fSplitMaxInputFileNumber;
4795 TString locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
4796 out << locjdl << " " << njobspermaster << endl;
4797 Int_t nmasterjobs = fInputFiles->GetEntries();
4798 for (Int_t i=0; i<nmasterjobs; i++) {
4799 TString runOutDir = gSystem->BaseName(fInputFiles->At(i)->GetName());
4800 runOutDir.ReplaceAll(".xml", "");
4802 out << Form("%s", fInputFiles->At(i)->GetName()) << " " << runOutDir << endl;
4804 out << Form("%s", fInputFiles->At(i)->GetName()) << " " << Form("%03d", i) << endl;
4807 Info("WriteProductionFile", "\n##### Copying production file <%s> to your work directory", filename);
4808 if (FileExists(filename)) gGrid->Rm(filename);
4809 // TFile::Cp(Form("file:%s",filename), Form("alien://%s/%s", workdir.Data(),filename));
4810 if (!copyLocal2Alien("WriteProductionFile", filename,
4811 Form("%s/%s", workdir.Data(),filename))) Fatal("","Terminating");
4815 //______________________________________________________________________________
4816 void AliAnalysisAlien::WriteValidationScript(Bool_t merge)
4818 // Generate the alien validation script.
4819 // Generate the validation script
4821 if (fValidationScript.IsNull()) {
4822 fValidationScript = fExecutable;
4823 fValidationScript.ReplaceAll(".sh", "_validation.sh");
4825 TString validationScript = fValidationScript;
4826 if (merge) validationScript.ReplaceAll(".sh", "_merge.sh");
4828 Error("WriteValidationScript", "Alien connection required");
4831 if (!fTerminateFiles.IsNull()) {
4832 fTerminateFiles.Strip();
4833 fTerminateFiles.ReplaceAll(" ",",");
4835 TString outStream = "";
4836 if (!TestBit(AliAnalysisGrid::kTest)) outStream = " >> stdout";
4837 if (!TestBit(AliAnalysisGrid::kSubmit)) {
4839 out.open(validationScript, ios::out);
4840 out << "#!/bin/bash" << endl;
4841 out << "##################################################" << endl;
4842 out << "validateout=`dirname $0`" << endl;
4843 out << "validatetime=`date`" << endl;
4844 out << "validated=\"0\";" << endl;
4845 out << "error=0" << endl;
4846 out << "if [ -z $validateout ]" << endl;
4847 out << "then" << endl;
4848 out << " validateout=\".\"" << endl;
4849 out << "fi" << endl << endl;
4850 out << "cd $validateout;" << endl;
4851 out << "validateworkdir=`pwd`;" << endl << endl;
4852 out << "echo \"*******************************************************\"" << outStream << endl;
4853 out << "echo \"* Automatically generated validation script *\"" << outStream << endl;
4855 out << "echo \"* Time: $validatetime \"" << outStream << endl;
4856 out << "echo \"* Dir: $validateout\"" << outStream << endl;
4857 out << "echo \"* Workdir: $validateworkdir\"" << outStream << endl;
4858 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl;
4859 out << "ls -la ./" << outStream << endl;
4860 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl << endl;
4861 out << "##################################################" << endl;
4864 out << "if [ ! -f stderr ] ; then" << endl;
4865 out << " error=1" << endl;
4866 out << " echo \"* ########## Job not validated - no stderr ###\" " << outStream << endl;
4867 out << " echo \"Error = $error\" " << outStream << endl;
4868 out << "fi" << endl;
4870 out << "parArch=`grep -Ei \"Cannot Build the PAR Archive\" stderr`" << endl;
4871 out << "segViol=`grep -Ei \"Segmentation violation\" stderr`" << endl;
4872 out << "segFault=`grep -Ei \"Segmentation fault\" stderr`" << endl;
4873 out << "glibcErr=`grep -Ei \"*** glibc detected ***\" stderr`" << endl;
4876 out << "if [ \"$parArch\" != \"\" ] ; then" << endl;
4877 out << " error=1" << endl;
4878 out << " echo \"* ########## Job not validated - PAR archive not built ###\" " << outStream << endl;
4879 out << " echo \"$parArch\" " << outStream << endl;
4880 out << " echo \"Error = $error\" " << outStream << endl;
4881 out << "fi" << endl;
4883 out << "if [ \"$segViol\" != \"\" ] ; then" << endl;
4884 out << " error=1" << endl;
4885 out << " echo \"* ########## Job not validated - Segment. violation ###\" " << outStream << endl;
4886 out << " echo \"$segViol\" " << outStream << endl;
4887 out << " echo \"Error = $error\" " << outStream << endl;
4888 out << "fi" << endl;
4890 out << "if [ \"$segFault\" != \"\" ] ; then" << endl;
4891 out << " error=1" << endl;
4892 out << " echo \"* ########## Job not validated - Segment. fault ###\" " << outStream << endl;
4893 out << " echo \"$segFault\" " << outStream << endl;
4894 out << " echo \"Error = $error\" " << outStream << endl;
4895 out << "fi" << endl;
4897 out << "if [ \"$glibcErr\" != \"\" ] ; then" << endl;
4898 out << " error=1" << endl;
4899 out << " echo \"* ########## Job not validated - *** glibc detected *** ###\" " << outStream << endl;
4900 out << " echo \"$glibcErr\" " << outStream << endl;
4901 out << " echo \"Error = $error\" " << outStream << endl;
4902 out << "fi" << endl;
4904 // Part dedicated to the specific analyses running into the train
4906 TString outputFiles = fOutputFiles;
4907 if (merge && !fTerminateFiles.IsNull()) {
4909 outputFiles += fTerminateFiles;
4911 TObjArray *arr = outputFiles.Tokenize(",");
4914 while (!merge && (os=(TObjString*)next1())) {
4915 // No need to validate outputs produced by merging since the merging macro does this
4916 outputFile = os->GetString();
4917 Int_t index = outputFile.Index("@");
4918 if (index > 0) outputFile.Remove(index);
4919 if (fTerminateFiles.Contains(outputFile)) continue;
4920 if (outputFile.Contains("*")) continue;
4921 out << "if ! [ -f " << outputFile.Data() << " ] ; then" << endl;
4922 out << " error=1" << endl;
4923 out << " echo \"Output file " << outputFile << " not found. Job FAILED !\"" << outStream << endl;
4924 out << " echo \"Output file " << outputFile << " not found. Job FAILED !\" >> stderr" << endl;
4925 out << "fi" << endl;
4928 out << "if ! [ -f outputs_valid ] ; then" << endl;
4929 out << " error=1" << endl;
4930 out << " echo \"Output files were not validated by the analysis manager\" >> stdout" << endl;
4931 out << " echo \"Output files were not validated by the analysis manager\" >> stderr" << endl;
4932 out << "fi" << endl;
4934 out << "if [ $error = 0 ] ; then" << endl;
4935 out << " echo \"* ---------------- Job Validated ------------------*\"" << outStream << endl;
4936 if (!IsKeepLogs()) {
4937 out << " echo \"* === Logs std* will be deleted === \"" << endl;
4939 out << " rm -f std*" << endl;
4941 out << "fi" << endl;
4943 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl;
4944 out << "echo \"*******************************************************\"" << outStream << endl;
4945 out << "cd -" << endl;
4946 out << "exit $error" << endl;
4948 Bool_t copy = kTRUE;
4949 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
4952 TString workdir = gGrid->GetHomeDirectory();
4953 workdir += fGridWorkingDir;
4954 Info("WriteValidationScript", "\n##### Copying validation script <%s> to your AliEn working space", validationScript.Data());
4955 if (FileExists(validationScript)) gGrid->Rm(validationScript);
4956 // TFile::Cp(Form("file:%s",validationScript.Data()), Form("alien://%s/%s", workdir.Data(),validationScript.Data()));
4957 if (!copyLocal2Alien("WriteValidationScript", validationScript.Data(),
4958 Form("%s/%s",workdir.Data(), validationScript.Data()))) Fatal("","Terminating");