1 /**************************************************************************
2 * Copyright(c) 1998-2007, ALICE Experiment at CERN, All rights reserved. *
4 * Author: The ALICE Off-line Project. *
5 * Contributors are mentioned in the code where appropriate. *
7 * Permission to use, copy, modify and distribute this software and its *
8 * documentation strictly for non-commercial purposes is hereby granted *
9 * without fee, provided that the above copyright notice appears in all *
10 * copies and that both the copyright notice and this permission notice *
11 * appear in the supporting documentation. The authors make no claims *
12 * about the suitability of this software for any purpose. It is *
13 * provided "as is" without express or implied warranty. *
14 **************************************************************************/
16 // Author: Mihaela Gheata, 01/09/2008
18 //==============================================================================
19 // AliAnalysisAlien - AliEn utility class. Provides interface for creating
20 // a personalized JDL, finding and creating a dataset.
21 //==============================================================================
23 #include "AliAnalysisAlien.h"
25 #include "Riostream.h"
32 #include "TInterpreter.h"
34 #include "TFileCollection.h"
36 #include "TObjString.h"
37 #include "TObjArray.h"
40 #include "TGridResult.h"
41 #include "TGridCollection.h"
43 #include "TGridJobStatusList.h"
44 #include "TGridJobStatus.h"
45 #include "TFileMerger.h"
46 #include "AliAnalysisManager.h"
47 #include "AliAnalysisTaskCfg.h"
48 #include "AliVEventHandler.h"
49 #include "AliAnalysisDataContainer.h"
50 #include "AliMultiInputEventHandler.h"
56 ClassImp(AliAnalysisAlien)
62 Bool_t copyLocal2Alien(const char* where, const char* loc, const char* rem)
64 TString sl(Form("file:%s", loc));
65 TString sr(Form("alien://%s", rem));
66 Bool_t ret = TFile::Cp(sl, sr);
68 Warning(where, "Failed to copy %s to %s", sl.Data(), sr.Data());
74 //______________________________________________________________________________
75 AliAnalysisAlien::AliAnalysisAlien()
81 fSplitMaxInputFileNumber(0),
83 fMasterResubmitThreshold(0),
96 fNproofWorkersPerSlave(0),
102 fExecutableCommand(),
108 fAdditionalRootLibs(),
156 //______________________________________________________________________________
157 AliAnalysisAlien::AliAnalysisAlien(const char *name)
158 :AliAnalysisGrid(name),
163 fSplitMaxInputFileNumber(0),
165 fMasterResubmitThreshold(0),
178 fNproofWorkersPerSlave(0),
184 fExecutableCommand(),
190 fAdditionalRootLibs(),
238 //______________________________________________________________________________
239 AliAnalysisAlien::AliAnalysisAlien(const AliAnalysisAlien& other)
240 :AliAnalysisGrid(other),
243 fPrice(other.fPrice),
245 fSplitMaxInputFileNumber(other.fSplitMaxInputFileNumber),
246 fMaxInitFailed(other.fMaxInitFailed),
247 fMasterResubmitThreshold(other.fMasterResubmitThreshold),
248 fNtestFiles(other.fNtestFiles),
249 fNrunsPerMaster(other.fNrunsPerMaster),
250 fMaxMergeFiles(other.fMaxMergeFiles),
251 fMaxMergeStages(other.fMaxMergeStages),
252 fNsubmitted(other.fNsubmitted),
253 fProductionMode(other.fProductionMode),
254 fOutputToRunNo(other.fOutputToRunNo),
255 fMergeViaJDL(other.fMergeViaJDL),
256 fFastReadOption(other.fFastReadOption),
257 fOverwriteMode(other.fOverwriteMode),
258 fNreplicas(other.fNreplicas),
259 fNproofWorkers(other.fNproofWorkers),
260 fNproofWorkersPerSlave(other.fNproofWorkersPerSlave),
261 fProofReset(other.fProofReset),
262 fNMCevents(other.fNMCevents),
263 fNMCjobs(other.fNMCjobs),
264 fRunNumbers(other.fRunNumbers),
265 fExecutable(other.fExecutable),
266 fExecutableCommand(other.fExecutableCommand),
267 fArguments(other.fArguments),
268 fExecutableArgs(other.fExecutableArgs),
269 fAnalysisMacro(other.fAnalysisMacro),
270 fAnalysisSource(other.fAnalysisSource),
271 fValidationScript(other.fValidationScript),
272 fAdditionalRootLibs(other.fAdditionalRootLibs),
273 fAdditionalLibs(other.fAdditionalLibs),
274 fGeneratorLibs(other.fGeneratorLibs),
275 fSplitMode(other.fSplitMode),
276 fAPIVersion(other.fAPIVersion),
277 fROOTVersion(other.fROOTVersion),
278 fAliROOTVersion(other.fAliROOTVersion),
279 fExternalPackages(other.fExternalPackages),
281 fGridWorkingDir(other.fGridWorkingDir),
282 fGridDataDir(other.fGridDataDir),
283 fDataPattern(other.fDataPattern),
284 fGridOutputDir(other.fGridOutputDir),
285 fOutputArchive(other.fOutputArchive),
286 fOutputFiles(other.fOutputFiles),
287 fInputFormat(other.fInputFormat),
288 fDatasetName(other.fDatasetName),
289 fJDLName(other.fJDLName),
290 fTerminateFiles(other.fTerminateFiles),
291 fMergeExcludes(other.fMergeExcludes),
292 fRegisterExcludes(other.fRegisterExcludes),
293 fIncludePath(other.fIncludePath),
294 fCloseSE(other.fCloseSE),
295 fFriendChainName(other.fFriendChainName),
296 fJobTag(other.fJobTag),
297 fOutputSingle(other.fOutputSingle),
298 fRunPrefix(other.fRunPrefix),
299 fProofCluster(other.fProofCluster),
300 fProofDataSet(other.fProofDataSet),
301 fFileForTestMode(other.fFileForTestMode),
302 fAliRootMode(other.fAliRootMode),
303 fProofProcessOpt(other.fProofProcessOpt),
304 fMergeDirName(other.fMergeDirName),
309 fDropToShell(other.fDropToShell),
310 fMCLoop(other.fMCLoop),
311 fGridJobIDs(other.fGridJobIDs),
312 fGridStages(other.fGridStages),
313 fFriendLibs(other.fFriendLibs),
314 fTreeName(other.fTreeName)
317 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
318 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
319 fRunRange[0] = other.fRunRange[0];
320 fRunRange[1] = other.fRunRange[1];
321 if (other.fInputFiles) {
322 fInputFiles = new TObjArray();
323 TIter next(other.fInputFiles);
325 while ((obj=next())) fInputFiles->Add(new TObjString(obj->GetName()));
326 fInputFiles->SetOwner();
328 if (other.fPackages) {
329 fPackages = new TObjArray();
330 TIter next(other.fPackages);
332 while ((obj=next())) fPackages->Add(new TObjString(obj->GetName()));
333 fPackages->SetOwner();
335 if (other.fModules) {
336 fModules = new TObjArray();
337 fModules->SetOwner();
338 TIter next(other.fModules);
339 AliAnalysisTaskCfg *mod, *crt;
340 while ((crt=(AliAnalysisTaskCfg*)next())) {
341 mod = new AliAnalysisTaskCfg(*crt);
347 //______________________________________________________________________________
348 AliAnalysisAlien::~AliAnalysisAlien()
356 fProofParam.DeleteAll();
359 //______________________________________________________________________________
360 AliAnalysisAlien &AliAnalysisAlien::operator=(const AliAnalysisAlien& other)
363 if (this != &other) {
364 AliAnalysisGrid::operator=(other);
365 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
366 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
367 fPrice = other.fPrice;
369 fSplitMaxInputFileNumber = other.fSplitMaxInputFileNumber;
370 fMaxInitFailed = other.fMaxInitFailed;
371 fMasterResubmitThreshold = other.fMasterResubmitThreshold;
372 fNtestFiles = other.fNtestFiles;
373 fNrunsPerMaster = other.fNrunsPerMaster;
374 fMaxMergeFiles = other.fMaxMergeFiles;
375 fMaxMergeStages = other.fMaxMergeStages;
376 fNsubmitted = other.fNsubmitted;
377 fProductionMode = other.fProductionMode;
378 fOutputToRunNo = other.fOutputToRunNo;
379 fMergeViaJDL = other.fMergeViaJDL;
380 fFastReadOption = other.fFastReadOption;
381 fOverwriteMode = other.fOverwriteMode;
382 fNreplicas = other.fNreplicas;
383 fNproofWorkers = other.fNproofWorkers;
384 fNproofWorkersPerSlave = other.fNproofWorkersPerSlave;
385 fProofReset = other.fProofReset;
386 fNMCevents = other.fNMCevents;
387 fNMCjobs = other.fNMCjobs;
388 fRunNumbers = other.fRunNumbers;
389 fExecutable = other.fExecutable;
390 fExecutableCommand = other.fExecutableCommand;
391 fArguments = other.fArguments;
392 fExecutableArgs = other.fExecutableArgs;
393 fAnalysisMacro = other.fAnalysisMacro;
394 fAnalysisSource = other.fAnalysisSource;
395 fValidationScript = other.fValidationScript;
396 fAdditionalRootLibs = other.fAdditionalRootLibs;
397 fAdditionalLibs = other.fAdditionalLibs;
398 fGeneratorLibs = other.fGeneratorLibs;
399 fSplitMode = other.fSplitMode;
400 fAPIVersion = other.fAPIVersion;
401 fROOTVersion = other.fROOTVersion;
402 fAliROOTVersion = other.fAliROOTVersion;
403 fExternalPackages = other.fExternalPackages;
405 fGridWorkingDir = other.fGridWorkingDir;
406 fGridDataDir = other.fGridDataDir;
407 fDataPattern = other.fDataPattern;
408 fGridOutputDir = other.fGridOutputDir;
409 fOutputArchive = other.fOutputArchive;
410 fOutputFiles = other.fOutputFiles;
411 fInputFormat = other.fInputFormat;
412 fDatasetName = other.fDatasetName;
413 fJDLName = other.fJDLName;
414 fTerminateFiles = other.fTerminateFiles;
415 fMergeExcludes = other.fMergeExcludes;
416 fRegisterExcludes = other.fRegisterExcludes;
417 fIncludePath = other.fIncludePath;
418 fCloseSE = other.fCloseSE;
419 fFriendChainName = other.fFriendChainName;
420 fJobTag = other.fJobTag;
421 fOutputSingle = other.fOutputSingle;
422 fRunPrefix = other.fRunPrefix;
423 fProofCluster = other.fProofCluster;
424 fProofDataSet = other.fProofDataSet;
425 fFileForTestMode = other.fFileForTestMode;
426 fAliRootMode = other.fAliRootMode;
427 fProofProcessOpt = other.fProofProcessOpt;
428 fMergeDirName = other.fMergeDirName;
429 fDropToShell = other.fDropToShell;
430 fMCLoop = other.fMCLoop;
431 fGridJobIDs = other.fGridJobIDs;
432 fGridStages = other.fGridStages;
433 fFriendLibs = other.fFriendLibs;
434 fTreeName = other.fTreeName;
435 if (other.fInputFiles) {
436 fInputFiles = new TObjArray();
437 TIter next(other.fInputFiles);
439 while ((obj=next())) fInputFiles->Add(new TObjString(obj->GetName()));
440 fInputFiles->SetOwner();
442 if (other.fPackages) {
443 fPackages = new TObjArray();
444 TIter next(other.fPackages);
446 while ((obj=next())) fPackages->Add(new TObjString(obj->GetName()));
447 fPackages->SetOwner();
449 if (other.fModules) {
450 fModules = new TObjArray();
451 fModules->SetOwner();
452 TIter next(other.fModules);
453 AliAnalysisTaskCfg *mod, *crt;
454 while ((crt=(AliAnalysisTaskCfg*)next())) {
455 mod = new AliAnalysisTaskCfg(*crt);
463 //______________________________________________________________________________
464 void AliAnalysisAlien::AddAdditionalLibrary(const char *name)
466 // Add a single additional library to be loaded. Extension must be present.
468 if (!lib.Contains(".")) {
469 Error("AddAdditionalLibrary", "Extension not defined for %s", name);
472 if (fAdditionalLibs.Contains(name)) {
473 Warning("AddAdditionalLibrary", "Library %s already added.", name);
476 if (!fAdditionalLibs.IsNull()) fAdditionalLibs += " ";
477 fAdditionalLibs += lib;
480 //______________________________________________________________________________
481 void AliAnalysisAlien::AddModule(AliAnalysisTaskCfg *module)
483 // Adding a module. Checks if already existing. Becomes owned by this.
485 if (GetModule(module->GetName())) {
486 Error("AddModule", "A module having the same name %s already added", module->GetName());
490 fModules = new TObjArray();
491 fModules->SetOwner();
493 fModules->Add(module);
496 //______________________________________________________________________________
497 void AliAnalysisAlien::AddModules(TObjArray *list)
499 // Adding a list of modules. Checks if already existing. Becomes owned by this.
501 AliAnalysisTaskCfg *module;
502 while ((module = (AliAnalysisTaskCfg*)next())) AddModule(module);
505 //______________________________________________________________________________
506 Bool_t AliAnalysisAlien::CheckDependencies()
508 // Check if all dependencies are satisfied. Reorder modules if needed.
509 Int_t nmodules = GetNmodules();
511 Warning("CheckDependencies", "No modules added yet to check their dependencies");
514 AliAnalysisTaskCfg *mod = 0;
515 AliAnalysisTaskCfg *dep = 0;
518 for (i=0; i<nmodules; i++) {
519 mod = (AliAnalysisTaskCfg*) fModules->At(i);
520 Int_t ndeps = mod->GetNdeps();
522 for (j=0; j<ndeps; j++) {
523 depname = mod->GetDependency(j);
524 dep = GetModule(depname);
526 Error("CheckDependencies","Dependency %s not added for module %s",
527 depname.Data(), mod->GetName());
530 if (dep->NeedsDependency(mod->GetName())) {
531 Error("CheckDependencies","Modules %s and %s circularly depend on each other",
532 mod->GetName(), dep->GetName());
535 Int_t idep = fModules->IndexOf(dep);
536 // The dependency task must come first
538 // Remove at idep and move all objects below up one slot
539 // down to index i included.
540 fModules->RemoveAt(idep);
541 for (k=idep-1; k>=i; k--) fModules->AddAt(fModules->RemoveAt(k),k+1);
542 fModules->AddAt(dep, i++);
544 //Redo from istart if dependencies were inserted
545 if (i>istart) i=istart-1;
551 //______________________________________________________________________________
552 AliAnalysisManager *AliAnalysisAlien::CreateAnalysisManager(const char *name, const char *filename)
554 // Create the analysis manager and optionally execute the macro in filename.
555 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
557 mgr->SetMCLoop(fMCLoop);
560 mgr = new AliAnalysisManager(name);
561 mgr->SetGridHandler((AliAnalysisGrid*)this);
562 mgr->SetMCLoop(fMCLoop);
563 if (strlen(filename)) {
564 TString line = gSystem->ExpandPathName(filename);
566 gROOT->ProcessLine(line.Data());
571 //______________________________________________________________________________
572 Int_t AliAnalysisAlien::GetNmodules() const
574 // Get number of modules.
575 if (!fModules) return 0;
576 return fModules->GetEntries();
579 //______________________________________________________________________________
580 AliAnalysisTaskCfg *AliAnalysisAlien::GetModule(const char *name)
582 // Get a module by name.
583 if (!fModules) return 0;
584 return (AliAnalysisTaskCfg*)fModules->FindObject(name);
587 //______________________________________________________________________________
588 Bool_t AliAnalysisAlien::LoadModule(AliAnalysisTaskCfg *mod)
590 // Load a given module.
591 if (mod->IsLoaded()) return kTRUE;
592 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
594 Error("LoadModule", "No analysis manager created yet. Use CreateAnalysisManager first.");
597 Int_t ndeps = mod->GetNdeps();
599 for (Int_t j=0; j<ndeps; j++) {
600 depname = mod->GetDependency(j);
601 AliAnalysisTaskCfg *dep = GetModule(depname);
603 Error("LoadModule","Dependency %s not existing for module %s",
604 depname.Data(), mod->GetName());
607 if (!LoadModule(dep)) {
608 Error("LoadModule","Dependency %s for module %s could not be loaded",
609 depname.Data(), mod->GetName());
613 // Load libraries for the module
614 if (!mod->CheckLoadLibraries()) {
615 Error("LoadModule", "Cannot load all libraries for module %s", mod->GetName());
618 // Check if a custom file name was requested
619 if (strlen(mod->GetOutputFileName())) mgr->SetCommonFileName(mod->GetOutputFileName());
621 // Check if a custom terminate file name was requested
622 if (strlen(mod->GetTerminateFileName())) {
623 if (!fTerminateFiles.IsNull()) fTerminateFiles += ",";
624 fTerminateFiles += mod->GetTerminateFileName();
628 if (mod->ExecuteMacro()<0) {
629 Error("LoadModule", "Executing the macro %s with arguments: %s for module %s returned a negative value",
630 mod->GetMacroName(), mod->GetMacroArgs(), mod->GetName());
633 // Configure dependencies
634 if (mod->GetConfigMacro() && mod->ExecuteConfigMacro()<0) {
635 Error("LoadModule", "There was an error executing the deps config macro %s for module %s",
636 mod->GetConfigMacro()->GetTitle(), mod->GetName());
639 // Adjust extra libraries
640 Int_t nlibs = mod->GetNlibs();
642 for (Int_t i=0; i<nlibs; i++) {
643 lib = mod->GetLibrary(i);
644 lib = Form("lib%s.so", lib.Data());
645 if (fAdditionalLibs.Contains(lib)) continue;
646 if (!fAdditionalLibs.IsNull()) fAdditionalLibs += " ";
647 fAdditionalLibs += lib;
652 //______________________________________________________________________________
653 Bool_t AliAnalysisAlien::GenerateTrain(const char *name)
655 // Generate the full train.
656 fAdditionalLibs = "";
657 if (!LoadModules()) return kFALSE;
658 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
659 if (!mgr->InitAnalysis()) return kFALSE;
662 Int_t productionMode = fProductionMode;
664 TString macro = fAnalysisMacro;
665 TString executable = fExecutable;
666 TString validation = fValidationScript;
667 TString execCommand = fExecutableCommand;
668 SetAnalysisMacro(Form("%s.C", name));
669 SetExecutable(Form("%s.sh", name));
670 // SetExecutableCommand("aliroot -b -q ");
671 SetValidationScript(Form("%s_validation.sh", name));
673 SetProductionMode(productionMode);
674 fAnalysisMacro = macro;
675 fExecutable = executable;
676 fExecutableCommand = execCommand;
677 fValidationScript = validation;
681 //______________________________________________________________________________
682 Bool_t AliAnalysisAlien::GenerateTest(const char *name, const char *modname)
684 // Generate test macros for a single module or for the full train.
685 fAdditionalLibs = "";
686 if (strlen(modname)) {
687 if (!CheckDependencies()) return kFALSE;
688 AliAnalysisTaskCfg *mod = GetModule(modname);
690 Error("GenerateTest", "cannot generate test for inexistent module %s", modname);
693 if (!LoadModule(mod)) return kFALSE;
694 if (!LoadFriendLibs()) return kFALSE;
695 } else if (!LoadModules()) return kFALSE;
696 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
697 if (!mgr->InitAnalysis()) return kFALSE;
701 Int_t productionMode = fProductionMode;
703 TString macro = fAnalysisMacro;
704 TString executable = fExecutable;
705 TString validation = fValidationScript;
706 TString execCommand = fExecutableCommand;
707 SetAnalysisMacro(Form("%s.C", name));
708 SetExecutable(Form("%s.sh", name));
709 fOutputFiles = GetListOfFiles("outaod");
710 // Add extra files registered to the analysis manager
711 TString extra = GetListOfFiles("ext");
712 if (!extra.IsNull()) {
713 extra.ReplaceAll(".root", "*.root");
714 if (!fOutputFiles.IsNull()) fOutputFiles += ",";
715 fOutputFiles += extra;
717 // SetExecutableCommand("aliroot -b -q ");
718 SetValidationScript(Form("%s_validation.sh", name));
720 WriteAnalysisMacro();
722 WriteValidationScript();
724 WriteMergeExecutable();
725 WriteValidationScript(kTRUE);
726 SetLocalTest(kFALSE);
727 SetProductionMode(productionMode);
728 fAnalysisMacro = macro;
729 fExecutable = executable;
730 fExecutableCommand = execCommand;
731 fValidationScript = validation;
735 //______________________________________________________________________________
736 Bool_t AliAnalysisAlien::LoadModules()
738 // Load all modules by executing the AddTask macros. Checks first the dependencies.
739 fAdditionalLibs = "";
740 Int_t nmodules = GetNmodules();
742 Warning("LoadModules", "No module to be loaded");
745 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
747 Error("LoadModules", "No analysis manager created yet. Use CreateAnalysisManager first.");
750 if (!CheckDependencies()) return kFALSE;
751 nmodules = GetNmodules();
752 AliAnalysisTaskCfg *mod;
753 for (Int_t imod=0; imod<nmodules; imod++) {
754 mod = (AliAnalysisTaskCfg*)fModules->At(imod);
755 if (!LoadModule(mod)) return kFALSE;
757 // Load additional friend libraries
758 return LoadFriendLibs();
761 //______________________________________________________________________________
762 Bool_t AliAnalysisAlien::LoadFriendLibs() const
764 // Load libraries required for reading friends.
765 if (fFriendLibs.Length()) {
768 if (fFriendLibs.Contains(",")) list = fFriendLibs.Tokenize(",");
769 else list = fFriendLibs.Tokenize(" ");
770 for (Int_t ilib=0; ilib<list->GetEntriesFast(); ilib++) {
771 lib = list->At(ilib)->GetName();
772 lib.ReplaceAll(".so","");
773 lib.ReplaceAll(" ","");
774 if (lib.BeginsWith("lib")) lib.Remove(0, 3);
776 Int_t loaded = strlen(gSystem->GetLibraries(lib,"",kFALSE));
777 if (!loaded) loaded = gSystem->Load(lib);
779 Error("LoadModules", "Cannot load library for friends %s", lib.Data());
788 //______________________________________________________________________________
789 void AliAnalysisAlien::SetRunPrefix(const char *prefix)
791 // Set the run number format. Can be a prefix or a format like "%09d"
793 if (!fRunPrefix.Contains("%")) fRunPrefix += "%d";
796 //______________________________________________________________________________
797 void AliAnalysisAlien::AddIncludePath(const char *path)
799 // Add include path in the remote analysis macro.
801 if (p.Contains("-I")) fIncludePath += Form("%s ", path);
802 else fIncludePath += Form("-I%s ", path);
805 //______________________________________________________________________________
806 void AliAnalysisAlien::AddRunNumber(Int_t run)
808 // Add a run number to the list of runs to be processed.
809 if (fRunNumbers.Length()) fRunNumbers += " ";
810 fRunNumbers += Form(fRunPrefix.Data(), run);
813 //______________________________________________________________________________
814 void AliAnalysisAlien::AddRunList(const char* runList)
816 // Add several runs into the list of runs; they are expected to be separated by a blank character.
817 TString sList = runList;
818 TObjArray *list = sList.Tokenize(" ");
819 Int_t n = list->GetEntries();
820 for (Int_t i = 0; i < n; i++) {
821 TObjString *os = (TObjString*)list->At(i);
822 AddRunNumber(os->GetString().Atoi());
827 //______________________________________________________________________________
828 void AliAnalysisAlien::AddRunNumber(const char* run)
830 // Add a run number to the list of runs to be processed.
833 TObjArray *arr = runs.Tokenize(" ");
836 prefix.Append(fRunPrefix, fRunPrefix.Index("%d"));
837 while ((os=(TObjString*)next())){
838 if (fRunNumbers.Length()) fRunNumbers += " ";
839 fRunNumbers += Form("%s%s", prefix.Data(), os->GetString().Data());
844 //______________________________________________________________________________
845 void AliAnalysisAlien::AddDataFile(const char *lfn)
847 // Adds a data file to the input to be analysed. The file should be a valid LFN
848 // or point to an existing file in the alien workdir.
849 if (!fInputFiles) fInputFiles = new TObjArray();
850 fInputFiles->Add(new TObjString(lfn));
853 //______________________________________________________________________________
854 void AliAnalysisAlien::AddExternalPackage(const char *package)
856 // Adds external packages w.r.t to the default ones (root,aliroot and gapi)
857 if (fExternalPackages) fExternalPackages += " ";
858 fExternalPackages += package;
861 //______________________________________________________________________________
862 Bool_t AliAnalysisAlien::Connect()
864 // Try to connect to AliEn. User needs a valid token and /tmp/gclient_env_$UID sourced.
865 if (gGrid && gGrid->IsConnected()) return kTRUE;
866 if (fProductionMode) return kTRUE;
868 Info("Connect", "Trying to connect to AliEn ...");
869 TGrid::Connect("alien://");
871 if (!gGrid || !gGrid->IsConnected()) {
872 Error("Connect", "Did not managed to connect to AliEn. Make sure you have a valid token.");
875 fUser = gGrid->GetUser();
876 Info("Connect", "\n##### Connected to AliEn as user %s. Setting analysis user to <%s>", fUser.Data(), fUser.Data());
880 //______________________________________________________________________________
881 void AliAnalysisAlien::CdWork()
883 // Check validity of alien workspace. Create directory if possible.
885 Error("CdWork", "Alien connection required");
888 TString homedir = gGrid->GetHomeDirectory();
889 TString workdir = homedir + fGridWorkingDir;
890 if (DirectoryExists(workdir)) {
894 // Work directory not existing - create it
896 if (gGrid->Mkdir(workdir, "-p")) {
897 gGrid->Cd(fGridWorkingDir);
898 Info("CdWork", "\n##### Created alien working directory %s", fGridWorkingDir.Data());
900 Warning("CdWork", "Working directory %s cannot be created.\n Using %s instead.",
901 workdir.Data(), homedir.Data());
902 fGridWorkingDir = "";
906 //______________________________________________________________________________
907 Bool_t AliAnalysisAlien::CheckFileCopy(const char *alienpath)
909 // Check if file copying is possible.
910 if (fProductionMode) return kTRUE;
911 TString salienpath(alienpath);
912 if (salienpath.Contains(" ")) {
913 Error("CheckFileCopy", "path: <%s> contains blancs - FIX IT !",alienpath);
917 Error("CheckFileCopy", "Not connected to AliEn. File copying cannot be tested.");
920 Info("CheckFileCopy", "Checking possibility to copy files to your AliEn home directory... \
921 \n +++ NOTE: You can disable this via: plugin->SetCheckCopy(kFALSE);");
922 // Check if alien_CLOSE_SE is defined
923 TString closeSE = gSystem->Getenv("alien_CLOSE_SE");
924 if (!closeSE.IsNull()) {
925 Info("CheckFileCopy", "Your current close storage is pointing to: \
926 \n alien_CLOSE_SE = \"%s\"", closeSE.Data());
928 Warning("CheckFileCopy", "Your current close storage is empty ! Depending on your location, file copying may fail.");
930 // Check if grid directory exists.
931 if (!DirectoryExists(alienpath)) {
932 Error("CheckFileCopy", "Alien path %s does not seem to exist", alienpath);
935 TString stest = "plugin_test_copy";
936 TFile f(stest, "RECREATE");
937 // User may not have write permissions to current directory
939 Error("CheckFileCopy", "Cannot create local test file. Do you have write access to current directory: <%s> ?",
940 gSystem->WorkingDirectory());
944 if (FileExists(Form("alien://%s/%s",alienpath, stest.Data()))) gGrid->Rm(Form("alien://%s/%s",alienpath, stest.Data()));
945 if (!TFile::Cp(stest.Data(), Form("alien://%s/%s",alienpath, stest.Data()))) {
946 Error("CheckFileCopy", "Cannot copy files to Alien destination: <%s> This may be temporary, or: \
947 \n# 1. Make sure you have write permissions there. If this is the case: \
948 \n# 2. Check the storage availability at: http://alimonitor.cern.ch/stats?page=SE/table \
949 \n# Do: export alien_CLOSE_SE=\"working_disk_SE\" \
950 \n# To make this permanent put in in your .bashrc (in .alienshrc is not enough) \
951 \n# Redo token: rm /tmp/x509up_u$UID then: alien-token-init <username>", alienpath);
952 gSystem->Unlink(stest.Data());
955 gSystem->Unlink(stest.Data());
956 gGrid->Rm(Form("%s/%s",alienpath,stest.Data()));
957 Info("CheckFileCopy", "### ...SUCCESS ###");
961 //______________________________________________________________________________
962 Bool_t AliAnalysisAlien::CheckInputData()
964 // Check validity of input data. If necessary, create xml files.
965 if (fProductionMode) return kTRUE;
966 if (!fInputFiles && !fRunNumbers.Length() && !fRunRange[0]) {
967 if (!fGridDataDir.Length()) {
968 Error("CkeckInputData", "AliEn path to base data directory must be set.\n = Use: SetGridDataDir()");
972 Error("CheckInputData", "Merging via jdl works only with run numbers, run range or provided xml");
975 Info("CheckInputData", "Analysis will make a single xml for base data directory %s",fGridDataDir.Data());
976 if (fDataPattern.Contains("tag") && TestBit(AliAnalysisGrid::kTest))
977 TObject::SetBit(AliAnalysisGrid::kUseTags, kTRUE); // ADDED (fix problem in determining the tag usage in test mode)
980 // Process declared files
981 Bool_t isCollection = kFALSE;
982 Bool_t isXml = kFALSE;
983 Bool_t useTags = kFALSE;
984 Bool_t checked = kFALSE;
985 if (!TestBit(AliAnalysisGrid::kTest)) CdWork();
987 TString workdir = gGrid->GetHomeDirectory();
988 workdir += fGridWorkingDir;
991 TIter next(fInputFiles);
992 while ((objstr=(TObjString*)next())) {
995 file += objstr->GetString();
996 // Store full lfn path
997 if (FileExists(file)) objstr->SetString(file);
999 file = objstr->GetName();
1000 if (!FileExists(objstr->GetName())) {
1001 Error("CheckInputData", "Data file %s not found or not in your working dir: %s",
1002 objstr->GetName(), workdir.Data());
1006 Bool_t iscoll, isxml, usetags;
1007 CheckDataType(file, iscoll, isxml, usetags);
1010 isCollection = iscoll;
1013 TObject::SetBit(AliAnalysisGrid::kUseTags, useTags);
1015 if ((iscoll != isCollection) || (isxml != isXml) || (usetags != useTags)) {
1016 Error("CheckInputData", "Some conflict was found in the types of inputs");
1022 // Process requested run numbers
1023 if (!fRunNumbers.Length() && !fRunRange[0]) return kTRUE;
1024 // Check validity of alien data directory
1025 if (!fGridDataDir.Length()) {
1026 Error("CkeckInputData", "AliEn path to base data directory must be set.\n = Use: SetGridDataDir()");
1029 if (!DirectoryExists(fGridDataDir)) {
1030 Error("CheckInputData", "Data directory %s not existing.", fGridDataDir.Data());
1034 Error("CheckInputData", "You are using raw AliEn collections as input. Cannot process run numbers.");
1038 if (checked && !isXml) {
1039 Error("CheckInputData", "Cannot mix processing of full runs with non-xml files");
1042 // Check validity of run number(s)
1047 TString schunk, schunk2;
1051 useTags = fDataPattern.Contains("tag");
1052 TObject::SetBit(AliAnalysisGrid::kUseTags, useTags);
1054 if (useTags != fDataPattern.Contains("tag")) {
1055 Error("CheckInputData", "Cannot mix input files using/not using tags");
1058 if (fRunNumbers.Length()) {
1059 Info("CheckDataType", "Using supplied run numbers (run ranges are ignored)");
1060 arr = fRunNumbers.Tokenize(" ");
1062 while ((os=(TObjString*)next())) {
1063 path = Form("%s/%s ", fGridDataDir.Data(), os->GetString().Data());
1064 if (!DirectoryExists(path)) {
1065 Warning("CheckInputData", "Run number %s not found in path: <%s>", os->GetString().Data(), path.Data());
1068 path = Form("%s/%s.xml", workdir.Data(),os->GetString().Data());
1069 TString msg = "\n##### file: ";
1071 msg += " type: xml_collection;";
1072 if (useTags) msg += " using_tags: Yes";
1073 else msg += " using_tags: No";
1074 Info("CheckDataType", "%s", msg.Data());
1075 if (fNrunsPerMaster<2) {
1076 AddDataFile(Form("%s.xml", os->GetString().Data()));
1079 if (((nruns-1)%fNrunsPerMaster) == 0) {
1080 schunk = os->GetString();
1082 if ((nruns%fNrunsPerMaster)!=0 && os!=arr->Last()) continue;
1083 schunk += Form("_%s.xml", os->GetString().Data());
1084 AddDataFile(schunk);
1089 Info("CheckDataType", "Using run range [%d, %d]", fRunRange[0], fRunRange[1]);
1090 for (Int_t irun=fRunRange[0]; irun<=fRunRange[1]; irun++) {
1091 format = Form("%%s/%s ", fRunPrefix.Data());
1092 path = Form(format.Data(), fGridDataDir.Data(), irun);
1093 if (!DirectoryExists(path)) {
1096 format = Form("%%s/%s.xml", fRunPrefix.Data());
1097 path = Form(format.Data(), workdir.Data(),irun);
1098 TString msg = "\n##### file: ";
1100 msg += " type: xml_collection;";
1101 if (useTags) msg += " using_tags: Yes";
1102 else msg += " using_tags: No";
1103 Info("CheckDataType", "%s", msg.Data());
1104 if (fNrunsPerMaster<2) {
1105 format = Form("%s.xml", fRunPrefix.Data());
1106 AddDataFile(Form(format.Data(),irun));
1109 if (((nruns-1)%fNrunsPerMaster) == 0) {
1110 schunk = Form(fRunPrefix.Data(),irun);
1112 format = Form("_%s.xml", fRunPrefix.Data());
1113 schunk2 = Form(format.Data(), irun);
1114 if ((nruns%fNrunsPerMaster)!=0 && irun != fRunRange[1]) continue;
1116 AddDataFile(schunk);
1121 AddDataFile(schunk);
1127 //______________________________________________________________________________
1128 Int_t AliAnalysisAlien::CopyLocalDataset(const char *griddir, const char *pattern, Int_t nfiles, const char *output, const char *archivefile, const char *outputdir)
1130 // Copy data from the given grid directory according a pattern and make a local
1132 // archivefile (optional) results in that the archive containing the file <pattern> is copied. archivefile can contain a list of files (semicolon-separated) which are all copied
1134 Error("CopyLocalDataset", "Cannot copy local dataset with no grid connection");
1137 if (!DirectoryExists(griddir)) {
1138 Error("CopyLocalDataset", "Data directory %s not existing.", griddir);
1141 TString command = Form("find -z -l %d %s %s", nfiles, griddir, pattern);
1142 printf("Running command: %s\n", command.Data());
1143 TGridResult *res = gGrid->Command(command);
1144 Int_t nfound = res->GetEntries();
1146 Error("CopyLocalDataset", "No file found in <%s> having pattern <%s>", griddir, pattern);
1149 printf("... found %d files. Copying locally ...\n", nfound);
1152 TObjArray* additionalArchives = 0;
1153 if (strlen(archivefile) > 0 && TString(archivefile).Contains(";")) {
1154 additionalArchives = TString(archivefile).Tokenize(";");
1155 archivefile = additionalArchives->At(0)->GetName();
1156 additionalArchives->RemoveAt(0);
1157 additionalArchives->Compress();
1160 // Copy files locally
1162 out.open(output, ios::out);
1164 TString turl, dirname, filename, temp;
1165 TString cdir = gSystem->WorkingDirectory();
1166 gSystem->MakeDirectory(outputdir);
1167 gSystem->ChangeDirectory(outputdir);
1169 for (Int_t i=0; i<nfound; i++) {
1170 map = (TMap*)res->At(i);
1171 turl = map->GetValue("turl")->GetName();
1172 filename = gSystem->BaseName(turl.Data());
1173 dirname = gSystem->DirName(turl.Data());
1174 dirname = gSystem->BaseName(dirname.Data());
1175 gSystem->MakeDirectory(dirname);
1177 TString source(turl);
1178 TString targetFileName(filename);
1180 if (strlen(archivefile) > 0) {
1181 // TODO here the archive in which the file resides should be determined
1182 // however whereis returns only a guid, and guid2lfn does not work
1183 // Therefore we use the one provided as argument for now
1184 source = Form("%s/%s", gSystem->DirName(source.Data()), archivefile);
1185 targetFileName = archivefile;
1187 if (TFile::Cp(source, Form("file:./%s/%s", dirname.Data(), targetFileName.Data()))) {
1188 Bool_t success = kTRUE;
1189 if (additionalArchives) {
1190 for (Int_t j=0; j<additionalArchives->GetEntriesFast(); j++) {
1192 target.Form("./%s/%s", dirname.Data(), additionalArchives->At(j)->GetName());
1193 gSystem->MakeDirectory(gSystem->DirName(target));
1194 success &= TFile::Cp(Form("%s/%s", gSystem->DirName(source.Data()), additionalArchives->At(j)->GetName()), Form("file:%s", target.Data()));
1199 if (strlen(archivefile) > 0) targetFileName = Form("%s#%s", targetFileName.Data(), gSystem->BaseName(turl.Data()));
1200 out << cdir << Form("/%s/%s/%s", outputdir, dirname.Data(), targetFileName.Data()) << endl;
1205 gSystem->ChangeDirectory(cdir);
1207 delete additionalArchives;
1211 //______________________________________________________________________________
1212 Bool_t AliAnalysisAlien::CreateDataset(const char *pattern)
1214 // Create dataset for the grid data directory + run number.
1215 const Int_t gMaxEntries = 15000;
1216 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline)) return kTRUE;
1218 Error("CreateDataset", "Cannot create dataset with no grid connection");
1223 if (!TestBit(AliAnalysisGrid::kTest)) CdWork();
1224 TString workdir = gGrid->GetHomeDirectory();
1225 workdir += fGridWorkingDir;
1227 // Compose the 'find' command arguments
1230 TString delimiter = pattern;
1232 if (delimiter.Contains(" ")) delimiter = "";
1233 else delimiter = " ";
1234 TString options = "-x collection ";
1235 if (TestBit(AliAnalysisGrid::kTest)) options += Form("-l %d ", fNtestFiles);
1236 else options += Form("-l %d ", gMaxEntries); // Protection for the find command
1237 TString conditions = "";
1244 TString schunk, schunk2;
1245 TGridCollection *cbase=0, *cadd=0;
1246 if (!fRunNumbers.Length() && !fRunRange[0]) {
1247 if (fInputFiles && fInputFiles->GetEntries()) return kTRUE;
1248 // Make a single data collection from data directory.
1249 path = fGridDataDir;
1250 if (!DirectoryExists(path)) {
1251 Error("CreateDataset", "Path to data directory %s not valid",fGridDataDir.Data());
1255 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
1256 else file = Form("%s.xml", gSystem->BaseName(path));
1260 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
1262 command += Form("%s -o %d ",options.Data(), nstart);
1264 command += delimiter;
1266 command += conditions;
1267 printf("command: %s\n", command.Data());
1268 TGridResult *res = gGrid->Command(command);
1269 if (res) delete res;
1270 // Write standard output to file
1271 gROOT->ProcessLine(Form("gGrid->Stdout(); > __tmp%d__%s", stage, file.Data()));
1272 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
1273 Bool_t nullFile = kFALSE;
1275 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
1277 nullFile = (gSystem->Exec(Form("grep -c /event __tmp%d__%s 2>/dev/null > __tmp__",stage,file.Data()))==0)?kFALSE:kTRUE;
1279 Error("CreateDataset","Dataset %s produced by the previous find command is empty !", file.Data());
1280 gSystem->Exec("rm -f __tmp*");
1288 gSystem->Exec("rm -f __tmp__");
1289 ncount = line.Atoi();
1292 if (ncount == gMaxEntries) {
1293 Info("CreateDataset", "Dataset %s has more than 15K entries. Trying to merge...", file.Data());
1294 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1295 if (!cbase) cbase = cadd;
1303 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1304 printf("... please wait - TAlienCollection::Add() scales badly...\n");
1307 cbase->ExportXML(Form("file://%s", file.Data()),kFALSE,kFALSE, file, "Merged entries for a run");
1308 delete cbase; cbase = 0;
1310 TFile::Cp(Form("__tmp%d__%s",stage, file.Data()), file.Data());
1312 gSystem->Exec("rm -f __tmp*");
1313 Info("CreateDataset", "Created dataset %s with %d files", file.Data(), nstart+ncount);
1317 Bool_t fileExists = FileExists(file);
1318 if (!TestBit(AliAnalysisGrid::kTest) && (!fileExists || fOverwriteMode)) {
1319 // Copy xml file to alien space
1320 if (fileExists) gGrid->Rm(file);
1321 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
1322 if (!FileExists(file)) {
1323 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
1326 // Update list of files to be processed.
1328 AddDataFile(Form("%s/%s", workdir.Data(), file.Data()));
1332 Bool_t nullResult = kTRUE;
1333 if (fRunNumbers.Length()) {
1334 TObjArray *arr = fRunNumbers.Tokenize(" ");
1337 while ((os=(TObjString*)next())) {
1340 path = Form("%s/%s", fGridDataDir.Data(), os->GetString().Data());
1341 if (!DirectoryExists(path)) continue;
1343 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
1344 else file = Form("%s.xml", os->GetString().Data());
1345 // If local collection file does not exist, create it via 'find' command.
1349 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
1351 command += Form("%s -o %d ",options.Data(), nstart);
1353 command += delimiter;
1355 command += conditions;
1356 TGridResult *res = gGrid->Command(command);
1357 if (res) delete res;
1358 // Write standard output to file
1359 gROOT->ProcessLine(Form("gGrid->Stdout(); > __tmp%d__%s", stage,file.Data()));
1360 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
1361 Bool_t nullFile = kFALSE;
1363 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
1365 nullFile = (gSystem->Exec(Form("grep -c /event __tmp%d__%s 2>/dev/null > __tmp__",stage,file.Data()))==0)?kFALSE:kTRUE;
1367 Warning("CreateDataset","Dataset %s produced by: <%s> is empty !", file.Data(), command.Data());
1368 gSystem->Exec("rm -f __tmp*");
1369 fRunNumbers.ReplaceAll(os->GetString().Data(), "");
1377 gSystem->Exec("rm -f __tmp__");
1378 ncount = line.Atoi();
1380 nullResult = kFALSE;
1382 if (ncount == gMaxEntries) {
1383 Info("CreateDataset", "Dataset %s has more than 15K entries. Trying to merge...", file.Data());
1384 if (fNrunsPerMaster > 1) {
1385 Error("CreateDataset", "File %s has more than %d entries. Please set the number of runs per master to 1 !",
1386 file.Data(),gMaxEntries);
1389 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1390 if (!cbase) cbase = cadd;
1397 if (cbase && fNrunsPerMaster<2) {
1398 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1399 printf("... please wait - TAlienCollection::Add() scales badly...\n");
1402 cbase->ExportXML(Form("file://%s", file.Data()),kFALSE,kFALSE, file, "Merged entries for a run");
1403 delete cbase; cbase = 0;
1405 TFile::Cp(Form("__tmp%d__%s",stage, file.Data()), file.Data());
1407 gSystem->Exec("rm -f __tmp*");
1408 Info("CreateDataset", "Created dataset %s with %d files", file.Data(), nstart+ncount);
1412 if (TestBit(AliAnalysisGrid::kTest)) break;
1413 // Check if there is one run per master job.
1414 if (fNrunsPerMaster<2) {
1415 if (FileExists(file)) {
1416 if (fOverwriteMode) gGrid->Rm(file);
1418 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
1422 // Copy xml file to alien space
1423 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
1424 if (!FileExists(file)) {
1425 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
1431 if (((nruns-1)%fNrunsPerMaster) == 0) {
1432 schunk = os->GetString();
1433 cbase = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
1435 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
1436 printf(" Merging collection <%s> into masterjob input...\n", file.Data());
1440 if ((nruns%fNrunsPerMaster)!=0 && os!=arr->Last()) {
1443 schunk += Form("_%s.xml", os->GetString().Data());
1444 if (FileExists(schunk)) {
1445 if (fOverwriteMode) gGrid->Rm(file);
1447 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", schunk.Data());
1451 printf("Exporting merged collection <%s> and copying to AliEn\n", schunk.Data());
1452 cbase->ExportXML(Form("file://%s", schunk.Data()),kFALSE,kFALSE, schunk, "Merged runs");
1453 TFile::Cp(Form("file:%s",schunk.Data()), Form("alien://%s/%s",workdir.Data(), schunk.Data()));
1454 if (!FileExists(schunk)) {
1455 Error("CreateDataset", "Copy command did NOT succeed for %s", schunk.Data());
1463 Error("CreateDataset", "No valid dataset corresponding to the query!");
1467 // Process a full run range.
1468 for (Int_t irun=fRunRange[0]; irun<=fRunRange[1]; irun++) {
1469 format = Form("%%s/%s", fRunPrefix.Data());
1472 path = Form(format.Data(), fGridDataDir.Data(), irun);
1473 if (!DirectoryExists(path)) continue;
1475 format = Form("%s.xml", fRunPrefix.Data());
1476 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
1477 else file = Form(format.Data(), irun);
1478 if (FileExists(file) && fNrunsPerMaster<2 && !TestBit(AliAnalysisGrid::kTest)) {
1479 if (fOverwriteMode) gGrid->Rm(file);
1481 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
1485 // If local collection file does not exist, create it via 'find' command.
1489 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
1491 command += Form("%s -o %d ",options.Data(), nstart);
1493 command += delimiter;
1495 command += conditions;
1496 TGridResult *res = gGrid->Command(command);
1497 if (res) delete res;
1498 // Write standard output to file
1499 gROOT->ProcessLine(Form("gGrid->Stdout(); > __tmp%d__%s", stage,file.Data()));
1500 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
1501 Bool_t nullFile = kFALSE;
1503 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
1505 nullFile = (gSystem->Exec(Form("grep -c /event __tmp%d__%s 2>/dev/null > __tmp__",stage,file.Data()))==0)?kFALSE:kTRUE;
1507 Warning("CreateDataset","Dataset %s produced by: <%s> is empty !", file.Data(), command.Data());
1508 gSystem->Exec("rm -f __tmp*");
1516 gSystem->Exec("rm -f __tmp__");
1517 ncount = line.Atoi();
1519 nullResult = kFALSE;
1521 if (ncount == gMaxEntries) {
1522 Info("CreateDataset", "Dataset %s has more than 15K entries. Trying to merge...", file.Data());
1523 if (fNrunsPerMaster > 1) {
1524 Error("CreateDataset", "File %s has more than %d entries. Please set the number of runs per master to 1 !",
1525 file.Data(),gMaxEntries);
1528 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1529 if (!cbase) cbase = cadd;
1536 if (cbase && fNrunsPerMaster<2) {
1537 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1538 printf("... please wait - TAlienCollection::Add() scales badly...\n");
1541 cbase->ExportXML(Form("file://%s", file.Data()),kFALSE,kFALSE, file, "Merged entries for a run");
1542 delete cbase; cbase = 0;
1544 TFile::Cp(Form("__tmp%d__%s",stage, file.Data()), file.Data());
1546 Info("CreateDataset", "Created dataset %s with %d files", file.Data(), nstart+ncount);
1550 if (TestBit(AliAnalysisGrid::kTest)) break;
1551 // Check if there is one run per master job.
1552 if (fNrunsPerMaster<2) {
1553 if (FileExists(file)) {
1554 if (fOverwriteMode) gGrid->Rm(file);
1556 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
1560 // Copy xml file to alien space
1561 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
1562 if (!FileExists(file)) {
1563 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
1568 // Check if the collection for the chunk exist locally.
1569 Int_t nchunk = (nruns-1)/fNrunsPerMaster;
1570 if (FileExists(fInputFiles->At(nchunk)->GetName())) {
1571 if (fOverwriteMode) gGrid->Rm(fInputFiles->At(nchunk)->GetName());
1574 printf(" Merging collection <%s> into %d runs chunk...\n",file.Data(),fNrunsPerMaster);
1575 if (((nruns-1)%fNrunsPerMaster) == 0) {
1576 schunk = Form(fRunPrefix.Data(), irun);
1577 cbase = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
1579 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
1583 format = Form("%%s_%s.xml", fRunPrefix.Data());
1584 schunk2 = Form(format.Data(), schunk.Data(), irun);
1585 if ((nruns%fNrunsPerMaster)!=0 && irun!=fRunRange[1] && schunk2 != fInputFiles->Last()->GetName()) {
1589 if (FileExists(schunk)) {
1590 if (fOverwriteMode) gGrid->Rm(schunk);
1592 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", schunk.Data());
1596 printf("Exporting merged collection <%s> and copying to AliEn.\n", schunk.Data());
1597 cbase->ExportXML(Form("file://%s", schunk.Data()),kFALSE,kFALSE, schunk, "Merged runs");
1598 if (FileExists(schunk)) {
1599 if (fOverwriteMode) gGrid->Rm(schunk);
1601 Info("CreateDataset", "\n##### Dataset %s exist. Skipping copy...", schunk.Data());
1605 TFile::Cp(Form("file:%s",schunk.Data()), Form("alien://%s/%s",workdir.Data(), schunk.Data()));
1606 if (!FileExists(schunk)) {
1607 Error("CreateDataset", "Copy command did NOT succeed for %s", schunk.Data());
1613 Error("CreateDataset", "No valid dataset corresponding to the query!");
1620 //______________________________________________________________________________
1621 Bool_t AliAnalysisAlien::CreateJDL()
1623 // Generate a JDL file according to current settings. The name of the file is
1624 // specified by fJDLName.
1625 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
1626 Bool_t error = kFALSE;
1628 Bool_t copy = kTRUE;
1629 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
1630 Bool_t generate = kTRUE;
1631 if (TestBit(AliAnalysisGrid::kTest) || TestBit(AliAnalysisGrid::kSubmit)) generate = kFALSE;
1633 Error("CreateJDL", "Alien connection required");
1636 // Check validity of alien workspace
1638 if (!fProductionMode && !fGridWorkingDir.BeginsWith("/alice")) workdir = gGrid->GetHomeDirectory();
1639 if (!fProductionMode && !TestBit(AliAnalysisGrid::kTest)) CdWork();
1640 workdir += fGridWorkingDir;
1644 Error("CreateJDL()", "Define some input files for your analysis.");
1647 // Compose list of input files
1648 // Check if output files were defined
1649 if (!fOutputFiles.Length()) {
1650 Error("CreateJDL", "You must define at least one output file");
1653 // Check if an output directory was defined and valid
1654 if (!fGridOutputDir.Length()) {
1655 Error("CreateJDL", "You must define AliEn output directory");
1658 if (!fProductionMode) {
1659 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s", workdir.Data(), fGridOutputDir.Data());
1660 if (!DirectoryExists(fGridOutputDir)) {
1661 if (gGrid->Mkdir(fGridOutputDir,"-p")) {
1662 Info("CreateJDL", "\n##### Created alien output directory %s", fGridOutputDir.Data());
1664 Error("CreateJDL", "Could not create alien output directory %s", fGridOutputDir.Data());
1668 Warning("CreateJDL", "#### Output directory %s exists! If this contains old data, jobs will fail with ERROR_SV !!! ###", fGridOutputDir.Data());
1673 // Exit if any error up to now
1674 if (error) return kFALSE;
1676 if (!fUser.IsNull()) {
1677 fGridJDL->SetValue("User", Form("\"%s\"", fUser.Data()));
1678 fMergingJDL->SetValue("User", Form("\"%s\"", fUser.Data()));
1680 fGridJDL->SetExecutable(fExecutable, "This is the startup script");
1681 TString mergeExec = fExecutable;
1682 mergeExec.ReplaceAll(".sh", "_merge.sh");
1683 fMergingJDL->SetExecutable(mergeExec, "This is the startup script");
1684 mergeExec.ReplaceAll(".sh", ".C");
1685 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),mergeExec.Data()), "List of input files to be uploaded to workers");
1686 if (!fArguments.IsNull())
1687 fGridJDL->SetArguments(fArguments, "Arguments for the executable command");
1688 if (IsOneStageMerging()) fMergingJDL->SetArguments(fGridOutputDir);
1690 if (fProductionMode) fMergingJDL->SetArguments("wn.xml $4"); // xml, stage
1691 else fMergingJDL->SetArguments("wn.xml $2"); // xml, stage
1694 fGridJDL->SetValue("TTL", Form("\"%d\"",fTTL));
1695 fGridJDL->SetDescription("TTL", Form("Time after which the job is killed (%d min.)", fTTL/60));
1696 fMergingJDL->SetValue("TTL", Form("\"%d\"",fTTL));
1697 fMergingJDL->SetDescription("TTL", Form("Time after which the job is killed (%d min.)", fTTL/60));
1699 if (fMaxInitFailed > 0) {
1700 fGridJDL->SetValue("MaxInitFailed", Form("\"%d\"",fMaxInitFailed));
1701 fGridJDL->SetDescription("MaxInitFailed", "Maximum number of first failing jobs to abort the master job");
1703 if (fSplitMaxInputFileNumber > 0) {
1704 fGridJDL->SetValue("SplitMaxInputFileNumber", Form("\"%d\"", fSplitMaxInputFileNumber));
1705 fGridJDL->SetDescription("SplitMaxInputFileNumber", "Maximum number of input files to be processed per subjob");
1707 if (!IsOneStageMerging()) {
1708 fMergingJDL->SetValue("SplitMaxInputFileNumber", Form("\"%d\"",fMaxMergeFiles));
1709 fMergingJDL->SetDescription("SplitMaxInputFileNumber", "Maximum number of input files to be merged in one go");
1711 if (fSplitMode.Length()) {
1712 fGridJDL->SetValue("Split", Form("\"%s\"", fSplitMode.Data()));
1713 fGridJDL->SetDescription("Split", "We split per SE or file");
1715 fMergingJDL->SetValue("Split", "\"se\"");
1716 fMergingJDL->SetDescription("Split", "We split per SE for merging in stages");
1717 if (!fAliROOTVersion.IsNull()) {
1718 fGridJDL->AddToPackages("AliRoot", fAliROOTVersion,"VO_ALICE", "List of requested packages");
1719 fMergingJDL->AddToPackages("AliRoot", fAliROOTVersion, "VO_ALICE", "List of requested packages");
1721 if (!fROOTVersion.IsNull()) {
1722 fGridJDL->AddToPackages("ROOT", fROOTVersion);
1723 fMergingJDL->AddToPackages("ROOT", fROOTVersion);
1725 if (!fAPIVersion.IsNull()) {
1726 fGridJDL->AddToPackages("APISCONFIG", fAPIVersion);
1727 fMergingJDL->AddToPackages("APISCONFIG", fAPIVersion);
1729 if (!fExternalPackages.IsNull()) {
1730 arr = fExternalPackages.Tokenize(" ");
1732 while ((os=(TObjString*)next())) {
1733 TString pkgname = os->GetString();
1734 Int_t index = pkgname.Index("::");
1735 TString pkgversion = pkgname(index+2, pkgname.Length());
1736 pkgname.Remove(index);
1737 fGridJDL->AddToPackages(pkgname, pkgversion);
1738 fMergingJDL->AddToPackages(pkgname, pkgversion);
1742 fGridJDL->SetInputDataListFormat(fInputFormat, "Format of input data");
1743 fGridJDL->SetInputDataList("wn.xml", "Collection name to be processed on each worker node");
1744 fMergingJDL->SetInputDataListFormat(fInputFormat, "Format of input data");
1745 fMergingJDL->SetInputDataList("wn.xml", "Collection name to be processed on each worker node");
1746 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), fAnalysisMacro.Data()), "List of input files to be uploaded to workers");
1747 TString analysisFile = fExecutable;
1748 analysisFile.ReplaceAll(".sh", ".root");
1749 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),analysisFile.Data()));
1750 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),analysisFile.Data()));
1751 if (fAdditionalLibs.Length()) {
1752 arr = fAdditionalLibs.Tokenize(" ");
1754 while ((os=(TObjString*)next())) {
1755 if (os->GetString().Contains(".so")) continue;
1756 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), os->GetString().Data()));
1757 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), os->GetString().Data()));
1762 TIter next(fPackages);
1764 while ((obj=next())) {
1765 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), obj->GetName()));
1766 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), obj->GetName()));
1769 const char *comment = "List of output files and archives";
1770 if (fOutputArchive.Length()) {
1771 TString outputArchive = fOutputArchive;
1772 if (!fRegisterExcludes.IsNull()) {
1773 arr = fRegisterExcludes.Tokenize(" ");
1775 while ((os=(TObjString*)next1())) {
1776 outputArchive.ReplaceAll(Form("%s,",os->GetString().Data()),"");
1777 outputArchive.ReplaceAll(os->GetString(),"");
1781 arr = outputArchive.Tokenize(" ");
1783 Bool_t first = kTRUE;
1784 while ((os=(TObjString*)next())) {
1785 if (!os->GetString().Contains("@") && fCloseSE.Length())
1786 fGridJDL->AddToSet("Output", Form("%s@%s",os->GetString().Data(), fCloseSE.Data()));
1788 fGridJDL->AddToSet("Output", os->GetString());
1789 if (first) fGridJDL->AddToSetDescription("Output", comment);
1793 // Output archive for the merging jdl
1794 if (TestBit(AliAnalysisGrid::kDefaultOutputs)) {
1795 outputArchive = "log_archive.zip:std*@disk=1 ";
1796 // Add normal output files, extra files + terminate files
1797 TString files = GetListOfFiles("outextter");
1798 // Do not register files in fRegisterExcludes
1799 if (!fRegisterExcludes.IsNull()) {
1800 arr = fRegisterExcludes.Tokenize(" ");
1802 while ((os=(TObjString*)next1())) {
1803 files.ReplaceAll(Form("%s,",os->GetString().Data()),"");
1804 files.ReplaceAll(os->GetString(),"");
1808 files.ReplaceAll(".root", "*.root");
1810 if (mgr->IsCollectThroughput())
1811 outputArchive += Form("root_archive.zip:%s,*.stat@disk=%d %s@disk=%d",files.Data(),fNreplicas, mgr->GetFileInfoLog(),fNreplicas);
1813 outputArchive += Form("root_archive.zip:%s,*.stat@disk=%d",files.Data(),fNreplicas);
1815 TString files = fOutputArchive;
1816 files.ReplaceAll(".root", "*.root"); // nreplicas etc should be already atttached by use
1817 outputArchive = files;
1819 arr = outputArchive.Tokenize(" ");
1822 while ((os=(TObjString*)next2())) {
1823 TString currentfile = os->GetString();
1824 if (!currentfile.Contains("@") && fCloseSE.Length())
1825 fMergingJDL->AddToSet("Output", Form("%s@%s",currentfile.Data(), fCloseSE.Data()));
1827 fMergingJDL->AddToSet("Output", currentfile);
1828 if (first) fMergingJDL->AddToSetDescription("Output", comment);
1833 arr = fOutputFiles.Tokenize(",");
1835 Bool_t first = kTRUE;
1836 while ((os=(TObjString*)next())) {
1837 // Ignore ouputs in jdl that are also in outputarchive
1838 TString sout = os->GetString();
1839 sout.ReplaceAll("*", "");
1840 sout.ReplaceAll(".root", "");
1841 if (sout.Index("@")>0) sout.Remove(sout.Index("@"));
1842 if (fOutputArchive.Contains(sout)) continue;
1843 // Ignore fRegisterExcludes
1844 if (fRegisterExcludes.Contains(sout)) continue;
1845 if (!first) comment = NULL;
1846 if (!os->GetString().Contains("@") && fCloseSE.Length())
1847 fGridJDL->AddToSet("Output", Form("%s@%s",os->GetString().Data(), fCloseSE.Data()));
1849 fGridJDL->AddToSet("Output", os->GetString());
1850 if (first) fGridJDL->AddToSetDescription("Output", comment);
1851 if (fMergeExcludes.Contains(sout)) continue;
1852 if (!os->GetString().Contains("@") && fCloseSE.Length())
1853 fMergingJDL->AddToSet("Output", Form("%s@%s",os->GetString().Data(), fCloseSE.Data()));
1855 fMergingJDL->AddToSet("Output", os->GetString());
1856 if (first) fMergingJDL->AddToSetDescription("Output", comment);
1860 fGridJDL->SetPrice((UInt_t)fPrice, "AliEn price for this job");
1861 fMergingJDL->SetPrice((UInt_t)fPrice, "AliEn price for this job");
1862 TString validationScript = fValidationScript;
1863 fGridJDL->SetValidationCommand(Form("%s/%s", workdir.Data(),validationScript.Data()), "Validation script to be run for each subjob");
1864 validationScript.ReplaceAll(".sh", "_merge.sh");
1865 fMergingJDL->SetValidationCommand(Form("%s/%s", workdir.Data(),validationScript.Data()), "Validation script to be run for each subjob");
1866 if (fMasterResubmitThreshold) {
1867 fGridJDL->SetValue("MasterResubmitThreshold", Form("\"%d%%\"", fMasterResubmitThreshold));
1868 fGridJDL->SetDescription("MasterResubmitThreshold", "Resubmit failed jobs until DONE rate reaches this percentage");
1870 // Write a jdl with 2 input parameters: collection name and output dir name.
1873 // Copy jdl to grid workspace
1875 // Check if an output directory was defined and valid
1876 if (!fGridOutputDir.Length()) {
1877 Error("CreateJDL", "You must define AliEn output directory");
1880 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s", workdir.Data(), fGridOutputDir.Data());
1881 if (!fProductionMode && !DirectoryExists(fGridOutputDir)) {
1882 if (gGrid->Mkdir(fGridOutputDir,"-p")) {
1883 Info("CreateJDL", "\n##### Created alien output directory %s", fGridOutputDir.Data());
1885 Error("CreateJDL", "Could not create alien output directory %s", fGridOutputDir.Data());
1891 if (TestBit(AliAnalysisGrid::kSubmit)) {
1892 TString mergeJDLName = fExecutable;
1893 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
1894 TString locjdl = Form("%s/%s", fGridOutputDir.Data(),fJDLName.Data());
1895 TString locjdl1 = Form("%s/%s", fGridOutputDir.Data(),mergeJDLName.Data());
1896 if (fProductionMode) {
1897 locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
1898 locjdl1 = Form("%s/%s", workdir.Data(),mergeJDLName.Data());
1900 if (FileExists(locjdl)) gGrid->Rm(locjdl);
1901 if (FileExists(locjdl1)) gGrid->Rm(locjdl1);
1902 Info("CreateJDL", "\n##### Copying JDL file <%s> to your AliEn output directory", fJDLName.Data());
1903 if (!copyLocal2Alien("CreateJDL", fJDLName, locjdl))
1904 Fatal("","Terminating");
1905 // TFile::Cp(Form("file:%s",fJDLName.Data()), Form("alien://%s", locjdl.Data()));
1907 Info("CreateJDL", "\n##### Copying merging JDL file <%s> to your AliEn output directory", mergeJDLName.Data());
1908 // TFile::Cp(Form("file:%s",mergeJDLName.Data()), Form("alien://%s", locjdl1.Data()));
1909 if (!copyLocal2Alien("CreateJDL", mergeJDLName.Data(), locjdl1))
1910 Fatal("","Terminating");
1913 if (fAdditionalLibs.Length()) {
1914 arr = fAdditionalLibs.Tokenize(" ");
1917 while ((os=(TObjString*)next())) {
1918 if (os->GetString().Contains(".so")) continue;
1919 Info("CreateJDL", "\n##### Copying dependency: <%s> to your alien workspace", os->GetString().Data());
1920 if (FileExists(os->GetString())) gGrid->Rm(os->GetString());
1921 // TFile::Cp(Form("file:%s",os->GetString().Data()), Form("alien://%s/%s", workdir.Data(), os->GetString().Data()));
1922 if (!copyLocal2Alien("CreateJDL", os->GetString().Data(),
1923 Form("%s/%s", workdir.Data(), os->GetString().Data())))
1924 Fatal("","Terminating");
1929 TIter next(fPackages);
1931 while ((obj=next())) {
1932 if (FileExists(obj->GetName())) gGrid->Rm(obj->GetName());
1933 Info("CreateJDL", "\n##### Copying dependency: <%s> to your alien workspace", obj->GetName());
1934 // TFile::Cp(Form("file:%s",obj->GetName()), Form("alien://%s/%s", workdir.Data(), obj->GetName()));
1935 if (!copyLocal2Alien("CreateJDL",obj->GetName(),
1936 Form("%s/%s", workdir.Data(), obj->GetName())))
1937 Fatal("","Terminating");
1944 //______________________________________________________________________________
1945 Bool_t AliAnalysisAlien::WriteJDL(Bool_t copy)
1947 // Writes one or more JDL's corresponding to findex. If findex is negative,
1948 // all run numbers are considered in one go (jdl). For non-negative indices
1949 // they correspond to the indices in the array fInputFiles.
1950 if (!fInputFiles) return kFALSE;
1953 if (!fProductionMode && !fGridWorkingDir.BeginsWith("/alice")) workdir = gGrid->GetHomeDirectory();
1954 workdir += fGridWorkingDir;
1955 TString stageName = "$2";
1956 if (fProductionMode) stageName = "$4";
1957 if (!fMergeDirName.IsNull()) {
1958 fMergingJDL->AddToInputDataCollection(Form("LF:$1/%s/Stage_%s.xml,nodownload",fMergeDirName.Data(),stageName.Data()), "Collection of files to be merged for current stage");
1959 fMergingJDL->SetOutputDirectory(Form("$1/%s/Stage_%s/#alien_counter_03i#",fMergeDirName.Data(),stageName.Data()), "Output directory");
1961 fMergingJDL->AddToInputDataCollection(Form("LF:$1/Stage_%s.xml,nodownload",stageName.Data()), "Collection of files to be merged for current stage");
1962 fMergingJDL->SetOutputDirectory(Form("$1/Stage_%s/#alien_counter_03i#",stageName.Data()), "Output directory");
1964 if (fProductionMode) {
1965 TIter next(fInputFiles);
1966 while ((os=next())) {
1967 fGridJDL->AddToInputDataCollection(Form("LF:%s,nodownload", os->GetName()), "Input xml collections");
1969 if (!fOutputToRunNo)
1970 fGridJDL->SetOutputDirectory(Form("%s/#alien_counter_04i#", fGridOutputDir.Data()));
1972 fGridJDL->SetOutputDirectory(fGridOutputDir);
1974 if (!fRunNumbers.Length() && !fRunRange[0]) {
1975 // One jdl with no parameters in case input data is specified by name.
1976 TIter next(fInputFiles);
1978 fGridJDL->AddToInputDataCollection(Form("LF:%s,nodownload", os->GetName()), "Input xml collections");
1979 if (!fOutputSingle.IsNull())
1980 fGridJDL->SetOutputDirectory(Form("#alienfulldir#/../%s",fOutputSingle.Data()), "Output directory");
1982 fGridJDL->SetOutputDirectory(Form("%s/#alien_counter_03i#", fGridOutputDir.Data()), "Output directory");
1983 fMergingJDL->SetOutputDirectory(fGridOutputDir);
1986 // One jdl to be submitted with 2 input parameters: data collection name and output dir prefix
1987 fGridJDL->AddToInputDataCollection(Form("LF:%s/$1,nodownload", workdir.Data()), "Input xml collections");
1988 if (!fOutputSingle.IsNull()) {
1989 if (!fOutputToRunNo) fGridJDL->SetOutputDirectory(Form("#alienfulldir#/%s",fOutputSingle.Data()), "Output directory");
1990 else fGridJDL->SetOutputDirectory(Form("%s/$2",fGridOutputDir.Data()), "Output directory");
1992 fGridJDL->SetOutputDirectory(Form("%s/$2/#alien_counter_03i#", fGridOutputDir.Data()), "Output directory");
1997 // Generate the JDL as a string
1998 TString sjdl = fGridJDL->Generate();
1999 TString sjdl1 = fMergingJDL->Generate();
2001 if (!fMergeDirName.IsNull()) {
2002 fMergingJDL->SetOutputDirectory(Form("$1/%s",fMergeDirName.Data()), "Output directory");
2003 fMergingJDL->AddToInputSandbox(Form("LF:$1/%s/Stage_%s.xml",fMergeDirName.Data(),stageName.Data()));
2005 fMergingJDL->SetOutputDirectory("$1", "Output directory");
2006 fMergingJDL->AddToInputSandbox(Form("LF:$1/Stage_%s.xml",stageName.Data()));
2008 TString sjdl2 = fMergingJDL->Generate();
2009 Int_t index, index1;
2010 sjdl.ReplaceAll("\",\"", "\",\n \"");
2011 sjdl.ReplaceAll("(member", "\n (member");
2012 sjdl.ReplaceAll("\",\"VO_", "\",\n \"VO_");
2013 sjdl.ReplaceAll("{", "{\n ");
2014 sjdl.ReplaceAll("};", "\n};");
2015 sjdl.ReplaceAll("{\n \n", "{\n");
2016 sjdl.ReplaceAll("\n\n", "\n");
2017 sjdl.ReplaceAll("OutputDirectory", "OutputDir");
2018 sjdl1.ReplaceAll("\",\"", "\",\n \"");
2019 sjdl1.ReplaceAll("(member", "\n (member");
2020 sjdl1.ReplaceAll("\",\"VO_", "\",\n \"VO_");
2021 sjdl1.ReplaceAll("{", "{\n ");
2022 sjdl1.ReplaceAll("};", "\n};");
2023 sjdl1.ReplaceAll("{\n \n", "{\n");
2024 sjdl1.ReplaceAll("\n\n", "\n");
2025 sjdl1.ReplaceAll("OutputDirectory", "OutputDir");
2026 sjdl2.ReplaceAll("\",\"", "\",\n \"");
2027 sjdl2.ReplaceAll("(member", "\n (member");
2028 sjdl2.ReplaceAll("\",\"VO_", "\",\n \"VO_");
2029 sjdl2.ReplaceAll("{", "{\n ");
2030 sjdl2.ReplaceAll("};", "\n};");
2031 sjdl2.ReplaceAll("{\n \n", "{\n");
2032 sjdl2.ReplaceAll("\n\n", "\n");
2033 sjdl2.ReplaceAll("OutputDirectory", "OutputDir");
2034 sjdl += "JDLVariables = \n{\n \"Packages\",\n \"OutputDir\"\n};\n";
2035 sjdl.Prepend(Form("Jobtag = {\n \"comment:%s\"\n};\n", fJobTag.Data()));
2036 index = sjdl.Index("JDLVariables");
2037 if (index >= 0) sjdl.Insert(index, "\n# JDL variables\n");
2038 sjdl += "Workdirectorysize = {\"5000MB\"};";
2039 sjdl1 += "Workdirectorysize = {\"5000MB\"};";
2040 sjdl1 += "JDLVariables = \n{\n \"Packages\",\n \"OutputDir\"\n};\n";
2041 index = fJobTag.Index(":");
2042 if (index < 0) index = fJobTag.Length();
2043 TString jobTag = fJobTag;
2044 if (fProductionMode) jobTag.Insert(index,"_Stage$4");
2045 sjdl1.Prepend(Form("Jobtag = {\n \"comment:%s_Merging\"\n};\n", jobTag.Data()));
2046 if (fProductionMode) {
2047 sjdl1.Prepend("# Generated merging jdl (production mode) \
2048 \n# $1 = full alien path to output directory to be merged \
2049 \n# $2 = train number \
2050 \n# $3 = production (like LHC10b) \
2051 \n# $4 = merging stage \
2052 \n# Stage_<n>.xml made via: find <OutputDir> *Stage<n-1>/*root_archive.zip\n");
2053 sjdl2.Prepend(Form("Jobtag = {\n \"comment:%s_FinalMerging\"\n};\n", jobTag.Data()));
2054 sjdl2.Prepend("# Generated merging jdl \
2055 \n# $1 = full alien path to output directory to be merged \
2056 \n# $2 = train number \
2057 \n# $3 = production (like LHC10b) \
2058 \n# $4 = merging stage \
2059 \n# Stage_<n>.xml made via: find <OutputDir> *Stage<n-1>/*root_archive.zip\n");
2061 sjdl1.Prepend("# Generated merging jdl \
2062 \n# $1 = full alien path to output directory to be merged \
2063 \n# $2 = merging stage \
2064 \n# xml made via: find <OutputDir> *Stage<n-1>/*root_archive.zip\n");
2065 sjdl2.Prepend(Form("Jobtag = {\n \"comment:%s_FinalMerging\"\n};\n", jobTag.Data()));
2066 sjdl2.Prepend("# Generated merging jdl \
2067 \n# $1 = full alien path to output directory to be merged \
2068 \n# $2 = merging stage \
2069 \n# xml made via: find <OutputDir> *Stage<n-1>/*root_archive.zip\n");
2071 index = sjdl1.Index("JDLVariables");
2072 if (index >= 0) sjdl1.Insert(index, "\n# JDL variables\n");
2073 index = sjdl2.Index("JDLVariables");
2074 if (index >= 0) sjdl2.Insert(index, "\n# JDL variables\n");
2075 sjdl1 += "Workdirectorysize = {\"5000MB\"};";
2076 sjdl2 += "Workdirectorysize = {\"5000MB\"};";
2077 index = sjdl2.Index("Split =");
2079 index1 = sjdl2.Index("\n", index);
2080 sjdl2.Remove(index, index1-index+1);
2082 index = sjdl2.Index("SplitMaxInputFileNumber");
2084 index1 = sjdl2.Index("\n", index);
2085 sjdl2.Remove(index, index1-index+1);
2087 index = sjdl2.Index("InputDataCollection");
2089 index1 = sjdl2.Index(";", index);
2090 sjdl2.Remove(index, index1-index+1);
2092 index = sjdl2.Index("InputDataListFormat");
2094 index1 = sjdl2.Index("\n", index);
2095 sjdl2.Remove(index, index1-index+1);
2097 index = sjdl2.Index("InputDataList");
2099 index1 = sjdl2.Index("\n", index);
2100 sjdl2.Remove(index, index1-index+1);
2102 sjdl2.ReplaceAll("wn.xml", Form("Stage_%s.xml",stageName.Data()));
2103 // Write jdl to file
2105 out.open(fJDLName.Data(), ios::out);
2107 Error("WriteJDL", "Bad file name: %s", fJDLName.Data());
2110 out << sjdl << endl;
2112 TString mergeJDLName = fExecutable;
2113 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
2116 out1.open(mergeJDLName.Data(), ios::out);
2118 Error("WriteJDL", "Bad file name: %s", mergeJDLName.Data());
2121 out1 << sjdl1 << endl;
2124 TString finalJDL = mergeJDLName;
2125 finalJDL.ReplaceAll(".jdl", "_final.jdl");
2126 out2.open(finalJDL.Data(), ios::out);
2128 Error("WriteJDL", "Bad file name: %s", finalJDL.Data());
2131 out2 << sjdl2 << endl;
2135 // Copy jdl to grid workspace
2137 Info("WriteJDL", "\n##### You may want to review jdl:%s and analysis macro:%s before running in <submit> mode", fJDLName.Data(), fAnalysisMacro.Data());
2139 TString locjdl = Form("%s/%s", fGridOutputDir.Data(),fJDLName.Data());
2140 TString locjdl1 = Form("%s/%s", fGridOutputDir.Data(),mergeJDLName.Data());
2141 TString finalJDL = mergeJDLName;
2142 finalJDL.ReplaceAll(".jdl", "_final.jdl");
2143 TString locjdl2 = Form("%s/%s", fGridOutputDir.Data(),finalJDL.Data());
2144 if (fProductionMode) {
2145 locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
2146 locjdl1 = Form("%s/%s", workdir.Data(),mergeJDLName.Data());
2147 locjdl2 = Form("%s/%s", workdir.Data(),finalJDL.Data());
2149 if (FileExists(locjdl)) gGrid->Rm(locjdl);
2150 if (FileExists(locjdl1)) gGrid->Rm(locjdl1);
2151 if (FileExists(locjdl2)) gGrid->Rm(locjdl2);
2152 Info("WriteJDL", "\n##### Copying JDL file <%s> to your AliEn output directory", fJDLName.Data());
2153 // TFile::Cp(Form("file:%s",fJDLName.Data()), Form("alien://%s", locjdl.Data()));
2154 if (!copyLocal2Alien("WriteJDL",fJDLName.Data(),locjdl.Data()))
2155 Fatal("","Terminating");
2157 Info("WriteJDL", "\n##### Copying merging JDL files <%s> to your AliEn output directory", mergeJDLName.Data());
2158 // TFile::Cp(Form("file:%s",mergeJDLName.Data()), Form("alien://%s", locjdl1.Data()));
2159 // TFile::Cp(Form("file:%s",finalJDL.Data()), Form("alien://%s", locjdl2.Data()));
2160 if (!copyLocal2Alien("WriteJDL",mergeJDLName.Data(),locjdl1.Data()))
2161 Fatal("","Terminating");
2162 if (!copyLocal2Alien("WriteJDL",finalJDL.Data(),locjdl2.Data()))
2163 Fatal("","Terminating");
2169 //______________________________________________________________________________
2170 Bool_t AliAnalysisAlien::FileExists(const char *lfn)
2172 // Returns true if file exists.
2173 if (!gGrid) return kFALSE;
2175 slfn.ReplaceAll("alien://","");
2176 TGridResult *res = gGrid->Ls(slfn);
2177 if (!res) return kFALSE;
2178 TMap *map = dynamic_cast<TMap*>(res->At(0));
2183 TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("name"));
2184 if (!objs || !objs->GetString().Length()) {
2192 //______________________________________________________________________________
2193 Bool_t AliAnalysisAlien::DirectoryExists(const char *dirname)
2195 // Returns true if directory exists. Can be also a path.
2196 if (!gGrid) return kFALSE;
2197 // Check if dirname is a path
2198 TString dirstripped = dirname;
2199 dirstripped = dirstripped.Strip();
2200 dirstripped = dirstripped.Strip(TString::kTrailing, '/');
2201 TString dir = gSystem->BaseName(dirstripped);
2203 TString path = gSystem->DirName(dirstripped);
2204 TGridResult *res = gGrid->Ls(path, "-F");
2205 if (!res) return kFALSE;
2209 while ((map=dynamic_cast<TMap*>(next()))) {
2210 obj = map->GetValue("name");
2212 if (dir == obj->GetName()) {
2221 //______________________________________________________________________________
2222 void AliAnalysisAlien::CheckDataType(const char *lfn, Bool_t &isCollection, Bool_t &isXml, Bool_t &useTags)
2224 // Check input data type.
2225 isCollection = kFALSE;
2229 Error("CheckDataType", "No connection to grid");
2232 isCollection = IsCollection(lfn);
2233 TString msg = "\n##### file: ";
2236 msg += " type: raw_collection;";
2237 // special treatment for collections
2239 // check for tag files in the collection
2240 TGridResult *res = gGrid->Command(Form("listFilesFromCollection -z -v %s",lfn), kFALSE);
2242 msg += " using_tags: No (unknown)";
2243 Info("CheckDataType", "%s", msg.Data());
2246 const char* typeStr = res->GetKey(0, "origLFN");
2247 if (!typeStr || !strlen(typeStr)) {
2248 msg += " using_tags: No (unknown)";
2249 Info("CheckDataType", "%s", msg.Data());
2252 TString file = typeStr;
2253 useTags = file.Contains(".tag");
2254 if (useTags) msg += " using_tags: Yes";
2255 else msg += " using_tags: No";
2256 Info("CheckDataType", "%s", msg.Data());
2261 isXml = slfn.Contains(".xml");
2263 // Open xml collection and check if there are tag files inside
2264 msg += " type: xml_collection;";
2265 TGridCollection *coll = (TGridCollection*)gROOT->ProcessLine(Form("TAlienCollection::Open(\"alien://%s\",1);",lfn));
2267 msg += " using_tags: No (unknown)";
2268 Info("CheckDataType", "%s", msg.Data());
2271 TMap *map = coll->Next();
2273 msg += " using_tags: No (unknown)";
2274 Info("CheckDataType", "%s", msg.Data());
2277 map = (TMap*)map->GetValue("");
2279 if (map && map->GetValue("name")) file = map->GetValue("name")->GetName();
2280 useTags = file.Contains(".tag");
2282 if (useTags) msg += " using_tags: Yes";
2283 else msg += " using_tags: No";
2284 Info("CheckDataType", "%s", msg.Data());
2287 useTags = slfn.Contains(".tag");
2288 if (slfn.Contains(".root")) msg += " type: root file;";
2289 else msg += " type: unknown file;";
2290 if (useTags) msg += " using_tags: Yes";
2291 else msg += " using_tags: No";
2292 Info("CheckDataType", "%s", msg.Data());
2295 //______________________________________________________________________________
2296 void AliAnalysisAlien::EnablePackage(const char *package)
2298 // Enables a par file supposed to exist in the current directory.
2299 TString pkg(package);
2300 pkg.ReplaceAll(".par", "");
2302 if (gSystem->AccessPathName(pkg)) {
2303 Fatal("EnablePackage", "Package %s not found", pkg.Data());
2306 if (!TObject::TestBit(AliAnalysisGrid::kUsePars))
2307 Info("EnablePackage", "AliEn plugin will use .par packages");
2308 TObject::SetBit(AliAnalysisGrid::kUsePars, kTRUE);
2310 fPackages = new TObjArray();
2311 fPackages->SetOwner();
2313 fPackages->Add(new TObjString(pkg));
2316 //______________________________________________________________________________
2317 TChain *AliAnalysisAlien::GetChainForTestMode(const char *treeName) const
2319 // Make a tree from files having the location specified in fFileForTestMode.
2320 // Inspired from JF's CreateESDChain.
2321 if (fFileForTestMode.IsNull()) {
2322 Error("GetChainForTestMode", "For proof test mode please use SetFileForTestMode() pointing to a file that contains data file locations.");
2325 if (gSystem->AccessPathName(fFileForTestMode)) {
2326 Error("GetChainForTestMode", "File not found: %s", fFileForTestMode.Data());
2331 in.open(fFileForTestMode);
2333 // Read the input list of files and add them to the chain
2335 TString streeName(treeName);
2336 if (IsUseMCchain()) streeName = "TE";
2337 TChain *chain = new TChain(streeName);
2338 TList *friends = new TList();
2339 TChain *cfriend = 0;
2340 if (!fFriendChainName.IsNull()) {
2341 TObjArray *list = fFriendChainName.Tokenize(" ");
2344 while((str=(TObjString*)next())) {
2345 cfriend = new TChain(streeName, str->GetName());
2346 friends->Add(cfriend);
2347 chain->AddFriend(cfriend);
2352 TIter nextfriend(friends);
2356 if (line.IsNull() || line.BeginsWith("#")) continue;
2357 if (count++ == fNtestFiles) break;
2358 TString esdFile(line);
2359 TFile *file = TFile::Open(esdFile);
2360 if (file && !file->IsZombie()) {
2361 chain->Add(esdFile);
2363 if (!fFriendChainName.IsNull()) {
2364 if (esdFile.Index("#") > -1)
2365 esdFile.Remove(esdFile.Index("#"));
2366 bpath = gSystem->DirName(esdFile);
2370 while ((cfriend=(TChain*)nextfriend())) {
2372 fileFriend += cfriend->GetTitle();
2373 file = TFile::Open(fileFriend);
2374 if (file && !file->IsZombie()) {
2376 cfriend->Add(fileFriend);
2378 Fatal("GetChainForTestMode", "Cannot open friend file: %s", fileFriend.Data());
2384 Error("GetChainforTestMode", "Skipping un-openable file: %s", esdFile.Data());
2388 if (!chain->GetListOfFiles()->GetEntries()) {
2389 Error("GetChainForTestMode", "No file from %s could be opened", fFileForTestMode.Data());
2398 //______________________________________________________________________________
2399 const char *AliAnalysisAlien::GetJobStatus(Int_t jobidstart, Int_t lastid, Int_t &nrunning, Int_t &nwaiting, Int_t &nerror, Int_t &ndone)
2401 // Get job status for all jobs with jobid>jobidstart.
2402 static char mstatus[20];
2408 TGridJobStatusList *list = gGrid->Ps("");
2409 if (!list) return mstatus;
2410 Int_t nentries = list->GetSize();
2411 TGridJobStatus *status;
2413 for (Int_t ijob=0; ijob<nentries; ijob++) {
2414 status = (TGridJobStatus *)list->At(ijob);
2415 pid = gROOT->ProcessLine(Form("atoi(((TAlienJobStatus*)%p)->GetKey(\"queueId\"));", status));
2416 if (pid<jobidstart) continue;
2417 if (pid == lastid) {
2418 gROOT->ProcessLine(Form("sprintf((char*)%p,((TAlienJobStatus*)%p)->GetKey(\"status\"));",mstatus, status));
2420 switch (status->GetStatus()) {
2421 case TGridJobStatus::kWAITING:
2423 case TGridJobStatus::kRUNNING:
2425 case TGridJobStatus::kABORTED:
2426 case TGridJobStatus::kFAIL:
2427 case TGridJobStatus::kUNKNOWN:
2429 case TGridJobStatus::kDONE:
2438 //______________________________________________________________________________
2439 Bool_t AliAnalysisAlien::IsCollection(const char *lfn) const
2441 // Returns true if file is a collection. Functionality duplicated from
2442 // TAlien::Type() because we don't want to directly depend on TAlien.
2444 Error("IsCollection", "No connection to grid");
2447 TGridResult *res = gGrid->Command(Form("type -z %s",lfn),kFALSE);
2448 if (!res) return kFALSE;
2449 const char* typeStr = res->GetKey(0, "type");
2450 if (!typeStr || !strlen(typeStr)) return kFALSE;
2451 if (!strcmp(typeStr, "collection")) return kTRUE;
2456 //______________________________________________________________________________
2457 Bool_t AliAnalysisAlien::IsSingleOutput() const
2459 // Check if single-ouput option is on.
2460 return (!fOutputSingle.IsNull());
2463 //______________________________________________________________________________
2464 Long64_t AliAnalysisAlien::RunMacroAndExtractLibs(const char* macro, const char *args, TString &libs)
2466 // Tries to run the specified macro and return the libraries that it loads.
2468 if (strlen(macro)) expname = gSystem->ExpandPathName(macro);
2469 if (expname.IsNull() || gSystem->AccessPathName(expname)) {
2470 ::Error("RunMacroAndExtractLibs","Cannot find macro %s in current directory", macro);
2473 TString oldlibs = gSystem->GetLibraries();
2476 Long64_t retval = m.Exec(args, &error);
2477 if (error != TInterpreter::kNoError)
2479 ::Error("RunMacroAndExtractLibs", "Macro interpretation %s failed", macro);
2482 libs = gSystem->GetLibraries();
2483 libs.ReplaceAll(oldlibs, "");
2484 libs.Strip(TString::kLeading);
2485 TObjArray *libTokens = libs.Tokenize(" ");
2487 for (Int_t i=0; i<libTokens->GetEntries(); i++) {
2488 if (!libs.IsNull()) libs += " ";
2489 libs += gSystem->BaseName(libTokens->At(i)->GetName());
2495 //______________________________________________________________________________
2496 void AliAnalysisAlien::Print(Option_t *) const
2498 // Print current plugin settings.
2499 printf("### AliEn analysis plugin current settings ###\n");
2500 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
2501 if (mgr && mgr->IsProofMode()) {
2502 TString proofType = "= PLUGIN IN PROOF MODE ON CLUSTER:_________________";
2503 if (TestBit(AliAnalysisGrid::kTest))
2504 proofType = "= PLUGIN IN PROOF LITE MODE ON CLUSTER:____________";
2505 printf("%s %s\n", proofType.Data(), fProofCluster.Data());
2506 if (!fProofDataSet.IsNull())
2507 printf("= Requested data set:___________________________ %s\n", fProofDataSet.Data());
2509 printf("= Soft reset signal will be send to master______ CHANGE BEHAVIOR AFTER COMPLETION\n");
2511 printf("= Hard reset signal will be send to master______ CHANGE BEHAVIOR AFTER COMPLETION\n");
2512 if (!fROOTVersion.IsNull())
2513 printf("= ROOT version requested________________________ %s\n", fROOTVersion.Data());
2515 printf("= ROOT version requested________________________ default\n");
2516 printf("= AliRoot version requested_____________________ %s\n", fAliROOTVersion.Data());
2517 if (!fAliRootMode.IsNull())
2518 printf("= Requested AliRoot mode________________________ %s\n", fAliRootMode.Data());
2520 printf("= Number of PROOF workers limited to____________ %d\n", fNproofWorkers);
2521 if (fNproofWorkersPerSlave)
2522 printf("= Maximum number of workers per slave___________ %d\n", fNproofWorkersPerSlave);
2523 if (TestSpecialBit(kClearPackages))
2524 printf("= ClearPackages requested...\n");
2525 if (fIncludePath.Data())
2526 printf("= Include path for runtime task compilation: ___ %s\n", fIncludePath.Data());
2527 printf("= Additional libs to be loaded or souces to be compiled runtime: <%s>\n",fAdditionalLibs.Data());
2528 if (fPackages && fPackages->GetEntries()) {
2529 TIter next(fPackages);
2532 while ((obj=next())) list += obj->GetName();
2533 printf("= Par files to be used: ________________________ %s\n", list.Data());
2535 if (TestSpecialBit(kProofConnectGrid))
2536 printf("= Requested PROOF connection to grid\n");
2539 printf("= OverwriteMode:________________________________ %d\n", fOverwriteMode);
2540 if (fOverwriteMode) {
2541 printf("***** NOTE: Overwrite mode will overwrite the input generated datasets and partial results from previous analysis. \
2542 \n***** To disable, use: plugin->SetOverwriteMode(kFALSE);\n");
2544 printf("= Copy files to grid: __________________________ %s\n", (IsUseCopy())?"YES":"NO");
2545 printf("= Check if files can be copied to grid: ________ %s\n", (IsCheckCopy())?"YES":"NO:Print");
2546 printf("= Production mode:______________________________ %d\n", fProductionMode);
2547 printf("= Version of API requested: ____________________ %s\n", fAPIVersion.Data());
2548 printf("= Version of ROOT requested: ___________________ %s\n", fROOTVersion.Data());
2549 printf("= Version of AliRoot requested: ________________ %s\n", fAliROOTVersion.Data());
2551 printf("= User running the plugin: _____________________ %s\n", fUser.Data());
2552 printf("= Grid workdir relative to user $HOME: _________ %s\n", fGridWorkingDir.Data());
2553 printf("= Grid output directory relative to workdir: ___ %s\n", fGridOutputDir.Data());
2554 TString basedatadir = fGridDataDir;
2555 TString pattern = fDataPattern;
2557 Int_t ind = pattern.Index(" ");
2559 basedatadir += "/%run%/";
2560 basedatadir += pattern(0, ind);
2561 pattern = pattern(ind+1, pattern.Length());
2563 printf("= Data base directory path requested: __________ %s\n", basedatadir.Data());
2564 printf("= Data search pattern: _________________________ %s\n", pattern.Data());
2565 printf("= Input data format: ___________________________ %s\n", fInputFormat.Data());
2566 if (fRunNumbers.Length())
2567 printf("= Run numbers to be processed: _________________ %s\n", fRunNumbers.Data());
2569 printf("= Run range to be processed: ___________________ %d-%d\n", fRunRange[0], fRunRange[1]);
2570 if (!fRunRange[0] && !fRunNumbers.Length()) {
2571 TIter next(fInputFiles);
2574 while ((obj=next())) list += obj->GetName();
2575 printf("= Input files to be processed: _________________ %s\n", list.Data());
2577 if (TestBit(AliAnalysisGrid::kTest))
2578 printf("= Number of input files used in test mode: _____ %d\n", fNtestFiles);
2579 printf("= List of output files to be registered: _______ %s\n", fOutputFiles.Data());
2580 printf("= List of outputs going to be archived: ________ %s\n", fOutputArchive.Data());
2581 printf("= List of outputs that should not be merged: ___ %s\n", fMergeExcludes.Data());
2582 printf("= List of outputs that should not be registered: %s\n", fRegisterExcludes.Data());
2583 printf("= List of outputs produced during Terminate: ___ %s\n", fTerminateFiles.Data());
2584 printf("=====================================================================\n");
2585 printf("= Job price: ___________________________________ %d\n", fPrice);
2586 printf("= Time to live (TTL): __________________________ %d\n", fTTL);
2587 printf("= Max files per subjob: ________________________ %d\n", fSplitMaxInputFileNumber);
2588 if (fMaxInitFailed>0)
2589 printf("= Max number of subjob fails to kill: __________ %d\n", fMaxInitFailed);
2590 if (fMasterResubmitThreshold>0)
2591 printf("= Resubmit master job if failed subjobs >_______ %d\n", fMasterResubmitThreshold);
2592 printf("= Number of replicas for the output files_______ %d\n", fNreplicas);
2593 if (fNrunsPerMaster>0)
2594 printf("= Number of runs per master job: _______________ %d\n", fNrunsPerMaster);
2595 printf("= Number of files in one chunk to be merged: ___ %d\n", fMaxMergeFiles);
2596 printf("= Name of the generated execution script: ______ %s\n", fExecutable.Data());
2597 printf("= Executable command: __________________________ %s\n", fExecutableCommand.Data());
2598 if (fArguments.Length())
2599 printf("= Arguments for the execution script: __________ %s\n",fArguments.Data());
2600 if (fExecutableArgs.Length())
2601 printf("= Arguments after macro name in executable______ %s\n",fExecutableArgs.Data());
2602 printf("= Name of the generated analysis macro: ________ %s\n",fAnalysisMacro.Data());
2603 printf("= User analysis files to be deployed: __________ %s\n",fAnalysisSource.Data());
2604 printf("= Additional libs to be loaded or souces to be compiled runtime: <%s>\n",fAdditionalLibs.Data());
2605 printf("= Master jobs split mode: ______________________ %s\n",fSplitMode.Data());
2607 printf("= Custom name for the dataset to be created: ___ %s\n", fDatasetName.Data());
2608 printf("= Name of the generated JDL: ___________________ %s\n", fJDLName.Data());
2609 if (fIncludePath.Data())
2610 printf("= Include path for runtime task compilation: ___ %s\n", fIncludePath.Data());
2611 if (fCloseSE.Length())
2612 printf("= Force job outputs to storage element: ________ %s\n", fCloseSE.Data());
2613 if (fFriendChainName.Length())
2614 printf("= Open friend chain file on worker: ____________ %s\n", fFriendChainName.Data());
2615 if (fPackages && fPackages->GetEntries()) {
2616 TIter next(fPackages);
2619 while ((obj=next())) list += obj->GetName();
2620 printf("= Par files to be used: ________________________ %s\n", list.Data());
2624 //______________________________________________________________________________
2625 void AliAnalysisAlien::SetDefaults()
2627 // Set default values for everything. What cannot be filled will be left empty.
2628 if (fGridJDL) delete fGridJDL;
2629 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
2630 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
2633 fSplitMaxInputFileNumber = 100;
2635 fMasterResubmitThreshold = 0;
2641 fNrunsPerMaster = 1;
2642 fMaxMergeFiles = 100;
2644 fExecutable = "analysis.sh";
2645 fExecutableCommand = "root -b -q -x";
2647 fExecutableArgs = "";
2648 fAnalysisMacro = "myAnalysis.C";
2649 fAnalysisSource = "";
2650 fAdditionalLibs = "";
2654 fAliROOTVersion = "";
2655 fUser = ""; // Your alien user name
2656 fGridWorkingDir = "";
2657 fGridDataDir = ""; // Can be like: /alice/sim/PDC_08a/LHC08c9/
2658 fDataPattern = "*AliESDs.root"; // Can be like: *AliESDs.root, */pass1/*AliESDs.root, ...
2659 fFriendChainName = "";
2660 fGridOutputDir = "output";
2661 fOutputArchive = "log_archive.zip:std*@disk=1 root_archive.zip:*.root@disk=2";
2662 fOutputFiles = ""; // Like "AliAODs.root histos.root"
2663 fInputFormat = "xml-single";
2664 fJDLName = "analysis.jdl";
2665 fJobTag = "Automatically generated analysis JDL";
2666 fMergeExcludes = "";
2669 SetCheckCopy(kTRUE);
2670 SetDefaultOutputs(kTRUE);
2674 //______________________________________________________________________________
2675 void AliAnalysisAlien::SetFriendChainName(const char *name, const char *libnames)
2677 // Set file name for the chain of friends and optionally additional libs to be loaded.
2678 // Libs should be separated by blancs.
2679 fFriendChainName = name;
2680 fFriendChainName.ReplaceAll(",", " ");
2681 fFriendChainName.Strip();
2682 fFriendChainName.ReplaceAll(" ", " ");
2684 fFriendLibs = libnames;
2685 if (fFriendLibs.Length()) {
2686 if(!fFriendLibs.Contains(".so"))
2687 Fatal("SetFriendChainName()", "You should provide explicit library names (with extension)");
2688 fFriendLibs.ReplaceAll(",", " ");
2689 fFriendLibs.Strip();
2690 fFriendLibs.ReplaceAll(" ", " ");
2694 //______________________________________________________________________________
2695 void AliAnalysisAlien::SetRootVersionForProof(const char *version)
2697 // Obsolete method. Use SetROOTVersion instead
2698 Warning("SetRootVersionForProof", "Obsolete. Use SetROOTVersion instead");
2699 if (fROOTVersion.IsNull()) SetROOTVersion(version);
2700 else Error("SetRootVersionForProof", "ROOT version already set to %s", fROOTVersion.Data());
2703 //______________________________________________________________________________
2704 Bool_t AliAnalysisAlien::CheckMergedFiles(const char *filename, const char *aliendir, Int_t nperchunk, const char *jdl)
2706 // Checks current merge stage, makes xml for the next stage, counts number of files, submits next stage.
2707 // First check if the result is already in the output directory.
2708 if (FileExists(Form("%s/%s",aliendir,filename))) {
2709 printf("Final merged results found. Not merging again.\n");
2712 // Now check the last stage done.
2715 if (!FileExists(Form("%s/Stage_%d.xml",aliendir, stage+1))) break;
2718 // Next stage of merging
2720 TString pattern = "*root_archive.zip";
2721 if (stage>1) pattern = Form("Stage_%d/*root_archive.zip", stage-1);
2722 TGridResult *res = gGrid->Command(Form("find -x Stage_%d %s %s", stage, aliendir, pattern.Data()));
2723 if (res) delete res;
2724 // Write standard output to file
2725 gROOT->ProcessLine(Form("gGrid->Stdout(); > %s", Form("Stage_%d.xml",stage)));
2726 // Count the number of files inside
2728 ifile.open(Form("Stage_%d.xml",stage));
2729 if (!ifile.good()) {
2730 ::Error("CheckMergedFiles", "Could not redirect result of the find command to file %s", Form("Stage_%d.xml",stage));
2735 while (!ifile.eof()) {
2737 if (line.Contains("/event")) nfiles++;
2741 ::Error("CheckMergedFiles", "Cannot start Stage_%d merging since Stage_%d did not produced yet output", stage, stage-1);
2744 printf("=== Stage_%d produced %d files\n", stage-1, nfiles);
2746 // Copy the file in the output directory
2747 printf("===> Copying collection %s in the output directory %s\n", Form("Stage_%d.xml",stage), aliendir);
2748 // TFile::Cp(Form("Stage_%d.xml",stage), Form("alien://%s/Stage_%d.xml",aliendir,stage));
2749 if (!copyLocal2Alien("CheckMergedFiles", Form("Stage_%d.xml",stage),
2750 Form("%s/Stage_%d.xml",aliendir,stage))) Fatal("","Terminating");
2751 // Check if this is the last stage to be done.
2752 Bool_t laststage = (nfiles<nperchunk);
2753 if (fMaxMergeStages && stage>=fMaxMergeStages) laststage = kTRUE;
2756 printf("### Submiting final merging stage %d\n", stage);
2757 TString finalJDL = jdl;
2758 finalJDL.ReplaceAll(".jdl", "_final.jdl");
2759 TString query = Form("submit %s %s %d", finalJDL.Data(), aliendir, stage);
2760 jobId = SubmitSingleJob(query);
2762 printf("### Submiting merging stage %d\n", stage);
2763 TString query = Form("submit %s %s %d", jdl, aliendir, stage);
2764 jobId = SubmitSingleJob(query);
2766 if (!jobId) return kFALSE;
2768 if (!fGridJobIDs.IsNull()) fGridJobIDs.Append(" ");
2769 fGridJobIDs.Append(Form("%d", jobId));
2770 if (!fGridStages.IsNull()) fGridStages.Append(" ");
2771 fGridStages.Append(Form("%s_merge_stage%d",
2772 laststage ? "final" : "partial", stage));
2777 //______________________________________________________________________________
2778 AliAnalysisManager *AliAnalysisAlien::LoadAnalysisManager(const char *fname)
2780 // Loat the analysis manager from a file.
2781 TFile *file = TFile::Open(fname);
2783 ::Error("LoadAnalysisManager", "Cannot open file %s", fname);
2786 TIter nextkey(file->GetListOfKeys());
2787 AliAnalysisManager *mgr = 0;
2789 while ((key=(TKey*)nextkey())) {
2790 if (!strcmp(key->GetClassName(), "AliAnalysisManager"))
2791 mgr = (AliAnalysisManager*)file->Get(key->GetName());
2794 ::Error("LoadAnalysisManager", "No analysis manager found in file %s", fname);
2798 //______________________________________________________________________________
2799 Int_t AliAnalysisAlien::SubmitSingleJob(const char *query)
2801 // Submits a single job corresponding to the query and returns job id. If 0 submission failed.
2802 if (!gGrid) return 0;
2803 printf("=> %s ------> ",query);
2804 TGridResult *res = gGrid->Command(query);
2806 TString jobId = res->GetKey(0,"jobId");
2808 if (jobId.IsNull()) {
2809 printf("submission failed. Reason:\n");
2812 ::Error("SubmitSingleJob", "Your query %s could not be submitted", query);
2815 Int_t ijobId = jobId.Atoi();
2816 printf(" Job id: '%s' (%d)\n", jobId.Data(), ijobId);
2820 //______________________________________________________________________________
2821 Bool_t AliAnalysisAlien::MergeInfo(const char *output, const char *collection)
2823 // Merges a collection of output files using concatenation.
2824 TString scoll(collection);
2825 if (!scoll.Contains(".xml")) return kFALSE;
2826 TGridCollection *coll = (TGridCollection*)gROOT->ProcessLine(Form("TAlienCollection::Open(\"%s\");", collection));
2828 ::Error("MergeInfo", "Input XML %s collection empty.", collection);
2831 // Iterate grid collection
2833 Bool_t merged = kFALSE;
2835 while (coll->Next()) {
2836 TString fname = gSystem->DirName(coll->GetTURL());
2839 outtmp = Form("%d_%s", ifile, output);
2840 if (!TFile::Cp(fname, outtmp)) {
2841 ::Error("MergeInfo", "Could not copy %s", fname.Data());
2846 gSystem->Exec(Form("cp %s lastmerged", outtmp.Data()));
2849 gSystem->Exec(Form("cat lastmerged %s > tempmerged", outtmp.Data()));
2850 gSystem->Exec("cp tempmerged lastmerged");
2853 gSystem->Exec(Form("cp lastmerged %s", output));
2854 gSystem->Exec(Form("rm tempmerged lastmerged *_%s", output));
2860 //______________________________________________________________________________
2861 Bool_t AliAnalysisAlien::MergeOutput(const char *output, const char *basedir, Int_t nmaxmerge, Int_t stage)
2863 // Merge given output files from basedir. Basedir can be an alien output directory
2864 // but also an xml file with root_archive.zip locations. The file merger will merge nmaxmerge
2865 // files in a group (ignored for xml input). Merging can be done in stages:
2866 // stage=0 : will merge all existing files in a single stage, supporting resume if run locally
2867 // stage=1 : works with an xml of all root_archive.zip in the output directory
2868 // stage>1 : works with an xml of all root_archive.zip in the Stage_<n-1> directory
2869 TString outputFile = output;
2871 TString outputChunk;
2872 TString previousChunk = "";
2873 TObjArray *listoffiles = new TObjArray();
2874 // listoffiles->SetOwner();
2875 Int_t countChunk = 0;
2876 Int_t countZero = nmaxmerge;
2877 Bool_t merged = kTRUE;
2878 Bool_t isGrid = kTRUE;
2879 Int_t index = outputFile.Index("@");
2880 if (index > 0) outputFile.Remove(index);
2881 TString inputFile = outputFile;
2882 TString sbasedir = basedir;
2883 if (sbasedir.Contains(".xml")) {
2884 // Merge files pointed by the xml - ignore nmaxmerge and set ichunk to 0
2885 nmaxmerge = 9999999;
2886 TGridCollection *coll = (TGridCollection*)gROOT->ProcessLine(Form("TAlienCollection::Open(\"%s\");", basedir));
2888 ::Error("MergeOutput", "Input XML collection empty.");
2891 // Iterate grid collection
2892 while (coll->Next()) {
2893 TString fname = gSystem->DirName(coll->GetTURL());
2896 listoffiles->Add(new TNamed(fname.Data(),""));
2898 } else if (sbasedir.Contains(".txt")) {
2899 // The file having the .txt extension is expected to contain a list of
2900 // folders where the output files will be looked. For alien folders,
2901 // the full folder LFN is expected (starting with alien://)
2902 // Assume lfn's on each line
2907 ::Error("MergeOutput", "File %s cannot be opened. Merging stopped." ,sbasedir.Data());
2913 if (line.IsNull() || line.BeginsWith("#")) continue;
2915 if (!line.Contains("alien:")) isGrid = kFALSE;
2919 listoffiles->Add(new TNamed(line.Data(),""));
2923 ::Error("MergeOutput","Input file %s contains no files to be merged\n", sbasedir.Data());
2928 command = Form("find %s/ *%s", basedir, inputFile.Data());
2929 printf("command: %s\n", command.Data());
2930 TGridResult *res = gGrid->Command(command);
2932 ::Error("MergeOutput","No result for the find command\n");
2938 while ((map=(TMap*)nextmap())) {
2939 TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("turl"));
2940 if (!objs || !objs->GetString().Length()) {
2941 // Nothing found - skip this output
2946 listoffiles->Add(new TNamed(objs->GetName(),""));
2950 if (!listoffiles->GetEntries()) {
2951 ::Error("MergeOutput","No result for the find command\n");
2956 TFileMerger *fm = 0;
2957 TIter next0(listoffiles);
2958 TObjArray *listoffilestmp = new TObjArray();
2959 listoffilestmp->SetOwner();
2962 // Keep only the files at upper level
2963 Int_t countChar = 0;
2964 while ((nextfile=next0())) {
2965 snextfile = nextfile->GetName();
2966 Int_t crtCount = snextfile.CountChar('/');
2967 if (nextfile == listoffiles->First()) countChar = crtCount;
2968 if (crtCount < countChar) countChar = crtCount;
2971 while ((nextfile=next0())) {
2972 snextfile = nextfile->GetName();
2973 Int_t crtCount = snextfile.CountChar('/');
2974 if (crtCount > countChar) {
2978 listoffilestmp->Add(nextfile);
2981 listoffiles = listoffilestmp; // Now contains 'good' files
2982 listoffiles->Print();
2983 TIter next(listoffiles);
2984 // Check if there is a merge operation to resume. Works only for stage 0 or 1.
2985 outputChunk = outputFile;
2986 outputChunk.ReplaceAll(".root", "_*.root");
2987 // Check for existent temporary merge files
2988 // Check overwrite mode and remove previous partial results if needed
2989 // Preserve old merging functionality for stage 0.
2991 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
2993 // Skip as many input files as in a chunk
2994 for (Int_t counter=0; counter<nmaxmerge; counter++) {
2997 ::Error("MergeOutput", "Mismatch found. Please remove partial merged files from local dir.");
3001 snextfile = nextfile->GetName();
3003 outputChunk = outputFile;
3004 outputChunk.ReplaceAll(".root", Form("_%04d.root", countChunk));
3006 if (gSystem->AccessPathName(outputChunk)) continue;
3007 // Merged file with chunks up to <countChunk> found
3008 ::Info("MergeOutput", "Resume merging of <%s> from <%s>\n", outputFile.Data(), outputChunk.Data());
3009 previousChunk = outputChunk;
3013 countZero = nmaxmerge;
3015 while ((nextfile=next())) {
3016 snextfile = nextfile->GetName();
3017 // Loop 'find' results and get next LFN
3018 if (countZero == nmaxmerge) {
3019 // First file in chunk - create file merger and add previous chunk if any.
3020 fm = new TFileMerger(isGrid);
3021 fm->SetFastMethod(kTRUE);
3022 if (previousChunk.Length()) fm->AddFile(previousChunk.Data());
3023 outputChunk = outputFile;
3024 outputChunk.ReplaceAll(".root", Form("_%04d.root", countChunk));
3026 // If last file found, put merged results in the output file
3027 if (nextfile == listoffiles->Last()) outputChunk = outputFile;
3028 // Add file to be merged and decrement chunk counter.
3029 fm->AddFile(snextfile);
3031 if (countZero==0 || nextfile == listoffiles->Last()) {
3032 if (!fm->GetMergeList() || !fm->GetMergeList()->GetSize()) {
3033 // Nothing found - skip this output
3034 ::Warning("MergeOutput", "No <%s> files found.", inputFile.Data());
3038 fm->OutputFile(outputChunk);
3039 // Merge the outputs, then go to next chunk
3041 ::Error("MergeOutput", "Could not merge all <%s> files", outputFile.Data());
3045 ::Info("MergeOutputs", "\n##### Merged %d output files to <%s>", fm->GetMergeList()->GetSize(), outputChunk.Data());
3046 gSystem->Unlink(previousChunk);
3048 if (nextfile == listoffiles->Last()) break;
3050 countZero = nmaxmerge;
3051 previousChunk = outputChunk;
3058 // Merging stage different than 0.
3059 // Move to the begining of the requested chunk.
3060 fm = new TFileMerger(isGrid);
3061 fm->SetFastMethod(kTRUE);
3062 while ((nextfile=next())) fm->AddFile(nextfile->GetName());
3064 if (!fm->GetMergeList() || !fm->GetMergeList()->GetSize()) {
3065 // Nothing found - skip this output
3066 ::Warning("MergeOutput", "No <%s> files found.", inputFile.Data());
3070 fm->OutputFile(outputFile);
3071 // Merge the outputs
3073 ::Error("MergeOutput", "Could not merge all <%s> files", outputFile.Data());
3077 ::Info("MergeOutput", "\n##### Merged %d output files to <%s>", fm->GetMergeList()->GetSize(), outputFile.Data());
3083 //______________________________________________________________________________
3084 Bool_t AliAnalysisAlien::MergeOutputs()
3086 // Merge analysis outputs existing in the AliEn space.
3087 if (TestBit(AliAnalysisGrid::kTest)) return kTRUE;
3088 if (TestBit(AliAnalysisGrid::kOffline)) return kFALSE;
3090 Error("MergeOutputs", "Cannot merge outputs without grid connection. Terminate will NOT be executed");
3094 if (!TestBit(AliAnalysisGrid::kMerge)) {
3095 Info("MergeOutputs", "### Re-run with <MergeViaJDL> option in terminate mode of the plugin to submit merging jobs ###");
3098 if (fProductionMode) {
3099 Info("MergeOutputs", "### Merging will be submitted by LPM manager... ###");
3102 Info("MergeOutputs", "Submitting merging JDL");
3103 if (!SubmitMerging()) return kFALSE;
3104 Info("MergeOutputs", "### Re-run with <MergeViaJDL> off to collect results after merging jobs are done ###");
3105 Info("MergeOutputs", "### The Terminate() method is executed by the merging jobs");
3108 // Get the output path
3109 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
3110 if (!DirectoryExists(fGridOutputDir)) {
3111 Error("MergeOutputs", "Grid output directory %s not found. Terminate() will NOT be executed", fGridOutputDir.Data());
3114 if (!fOutputFiles.Length()) {
3115 Error("MergeOutputs", "No output file names defined. Are you running the right AliAnalysisAlien configuration ?");
3118 // Check if fast read option was requested
3119 Info("MergeOutputs", "Started local merging of output files from: alien://%s \
3120 \n======= overwrite mode = %d", fGridOutputDir.Data(), (Int_t)fOverwriteMode);
3121 if (fFastReadOption) {
3122 Warning("MergeOutputs", "You requested FastRead option. Using xrootd flags to reduce timeouts. This may skip some files that could be accessed ! \
3123 \n+++ NOTE: To disable this option, use: plugin->SetFastReadOption(kFALSE)");
3124 gEnv->SetValue("XNet.ConnectTimeout",50);
3125 gEnv->SetValue("XNet.RequestTimeout",50);
3126 gEnv->SetValue("XNet.MaxRedirectCount",2);
3127 gEnv->SetValue("XNet.ReconnectTimeout",50);
3128 gEnv->SetValue("XNet.FirstConnectMaxCnt",1);
3130 // Make sure we change the temporary directory
3131 gSystem->Setenv("TMPDIR", gSystem->pwd());
3132 // Set temporary compilation directory to current one
3133 gSystem->SetBuildDir(gSystem->pwd(), kTRUE);
3134 TObjArray *list = fOutputFiles.Tokenize(",");
3138 Bool_t merged = kTRUE;
3139 while((str=(TObjString*)next())) {
3140 outputFile = str->GetString();
3141 Int_t index = outputFile.Index("@");
3142 if (index > 0) outputFile.Remove(index);
3143 TString outputChunk = outputFile;
3144 outputChunk.ReplaceAll(".root", "_*.root");
3145 // Skip already merged outputs
3146 if (!gSystem->AccessPathName(outputFile)) {
3147 if (fOverwriteMode) {
3148 Info("MergeOutputs", "Overwrite mode. Existing file %s was deleted.", outputFile.Data());
3149 gSystem->Unlink(outputFile);
3150 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
3151 Info("MergeOutput", "Overwrite mode: partial merged files %s will removed",
3152 outputChunk.Data());
3153 gSystem->Exec(Form("rm -f %s", outputChunk.Data()));
3156 Info("MergeOutputs", "Output file <%s> found. Not merging again.", outputFile.Data());
3160 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
3161 Info("MergeOutput", "Overwrite mode: partial merged files %s will removed",
3162 outputChunk.Data());
3163 gSystem->Exec(Form("rm -f %s", outputChunk.Data()));
3166 if (fMergeExcludes.Contains(outputFile.Data()) ||
3167 fRegisterExcludes.Contains(outputFile.Data())) continue;
3168 // Perform a 'find' command in the output directory, looking for registered outputs
3169 merged = MergeOutput(outputFile, fGridOutputDir, fMaxMergeFiles);
3171 Error("MergeOutputs", "Terminate() will NOT be executed");
3175 TFile *fileOpened = (TFile*)gROOT->GetListOfFiles()->FindObject(outputFile);
3176 if (fileOpened) fileOpened->Close();
3182 //______________________________________________________________________________
3183 void AliAnalysisAlien::SetDefaultOutputs(Bool_t flag)
3185 // Use the output files connected to output containers from the analysis manager
3186 // rather than the files defined by SetOutputFiles
3187 if (flag && !TObject::TestBit(AliAnalysisGrid::kDefaultOutputs))
3188 Info("SetDefaultOutputs", "Plugin will use the output files taken from analysis manager");
3189 TObject::SetBit(AliAnalysisGrid::kDefaultOutputs, flag);
3192 //______________________________________________________________________________
3193 void AliAnalysisAlien::SetOutputFiles(const char *list)
3195 // Manually set the output files list.
3196 // Removes duplicates. Not allowed if default outputs are not disabled.
3197 if (TObject::TestBit(AliAnalysisGrid::kDefaultOutputs)) {
3198 Fatal("SetOutputFiles", "You have to explicitly call SetDefaultOutputs(kFALSE) to manually set output files.");
3201 Info("SetOutputFiles", "Output file list is set manually - you are on your own.");
3203 TString slist = list;
3204 if (slist.Contains("@")) Warning("SetOutputFiles","The plugin does not allow explicit SE's. Please use: SetNumberOfReplicas() instead.");
3205 TObjArray *arr = slist.Tokenize(" ");
3209 while ((os=(TObjString*)next())) {
3210 sout = os->GetString();
3211 if (sout.Index("@")>0) sout.Remove(sout.Index("@"));
3212 if (fOutputFiles.Contains(sout)) continue;
3213 if (!fOutputFiles.IsNull()) fOutputFiles += ",";
3214 fOutputFiles += sout;
3219 //______________________________________________________________________________
3220 void AliAnalysisAlien::SetOutputArchive(const char *list)
3222 // Manually set the output archive list. Free text - you are on your own...
3223 // Not allowed if default outputs are not disabled.
3224 if (TObject::TestBit(AliAnalysisGrid::kDefaultOutputs)) {
3225 Fatal("SetOutputArchive", "You have to explicitly call SetDefaultOutputs(kFALSE) to manually set the output archives.");
3228 Info("SetOutputArchive", "Output archive is set manually - you are on your own.");
3229 fOutputArchive = list;
3232 //______________________________________________________________________________
3233 void AliAnalysisAlien::SetPreferedSE(const char */*se*/)
3235 // Setting a prefered output SE is not allowed anymore.
3236 Warning("SetPreferedSE", "Setting a preferential SE is not allowed anymore via the plugin. Use SetNumberOfReplicas() and SetDefaultOutputs()");
3239 //______________________________________________________________________________
3240 void AliAnalysisAlien::SetProofParameter(const char *pname, const char *value)
3242 // Set some PROOF special parameter.
3243 TPair *pair = dynamic_cast<TPair*>(fProofParam.FindObject(pname));
3245 TObject *old = pair->Key();
3246 TObject *val = pair->Value();
3247 fProofParam.Remove(old);
3251 fProofParam.Add(new TObjString(pname), new TObjString(value));
3254 //______________________________________________________________________________
3255 const char *AliAnalysisAlien::GetProofParameter(const char *pname) const
3257 // Returns a special PROOF parameter.
3258 TPair *pair = dynamic_cast<TPair*>(fProofParam.FindObject(pname));
3259 if (!pair) return 0;
3260 return pair->Value()->GetName();
3263 //______________________________________________________________________________
3264 Bool_t AliAnalysisAlien::StartAnalysis(Long64_t /*nentries*/, Long64_t /*firstEntry*/)
3266 // Start remote grid analysis.
3267 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
3268 Bool_t testMode = TestBit(AliAnalysisGrid::kTest);
3269 if (!mgr || !mgr->IsInitialized()) {
3270 Error("StartAnalysis", "You need an initialized analysis manager for this");
3273 // Are we in PROOF mode ?
3274 if (mgr->IsProofMode()) {
3275 if (testMode) Info("StartAnalysis", "##### Starting PROOF analysis with Proof Lite via the plugin #####");
3276 else Info("StartAnalysis", "##### Starting PROOF analysis on cluster <%s> via the plugin #####", fProofCluster.Data());
3277 if (fProofCluster.IsNull()) {
3278 Error("StartAnalysis", "You need to specify the proof cluster name via SetProofCluster");
3281 if (fProofDataSet.IsNull() && !testMode) {
3282 Error("StartAnalysis", "You need to specify a dataset using SetProofDataSet()");
3285 // Set the needed environment
3286 gEnv->SetValue("XSec.GSI.DelegProxy","2");
3287 // Do we need to reset PROOF ? The success of the Reset operation cannot be checked
3288 if (fProofReset && !testMode) {
3289 if (fProofReset==1) {
3290 Info("StartAnalysis", "Sending soft reset signal to proof cluster %s", fProofCluster.Data());
3291 gROOT->ProcessLine(Form("TProof::Reset(\"%s\", kFALSE);", fProofCluster.Data()));
3293 Info("StartAnalysis", "Sending hard reset signal to proof cluster %s", fProofCluster.Data());
3294 gROOT->ProcessLine(Form("TProof::Reset(\"%s\", kTRUE);", fProofCluster.Data()));
3296 Info("StartAnalysis", "Stopping the analysis. Please use SetProofReset(0) to resume.");
3301 // Check if there is an old active session
3302 Long_t nsessions = gROOT->ProcessLine(Form("TProof::Mgr(\"%s\")->QuerySessions(\"\")->GetEntries();", fProofCluster.Data()));
3304 Error("StartAnalysis","You have to reset your old session first\n");
3308 // Do we need to change the ROOT version ? The success of this cannot be checked.
3309 if (!fROOTVersion.IsNull() && !testMode) {
3310 gROOT->ProcessLine(Form("TProof::Mgr(\"%s\")->SetROOTVersion(\"VO_ALICE@ROOT::%s\");",
3311 fProofCluster.Data(), fROOTVersion.Data()));
3313 // Connect to PROOF and check the status
3316 if (fNproofWorkersPerSlave) sworkers = Form("workers=%dx", fNproofWorkersPerSlave);
3317 else if (fNproofWorkers) sworkers = Form("workers=%d", fNproofWorkers);
3319 if (!sworkers.IsNull())
3320 proof = gROOT->ProcessLine(Form("TProof::Open(\"%s\", \"%s\");", fProofCluster.Data(), sworkers.Data()));
3322 proof = gROOT->ProcessLine(Form("TProof::Open(\"%s\");", fProofCluster.Data()));
3324 proof = gROOT->ProcessLine("TProof::Open(\"\");");
3326 Error("StartAnalysis", "Could not start PROOF in test mode");
3331 Error("StartAnalysis", "Could not connect to PROOF cluster <%s>", fProofCluster.Data());
3334 if (fNproofWorkersPerSlave*fNproofWorkers > 0)
3335 gROOT->ProcessLine(Form("gProof->SetParallel(%d);", fNproofWorkers));
3336 // Set proof special parameters if any
3337 TIter nextpp(&fProofParam);
3338 TObject *proofparam;
3339 while ((proofparam=nextpp())) {
3340 TString svalue = GetProofParameter(proofparam->GetName());
3341 gROOT->ProcessLine(Form("gProof->SetParameter(\"%s\",%s);", proofparam->GetName(), svalue.Data()));
3343 // Is dataset existing ?
3345 TString dataset = fProofDataSet;
3346 Int_t index = dataset.Index("#");
3347 if (index>=0) dataset.Remove(index);
3348 // if (!gROOT->ProcessLine(Form("gProof->ExistsDataSet(\"%s\");",fProofDataSet.Data()))) {
3349 // Error("StartAnalysis", "Dataset %s not existing", fProofDataSet.Data());
3352 // Info("StartAnalysis", "Dataset %s found", dataset.Data());
3354 // Is ClearPackages() needed ?
3355 if (TestSpecialBit(kClearPackages)) {
3356 Info("StartAnalysis", "ClearPackages signal sent to PROOF. Use SetClearPackages(kFALSE) to reset this.");
3357 gROOT->ProcessLine("gProof->ClearPackages();");
3359 // Is a given aliroot mode requested ?
3362 if (!fAliRootMode.IsNull()) {
3363 TString alirootMode = fAliRootMode;
3364 if (alirootMode == "default") alirootMode = "";
3365 Info("StartAnalysis", "You are requesting AliRoot mode: %s", fAliRootMode.Data());
3366 optionsList.SetOwner();
3367 optionsList.Add(new TNamed("ALIROOT_MODE", alirootMode.Data()));
3368 // Check the additional libs to be loaded
3370 Bool_t parMode = kFALSE;
3371 if (!alirootMode.IsNull()) extraLibs = "ANALYSIS:OADB:ANALYSISalice";
3372 // Parse the extra libs for .so
3373 if (fAdditionalLibs.Length()) {
3374 TString additionalLibs = fAdditionalLibs;
3375 additionalLibs.Strip();
3376 if (additionalLibs.Length() && fFriendLibs.Length())
3377 additionalLibs += " ";
3378 additionalLibs += fFriendLibs;
3379 TObjArray *list = additionalLibs.Tokenize(" ");
3382 while((str=(TObjString*)next())) {
3383 if (str->GetString().Contains(".so")) {
3385 Warning("StartAnalysis", "Plugin does not support loading libs after par files in PROOF mode. Library %s and following will not load on workers", str->GetName());
3388 TString stmp = str->GetName();
3389 if (stmp.BeginsWith("lib")) stmp.Remove(0,3);
3390 stmp.ReplaceAll(".so","");
3391 if (!extraLibs.IsNull()) extraLibs += ":";
3395 if (str->GetString().Contains(".par")) {
3396 // The first par file found in the list will not allow any further .so
3398 if (!parLibs.IsNull()) parLibs += ":";
3399 parLibs += str->GetName();
3403 if (list) delete list;
3405 if (!extraLibs.IsNull()) {
3406 Info("StartAnalysis", "Adding extra libs: %s",extraLibs.Data());
3407 optionsList.Add(new TNamed("ALIROOT_EXTRA_LIBS",extraLibs.Data()));
3409 // Check extra includes
3410 if (!fIncludePath.IsNull()) {
3411 TString includePath = fIncludePath;
3412 includePath.ReplaceAll(" ",":");
3413 includePath.ReplaceAll("$ALICE_ROOT/","");
3414 includePath.ReplaceAll("${ALICE_ROOT}/","");
3415 includePath.ReplaceAll("-I","");
3416 includePath.Remove(TString::kTrailing, ':');
3417 Info("StartAnalysis", "Adding extra includes: %s",includePath.Data());
3418 optionsList.Add(new TNamed("ALIROOT_EXTRA_INCLUDES",includePath.Data()));
3420 // Check if connection to grid is requested
3421 if (TestSpecialBit(kProofConnectGrid))
3422 optionsList.Add(new TNamed("ALIROOT_ENABLE_ALIEN", "1"));
3423 // Enable AliRoot par
3425 // Enable proof lite package
3426 TString alirootLite = gSystem->ExpandPathName("$ALICE_ROOT/ANALYSIS/macros/AliRootProofLite.par");
3427 for (Int_t i=0; i<optionsList.GetSize(); i++) {
3428 TNamed *obj = (TNamed*)optionsList.At(i);
3429 printf("%s %s\n", obj->GetName(), obj->GetTitle());
3431 if (!gROOT->ProcessLine(Form("gProof->UploadPackage(\"%s\");",alirootLite.Data()))
3432 && !gROOT->ProcessLine(Form("gProof->EnablePackage(\"%s\", (TList*)%p);",alirootLite.Data(),&optionsList))) {
3433 Info("StartAnalysis", "AliRootProofLite enabled");
3435 Error("StartAnalysis", "There was an error trying to enable package AliRootProofLite.par");
3439 if ( ! fAliROOTVersion.IsNull() ) {
3440 if (gROOT->ProcessLine(Form("gProof->EnablePackage(\"VO_ALICE@AliRoot::%s\", (TList*)%p, kTRUE);",
3441 fAliROOTVersion.Data(), &optionsList))) {
3442 Error("StartAnalysis", "There was an error trying to enable package VO_ALICE@AliRoot::%s", fAliROOTVersion.Data());
3447 // Enable first par files from fAdditionalLibs
3448 if (!parLibs.IsNull()) {
3449 TObjArray *list = parLibs.Tokenize(":");
3451 TObjString *package;
3452 while((package=(TObjString*)next())) {
3453 TString spkg = package->GetName();
3454 spkg.ReplaceAll(".par", "");
3455 gSystem->Exec(TString::Format("rm -rf %s", spkg.Data()));
3456 if (!gROOT->ProcessLine(Form("gProof->UploadPackage(\"%s\");", package->GetName()))) {
3457 TString enablePackage = (testMode)?Form("gProof->EnablePackage(\"%s\",kFALSE);", package->GetName()):Form("gProof->EnablePackage(\"%s\",kTRUE);", package->GetName());
3458 if (gROOT->ProcessLine(enablePackage)) {
3459 Error("StartAnalysis", "There was an error trying to enable package %s", package->GetName());
3463 Error("StartAnalysis", "There was an error trying to upload package %s", package->GetName());
3467 if (list) delete list;
3470 if (fAdditionalLibs.Contains(".so") && !testMode) {
3471 Error("StartAnalysis", "You request additional libs to be loaded but did not enabled any AliRoot mode. Please refer to: \
3472 \n http://aaf.cern.ch/node/83 and use a parameter for SetAliRootMode()");
3476 // Enable par files if requested
3477 if (fPackages && fPackages->GetEntries()) {
3478 TIter next(fPackages);
3480 while ((package=next())) {
3481 // Skip packages already enabled
3482 if (parLibs.Contains(package->GetName())) continue;
3483 TString spkg = package->GetName();
3484 spkg.ReplaceAll(".par", "");
3485 gSystem->Exec(TString::Format("rm -rf %s", spkg.Data()));
3486 if (!gROOT->ProcessLine(Form("gProof->UploadPackage(\"%s\");", package->GetName()))) {
3487 if (gROOT->ProcessLine(Form("gProof->EnablePackage(\"%s\",kTRUE);", package->GetName()))) {
3488 Error("StartAnalysis", "There was an error trying to enable package %s", package->GetName());
3492 Error("StartAnalysis", "There was an error trying to upload package %s", package->GetName());
3497 // Do we need to load analysis source files ?
3498 // NOTE: don't load on client since this is anyway done by the user to attach his task.
3499 if (fAnalysisSource.Length()) {
3500 TObjArray *list = fAnalysisSource.Tokenize(" ");
3503 while((str=(TObjString*)next())) {
3504 gROOT->ProcessLine(Form("gProof->Load(\"%s+g\", kTRUE);", str->GetName()));
3506 if (list) delete list;
3509 // Register dataset to proof lite.
3510 if (fFileForTestMode.IsNull()) {
3511 Error("GetChainForTestMode", "For proof test mode please use SetFileForTestMode() pointing to a file that contains data file locations.");
3514 if (gSystem->AccessPathName(fFileForTestMode)) {
3515 Error("GetChainForTestMode", "File not found: %s", fFileForTestMode.Data());
3518 TFileCollection *coll = new TFileCollection();
3519 coll->AddFromFile(fFileForTestMode);
3520 gROOT->ProcessLine(Form("gProof->RegisterDataSet(\"test_collection\", (TFileCollection*)%p, \"OV\");", coll));
3521 gROOT->ProcessLine("gProof->ShowDataSets()");
3526 // Check if output files have to be taken from the analysis manager
3527 if (TestBit(AliAnalysisGrid::kDefaultOutputs)) {
3528 // Add output files and AOD files
3529 fOutputFiles = GetListOfFiles("outaod");
3530 // Add extra files registered to the analysis manager
3531 TString extra = GetListOfFiles("ext");
3532 if (!extra.IsNull()) {
3533 extra.ReplaceAll(".root", "*.root");
3534 if (!fOutputFiles.IsNull()) fOutputFiles += ",";
3535 fOutputFiles += extra;
3537 // Compose the output archive.
3538 fOutputArchive = "log_archive.zip:std*@disk=1 ";
3539 if (mgr->IsCollectThroughput())
3540 fOutputArchive += Form("root_archive.zip:%s,*.stat@disk=%d %s@disk=%d",fOutputFiles.Data(),fNreplicas, mgr->GetFileInfoLog(),fNreplicas);
3542 fOutputArchive += Form("root_archive.zip:%s,*.stat@disk=%d",fOutputFiles.Data(),fNreplicas);
3544 // if (!fCloseSE.Length()) fCloseSE = gSystem->Getenv("alien_CLOSE_SE");
3545 if (TestBit(AliAnalysisGrid::kOffline)) {
3546 Info("StartAnalysis","\n##### OFFLINE MODE ##### Files to be used in GRID are produced but not copied \
3547 \n there nor any job run. You can revise the JDL and analysis \
3548 \n macro then run the same in \"submit\" mode.");
3549 } else if (TestBit(AliAnalysisGrid::kTest)) {
3550 Info("StartAnalysis","\n##### LOCAL MODE ##### Your analysis will be run locally on a subset of the requested \
3552 } else if (TestBit(AliAnalysisGrid::kSubmit)) {
3553 Info("StartAnalysis","\n##### SUBMIT MODE ##### Files required by your analysis are copied to your grid working \
3554 \n space and job submitted.");
3555 } else if (TestBit(AliAnalysisGrid::kMerge)) {
3556 Info("StartAnalysis","\n##### MERGE MODE ##### The registered outputs of the analysis will be merged");
3557 if (fMergeViaJDL) CheckInputData();
3560 Info("StartAnalysis","\n##### FULL ANALYSIS MODE ##### Producing needed files and submitting your analysis job...");
3565 Error("StartAnalysis", "Cannot start grid analysis without grid connection");
3568 if (IsCheckCopy() && gGrid) CheckFileCopy(gGrid->GetHomeDirectory());
3569 if (!CheckInputData()) {
3570 Error("StartAnalysis", "There was an error in preprocessing your requested input data");
3573 if (!CreateDataset(fDataPattern)) {
3575 if (!fRunNumbers.Length() && !fRunRange[0]) serror = Form("path to data directory: <%s>", fGridDataDir.Data());
3576 if (fRunNumbers.Length()) serror = "run numbers";
3577 if (fRunRange[0]) serror = Form("run range [%d, %d]", fRunRange[0], fRunRange[1]);
3578 serror += Form("\n or data pattern <%s>", fDataPattern.Data());
3579 Error("StartAnalysis", "No data to process. Please fix %s in your plugin configuration.", serror.Data());
3582 WriteAnalysisFile();
3583 WriteAnalysisMacro();
3585 WriteValidationScript();
3587 WriteMergingMacro();
3588 WriteMergeExecutable();
3589 WriteValidationScript(kTRUE);
3591 if (!CreateJDL()) return kFALSE;
3592 if (TestBit(AliAnalysisGrid::kOffline)) return kFALSE;
3594 // Locally testing the analysis
3595 Info("StartAnalysis", "\n_______________________________________________________________________ \
3596 \n Running analysis script in a daughter shell as on a worker node \
3597 \n_______________________________________________________________________");
3598 TObjArray *list = fOutputFiles.Tokenize(",");
3602 while((str=(TObjString*)next())) {
3603 outputFile = str->GetString();
3604 Int_t index = outputFile.Index("@");
3605 if (index > 0) outputFile.Remove(index);
3606 if (!gSystem->AccessPathName(outputFile)) gSystem->Exec(Form("rm %s", outputFile.Data()));
3609 gSystem->Exec(Form("bash %s 2>stderr", fExecutable.Data()));
3610 gSystem->Exec(Form("bash %s",fValidationScript.Data()));
3611 // gSystem->Exec("cat stdout");
3614 // Check if submitting is managed by LPM manager
3615 if (fProductionMode) {
3616 //TString prodfile = fJDLName;
3617 //prodfile.ReplaceAll(".jdl", ".prod");
3618 //WriteProductionFile(prodfile);
3619 Info("StartAnalysis", "Job submitting is managed by LPM. Rerun in terminate mode after jobs finished.");
3622 // Submit AliEn job(s)
3623 gGrid->Cd(fGridOutputDir);
3628 if (!fRunNumbers.Length() && !fRunRange[0]) {
3629 // Submit a given xml or a set of runs
3630 res = gGrid->Command(Form("submit %s", fJDLName.Data()));
3631 printf("*************************** %s\n",Form("submit %s", fJDLName.Data()));
3633 const char *cjobId = res->GetKey(0,"jobId");
3637 Error("StartAnalysis", "Your JDL %s could not be submitted", fJDLName.Data());
3640 Info("StartAnalysis", "\n_______________________________________________________________________ \
3641 \n##### Your JDL %s was successfully submitted. \nTHE JOB ID IS: %s \
3642 \n_______________________________________________________________________",
3643 fJDLName.Data(), cjobId);
3646 if (!fGridJobIDs.IsNull()) fGridJobIDs.Append(" ");
3647 fGridJobIDs.Append(jobID);
3648 if (!fGridStages.IsNull()) fGridStages.Append(" ");
3649 fGridStages.Append("full");
3654 Error("StartAnalysis", "No grid result after submission !!! Bailing out...");
3658 // Submit for a range of enumeration of runs.
3659 if (!Submit()) return kFALSE;
3660 jobID = fGridJobIDs;
3664 Info("StartAnalysis", "\n#### STARTING AN ALIEN SHELL FOR YOU. EXIT WHEN YOUR JOB %s HAS FINISHED. #### \
3665 \n You may exit at any time and terminate the job later using the option <terminate> \
3666 \n ##################################################################################", jobID.Data());
3667 gSystem->Exec("aliensh");
3669 Info("StartAnalysis", "\n#### SUBMITTED JOB %s TO ALIEN QUEUE #### \
3670 \n Remember to terminate the job later using the option <terminate> \
3671 \n ##################################################################################", jobID.Data());
3676 //______________________________________________________________________________
3677 const char *AliAnalysisAlien::GetListOfFiles(const char *type)
3679 // Get a comma-separated list of output files of the requested type.
3680 // Type can be (case unsensitive):
3681 // aod - list of aod files (std, extensions and filters)
3682 // out - list of output files connected to containers (but not aod's or extras)
3683 // ext - list of extra files registered to the manager
3684 // ter - list of files produced in terminate
3685 static TString files;
3687 TString stype = type;
3689 TString aodfiles, extra;
3690 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
3692 ::Error("GetListOfFiles", "Cannot call this without analysis manager");
3693 return files.Data();
3695 if (mgr->GetOutputEventHandler()) {
3696 aodfiles = mgr->GetOutputEventHandler()->GetOutputFileName();
3697 TString extraaod = mgr->GetOutputEventHandler()->GetExtraOutputs();
3698 if (!extraaod.IsNull()) {
3700 aodfiles += extraaod;
3703 if (stype.Contains("aod")) {
3705 if (stype == "aod") return files.Data();
3707 // Add output files that are not in the list of AOD files
3708 TString outputfiles = "";
3709 TIter next(mgr->GetOutputs());
3710 AliAnalysisDataContainer *output;
3711 const char *filename = 0;
3712 while ((output=(AliAnalysisDataContainer*)next())) {
3713 filename = output->GetFileName();
3714 if (!(strcmp(filename, "default"))) continue;
3715 if (outputfiles.Contains(filename)) continue;
3716 if (aodfiles.Contains(filename)) continue;
3717 if (!outputfiles.IsNull()) outputfiles += ",";
3718 outputfiles += filename;
3720 if (stype.Contains("out")) {
3721 if (!files.IsNull()) files += ",";
3722 files += outputfiles;
3723 if (stype == "out") return files.Data();
3725 // Add extra files registered to the analysis manager
3727 extra = mgr->GetExtraFiles();
3728 if (!extra.IsNull()) {
3730 extra.ReplaceAll(" ", ",");
3731 TObjArray *fextra = extra.Tokenize(",");
3732 TIter nextx(fextra);
3734 while ((obj=nextx())) {
3735 if (aodfiles.Contains(obj->GetName())) continue;
3736 if (outputfiles.Contains(obj->GetName())) continue;
3737 if (sextra.Contains(obj->GetName())) continue;
3738 if (!sextra.IsNull()) sextra += ",";
3739 sextra += obj->GetName();
3742 if (stype.Contains("ext")) {
3743 if (!files.IsNull()) files += ",";
3747 if (stype == "ext") return files.Data();
3749 if (!fTerminateFiles.IsNull()) {
3750 fTerminateFiles.Strip();
3751 fTerminateFiles.ReplaceAll(" ",",");
3752 TObjArray *fextra = fTerminateFiles.Tokenize(",");
3753 TIter nextx(fextra);
3755 while ((obj=nextx())) {
3756 if (aodfiles.Contains(obj->GetName())) continue;
3757 if (outputfiles.Contains(obj->GetName())) continue;
3758 if (termfiles.Contains(obj->GetName())) continue;
3759 if (sextra.Contains(obj->GetName())) continue;
3760 if (!termfiles.IsNull()) termfiles += ",";
3761 termfiles += obj->GetName();
3765 if (stype.Contains("ter")) {
3766 if (!files.IsNull() && !termfiles.IsNull()) {
3771 return files.Data();
3774 //______________________________________________________________________________
3775 Bool_t AliAnalysisAlien::Submit()
3777 // Submit all master jobs.
3778 Int_t nmasterjobs = fInputFiles->GetEntries();
3779 Long_t tshoot = gSystem->Now();
3780 if (!fNsubmitted && !SubmitNext()) return kFALSE;
3781 while (fNsubmitted < nmasterjobs) {
3782 Long_t now = gSystem->Now();
3783 if ((now-tshoot)>30000) {
3785 if (!SubmitNext()) return kFALSE;
3791 //______________________________________________________________________________
3792 Bool_t AliAnalysisAlien::SubmitMerging()
3794 // Submit all merging jobs.
3795 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
3796 gGrid->Cd(fGridOutputDir);
3797 TString mergeJDLName = fExecutable;
3798 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
3800 Error("SubmitMerging", "You have to use explicit run numbers or run range to merge via JDL!");
3803 Int_t ntosubmit = fInputFiles->GetEntries();
3804 for (Int_t i=0; i<ntosubmit; i++) {
3805 TString runOutDir = gSystem->BaseName(fInputFiles->At(i)->GetName());
3806 runOutDir.ReplaceAll(".xml", "");
3807 if (fOutputToRunNo) {
3808 // The output directory is the run number
3809 printf("### Submitting merging job for run <%s>\n", runOutDir.Data());
3810 runOutDir = Form("%s/%s", fGridOutputDir.Data(), runOutDir.Data());
3812 if (!fRunNumbers.Length() && !fRunRange[0]) {
3813 // The output directory is the grid outdir
3814 printf("### Submitting merging job for the full output directory %s.\n", fGridOutputDir.Data());
3815 runOutDir = fGridOutputDir;
3817 // The output directory is the master number in 3 digits format
3818 printf("### Submitting merging job for master <%03d>\n", i);
3819 runOutDir = Form("%s/%03d",fGridOutputDir.Data(), i);
3822 // Check now the number of merging stages.
3823 TObjArray *list = fOutputFiles.Tokenize(",");
3827 while((str=(TObjString*)next())) {
3828 outputFile = str->GetString();
3829 Int_t index = outputFile.Index("@");
3830 if (index > 0) outputFile.Remove(index);
3831 if (!fMergeExcludes.Contains(outputFile) &&
3832 !fRegisterExcludes.Contains(outputFile)) break;
3835 Bool_t done = CheckMergedFiles(outputFile, runOutDir, fMaxMergeFiles, mergeJDLName);
3836 if (!done && (i==ntosubmit-1)) return kFALSE;
3837 if (!fRunNumbers.Length() && !fRunRange[0]) break;
3839 if (!ntosubmit) return kTRUE;
3841 Info("StartAnalysis", "\n #### STARTING AN ALIEN SHELL FOR YOU. You can exit any time or inspect your jobs in a different shell.##########\
3842 \n Make sure your jobs are in a final state (you can resubmit failed ones via 'masterjob <id> resubmit ERROR_ALL')\
3843 \n Rerun in 'terminate' mode to submit all merging stages, each AFTER the previous one completed. The final merged \
3844 \n output will be written to your alien output directory, while separate stages in <Stage_n>. \
3845 \n ################################################################################################################");
3846 gSystem->Exec("aliensh");
3848 Info("StartAnalysis", "\n #### STARTED MERGING JOBS FOR YOU #### \
3849 \n Make sure your jobs are in a final state (you can resubmit failed ones via 'masterjob <id> resubmit ERROR_ALL') \
3850 \n Rerun in 'terminate' mode to submit all merging stages, each AFTER the previous one completed. The final merged \
3851 \n output will be written to your alien output directory, while separate stages in <Stage_n>. \
3852 \n ################################################################################################################");
3857 //______________________________________________________________________________
3858 Bool_t AliAnalysisAlien::SubmitNext()
3860 // Submit next bunch of master jobs if the queue is free. The first master job is
3861 // submitted right away, while the next will not be unless the previous was split.
3862 // The plugin will not submit new master jobs if there are more that 500 jobs in
3864 static Bool_t iscalled = kFALSE;
3865 static Int_t firstmaster = 0;
3866 static Int_t lastmaster = 0;
3867 static Int_t npermaster = 0;
3868 if (iscalled) return kTRUE;
3870 Int_t nrunning=0, nwaiting=0, nerror=0, ndone=0;
3871 Int_t ntosubmit = 0;
3874 Int_t nmasterjobs = fInputFiles->GetEntries();
3877 if (!IsUseSubmitPolicy()) {
3879 Info("SubmitNext","### Warning submit policy not used ! Submitting too many jobs at a time may be prohibitted. \
3880 \n### You can use SetUseSubmitPolicy() to enable if you have problems.");
3881 ntosubmit = nmasterjobs;
3884 TString status = GetJobStatus(firstmaster, lastmaster, nrunning, nwaiting, nerror, ndone);
3885 printf("=== master %d: %s\n", lastmaster, status.Data());
3886 // If last master not split, just return
3887 if (status != "SPLIT") {iscalled = kFALSE; return kTRUE;}
3888 // No more than 100 waiting jobs
3889 if (nwaiting>500) {iscalled = kFALSE; return kTRUE;}
3890 npermaster = (nrunning+nwaiting+nerror+ndone)/fNsubmitted;
3891 if (npermaster) ntosubmit = (500-nwaiting)/npermaster;
3892 if (!ntosubmit) ntosubmit = 1;
3893 printf("=== WAITING(%d) RUNNING(%d) DONE(%d) OTHER(%d) NperMaster=%d => to submit %d jobs\n",
3894 nwaiting, nrunning, ndone, nerror, npermaster, ntosubmit);
3896 for (Int_t i=0; i<ntosubmit; i++) {
3897 // Submit for a range of enumeration of runs.
3898 if (fNsubmitted>=nmasterjobs) {iscalled = kFALSE; return kTRUE;}
3900 TString runOutDir = gSystem->BaseName(fInputFiles->At(fNsubmitted)->GetName());
3901 runOutDir.ReplaceAll(".xml", "");
3903 query = Form("submit %s %s %s", fJDLName.Data(), fInputFiles->At(fNsubmitted)->GetName(), runOutDir.Data());
3905 query = Form("submit %s %s %03d", fJDLName.Data(), fInputFiles->At(fNsubmitted)->GetName(), fNsubmitted);
3906 printf("********* %s\n",query.Data());
3907 res = gGrid->Command(query);
3909 TString cjobId1 = res->GetKey(0,"jobId");
3910 if (!cjobId1.Length()) {
3914 Error("StartAnalysis", "Your JDL %s could not be submitted. The message was:", fJDLName.Data());
3917 Info("StartAnalysis", "\n_______________________________________________________________________ \
3918 \n##### Your JDL %s submitted (%d to go). \nTHE JOB ID IS: %s \
3919 \n_______________________________________________________________________",
3920 fJDLName.Data(), nmasterjobs-fNsubmitted-1, cjobId1.Data());
3921 if (!fGridJobIDs.IsNull()) fGridJobIDs.Append(" ");
3922 fGridJobIDs.Append(cjobId1);
3923 if (!fGridStages.IsNull()) fGridStages.Append(" ");
3924 fGridStages.Append("full");
3927 lastmaster = cjobId1.Atoi();
3928 if (!firstmaster) firstmaster = lastmaster;
3933 Error("StartAnalysis", "No grid result after submission !!! Bailing out...");
3941 //______________________________________________________________________________
3942 void AliAnalysisAlien::WriteAnalysisFile()
3944 // Write current analysis manager into the file <analysisFile>
3945 TString analysisFile = fExecutable;
3946 analysisFile.ReplaceAll(".sh", ".root");
3947 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3948 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
3949 if (!mgr || !mgr->IsInitialized()) {
3950 Error("WriteAnalysisFile", "You need an initialized analysis manager for this");
3953 // Check analysis type
3955 if (mgr->GetMCtruthEventHandler()) TObject::SetBit(AliAnalysisGrid::kUseMC);
3956 handler = (TObject*)mgr->GetInputEventHandler();
3958 if (handler->InheritsFrom("AliMultiInputEventHandler")) {
3959 AliMultiInputEventHandler *multiIH = (AliMultiInputEventHandler*)handler;
3960 if (multiIH->GetFirstInputEventHandler()->InheritsFrom("AliESDInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseESD);
3961 if (multiIH->GetFirstInputEventHandler()->InheritsFrom("AliAODInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseAOD);
3963 if (handler->InheritsFrom("AliESDInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseESD);
3964 if (handler->InheritsFrom("AliAODInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseAOD);
3967 TDirectory *cdir = gDirectory;
3968 TFile *file = TFile::Open(analysisFile, "RECREATE");
3970 // Skip task Terminate calls for the grid job (but not in test mode, where we want to check also the terminate mode
3971 if (!TestBit(AliAnalysisGrid::kTest)) mgr->SetSkipTerminate(kTRUE);
3972 // Unless merging makes no sense
3973 if (IsSingleOutput()) mgr->SetSkipTerminate(kFALSE);
3976 // Enable termination for local jobs
3977 mgr->SetSkipTerminate(kFALSE);
3979 if (cdir) cdir->cd();
3980 Info("WriteAnalysisFile", "\n##### Analysis manager: %s wrote to file <%s>\n", mgr->GetName(),analysisFile.Data());
3982 Bool_t copy = kTRUE;
3983 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3986 TString workdir = gGrid->GetHomeDirectory();
3987 workdir += fGridWorkingDir;
3988 Info("WriteAnalysisFile", "\n##### Copying file <%s> containing your initialized analysis manager to your alien workspace", analysisFile.Data());
3989 if (FileExists(analysisFile)) gGrid->Rm(analysisFile);
3990 if (!copyLocal2Alien("WriteAnalysisFile",analysisFile.Data(),
3991 Form("%s/%s", workdir.Data(),analysisFile.Data()))) Fatal("","Terminating");
3995 //______________________________________________________________________________
3996 void AliAnalysisAlien::WriteAnalysisMacro()
3998 // Write the analysis macro that will steer the analysis in grid mode.
3999 if (!TestBit(AliAnalysisGrid::kSubmit)) {
4001 out.open(fAnalysisMacro.Data(), ios::out);
4003 Error("WriteAnalysisMacro", "could not open file %s for writing", fAnalysisMacro.Data());
4006 Bool_t hasSTEERBase = kFALSE;
4007 Bool_t hasESD = kFALSE;
4008 Bool_t hasAOD = kFALSE;
4009 Bool_t hasANALYSIS = kFALSE;
4010 Bool_t hasOADB = kFALSE;
4011 Bool_t hasANALYSISalice = kFALSE;
4012 Bool_t hasCORRFW = kFALSE;
4013 TString func = fAnalysisMacro;
4014 TString type = "ESD";
4015 TString comment = "// Analysis using ";
4020 if (IsUseMCchain()) {
4024 if (TObject::TestBit(AliAnalysisGrid::kUseESD)) comment += "ESD";
4025 if (TObject::TestBit(AliAnalysisGrid::kUseAOD)) {
4031 if (type!="AOD" && fFriendChainName!="") {
4032 Error("WriteAnalysisMacro", "Friend chain can be attached only to AOD");
4035 if (TObject::TestBit(AliAnalysisGrid::kUseMC)) comment += "/MC";
4036 else comment += " data";
4037 out << "const char *anatype = \"" << type.Data() << "\";" << endl << endl;
4038 func.ReplaceAll(".C", "");
4039 out << "void " << func.Data() << "()" << endl;
4041 out << comment.Data() << endl;
4042 out << "// Automatically generated analysis steering macro executed in grid subjobs" << endl << endl;
4043 out << " TStopwatch timer;" << endl;
4044 out << " timer.Start();" << endl << endl;
4045 // Change temp directory to current one
4046 if (!IsLocalTest()) {
4047 out << "// connect to AliEn and make the chain" << endl;
4048 out << " if (!TGrid::Connect(\"alien://\")) return;" << endl;
4050 out << "// Set temporary merging directory to current one" << endl;
4051 out << " gSystem->Setenv(\"TMPDIR\", gSystem->pwd());" << endl << endl;
4052 out << "// Set temporary compilation directory to current one" << endl;
4053 out << " gSystem->SetBuildDir(gSystem->pwd(), kTRUE);" << endl << endl;
4054 // Reset existing include path
4055 out << "// Reset existing include path and add current directory first in the search" << endl;
4056 out << " gSystem->SetIncludePath(\"-I.\");" << endl;
4057 if (!fExecutableCommand.Contains("aliroot")) {
4058 out << "// load base root libraries" << endl;
4059 out << " gSystem->Load(\"libTree\");" << endl;
4060 out << " gSystem->Load(\"libGeom\");" << endl;
4061 out << " gSystem->Load(\"libVMC\");" << endl;
4062 out << " gSystem->Load(\"libPhysics\");" << endl << endl;
4063 out << " gSystem->Load(\"libMinuit\");" << endl << endl;
4065 if (fAdditionalRootLibs.Length()) {
4066 // in principle libtree /lib geom libvmc etc. can go into this list, too
4067 out << "// Add aditional libraries" << endl;
4068 TObjArray *list = fAdditionalRootLibs.Tokenize(" ");
4071 while((str=(TObjString*)next())) {
4072 if (str->GetString().Contains(".so"))
4073 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
4075 if (list) delete list;
4077 out << "// Load analysis framework libraries" << endl;
4078 TString setupPar = "AliAnalysisAlien::SetupPar";
4080 if (!fExecutableCommand.Contains("aliroot")) {
4081 out << " gSystem->Load(\"libSTEERBase\");" << endl;
4082 out << " gSystem->Load(\"libESD\");" << endl;
4083 out << " gSystem->Load(\"libAOD\");" << endl;
4085 out << " gSystem->Load(\"libANALYSIS\");" << endl;
4086 out << " gSystem->Load(\"libOADB\");" << endl;
4087 out << " gSystem->Load(\"libANALYSISalice\");" << endl;
4088 out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
4090 TIter next(fPackages);
4093 while ((obj=next())) {
4094 pkgname = obj->GetName();
4095 if (pkgname == "STEERBase" ||
4096 pkgname == "STEERBase.par") hasSTEERBase = kTRUE;
4097 if (pkgname == "ESD" ||
4098 pkgname == "ESD.par") hasESD = kTRUE;
4099 if (pkgname == "AOD" ||
4100 pkgname == "AOD.par") hasAOD = kTRUE;
4101 if (pkgname == "ANALYSIS" ||
4102 pkgname == "ANALYSIS.par") hasANALYSIS = kTRUE;
4103 if (pkgname == "OADB" ||
4104 pkgname == "OADB.par") hasOADB = kTRUE;
4105 if (pkgname == "ANALYSISalice" ||
4106 pkgname == "ANALYSISalice.par") hasANALYSISalice = kTRUE;
4107 if (pkgname == "CORRFW" ||
4108 pkgname == "CORRFW.par") hasCORRFW = kTRUE;
4110 if (hasANALYSISalice) setupPar = "SetupPar";
4111 if (!hasSTEERBase) out << " gSystem->Load(\"libSTEERBase\");" << endl;
4112 else out << " if (!" << setupPar << "(\"STEERBase\")) return;" << endl;
4113 if (!hasESD) out << " gSystem->Load(\"libESD\");" << endl;
4114 else out << " if (!" << setupPar << "(\"ESD\")) return;" << endl;
4115 if (!hasAOD) out << " gSystem->Load(\"libAOD\");" << endl;
4116 else out << " if (!" << setupPar << "(\"AOD\")) return;" << endl;
4117 if (!hasANALYSIS) out << " gSystem->Load(\"libANALYSIS\");" << endl;
4118 else out << " if (!" << setupPar << "(\"ANALYSIS\")) return;" << endl;
4119 if (!hasOADB) out << " gSystem->Load(\"libOADB\");" << endl;
4120 else out << " if (!" << setupPar << "(\"OADB\")) return;" << endl;
4121 if (!hasANALYSISalice) out << " gSystem->Load(\"libANALYSISalice\");" << endl;
4122 else out << " if (!" << setupPar << "(\"ANALYSISalice\")) return;" << endl;
4123 if (!hasCORRFW) out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
4124 else out << " if (!" << setupPar << "(\"CORRFW\")) return;" << endl << endl;
4125 out << "// Compile other par packages" << endl;
4127 while ((obj=next())) {
4128 pkgname = obj->GetName();
4129 if (pkgname == "STEERBase" ||
4130 pkgname == "STEERBase.par" ||
4132 pkgname == "ESD.par" ||
4134 pkgname == "AOD.par" ||
4135 pkgname == "ANALYSIS" ||
4136 pkgname == "ANALYSIS.par" ||
4137 pkgname == "OADB" ||
4138 pkgname == "OADB.par" ||
4139 pkgname == "ANALYSISalice" ||
4140 pkgname == "ANALYSISalice.par" ||
4141 pkgname == "CORRFW" ||
4142 pkgname == "CORRFW.par") continue;
4143 out << " if (!" << setupPar << "(\"" << obj->GetName() << "\")) return;" << endl;
4146 out << "// include path" << endl;
4147 // Get the include path from the interpreter and remove entries pointing to AliRoot
4148 out << " TString intPath = gInterpreter->GetIncludePath();" << endl;
4149 out << " TObjArray *listpaths = intPath.Tokenize(\" \");" << endl;
4150 out << " TIter nextpath(listpaths);" << endl;
4151 out << " TObjString *pname;" << endl;
4152 out << " while ((pname=(TObjString*)nextpath())) {" << endl;
4153 out << " TString current = pname->GetName();" << endl;
4154 out << " if (current.Contains(\"AliRoot\") || current.Contains(\"ALICE_ROOT\")) continue;" << endl;
4155 out << " gSystem->AddIncludePath(current);" << endl;
4156 out << " }" << endl;
4157 out << " if (listpaths) delete listpaths;" << endl;
4158 if (fIncludePath.Length()) out << " gSystem->AddIncludePath(\"" << fIncludePath.Data() << "\");" << endl;
4159 out << " gROOT->ProcessLine(\".include $ALICE_ROOT/include\");" << endl;
4160 out << " printf(\"Include path: %s\\n\", gSystem->GetIncludePath());" << endl << endl;
4161 if (fMCLoop && !fGeneratorLibs.IsNull()) {
4162 out << "// MC generator libraries" << endl;
4163 TObjArray *list = fGeneratorLibs.Tokenize(" ");
4166 while((str=(TObjString*)next())) {
4167 out << " gSystem->Load(\"" << str->GetName() << "\");" << endl;
4171 if (fAdditionalLibs.Length()) {
4172 out << "// Add aditional AliRoot libraries" << endl;
4173 TString additionalLibs = fAdditionalLibs;
4174 additionalLibs.Strip();
4175 if (additionalLibs.Length() && fFriendLibs.Length())
4176 additionalLibs += " ";
4177 additionalLibs += fFriendLibs;
4178 TObjArray *list = additionalLibs.Tokenize(" ");
4181 while((str=(TObjString*)next())) {
4182 if (str->GetString().Contains(".so"))
4183 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
4184 if (str->GetString().Contains(".par"))
4185 out << " if (!" << setupPar << "(\"" << str->GetString() << "\")) return;" << endl;
4190 out << "// analysis source to be compiled at runtime (if any)" << endl;
4191 if (fAnalysisSource.Length()) {
4192 TObjArray *list = fAnalysisSource.Tokenize(" ");
4195 while((str=(TObjString*)next())) {
4196 out << " gROOT->ProcessLine(\".L " << str->GetString().Data() << "+g\");" << endl;
4198 if (list) delete list;
4201 // out << " printf(\"Currently load libraries:\\n\");" << endl;
4202 // out << " printf(\"%s\\n\", gSystem->GetLibraries());" << endl;
4203 if (fFastReadOption) {
4204 Warning("WriteAnalysisMacro", "!!! You requested FastRead option. Using xrootd flags to reduce timeouts in the grid jobs. This may skip some files that could be accessed !!! \
4205 \n+++ NOTE: To disable this option, use: plugin->SetFastReadOption(kFALSE)");
4206 out << "// fast xrootd reading enabled" << endl;
4207 out << " printf(\"!!! You requested FastRead option. Using xrootd flags to reduce timeouts. Note that this may skip some files that could be accessed !!!\");" << endl;
4208 out << " gEnv->SetValue(\"XNet.ConnectTimeout\",50);" << endl;
4209 out << " gEnv->SetValue(\"XNet.RequestTimeout\",50);" << endl;
4210 out << " gEnv->SetValue(\"XNet.MaxRedirectCount\",2);" << endl;
4211 out << " gEnv->SetValue(\"XNet.ReconnectTimeout\",50);" << endl;
4212 out << " gEnv->SetValue(\"XNet.FirstConnectMaxCnt\",1);" << endl << endl;
4214 out << "// read the analysis manager from file" << endl;
4215 TString analysisFile = fExecutable;
4216 analysisFile.ReplaceAll(".sh", ".root");
4217 out << " AliAnalysisManager *mgr = AliAnalysisAlien::LoadAnalysisManager(\""
4218 << analysisFile << "\");" << endl;
4219 out << " if (!mgr) return;" << endl;
4220 if (IsLocalTest()) {
4221 out << " AliAnalysisAlien *plugin = new AliAnalysisAlien();" << endl;
4222 out << " plugin->SetRunMode(\"test\");" << endl;
4223 if (fFileForTestMode.IsNull())
4224 out << " plugin->SetFileForTestMode(\"data.txt\");" << endl;
4226 out << " plugin->SetFileForTestMode(\"" << fFileForTestMode << "\");" << endl;
4227 out << " plugin->SetNtestFiles(" << fNtestFiles << ");" << endl;
4228 if (!fFriendChainName.IsNull())
4229 out << " plugin->SetFriendChainName(\"" << fFriendChainName << "\",\"" << fFriendLibs << "\");" << endl;
4231 out << " plugin->SetUseMCchain();" << endl;
4233 out << " plugin->SetMCLoop(kTRUE);" << endl;
4234 out << " mgr->SetGridHandler(plugin);" << endl;
4235 if (AliAnalysisManager::GetAnalysisManager()) {
4236 out << " mgr->SetDebugLevel(" << AliAnalysisManager::GetAnalysisManager()->GetDebugLevel() << ");" << endl;
4237 out << " mgr->SetNSysInfo(" << AliAnalysisManager::GetAnalysisManager()->GetNsysInfo() << ");" << endl;
4239 out << " mgr->SetDebugLevel(10);" << endl;
4240 out << " mgr->SetNSysInfo(100);" << endl;
4243 out << " mgr->PrintStatus();" << endl;
4244 if (AliAnalysisManager::GetAnalysisManager()) {
4245 if (AliAnalysisManager::GetAnalysisManager()->GetDebugLevel()>3) {
4246 out << " gEnv->SetValue(\"XNet.Debug\", \"1\");" << endl;
4248 if (TestBit(AliAnalysisGrid::kTest))
4249 out << " AliLog::SetGlobalLogLevel(AliLog::kWarning);" << endl;
4251 out << " AliLog::SetGlobalLogLevel(AliLog::kError);" << endl;
4254 if (!IsLocalTest()) {
4256 out << " mgr->SetCacheSize(0);" << endl;
4257 out << " mgr->EventLoop(" << fNMCevents << ");" << endl;
4259 out << " TChain *chain = CreateChain(\"wn.xml\", anatype);" << endl << endl;
4260 out << " mgr->StartAnalysis(\"localfile\", chain);" << endl;
4264 out << " mgr->SetCacheSize(0);" << endl;
4265 out << " mgr->EventLoop(" << fNMCevents << ");" << endl;
4267 out << " mgr->StartAnalysis(\"localfile\");" << endl;
4270 out << " timer.Stop();" << endl;
4271 out << " timer.Print();" << endl;
4272 out << "}" << endl << endl;
4273 if (!IsLocalTest() && !fMCLoop) {
4274 out <<"//________________________________________________________________________________" << endl;
4275 out << "TChain* CreateChain(const char *xmlfile, const char *type=\"ESD\")" << endl;
4277 out << "// Create a chain using url's from xml file" << endl;
4278 out << " TString filename;" << endl;
4279 out << " Int_t run = 0;" << endl;
4280 if (IsUseMCchain()) {
4281 out << " TString treename = \"TE\";" << endl;
4283 if (!fTreeName.IsNull()) {
4284 out << " TString treename = \"" << fTreeName << "\";" << endl;
4286 out << " TString treename = type;" << endl;
4287 out << " treename.ToLower();" << endl;
4288 out << " treename += \"Tree\";" << endl;
4291 out << " printf(\"***************************************\\n\");" << endl;
4292 out << " printf(\" Getting chain of trees %s\\n\", treename.Data());" << endl;
4293 out << " printf(\"***************************************\\n\");" << endl;
4294 out << " TAlienCollection *coll = TAlienCollection::Open(xmlfile);" << endl;
4295 out << " if (!coll) {" << endl;
4296 out << " ::Error(\"CreateChain\", \"Cannot create an AliEn collection from %s\", xmlfile);" << endl;
4297 out << " return NULL;" << endl;
4298 out << " }" << endl;
4299 out << " AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();" << endl;
4300 out << " TChain *chain = new TChain(treename);" << endl;
4301 if(!fFriendChainName.IsNull()) {
4302 out << " TList *friends = new TList();" << endl;
4303 out << " TIter nextfriend(friends);" << endl;
4304 out << " TChain *cfriend = 0;" << endl;
4305 TObjArray *list = fFriendChainName.Tokenize(" ");
4308 while((str=(TObjString*)next())) {
4309 out << " cfriend = new TChain(treename, \"" << str->GetName() << "\");" << endl;
4310 out << " friends->Add(cfriend);" << endl;
4311 out << " chain->AddFriend(cfriend);" << endl;
4314 // out << " TChain *chainFriend = new TChain(treename);" << endl;
4316 out << " coll->Reset();" << endl;
4317 out << " while (coll->Next()) {" << endl;
4318 out << " filename = coll->GetTURL("");" << endl;
4319 out << " if (mgr) {" << endl;
4320 out << " Int_t nrun = AliAnalysisManager::GetRunFromAlienPath(filename);" << endl;
4321 out << " if (nrun && nrun != run) {" << endl;
4322 out << " printf(\"### Run number detected from chain: %d\\n\", nrun);" << endl;
4323 out << " mgr->SetRunFromPath(nrun);" << endl;
4324 out << " run = nrun;" << endl;
4325 out << " }" << endl;
4326 out << " }" << endl;
4327 out << " chain->Add(filename);" << endl;
4328 if(!fFriendChainName.IsNull()) {
4329 out << " TString bpath=coll->GetTURL(\"\");" << endl;
4330 out << " if (bpath.Index(\"#\") > -1) bpath.Remove(bpath.Index(\"#\"));" << endl;
4331 out << " bpath = gSystem->DirName(bpath);" << endl;
4332 out << " bpath += \"/\";" << endl;
4333 out << " TString fileFriend;" << endl;
4334 out << " nextfriend.Reset();" << endl;
4335 out << " while ((cfriend=(TChain*)nextfriend())) {" << endl;
4336 out << " fileFriend = bpath;" << endl;
4337 out << " fileFriend += cfriend->GetTitle();" << endl;
4338 out << " TFile *file = TFile::Open(fileFriend);" << endl;
4339 out << " if (file) {" << endl;
4340 out << " file->Close();" << endl;
4341 out << " cfriend->Add(fileFriend.Data());" << endl;
4342 out << " } else {" << endl;
4343 out << " ::Fatal(\"CreateChain\", \"Cannot open friend file: %s\", fileFriend.Data());" << endl;
4344 out << " return 0;" << endl;
4345 out << " }" << endl;
4346 out << " }" << endl;
4348 out << " }" << endl;
4349 out << " if (!chain->GetNtrees()) {" << endl;
4350 out << " ::Error(\"CreateChain\", \"No tree found from collection %s\", xmlfile);" << endl;
4351 out << " return NULL;" << endl;
4352 out << " }" << endl;
4353 out << " return chain;" << endl;
4354 out << "}" << endl << endl;
4356 if (hasANALYSISalice) {
4357 out <<"//________________________________________________________________________________" << endl;
4358 out << "Bool_t SetupPar(const char *package) {" << endl;
4359 out << "// Compile the package and set it up." << endl;
4360 out << " TString pkgdir = package;" << endl;
4361 out << " pkgdir.ReplaceAll(\".par\",\"\");" << endl;
4362 out << " gSystem->Exec(TString::Format(\"tar xvzf %s.par\", pkgdir.Data()));" << endl;
4363 out << " TString cdir = gSystem->WorkingDirectory();" << endl;
4364 out << " gSystem->ChangeDirectory(pkgdir);" << endl;
4365 out << " // Check for BUILD.sh and execute" << endl;
4366 out << " if (!gSystem->AccessPathName(\"PROOF-INF/BUILD.sh\")) {" << endl;
4367 out << " printf(\"*******************************\\n\");" << endl;
4368 out << " printf(\"*** Building PAR archive ***\\n\");" << endl;
4369 out << " printf(\"*******************************\\n\");" << endl;
4370 out << " if (gSystem->Exec(\"PROOF-INF/BUILD.sh\")) {" << endl;
4371 out << " ::Error(\"SetupPar\", \"Cannot build par archive %s\", pkgdir.Data());" << endl;
4372 out << " gSystem->ChangeDirectory(cdir);" << endl;
4373 out << " return kFALSE;" << endl;
4374 out << " }" << endl;
4375 out << " } else {" << endl;
4376 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/BUILD.sh for package %s\", pkgdir.Data());" << endl;
4377 out << " gSystem->ChangeDirectory(cdir);" << endl;
4378 out << " return kFALSE;" << endl;
4379 out << " }" << endl;
4380 out << " // Check for SETUP.C and execute" << endl;
4381 out << " if (!gSystem->AccessPathName(\"PROOF-INF/SETUP.C\")) {" << endl;
4382 out << " printf(\"*******************************\\n\");" << endl;
4383 out << " printf(\"*** Setup PAR archive ***\\n\");" << endl;
4384 out << " printf(\"*******************************\\n\");" << endl;
4385 out << " gROOT->Macro(\"PROOF-INF/SETUP.C\");" << endl;
4386 out << " } else {" << endl;
4387 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/SETUP.C for package %s\", pkgdir.Data());" << endl;
4388 out << " gSystem->ChangeDirectory(cdir);" << endl;
4389 out << " return kFALSE;" << endl;
4390 out << " }" << endl;
4391 out << " // Restore original workdir" << endl;
4392 out << " gSystem->ChangeDirectory(cdir);" << endl;
4393 out << " return kTRUE;" << endl;
4396 Info("WriteAnalysisMacro", "\n##### Analysis macro to run on worker nodes <%s> written",fAnalysisMacro.Data());
4398 Bool_t copy = kTRUE;
4399 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
4402 TString workdir = gGrid->GetHomeDirectory();
4403 workdir += fGridWorkingDir;
4404 if (FileExists(fAnalysisMacro)) gGrid->Rm(fAnalysisMacro);
4405 Info("WriteAnalysisMacro", "\n##### Copying analysis macro: <%s> to your alien workspace", fAnalysisMacro.Data());
4406 // TFile::Cp(Form("file:%s",fAnalysisMacro.Data()), Form("alien://%s/%s", workdir.Data(), fAnalysisMacro.Data()));
4407 if (!copyLocal2Alien("WriteAnalysisMacro",fAnalysisMacro.Data(),
4408 Form("alien://%s/%s", workdir.Data(),
4409 fAnalysisMacro.Data()))) Fatal("","Terminating");
4413 //______________________________________________________________________________
4414 void AliAnalysisAlien::WriteMergingMacro()
4416 // Write a macro to merge the outputs per master job.
4417 if (!fMergeViaJDL) return;
4418 if (!fOutputFiles.Length()) {
4419 Error("WriteMergingMacro", "No output file names defined. Are you running the right AliAnalysisAlien configuration ?");
4422 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
4423 TString mergingMacro = fExecutable;
4424 mergingMacro.ReplaceAll(".sh","_merge.C");
4425 if (gGrid && !fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
4426 if (!TestBit(AliAnalysisGrid::kSubmit)) {
4428 out.open(mergingMacro.Data(), ios::out);
4430 Error("WriteMergingMacro", "could not open file %s for writing", fAnalysisMacro.Data());
4433 Bool_t hasSTEERBase = kFALSE;
4434 Bool_t hasESD = kFALSE;
4435 Bool_t hasAOD = kFALSE;
4436 Bool_t hasANALYSIS = kFALSE;
4437 Bool_t hasOADB = kFALSE;
4438 Bool_t hasANALYSISalice = kFALSE;
4439 Bool_t hasCORRFW = kFALSE;
4440 TString func = mergingMacro;
4442 func.ReplaceAll(".C", "");
4443 out << "void " << func.Data() << "(const char *dir, Int_t stage=0)" << endl;
4445 out << "// Automatically generated merging macro executed in grid subjobs" << endl << endl;
4446 out << " TStopwatch timer;" << endl;
4447 out << " timer.Start();" << endl << endl;
4448 // Reset existing include path
4449 out << "// Reset existing include path and add current directory first in the search" << endl;
4450 out << " gSystem->SetIncludePath(\"-I.\");" << endl;
4451 if (!fExecutableCommand.Contains("aliroot")) {
4452 out << "// load base root libraries" << endl;
4453 out << " gSystem->Load(\"libTree\");" << endl;
4454 out << " gSystem->Load(\"libGeom\");" << endl;
4455 out << " gSystem->Load(\"libVMC\");" << endl;
4456 out << " gSystem->Load(\"libPhysics\");" << endl << endl;
4457 out << " gSystem->Load(\"libMinuit\");" << endl << endl;
4459 if (fAdditionalRootLibs.Length()) {
4460 // in principle libtree /lib geom libvmc etc. can go into this list, too
4461 out << "// Add aditional libraries" << endl;
4462 TObjArray *list = fAdditionalRootLibs.Tokenize(" ");
4465 while((str=(TObjString*)next())) {
4466 if (str->GetString().Contains(".so"))
4467 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
4469 if (list) delete list;
4471 out << "// Load analysis framework libraries" << endl;
4473 if (!fExecutableCommand.Contains("aliroot")) {
4474 out << " gSystem->Load(\"libSTEERBase\");" << endl;
4475 out << " gSystem->Load(\"libESD\");" << endl;
4476 out << " gSystem->Load(\"libAOD\");" << endl;
4478 out << " gSystem->Load(\"libANALYSIS\");" << endl;
4479 out << " gSystem->Load(\"libOADB\");" << endl;
4480 out << " gSystem->Load(\"libANALYSISalice\");" << endl;
4481 out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
4483 TIter next(fPackages);
4486 TString setupPar = "AliAnalysisAlien::SetupPar";
4487 while ((obj=next())) {
4488 pkgname = obj->GetName();
4489 if (pkgname == "STEERBase" ||
4490 pkgname == "STEERBase.par") hasSTEERBase = kTRUE;
4491 if (pkgname == "ESD" ||
4492 pkgname == "ESD.par") hasESD = kTRUE;
4493 if (pkgname == "AOD" ||
4494 pkgname == "AOD.par") hasAOD = kTRUE;
4495 if (pkgname == "ANALYSIS" ||
4496 pkgname == "ANALYSIS.par") hasANALYSIS = kTRUE;
4497 if (pkgname == "OADB" ||
4498 pkgname == "OADB.par") hasOADB = kTRUE;
4499 if (pkgname == "ANALYSISalice" ||
4500 pkgname == "ANALYSISalice.par") hasANALYSISalice = kTRUE;
4501 if (pkgname == "CORRFW" ||
4502 pkgname == "CORRFW.par") hasCORRFW = kTRUE;
4504 if (hasANALYSISalice) setupPar = "SetupPar";
4505 if (!hasSTEERBase) out << " gSystem->Load(\"libSTEERBase\");" << endl;
4506 else out << " if (!" << setupPar << "(\"STEERBase\")) return;" << endl;
4507 if (!hasESD) out << " gSystem->Load(\"libESD\");" << endl;
4508 else out << " if (!" << setupPar << "(\"ESD\")) return;" << endl;
4509 if (!hasAOD) out << " gSystem->Load(\"libAOD\");" << endl;
4510 else out << " if (!" << setupPar << "(\"AOD\")) return;" << endl;
4511 out << " gSystem->Load(\"libOADB\");" << endl;
4512 if (!hasANALYSIS) out << " gSystem->Load(\"libANALYSIS\");" << endl;
4513 else out << " if (!" << setupPar << "(\"ANALYSIS\")) return;" << endl;
4514 if (!hasOADB) out << " gSystem->Load(\"libOADB\");" << endl;
4515 else out << " if (!" << setupPar << "(\"OADB\")) return;" << endl;
4516 if (!hasANALYSISalice) out << " gSystem->Load(\"libANALYSISalice\");" << endl;
4517 else out << " if (!" << setupPar << "(\"ANALYSISalice\")) return;" << endl;
4518 if (!hasCORRFW) out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
4519 else out << " if (!" << setupPar << "(\"CORRFW\")) return;" << endl << endl;
4520 out << "// Compile other par packages" << endl;
4522 while ((obj=next())) {
4523 pkgname = obj->GetName();
4524 if (pkgname == "STEERBase" ||
4525 pkgname == "STEERBase.par" ||
4527 pkgname == "ESD.par" ||
4529 pkgname == "AOD.par" ||
4530 pkgname == "ANALYSIS" ||
4531 pkgname == "ANALYSIS.par" ||
4532 pkgname == "OADB" ||
4533 pkgname == "OADB.par" ||
4534 pkgname == "ANALYSISalice" ||
4535 pkgname == "ANALYSISalice.par" ||
4536 pkgname == "CORRFW" ||
4537 pkgname == "CORRFW.par") continue;
4538 out << " if (!" << setupPar << "(\"" << obj->GetName() << "\")) return;" << endl;
4541 out << "// include path" << endl;
4542 // Get the include path from the interpreter and remove entries pointing to AliRoot
4543 out << " TString intPath = gInterpreter->GetIncludePath();" << endl;
4544 out << " TObjArray *listpaths = intPath.Tokenize(\" \");" << endl;
4545 out << " TIter nextpath(listpaths);" << endl;
4546 out << " TObjString *pname;" << endl;
4547 out << " while ((pname=(TObjString*)nextpath())) {" << endl;
4548 out << " TString current = pname->GetName();" << endl;
4549 out << " if (current.Contains(\"AliRoot\") || current.Contains(\"ALICE_ROOT\")) continue;" << endl;
4550 out << " gSystem->AddIncludePath(current);" << endl;
4551 out << " }" << endl;
4552 out << " if (listpaths) delete listpaths;" << endl;
4553 if (fIncludePath.Length()) out << " gSystem->AddIncludePath(\"" << fIncludePath.Data() << "\");" << endl;
4554 out << " gROOT->ProcessLine(\".include $ALICE_ROOT/include\");" << endl;
4555 out << " printf(\"Include path: %s\\n\", gSystem->GetIncludePath());" << endl << endl;
4556 if (fAdditionalLibs.Length()) {
4557 out << "// Add aditional AliRoot libraries" << endl;
4558 TString additionalLibs = fAdditionalLibs;
4559 additionalLibs.Strip();
4560 if (additionalLibs.Length() && fFriendLibs.Length())
4561 additionalLibs += " ";
4562 additionalLibs += fFriendLibs;
4563 TObjArray *list = additionalLibs.Tokenize(" ");
4566 while((str=(TObjString*)next())) {
4567 if (str->GetString().Contains(".so"))
4568 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
4570 if (list) delete list;
4573 out << "// Analysis source to be compiled at runtime (if any)" << endl;
4574 if (fAnalysisSource.Length()) {
4575 TObjArray *list = fAnalysisSource.Tokenize(" ");
4578 while((str=(TObjString*)next())) {
4579 out << " gROOT->ProcessLine(\".L " << str->GetString().Data() << "+g\");" << endl;
4581 if (list) delete list;
4585 if (fFastReadOption) {
4586 Warning("WriteMergingMacro", "!!! You requested FastRead option. Using xrootd flags to reduce timeouts in the grid merging jobs. Note that this may skip some files that could be accessed !!!");
4587 out << "// fast xrootd reading enabled" << endl;
4588 out << " printf(\"!!! You requested FastRead option. Using xrootd flags to reduce timeouts. Note that this may skip some files that could be accessed !!!\");" << endl;
4589 out << " gEnv->SetValue(\"XNet.ConnectTimeout\",50);" << endl;
4590 out << " gEnv->SetValue(\"XNet.RequestTimeout\",50);" << endl;
4591 out << " gEnv->SetValue(\"XNet.MaxRedirectCount\",2);" << endl;
4592 out << " gEnv->SetValue(\"XNet.ReconnectTimeout\",50);" << endl;
4593 out << " gEnv->SetValue(\"XNet.FirstConnectMaxCnt\",1);" << endl << endl;
4595 // Change temp directory to current one
4596 out << "// Connect to AliEn" << endl;
4597 out << " if (!TGrid::Connect(\"alien://\")) return;" << endl;
4598 out << "// Set temporary merging directory to current one" << endl;
4599 out << " gSystem->Setenv(\"TMPDIR\", gSystem->pwd());" << endl << endl;
4600 out << "// Set temporary compilation directory to current one" << endl;
4601 out << " gSystem->SetBuildDir(gSystem->pwd(), kTRUE);" << endl << endl;
4602 out << " TString outputDir = dir;" << endl;
4603 out << " TString outputFiles = \"" << GetListOfFiles("out") << "\";" << endl;
4604 out << " TString mergeExcludes = \"" << fMergeExcludes << " " << fRegisterExcludes << "\";" << endl;
4605 out << " TObjArray *list = outputFiles.Tokenize(\",\");" << endl;
4606 out << " TIter *iter = new TIter(list);" << endl;
4607 out << " TObjString *str;" << endl;
4608 out << " TString outputFile;" << endl;
4609 out << " Bool_t merged = kTRUE;" << endl;
4610 TString analysisFile = fExecutable;
4611 analysisFile.ReplaceAll(".sh", ".root");
4612 out << " AliAnalysisManager *mgr = AliAnalysisAlien::LoadAnalysisManager(\""
4613 << analysisFile << "\");" << endl;
4614 out << " if (!mgr) {" << endl;
4615 out << " printf(\"ERROR: Analysis manager could not be extracted from file \");" << endl;
4616 out << " return;" << endl;
4617 out << " }" << endl;
4618 if (IsLocalTest()) {
4619 out << " printf(\"===================================\n\");" << endl;
4620 out << " printf(\"Testing merging...\\n\");" << endl;
4621 out << " printf(\"===================================\n\");" << endl;
4623 out << " while((str=(TObjString*)iter->Next())) {" << endl;
4624 out << " outputFile = str->GetString();" << endl;
4625 out << " if (outputFile.Contains(\"*\")) continue;" << endl;
4626 out << " Int_t index = outputFile.Index(\"@\");" << endl;
4627 out << " if (index > 0) outputFile.Remove(index);" << endl;
4628 out << " // Skip already merged outputs" << endl;
4629 out << " if (!gSystem->AccessPathName(outputFile)) {" << endl;
4630 out << " printf(\"Output file <%s> found. Not merging again.\\n\",outputFile.Data());" << endl;
4631 out << " continue;" << endl;
4632 out << " }" << endl;
4633 out << " if (mergeExcludes.Contains(outputFile.Data())) continue;" << endl;
4634 out << " merged = AliAnalysisAlien::MergeOutput(outputFile, outputDir, " << fMaxMergeFiles << ", stage);" << endl;
4635 out << " if (!merged) {" << endl;
4636 out << " printf(\"ERROR: Cannot merge %s\\n\", outputFile.Data());" << endl;
4637 out << " return;" << endl;
4638 out << " }" << endl;
4639 out << " }" << endl;
4640 if (mgr && mgr->IsCollectThroughput() && !IsLocalTest()) {
4641 out << " TString infolog = \"" << mgr->GetFileInfoLog() << "\";" << endl;
4642 out << " AliAnalysisAlien::MergeInfo(infolog, outputDir);" << endl;
4644 out << " // all outputs merged, validate" << endl;
4645 out << " ofstream out;" << endl;
4646 out << " out.open(\"outputs_valid\", ios::out);" << endl;
4647 out << " out.close();" << endl;
4648 out << " // read the analysis manager from file" << endl;
4649 if (IsLocalTest()) {
4650 out << " printf(\"===================================\n\");" << endl;
4651 out << " printf(\"Testing Terminate()...\\n\");" << endl;
4652 out << " printf(\"===================================\n\");" << endl;
4654 out << " if (!outputDir.Contains(\"Stage\")) return;" << endl;
4656 out << " mgr->SetRunFromPath(mgr->GetRunFromAlienPath(dir));" << endl;
4657 out << " mgr->SetSkipTerminate(kFALSE);" << endl;
4658 out << " mgr->PrintStatus();" << endl;
4660 if (mgr->GetDebugLevel()>3) {
4661 out << " gEnv->SetValue(\"XNet.Debug\", \"1\");" << endl;
4663 if (TestBit(AliAnalysisGrid::kTest))
4664 out << " AliLog::SetGlobalLogLevel(AliLog::kWarning);" << endl;
4666 out << " AliLog::SetGlobalLogLevel(AliLog::kError);" << endl;
4669 out << " TTree *tree = NULL;" << endl;
4670 out << " mgr->StartAnalysis(\"gridterminate\", tree);" << endl;
4671 out << "}" << endl << endl;
4672 if (hasANALYSISalice) {
4673 out <<"//________________________________________________________________________________" << endl;
4674 out << "Bool_t SetupPar(const char *package) {" << endl;
4675 out << "// Compile the package and set it up." << endl;
4676 out << " TString pkgdir = package;" << endl;
4677 out << " pkgdir.ReplaceAll(\".par\",\"\");" << endl;
4678 out << " gSystem->Exec(TString::Format(\"tar xvzf %s.par\", pkgdir.Data()));" << endl;
4679 out << " TString cdir = gSystem->WorkingDirectory();" << endl;
4680 out << " gSystem->ChangeDirectory(pkgdir);" << endl;
4681 out << " // Check for BUILD.sh and execute" << endl;
4682 out << " if (!gSystem->AccessPathName(\"PROOF-INF/BUILD.sh\")) {" << endl;
4683 out << " printf(\"*******************************\\n\");" << endl;
4684 out << " printf(\"*** Building PAR archive ***\\n\");" << endl;
4685 out << " printf(\"*******************************\\n\");" << endl;
4686 out << " if (gSystem->Exec(\"PROOF-INF/BUILD.sh\")) {" << endl;
4687 out << " ::Error(\"SetupPar\", \"Cannot build par archive %s\", pkgdir.Data());" << endl;
4688 out << " gSystem->ChangeDirectory(cdir);" << endl;
4689 out << " return kFALSE;" << endl;
4690 out << " }" << endl;
4691 out << " } else {" << endl;
4692 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/BUILD.sh for package %s\", pkgdir.Data());" << endl;
4693 out << " gSystem->ChangeDirectory(cdir);" << endl;
4694 out << " return kFALSE;" << endl;
4695 out << " }" << endl;
4696 out << " // Check for SETUP.C and execute" << endl;
4697 out << " if (!gSystem->AccessPathName(\"PROOF-INF/SETUP.C\")) {" << endl;
4698 out << " printf(\"*******************************\\n\");" << endl;
4699 out << " printf(\"*** Setup PAR archive ***\\n\");" << endl;
4700 out << " printf(\"*******************************\\n\");" << endl;
4701 out << " gROOT->Macro(\"PROOF-INF/SETUP.C\");" << endl;
4702 out << " } else {" << endl;
4703 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/SETUP.C for package %s\", pkgdir.Data());" << endl;
4704 out << " gSystem->ChangeDirectory(cdir);" << endl;
4705 out << " return kFALSE;" << endl;
4706 out << " }" << endl;
4707 out << " // Restore original workdir" << endl;
4708 out << " gSystem->ChangeDirectory(cdir);" << endl;
4709 out << " return kTRUE;" << endl;
4713 Bool_t copy = kTRUE;
4714 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
4717 TString workdir = gGrid->GetHomeDirectory();
4718 workdir += fGridWorkingDir;
4719 if (FileExists(mergingMacro)) gGrid->Rm(mergingMacro);
4720 Info("WriteMergingMacro", "\n##### Copying merging macro: <%s> to your alien workspace", mergingMacro.Data());
4721 // TFile::Cp(Form("file:%s",mergingMacro.Data()), Form("alien://%s/%s", workdir.Data(), mergingMacro.Data()));
4722 if (!copyLocal2Alien("WriteMergeMacro",mergingMacro.Data(),
4723 Form("%s/%s", workdir.Data(), mergingMacro.Data()))) Fatal("","Terminating");
4727 //______________________________________________________________________________
4728 Bool_t AliAnalysisAlien::SetupPar(const char *package)
4730 // Compile the par file archive pointed by <package>. This must be present in the current directory.
4731 // Note that for loading the compiled library. The current directory should have precedence in
4733 TString pkgdir = package;
4734 pkgdir.ReplaceAll(".par","");
4735 gSystem->Exec(TString::Format("tar xzf %s.par", pkgdir.Data()));
4736 TString cdir = gSystem->WorkingDirectory();
4737 gSystem->ChangeDirectory(pkgdir);
4738 // Check for BUILD.sh and execute
4739 if (!gSystem->AccessPathName("PROOF-INF/BUILD.sh")) {
4740 printf("**************************************************\n");
4741 printf("*** Building PAR archive %s\n", package);
4742 printf("**************************************************\n");
4743 if (gSystem->Exec("PROOF-INF/BUILD.sh")) {
4744 ::Error("SetupPar", "Cannot build par archive %s", pkgdir.Data());
4745 gSystem->ChangeDirectory(cdir);
4749 ::Error("SetupPar","Cannot access PROOF-INF/BUILD.sh for package %s", pkgdir.Data());
4750 gSystem->ChangeDirectory(cdir);
4753 // Check for SETUP.C and execute
4754 if (!gSystem->AccessPathName("PROOF-INF/SETUP.C")) {
4755 printf("**************************************************\n");
4756 printf("*** Setup PAR archive %s\n", package);
4757 printf("**************************************************\n");
4758 gROOT->Macro("PROOF-INF/SETUP.C");
4759 printf("*** Loaded library: %s\n", gSystem->GetLibraries(pkgdir,"",kFALSE));
4761 ::Error("SetupPar","Cannot access PROOF-INF/SETUP.C for package %s", pkgdir.Data());
4762 gSystem->ChangeDirectory(cdir);
4765 // Restore original workdir
4766 gSystem->ChangeDirectory(cdir);
4770 //______________________________________________________________________________
4771 void AliAnalysisAlien::WriteExecutable()
4773 // Generate the alien executable script.
4774 // Patch executable with -x to catch error code
4775 if (fExecutableCommand.Contains("root") &&
4776 fExecutableCommand.Contains("-q") &&
4777 !fExecutableCommand.Contains("-x")) fExecutableCommand += " -x";
4778 if (!TestBit(AliAnalysisGrid::kSubmit)) {
4780 out.open(fExecutable.Data(), ios::out);
4782 Error("WriteExecutable", "Bad file name for executable: %s", fExecutable.Data());
4785 out << "#!/bin/bash" << endl;
4786 // Make sure we can properly compile par files
4787 out << "export LD_LIBRARY_PATH=.:$LD_LIBRARY_PATH" << endl;
4788 out << "echo \"=========================================\"" << endl;
4789 out << "echo \"############## PATH : ##############\"" << endl;
4790 out << "echo $PATH" << endl;
4791 out << "echo \"############## LD_LIBRARY_PATH : ##############\"" << endl;
4792 out << "echo $LD_LIBRARY_PATH" << endl;
4793 out << "echo \"############## ROOTSYS : ##############\"" << endl;
4794 out << "echo $ROOTSYS" << endl;
4795 out << "echo \"############## which root : ##############\"" << endl;
4796 out << "which root" << endl;
4797 out << "echo \"############## ALICE_ROOT : ##############\"" << endl;
4798 out << "echo $ALICE_ROOT" << endl;
4799 out << "echo \"############## which aliroot : ##############\"" << endl;
4800 out << "which aliroot" << endl;
4801 out << "echo \"############## system limits : ##############\"" << endl;
4802 out << "ulimit -a" << endl;
4803 out << "echo \"############## memory : ##############\"" << endl;
4804 out << "free -m" << endl;
4805 out << "echo \"=========================================\"" << endl << endl;
4806 out << fExecutableCommand << " ";
4807 out << fAnalysisMacro.Data() << " " << fExecutableArgs.Data() << endl;
4808 out << "RET=$?" << endl;
4809 out << "if [ \"$RET\" != \"0\" ];then" << endl;
4810 out << " echo \"======== ERROR : " << fAnalysisMacro.Data() << " finished with NON zero code: $RET ========\"" << endl;
4811 out << " if [ \"$RET\" -gt 128 ] && [ \"$RET\" -lt 160 ]; then"<<endl;
4812 out << " let sig=\"$RET - 128\""<<endl;
4813 out << " sigs='HUP INT QUIT ILL TRAP ABRT BUS FPE"<<endl;
4814 out << " KILL USR1 SEGV USR2 PIPE ALRM TERM STKFLT"<<endl;
4815 out << " CHLD CONT STOP TSTP TTIN TTOU URG XCPU"<<endl;
4816 out << " XFSZ VTALRM PROF WINCH IO PWR SYS'"<<endl;
4817 out << " sig=SIG`echo $sigs | awk '{ print $'\"$sig\"' }'`"<<endl;
4818 out << " echo \"======== it appears to have been killed with signal: $sig ========\""<<endl;
4820 out << " exit $RET"<< endl;
4821 out << "fi" << endl << endl ;
4822 out << "echo \"======== " << fAnalysisMacro.Data() << " finished with exit code: $RET ========\"" << endl;
4823 out << "echo \"############## memory after: ##############\"" << endl;
4824 out << "free -m" << endl;
4826 Bool_t copy = kTRUE;
4827 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
4830 TString workdir = gGrid->GetHomeDirectory();
4831 TString bindir = Form("%s/bin", workdir.Data());
4832 if (!DirectoryExists(bindir)) gGrid->Mkdir(bindir,"-p");
4833 workdir += fGridWorkingDir;
4834 TString executable = Form("%s/bin/%s", gGrid->GetHomeDirectory(), fExecutable.Data());
4835 if (FileExists(executable)) gGrid->Rm(executable);
4836 Info("WriteExecutable", "\n##### Copying executable file <%s> to your AliEn bin directory", fExecutable.Data());
4837 // TFile::Cp(Form("file:%s",fExecutable.Data()), Form("alien://%s", executable.Data()));
4838 if (!copyLocal2Alien("WriteExecutable",fExecutable.Data(),
4839 executable.Data())) Fatal("","Terminating");
4843 //______________________________________________________________________________
4844 void AliAnalysisAlien::WriteMergeExecutable()
4846 // Generate the alien executable script for the merging job.
4847 if (!fMergeViaJDL) return;
4848 TString mergeExec = fExecutable;
4849 mergeExec.ReplaceAll(".sh", "_merge.sh");
4850 if (!TestBit(AliAnalysisGrid::kSubmit)) {
4852 out.open(mergeExec.Data(), ios::out);
4854 Error("WriteMergingExecutable", "Bad file name for executable: %s", mergeExec.Data());
4857 out << "#!/bin/bash" << endl;
4858 // Make sure we can properly compile par files
4859 out << "export LD_LIBRARY_PATH=.:$LD_LIBRARY_PATH" << endl;
4860 out << "echo \"=========================================\"" << endl;
4861 out << "echo \"############## PATH : ##############\"" << endl;
4862 out << "echo $PATH" << endl;
4863 out << "echo \"############## LD_LIBRARY_PATH : ##############\"" << endl;
4864 out << "echo $LD_LIBRARY_PATH" << endl;
4865 out << "echo \"############## ROOTSYS : ##############\"" << endl;
4866 out << "echo $ROOTSYS" << endl;
4867 out << "echo \"############## which root : ##############\"" << endl;
4868 out << "which root" << endl;
4869 out << "echo \"############## ALICE_ROOT : ##############\"" << endl;
4870 out << "echo $ALICE_ROOT" << endl;
4871 out << "echo \"############## which aliroot : ##############\"" << endl;
4872 out << "which aliroot" << endl;
4873 out << "echo \"############## system limits : ##############\"" << endl;
4874 out << "ulimit -a" << endl;
4875 out << "echo \"############## memory : ##############\"" << endl;
4876 out << "free -m" << endl;
4877 out << "echo \"=========================================\"" << endl << endl;
4878 TString mergeMacro = fExecutable;
4879 mergeMacro.ReplaceAll(".sh", "_merge.C");
4880 if (IsOneStageMerging())
4881 out << "export ARG=\"" << mergeMacro << "(\\\"$1\\\")\"" << endl;
4883 out << "export ARG=\"" << mergeMacro << "(\\\"$1\\\",$2)\"" << endl;
4884 out << fExecutableCommand << " " << "$ARG" << endl;
4885 out << "RET=$?" << endl;
4886 out << "if [ \"$RET\" != \"0\" ];then" << endl;
4887 out << " echo \"======== ERROR : " << fAnalysisMacro.Data() << " finished with NON zero code: $RET ========\"" << endl;
4888 out << " if [ \"$RET\" -gt 128 ] && [ \"$RET\" -lt 160 ]; then"<<endl;
4889 out << " let sig=\"$RET - 128\""<<endl;
4890 out << " sigs='HUP INT QUIT ILL TRAP ABRT BUS FPE"<<endl;
4891 out << " KILL USR1 SEGV USR2 PIPE ALRM TERM STKFLT"<<endl;
4892 out << " CHLD CONT STOP TSTP TTIN TTOU URG XCPU"<<endl;
4893 out << " XFSZ VTALRM PROF WINCH IO PWR SYS'"<<endl;
4894 out << " sig=SIG`echo $sigs | awk '{ print $'\"$sig\"' }'`"<<endl;
4895 out << " echo \"======== it appears to have been killed with signal: $sig ========\""<<endl;
4897 out << " exit $RET"<< endl;
4898 out << "fi" << endl << endl ;
4899 out << "echo \"======== " << mergeMacro.Data() << " finished with exit code: $? ========\"" << endl;
4900 out << "echo \"############## memory after: ##############\"" << endl;
4901 out << "free -m" << endl;
4903 Bool_t copy = kTRUE;
4904 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
4907 TString workdir = gGrid->GetHomeDirectory();
4908 TString bindir = Form("%s/bin", workdir.Data());
4909 if (!DirectoryExists(bindir)) gGrid->Mkdir(bindir,"-p");
4910 workdir += fGridWorkingDir;
4911 TString executable = Form("%s/bin/%s", gGrid->GetHomeDirectory(), mergeExec.Data());
4912 if (FileExists(executable)) gGrid->Rm(executable);
4913 Info("WriteMergeExecutable", "\n##### Copying executable file <%s> to your AliEn bin directory", mergeExec.Data());
4914 // TFile::Cp(Form("file:%s",mergeExec.Data()), Form("alien://%s", executable.Data()));
4915 if (!copyLocal2Alien("WriteMergeExecutable",
4916 mergeExec.Data(), executable.Data())) Fatal("","Terminating");
4920 //______________________________________________________________________________
4921 void AliAnalysisAlien::WriteProductionFile(const char *filename) const
4923 // Write the production file to be submitted by LPM manager. The format is:
4924 // First line: full_path_to_jdl estimated_no_subjobs_per_master
4925 // Next lines: full_path_to_dataset XXX (XXX is a string)
4926 // To submit, one has to: submit jdl XXX for all lines
4928 out.open(filename, ios::out);
4930 Error("WriteProductionFile", "Bad file name: %s", filename);
4934 if (!fProductionMode && !fGridWorkingDir.BeginsWith("/alice"))
4935 workdir = gGrid->GetHomeDirectory();
4936 workdir += fGridWorkingDir;
4937 Int_t njobspermaster = 1000*fNrunsPerMaster/fSplitMaxInputFileNumber;
4938 TString locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
4939 out << locjdl << " " << njobspermaster << endl;
4940 Int_t nmasterjobs = fInputFiles->GetEntries();
4941 for (Int_t i=0; i<nmasterjobs; i++) {
4942 TString runOutDir = gSystem->BaseName(fInputFiles->At(i)->GetName());
4943 runOutDir.ReplaceAll(".xml", "");
4945 out << Form("%s", fInputFiles->At(i)->GetName()) << " " << runOutDir << endl;
4947 out << Form("%s", fInputFiles->At(i)->GetName()) << " " << Form("%03d", i) << endl;
4950 Info("WriteProductionFile", "\n##### Copying production file <%s> to your work directory", filename);
4951 if (FileExists(filename)) gGrid->Rm(filename);
4952 // TFile::Cp(Form("file:%s",filename), Form("alien://%s/%s", workdir.Data(),filename));
4953 if (!copyLocal2Alien("WriteProductionFile", filename,
4954 Form("%s/%s", workdir.Data(),filename))) Fatal("","Terminating");
4958 //______________________________________________________________________________
4959 void AliAnalysisAlien::WriteValidationScript(Bool_t merge)
4961 // Generate the alien validation script.
4962 // Generate the validation script
4964 if (fValidationScript.IsNull()) {
4965 fValidationScript = fExecutable;
4966 fValidationScript.ReplaceAll(".sh", "_validation.sh");
4968 TString validationScript = fValidationScript;
4969 if (merge) validationScript.ReplaceAll(".sh", "_merge.sh");
4971 Error("WriteValidationScript", "Alien connection required");
4974 if (!fTerminateFiles.IsNull()) {
4975 fTerminateFiles.Strip();
4976 fTerminateFiles.ReplaceAll(" ",",");
4978 TString outStream = "";
4979 if (!TestBit(AliAnalysisGrid::kTest)) outStream = " >> stdout";
4980 if (!TestBit(AliAnalysisGrid::kSubmit)) {
4982 out.open(validationScript, ios::out);
4983 out << "#!/bin/bash" << endl;
4984 out << "##################################################" << endl;
4985 out << "validateout=`dirname $0`" << endl;
4986 out << "validatetime=`date`" << endl;
4987 out << "validated=\"0\";" << endl;
4988 out << "error=0" << endl;
4989 out << "if [ -z $validateout ]" << endl;
4990 out << "then" << endl;
4991 out << " validateout=\".\"" << endl;
4992 out << "fi" << endl << endl;
4993 out << "cd $validateout;" << endl;
4994 out << "validateworkdir=`pwd`;" << endl << endl;
4995 out << "echo \"*******************************************************\"" << outStream << endl;
4996 out << "echo \"* Automatically generated validation script *\"" << outStream << endl;
4998 out << "echo \"* Time: $validatetime \"" << outStream << endl;
4999 out << "echo \"* Dir: $validateout\"" << outStream << endl;
5000 out << "echo \"* Workdir: $validateworkdir\"" << outStream << endl;
5001 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl;
5002 out << "ls -la ./" << outStream << endl;
5003 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl << endl;
5004 out << "##################################################" << endl;
5007 out << "if [ ! -f stderr ] ; then" << endl;
5008 out << " error=1" << endl;
5009 out << " echo \"* ########## Job not validated - no stderr ###\" " << outStream << endl;
5010 out << " echo \"Error = $error\" " << outStream << endl;
5011 out << "fi" << endl;
5013 out << "parArch=`grep -Ei \"Cannot Build the PAR Archive\" stderr`" << endl;
5014 out << "segViol=`grep -Ei \"Segmentation violation\" stderr`" << endl;
5015 out << "segFault=`grep -Ei \"Segmentation fault\" stderr`" << endl;
5016 out << "glibcErr=`grep -Ei \"*** glibc detected ***\" stderr`" << endl;
5019 out << "if [ \"$parArch\" != \"\" ] ; then" << endl;
5020 out << " error=1" << endl;
5021 out << " echo \"* ########## Job not validated - PAR archive not built ###\" " << outStream << endl;
5022 out << " echo \"$parArch\" " << outStream << endl;
5023 out << " echo \"Error = $error\" " << outStream << endl;
5024 out << "fi" << endl;
5026 out << "if [ \"$segViol\" != \"\" ] ; then" << endl;
5027 out << " error=1" << endl;
5028 out << " echo \"* ########## Job not validated - Segment. violation ###\" " << outStream << endl;
5029 out << " echo \"$segViol\" " << outStream << endl;
5030 out << " echo \"Error = $error\" " << outStream << endl;
5031 out << "fi" << endl;
5033 out << "if [ \"$segFault\" != \"\" ] ; then" << endl;
5034 out << " error=1" << endl;
5035 out << " echo \"* ########## Job not validated - Segment. fault ###\" " << outStream << endl;
5036 out << " echo \"$segFault\" " << outStream << endl;
5037 out << " echo \"Error = $error\" " << outStream << endl;
5038 out << "fi" << endl;
5040 out << "if [ \"$glibcErr\" != \"\" ] ; then" << endl;
5041 out << " error=1" << endl;
5042 out << " echo \"* ########## Job not validated - *** glibc detected *** ###\" " << outStream << endl;
5043 out << " echo \"$glibcErr\" " << outStream << endl;
5044 out << " echo \"Error = $error\" " << outStream << endl;
5045 out << "fi" << endl;
5047 // Part dedicated to the specific analyses running into the train
5049 TString outputFiles = fOutputFiles;
5050 if (merge && !fTerminateFiles.IsNull()) {
5052 outputFiles += fTerminateFiles;
5054 TObjArray *arr = outputFiles.Tokenize(",");
5057 while (!merge && (os=(TObjString*)next1())) {
5058 // No need to validate outputs produced by merging since the merging macro does this
5059 outputFile = os->GetString();
5060 Int_t index = outputFile.Index("@");
5061 if (index > 0) outputFile.Remove(index);
5062 if (fTerminateFiles.Contains(outputFile)) continue;
5063 if (outputFile.Contains("*")) continue;
5064 out << "if ! [ -f " << outputFile.Data() << " ] ; then" << endl;
5065 out << " error=1" << endl;
5066 out << " echo \"Output file " << outputFile << " not found. Job FAILED !\"" << outStream << endl;
5067 out << " echo \"Output file " << outputFile << " not found. Job FAILED !\" >> stderr" << endl;
5068 out << "fi" << endl;
5071 out << "if ! [ -f outputs_valid ] ; then" << endl;
5072 out << " error=1" << endl;
5073 out << " echo \"Output files were not validated by the analysis manager\" >> stdout" << endl;
5074 out << " echo \"Output files were not validated by the analysis manager\" >> stderr" << endl;
5075 out << "fi" << endl;
5077 out << "if [ $error = 0 ] ; then" << endl;
5078 out << " echo \"* ---------------- Job Validated ------------------*\"" << outStream << endl;
5079 if (!IsKeepLogs()) {
5080 out << " echo \"* === Logs std* will be deleted === \"" << endl;
5082 out << " rm -f std*" << endl;
5084 out << "fi" << endl;
5086 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl;
5087 out << "echo \"*******************************************************\"" << outStream << endl;
5088 out << "cd -" << endl;
5089 out << "exit $error" << endl;
5091 Bool_t copy = kTRUE;
5092 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
5095 TString workdir = gGrid->GetHomeDirectory();
5096 workdir += fGridWorkingDir;
5097 Info("WriteValidationScript", "\n##### Copying validation script <%s> to your AliEn working space", validationScript.Data());
5098 if (FileExists(validationScript)) gGrid->Rm(validationScript);
5099 // TFile::Cp(Form("file:%s",validationScript.Data()), Form("alien://%s/%s", workdir.Data(),validationScript.Data()));
5100 if (!copyLocal2Alien("WriteValidationScript", validationScript.Data(),
5101 Form("%s/%s",workdir.Data(), validationScript.Data()))) Fatal("","Terminating");