1 /**************************************************************************
2 * Copyright(c) 1998-2007, ALICE Experiment at CERN, All rights reserved. *
4 * Author: The ALICE Off-line Project. *
5 * Contributors are mentioned in the code where appropriate. *
7 * Permission to use, copy, modify and distribute this software and its *
8 * documentation strictly for non-commercial purposes is hereby granted *
9 * without fee, provided that the above copyright notice appears in all *
10 * copies and that both the copyright notice and this permission notice *
11 * appear in the supporting documentation. The authors make no claims *
12 * about the suitability of this software for any purpose. It is *
13 * provided "as is" without express or implied warranty. *
14 **************************************************************************/
16 // Author: Mihaela Gheata, 01/09/2008
18 //==============================================================================
19 // AliAnalysisAlien - AliEn utility class. Provides interface for creating
20 // a personalized JDL, finding and creating a dataset.
21 //==============================================================================
23 #include "AliAnalysisAlien.h"
25 #include "Riostream.h"
32 #include "TInterpreter.h"
34 #include "TFileCollection.h"
36 #include "TObjString.h"
37 #include "TObjArray.h"
40 #include "TGridResult.h"
41 #include "TGridCollection.h"
43 #include "TGridJobStatusList.h"
44 #include "TGridJobStatus.h"
45 #include "TFileMerger.h"
46 #include "AliAnalysisManager.h"
47 #include "AliAnalysisTaskCfg.h"
48 #include "AliVEventHandler.h"
49 #include "AliAnalysisDataContainer.h"
50 #include "AliMultiInputEventHandler.h"
56 ClassImp(AliAnalysisAlien)
62 Bool_t copyLocal2Alien(const char* where, const char* loc, const char* rem)
64 TString sl(Form("file:%s", loc));
65 TString sr(Form("alien://%s", rem));
66 Bool_t ret = TFile::Cp(sl, sr);
68 Warning(where, "Failed to copy %s to %s", sl.Data(), sr.Data());
74 //______________________________________________________________________________
75 AliAnalysisAlien::AliAnalysisAlien()
81 fSplitMaxInputFileNumber(0),
83 fMasterResubmitThreshold(0),
96 fNproofWorkersPerSlave(0),
102 fExecutableCommand(),
108 fAdditionalRootLibs(),
156 //______________________________________________________________________________
157 AliAnalysisAlien::AliAnalysisAlien(const char *name)
158 :AliAnalysisGrid(name),
163 fSplitMaxInputFileNumber(0),
165 fMasterResubmitThreshold(0),
178 fNproofWorkersPerSlave(0),
184 fExecutableCommand(),
190 fAdditionalRootLibs(),
238 //______________________________________________________________________________
239 AliAnalysisAlien::AliAnalysisAlien(const AliAnalysisAlien& other)
240 :AliAnalysisGrid(other),
243 fPrice(other.fPrice),
245 fSplitMaxInputFileNumber(other.fSplitMaxInputFileNumber),
246 fMaxInitFailed(other.fMaxInitFailed),
247 fMasterResubmitThreshold(other.fMasterResubmitThreshold),
248 fNtestFiles(other.fNtestFiles),
249 fNrunsPerMaster(other.fNrunsPerMaster),
250 fMaxMergeFiles(other.fMaxMergeFiles),
251 fMaxMergeStages(other.fMaxMergeStages),
252 fNsubmitted(other.fNsubmitted),
253 fProductionMode(other.fProductionMode),
254 fOutputToRunNo(other.fOutputToRunNo),
255 fMergeViaJDL(other.fMergeViaJDL),
256 fFastReadOption(other.fFastReadOption),
257 fOverwriteMode(other.fOverwriteMode),
258 fNreplicas(other.fNreplicas),
259 fNproofWorkers(other.fNproofWorkers),
260 fNproofWorkersPerSlave(other.fNproofWorkersPerSlave),
261 fProofReset(other.fProofReset),
262 fNMCevents(other.fNMCevents),
263 fNMCjobs(other.fNMCjobs),
264 fRunNumbers(other.fRunNumbers),
265 fExecutable(other.fExecutable),
266 fExecutableCommand(other.fExecutableCommand),
267 fArguments(other.fArguments),
268 fExecutableArgs(other.fExecutableArgs),
269 fAnalysisMacro(other.fAnalysisMacro),
270 fAnalysisSource(other.fAnalysisSource),
271 fValidationScript(other.fValidationScript),
272 fAdditionalRootLibs(other.fAdditionalRootLibs),
273 fAdditionalLibs(other.fAdditionalLibs),
274 fGeneratorLibs(other.fGeneratorLibs),
275 fSplitMode(other.fSplitMode),
276 fAPIVersion(other.fAPIVersion),
277 fROOTVersion(other.fROOTVersion),
278 fAliROOTVersion(other.fAliROOTVersion),
279 fExternalPackages(other.fExternalPackages),
281 fGridWorkingDir(other.fGridWorkingDir),
282 fGridDataDir(other.fGridDataDir),
283 fDataPattern(other.fDataPattern),
284 fGridOutputDir(other.fGridOutputDir),
285 fOutputArchive(other.fOutputArchive),
286 fOutputFiles(other.fOutputFiles),
287 fInputFormat(other.fInputFormat),
288 fDatasetName(other.fDatasetName),
289 fJDLName(other.fJDLName),
290 fTerminateFiles(other.fTerminateFiles),
291 fMergeExcludes(other.fMergeExcludes),
292 fRegisterExcludes(other.fRegisterExcludes),
293 fIncludePath(other.fIncludePath),
294 fCloseSE(other.fCloseSE),
295 fFriendChainName(other.fFriendChainName),
296 fJobTag(other.fJobTag),
297 fOutputSingle(other.fOutputSingle),
298 fRunPrefix(other.fRunPrefix),
299 fProofCluster(other.fProofCluster),
300 fProofDataSet(other.fProofDataSet),
301 fFileForTestMode(other.fFileForTestMode),
302 fAliRootMode(other.fAliRootMode),
303 fProofProcessOpt(other.fProofProcessOpt),
304 fMergeDirName(other.fMergeDirName),
309 fDropToShell(other.fDropToShell),
310 fMCLoop(other.fMCLoop),
311 fGridJobIDs(other.fGridJobIDs),
312 fGridStages(other.fGridStages),
313 fFriendLibs(other.fFriendLibs),
314 fTreeName(other.fTreeName)
317 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
318 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
319 fRunRange[0] = other.fRunRange[0];
320 fRunRange[1] = other.fRunRange[1];
321 if (other.fInputFiles) {
322 fInputFiles = new TObjArray();
323 TIter next(other.fInputFiles);
325 while ((obj=next())) fInputFiles->Add(new TObjString(obj->GetName()));
326 fInputFiles->SetOwner();
328 if (other.fPackages) {
329 fPackages = new TObjArray();
330 TIter next(other.fPackages);
332 while ((obj=next())) fPackages->Add(new TObjString(obj->GetName()));
333 fPackages->SetOwner();
335 if (other.fModules) {
336 fModules = new TObjArray();
337 fModules->SetOwner();
338 TIter next(other.fModules);
339 AliAnalysisTaskCfg *mod, *crt;
340 while ((crt=(AliAnalysisTaskCfg*)next())) {
341 mod = new AliAnalysisTaskCfg(*crt);
347 //______________________________________________________________________________
348 AliAnalysisAlien::~AliAnalysisAlien()
356 fProofParam.DeleteAll();
359 //______________________________________________________________________________
360 AliAnalysisAlien &AliAnalysisAlien::operator=(const AliAnalysisAlien& other)
363 if (this != &other) {
364 AliAnalysisGrid::operator=(other);
365 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
366 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
367 fPrice = other.fPrice;
369 fSplitMaxInputFileNumber = other.fSplitMaxInputFileNumber;
370 fMaxInitFailed = other.fMaxInitFailed;
371 fMasterResubmitThreshold = other.fMasterResubmitThreshold;
372 fNtestFiles = other.fNtestFiles;
373 fNrunsPerMaster = other.fNrunsPerMaster;
374 fMaxMergeFiles = other.fMaxMergeFiles;
375 fMaxMergeStages = other.fMaxMergeStages;
376 fNsubmitted = other.fNsubmitted;
377 fProductionMode = other.fProductionMode;
378 fOutputToRunNo = other.fOutputToRunNo;
379 fMergeViaJDL = other.fMergeViaJDL;
380 fFastReadOption = other.fFastReadOption;
381 fOverwriteMode = other.fOverwriteMode;
382 fNreplicas = other.fNreplicas;
383 fNproofWorkers = other.fNproofWorkers;
384 fNproofWorkersPerSlave = other.fNproofWorkersPerSlave;
385 fProofReset = other.fProofReset;
386 fNMCevents = other.fNMCevents;
387 fNMCjobs = other.fNMCjobs;
388 fRunNumbers = other.fRunNumbers;
389 fExecutable = other.fExecutable;
390 fExecutableCommand = other.fExecutableCommand;
391 fArguments = other.fArguments;
392 fExecutableArgs = other.fExecutableArgs;
393 fAnalysisMacro = other.fAnalysisMacro;
394 fAnalysisSource = other.fAnalysisSource;
395 fValidationScript = other.fValidationScript;
396 fAdditionalRootLibs = other.fAdditionalRootLibs;
397 fAdditionalLibs = other.fAdditionalLibs;
398 fGeneratorLibs = other.fGeneratorLibs;
399 fSplitMode = other.fSplitMode;
400 fAPIVersion = other.fAPIVersion;
401 fROOTVersion = other.fROOTVersion;
402 fAliROOTVersion = other.fAliROOTVersion;
403 fExternalPackages = other.fExternalPackages;
405 fGridWorkingDir = other.fGridWorkingDir;
406 fGridDataDir = other.fGridDataDir;
407 fDataPattern = other.fDataPattern;
408 fGridOutputDir = other.fGridOutputDir;
409 fOutputArchive = other.fOutputArchive;
410 fOutputFiles = other.fOutputFiles;
411 fInputFormat = other.fInputFormat;
412 fDatasetName = other.fDatasetName;
413 fJDLName = other.fJDLName;
414 fTerminateFiles = other.fTerminateFiles;
415 fMergeExcludes = other.fMergeExcludes;
416 fRegisterExcludes = other.fRegisterExcludes;
417 fIncludePath = other.fIncludePath;
418 fCloseSE = other.fCloseSE;
419 fFriendChainName = other.fFriendChainName;
420 fJobTag = other.fJobTag;
421 fOutputSingle = other.fOutputSingle;
422 fRunPrefix = other.fRunPrefix;
423 fProofCluster = other.fProofCluster;
424 fProofDataSet = other.fProofDataSet;
425 fFileForTestMode = other.fFileForTestMode;
426 fAliRootMode = other.fAliRootMode;
427 fProofProcessOpt = other.fProofProcessOpt;
428 fMergeDirName = other.fMergeDirName;
429 fDropToShell = other.fDropToShell;
430 fMCLoop = other.fMCLoop;
431 fGridJobIDs = other.fGridJobIDs;
432 fGridStages = other.fGridStages;
433 fFriendLibs = other.fFriendLibs;
434 fTreeName = other.fTreeName;
435 if (other.fInputFiles) {
436 fInputFiles = new TObjArray();
437 TIter next(other.fInputFiles);
439 while ((obj=next())) fInputFiles->Add(new TObjString(obj->GetName()));
440 fInputFiles->SetOwner();
442 if (other.fPackages) {
443 fPackages = new TObjArray();
444 TIter next(other.fPackages);
446 while ((obj=next())) fPackages->Add(new TObjString(obj->GetName()));
447 fPackages->SetOwner();
449 if (other.fModules) {
450 fModules = new TObjArray();
451 fModules->SetOwner();
452 TIter next(other.fModules);
453 AliAnalysisTaskCfg *mod, *crt;
454 while ((crt=(AliAnalysisTaskCfg*)next())) {
455 mod = new AliAnalysisTaskCfg(*crt);
463 //______________________________________________________________________________
464 void AliAnalysisAlien::AddAdditionalLibrary(const char *name)
466 // Add a single additional library to be loaded. Extension must be present.
468 if (!lib.Contains(".")) {
469 Error("AddAdditionalLibrary", "Extension not defined for %s", name);
472 if (fAdditionalLibs.Contains(name)) {
473 Warning("AddAdditionalLibrary", "Library %s already added.", name);
476 if (!fAdditionalLibs.IsNull()) fAdditionalLibs += " ";
477 fAdditionalLibs += lib;
480 //______________________________________________________________________________
481 void AliAnalysisAlien::AddModule(AliAnalysisTaskCfg *module)
483 // Adding a module. Checks if already existing. Becomes owned by this.
485 if (GetModule(module->GetName())) {
486 Error("AddModule", "A module having the same name %s already added", module->GetName());
490 fModules = new TObjArray();
491 fModules->SetOwner();
493 fModules->Add(module);
496 //______________________________________________________________________________
497 void AliAnalysisAlien::AddModules(TObjArray *list)
499 // Adding a list of modules. Checks if already existing. Becomes owned by this.
501 AliAnalysisTaskCfg *module;
502 while ((module = (AliAnalysisTaskCfg*)next())) AddModule(module);
505 //______________________________________________________________________________
506 Bool_t AliAnalysisAlien::CheckDependencies()
508 // Check if all dependencies are satisfied. Reorder modules if needed.
509 Int_t nmodules = GetNmodules();
511 Warning("CheckDependencies", "No modules added yet to check their dependencies");
514 AliAnalysisTaskCfg *mod = 0;
515 AliAnalysisTaskCfg *dep = 0;
518 for (i=0; i<nmodules; i++) {
519 mod = (AliAnalysisTaskCfg*) fModules->At(i);
520 Int_t ndeps = mod->GetNdeps();
522 for (j=0; j<ndeps; j++) {
523 depname = mod->GetDependency(j);
524 dep = GetModule(depname);
526 Error("CheckDependencies","Dependency %s not added for module %s",
527 depname.Data(), mod->GetName());
530 if (dep->NeedsDependency(mod->GetName())) {
531 Error("CheckDependencies","Modules %s and %s circularly depend on each other",
532 mod->GetName(), dep->GetName());
535 Int_t idep = fModules->IndexOf(dep);
536 // The dependency task must come first
538 // Remove at idep and move all objects below up one slot
539 // down to index i included.
540 fModules->RemoveAt(idep);
541 for (k=idep-1; k>=i; k--) fModules->AddAt(fModules->RemoveAt(k),k+1);
542 fModules->AddAt(dep, i++);
544 //Redo from istart if dependencies were inserted
545 if (i>istart) i=istart-1;
551 //______________________________________________________________________________
552 AliAnalysisManager *AliAnalysisAlien::CreateAnalysisManager(const char *name, const char *filename)
554 // Create the analysis manager and optionally execute the macro in filename.
555 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
557 mgr->SetMCLoop(fMCLoop);
560 mgr = new AliAnalysisManager(name);
561 mgr->SetGridHandler((AliAnalysisGrid*)this);
562 mgr->SetMCLoop(fMCLoop);
563 if (strlen(filename)) {
564 TString line = gSystem->ExpandPathName(filename);
566 gROOT->ProcessLine(line.Data());
571 //______________________________________________________________________________
572 Int_t AliAnalysisAlien::GetNmodules() const
574 // Get number of modules.
575 if (!fModules) return 0;
576 return fModules->GetEntries();
579 //______________________________________________________________________________
580 AliAnalysisTaskCfg *AliAnalysisAlien::GetModule(const char *name)
582 // Get a module by name.
583 if (!fModules) return 0;
584 return (AliAnalysisTaskCfg*)fModules->FindObject(name);
587 //______________________________________________________________________________
588 Bool_t AliAnalysisAlien::LoadModule(AliAnalysisTaskCfg *mod)
590 // Load a given module.
591 if (mod->IsLoaded()) return kTRUE;
592 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
594 Error("LoadModule", "No analysis manager created yet. Use CreateAnalysisManager first.");
597 Int_t ndeps = mod->GetNdeps();
599 for (Int_t j=0; j<ndeps; j++) {
600 depname = mod->GetDependency(j);
601 AliAnalysisTaskCfg *dep = GetModule(depname);
603 Error("LoadModule","Dependency %s not existing for module %s",
604 depname.Data(), mod->GetName());
607 if (!LoadModule(dep)) {
608 Error("LoadModule","Dependency %s for module %s could not be loaded",
609 depname.Data(), mod->GetName());
613 // Load libraries for the module
614 if (!mod->CheckLoadLibraries()) {
615 Error("LoadModule", "Cannot load all libraries for module %s", mod->GetName());
618 // Check if a custom file name was requested
619 if (strlen(mod->GetOutputFileName())) mgr->SetCommonFileName(mod->GetOutputFileName());
621 // Check if a custom terminate file name was requested
622 if (strlen(mod->GetTerminateFileName())) {
623 if (!fTerminateFiles.IsNull()) fTerminateFiles += ",";
624 fTerminateFiles += mod->GetTerminateFileName();
628 if (mod->ExecuteMacro()<0) {
629 Error("LoadModule", "Executing the macro %s with arguments: %s for module %s returned a negative value",
630 mod->GetMacroName(), mod->GetMacroArgs(), mod->GetName());
633 // Configure dependencies
634 if (mod->GetConfigMacro() && mod->ExecuteConfigMacro()<0) {
635 Error("LoadModule", "There was an error executing the deps config macro %s for module %s",
636 mod->GetConfigMacro()->GetTitle(), mod->GetName());
639 // Adjust extra libraries
640 Int_t nlibs = mod->GetNlibs();
642 for (Int_t i=0; i<nlibs; i++) {
643 lib = mod->GetLibrary(i);
644 lib = Form("lib%s.so", lib.Data());
645 if (fAdditionalLibs.Contains(lib)) continue;
646 if (!fAdditionalLibs.IsNull()) fAdditionalLibs += " ";
647 fAdditionalLibs += lib;
652 //______________________________________________________________________________
653 Bool_t AliAnalysisAlien::GenerateTrain(const char *name)
655 // Generate the full train.
656 fAdditionalLibs = "";
657 if (!LoadModules()) return kFALSE;
658 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
659 if (!mgr->InitAnalysis()) return kFALSE;
662 Int_t productionMode = fProductionMode;
664 TString macro = fAnalysisMacro;
665 TString executable = fExecutable;
666 TString validation = fValidationScript;
667 TString execCommand = fExecutableCommand;
668 SetAnalysisMacro(Form("%s.C", name));
669 SetExecutable(Form("%s.sh", name));
670 // SetExecutableCommand("aliroot -b -q ");
671 SetValidationScript(Form("%s_validation.sh", name));
673 SetProductionMode(productionMode);
674 fAnalysisMacro = macro;
675 fExecutable = executable;
676 fExecutableCommand = execCommand;
677 fValidationScript = validation;
681 //______________________________________________________________________________
682 Bool_t AliAnalysisAlien::GenerateTest(const char *name, const char *modname)
684 // Generate test macros for a single module or for the full train.
685 fAdditionalLibs = "";
686 if (strlen(modname)) {
687 if (!CheckDependencies()) return kFALSE;
688 AliAnalysisTaskCfg *mod = GetModule(modname);
690 Error("GenerateTest", "cannot generate test for inexistent module %s", modname);
693 if (!LoadModule(mod)) return kFALSE;
694 if (!LoadFriendLibs()) return kFALSE;
695 } else if (!LoadModules()) return kFALSE;
696 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
697 if (!mgr->InitAnalysis()) return kFALSE;
701 Int_t productionMode = fProductionMode;
703 TString macro = fAnalysisMacro;
704 TString executable = fExecutable;
705 TString validation = fValidationScript;
706 TString execCommand = fExecutableCommand;
707 SetAnalysisMacro(Form("%s.C", name));
708 SetExecutable(Form("%s.sh", name));
709 fOutputFiles = GetListOfFiles("outaod");
710 // Add extra files registered to the analysis manager
711 TString extra = GetListOfFiles("ext");
712 if (!extra.IsNull()) {
713 extra.ReplaceAll(".root", "*.root");
714 if (!fOutputFiles.IsNull()) fOutputFiles += ",";
715 fOutputFiles += extra;
717 // SetExecutableCommand("aliroot -b -q ");
718 SetValidationScript(Form("%s_validation.sh", name));
720 WriteAnalysisMacro();
722 WriteValidationScript();
724 WriteMergeExecutable();
725 WriteValidationScript(kTRUE);
726 SetLocalTest(kFALSE);
727 SetProductionMode(productionMode);
728 fAnalysisMacro = macro;
729 fExecutable = executable;
730 fExecutableCommand = execCommand;
731 fValidationScript = validation;
735 //______________________________________________________________________________
736 Bool_t AliAnalysisAlien::LoadModules()
738 // Load all modules by executing the AddTask macros. Checks first the dependencies.
739 fAdditionalLibs = "";
740 Int_t nmodules = GetNmodules();
742 Warning("LoadModules", "No module to be loaded");
745 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
747 Error("LoadModules", "No analysis manager created yet. Use CreateAnalysisManager first.");
750 if (!CheckDependencies()) return kFALSE;
751 nmodules = GetNmodules();
752 AliAnalysisTaskCfg *mod;
753 for (Int_t imod=0; imod<nmodules; imod++) {
754 mod = (AliAnalysisTaskCfg*)fModules->At(imod);
755 if (!LoadModule(mod)) return kFALSE;
757 // Load additional friend libraries
758 return LoadFriendLibs();
761 //______________________________________________________________________________
762 Bool_t AliAnalysisAlien::LoadFriendLibs() const
764 // Load libraries required for reading friends.
765 if (fFriendLibs.Length()) {
768 if (fFriendLibs.Contains(",")) list = fFriendLibs.Tokenize(",");
769 else list = fFriendLibs.Tokenize(" ");
770 for (Int_t ilib=0; ilib<list->GetEntriesFast(); ilib++) {
771 lib = list->At(ilib)->GetName();
772 lib.ReplaceAll(".so","");
773 lib.ReplaceAll(" ","");
774 if (lib.BeginsWith("lib")) lib.Remove(0, 3);
776 Int_t loaded = strlen(gSystem->GetLibraries(lib,"",kFALSE));
777 if (!loaded) loaded = gSystem->Load(lib);
779 Error("LoadModules", "Cannot load library for friends %s", lib.Data());
788 //______________________________________________________________________________
789 void AliAnalysisAlien::SetRunPrefix(const char *prefix)
791 // Set the run number format. Can be a prefix or a format like "%09d"
793 if (!fRunPrefix.Contains("%")) fRunPrefix += "%d";
796 //______________________________________________________________________________
797 void AliAnalysisAlien::AddIncludePath(const char *path)
799 // Add include path in the remote analysis macro.
801 if (p.Contains("-I")) fIncludePath += Form("%s ", path);
802 else fIncludePath += Form("-I%s ", path);
805 //______________________________________________________________________________
806 void AliAnalysisAlien::AddRunNumber(Int_t run)
808 // Add a run number to the list of runs to be processed.
809 if (fRunNumbers.Length()) fRunNumbers += " ";
810 fRunNumbers += Form(fRunPrefix.Data(), run);
813 //______________________________________________________________________________
814 void AliAnalysisAlien::AddRunList(const char* runList)
816 // Add several runs into the list of runs; they are expected to be separated by a blank character.
817 TString sList = runList;
818 TObjArray *list = sList.Tokenize(" ");
819 Int_t n = list->GetEntries();
820 for (Int_t i = 0; i < n; i++) {
821 TObjString *os = (TObjString*)list->At(i);
822 AddRunNumber(os->GetString().Atoi());
827 //______________________________________________________________________________
828 void AliAnalysisAlien::AddRunNumber(const char* run)
830 // Add a run number to the list of runs to be processed.
833 TObjArray *arr = runs.Tokenize(" ");
836 prefix.Append(fRunPrefix, fRunPrefix.Index("%d"));
837 while ((os=(TObjString*)next())){
838 if (fRunNumbers.Length()) fRunNumbers += " ";
839 fRunNumbers += Form("%s%s", prefix.Data(), os->GetString().Data());
844 //______________________________________________________________________________
845 void AliAnalysisAlien::AddDataFile(const char *lfn)
847 // Adds a data file to the input to be analysed. The file should be a valid LFN
848 // or point to an existing file in the alien workdir.
849 if (!fInputFiles) fInputFiles = new TObjArray();
850 fInputFiles->Add(new TObjString(lfn));
853 //______________________________________________________________________________
854 void AliAnalysisAlien::AddExternalPackage(const char *package)
856 // Adds external packages w.r.t to the default ones (root,aliroot and gapi)
857 if (fExternalPackages) fExternalPackages += " ";
858 fExternalPackages += package;
861 //______________________________________________________________________________
862 Bool_t AliAnalysisAlien::Connect()
864 // Try to connect to AliEn. User needs a valid token and /tmp/gclient_env_$UID sourced.
865 if (gGrid && gGrid->IsConnected()) return kTRUE;
866 if (fProductionMode) return kTRUE;
868 Info("Connect", "Trying to connect to AliEn ...");
869 TGrid::Connect("alien://");
871 if (!gGrid || !gGrid->IsConnected()) {
872 Error("Connect", "Did not managed to connect to AliEn. Make sure you have a valid token.");
875 fUser = gGrid->GetUser();
876 Info("Connect", "\n##### Connected to AliEn as user %s. Setting analysis user to <%s>", fUser.Data(), fUser.Data());
880 //______________________________________________________________________________
881 void AliAnalysisAlien::CdWork()
883 // Check validity of alien workspace. Create directory if possible.
885 Error("CdWork", "Alien connection required");
888 TString homedir = gGrid->GetHomeDirectory();
889 TString workdir = homedir + fGridWorkingDir;
890 if (DirectoryExists(workdir)) {
894 // Work directory not existing - create it
896 if (gGrid->Mkdir(workdir, "-p")) {
897 gGrid->Cd(fGridWorkingDir);
898 Info("CdWork", "\n##### Created alien working directory %s", fGridWorkingDir.Data());
900 Warning("CdWork", "Working directory %s cannot be created.\n Using %s instead.",
901 workdir.Data(), homedir.Data());
902 fGridWorkingDir = "";
906 //______________________________________________________________________________
907 Bool_t AliAnalysisAlien::CheckFileCopy(const char *alienpath)
909 // Check if file copying is possible.
910 if (fProductionMode) return kTRUE;
911 TString salienpath(alienpath);
912 if (salienpath.Contains(" ")) {
913 Error("CheckFileCopy", "path: <%s> contains blancs - FIX IT !",alienpath);
917 Error("CheckFileCopy", "Not connected to AliEn. File copying cannot be tested.");
920 Info("CheckFileCopy", "Checking possibility to copy files to your AliEn home directory... \
921 \n +++ NOTE: You can disable this via: plugin->SetCheckCopy(kFALSE);");
922 // Check if alien_CLOSE_SE is defined
923 TString closeSE = gSystem->Getenv("alien_CLOSE_SE");
924 if (!closeSE.IsNull()) {
925 Info("CheckFileCopy", "Your current close storage is pointing to: \
926 \n alien_CLOSE_SE = \"%s\"", closeSE.Data());
928 Warning("CheckFileCopy", "Your current close storage is empty ! Depending on your location, file copying may fail.");
930 // Check if grid directory exists.
931 if (!DirectoryExists(alienpath)) {
932 Error("CheckFileCopy", "Alien path %s does not seem to exist", alienpath);
935 TString stest = "plugin_test_copy";
936 TFile f(stest, "RECREATE");
937 // User may not have write permissions to current directory
939 Error("CheckFileCopy", "Cannot create local test file. Do you have write access to current directory: <%s> ?",
940 gSystem->WorkingDirectory());
944 if (FileExists(Form("alien://%s/%s",alienpath, stest.Data()))) gGrid->Rm(Form("alien://%s/%s",alienpath, stest.Data()));
945 if (!TFile::Cp(stest.Data(), Form("alien://%s/%s",alienpath, stest.Data()))) {
946 Error("CheckFileCopy", "Cannot copy files to Alien destination: <%s> This may be temporary, or: \
947 \n# 1. Make sure you have write permissions there. If this is the case: \
948 \n# 2. Check the storage availability at: http://alimonitor.cern.ch/stats?page=SE/table \
949 \n# Do: export alien_CLOSE_SE=\"working_disk_SE\" \
950 \n# To make this permanent put in in your .bashrc (in .alienshrc is not enough) \
951 \n# Redo token: rm /tmp/x509up_u$UID then: alien-token-init <username>", alienpath);
952 gSystem->Unlink(stest.Data());
955 gSystem->Unlink(stest.Data());
956 gGrid->Rm(Form("%s/%s",alienpath,stest.Data()));
957 Info("CheckFileCopy", "### ...SUCCESS ###");
961 //______________________________________________________________________________
962 Bool_t AliAnalysisAlien::CheckInputData()
964 // Check validity of input data. If necessary, create xml files.
965 if (fProductionMode) return kTRUE;
966 if (!fInputFiles && !fRunNumbers.Length() && !fRunRange[0]) {
967 if (!fGridDataDir.Length()) {
968 Error("CkeckInputData", "AliEn path to base data directory must be set.\n = Use: SetGridDataDir()");
972 Error("CheckInputData", "Merging via jdl works only with run numbers, run range or provided xml");
975 Info("CheckInputData", "Analysis will make a single xml for base data directory %s",fGridDataDir.Data());
976 if (fDataPattern.Contains("tag") && TestBit(AliAnalysisGrid::kTest))
977 TObject::SetBit(AliAnalysisGrid::kUseTags, kTRUE); // ADDED (fix problem in determining the tag usage in test mode)
980 // Process declared files
981 Bool_t isCollection = kFALSE;
982 Bool_t isXml = kFALSE;
983 Bool_t useTags = kFALSE;
984 Bool_t checked = kFALSE;
985 if (!TestBit(AliAnalysisGrid::kTest)) CdWork();
987 TString workdir = gGrid->GetHomeDirectory();
988 workdir += fGridWorkingDir;
991 TIter next(fInputFiles);
992 while ((objstr=(TObjString*)next())) {
995 file += objstr->GetString();
996 // Store full lfn path
997 if (FileExists(file)) objstr->SetString(file);
999 file = objstr->GetName();
1000 if (!FileExists(objstr->GetName())) {
1001 Error("CheckInputData", "Data file %s not found or not in your working dir: %s",
1002 objstr->GetName(), workdir.Data());
1006 Bool_t iscoll, isxml, usetags;
1007 CheckDataType(file, iscoll, isxml, usetags);
1010 isCollection = iscoll;
1013 TObject::SetBit(AliAnalysisGrid::kUseTags, useTags);
1015 if ((iscoll != isCollection) || (isxml != isXml) || (usetags != useTags)) {
1016 Error("CheckInputData", "Some conflict was found in the types of inputs");
1022 // Process requested run numbers
1023 if (!fRunNumbers.Length() && !fRunRange[0]) return kTRUE;
1024 // Check validity of alien data directory
1025 if (!fGridDataDir.Length()) {
1026 Error("CkeckInputData", "AliEn path to base data directory must be set.\n = Use: SetGridDataDir()");
1029 if (!DirectoryExists(fGridDataDir)) {
1030 Error("CheckInputData", "Data directory %s not existing.", fGridDataDir.Data());
1034 Error("CheckInputData", "You are using raw AliEn collections as input. Cannot process run numbers.");
1038 if (checked && !isXml) {
1039 Error("CheckInputData", "Cannot mix processing of full runs with non-xml files");
1042 // Check validity of run number(s)
1047 TString schunk, schunk2;
1051 useTags = fDataPattern.Contains("tag");
1052 TObject::SetBit(AliAnalysisGrid::kUseTags, useTags);
1054 if (useTags != fDataPattern.Contains("tag")) {
1055 Error("CheckInputData", "Cannot mix input files using/not using tags");
1058 if (fRunNumbers.Length()) {
1059 Info("CheckDataType", "Using supplied run numbers (run ranges are ignored)");
1060 arr = fRunNumbers.Tokenize(" ");
1062 while ((os=(TObjString*)next())) {
1063 path = Form("%s/%s ", fGridDataDir.Data(), os->GetString().Data());
1064 if (!DirectoryExists(path)) {
1065 Warning("CheckInputData", "Run number %s not found in path: <%s>", os->GetString().Data(), path.Data());
1068 path = Form("%s/%s.xml", workdir.Data(),os->GetString().Data());
1069 TString msg = "\n##### file: ";
1071 msg += " type: xml_collection;";
1072 if (useTags) msg += " using_tags: Yes";
1073 else msg += " using_tags: No";
1074 Info("CheckDataType", "%s", msg.Data());
1075 if (fNrunsPerMaster<2) {
1076 AddDataFile(Form("%s.xml", os->GetString().Data()));
1079 if (((nruns-1)%fNrunsPerMaster) == 0) {
1080 schunk = os->GetString();
1082 if ((nruns%fNrunsPerMaster)!=0 && os!=arr->Last()) continue;
1083 schunk += Form("_%s.xml", os->GetString().Data());
1084 AddDataFile(schunk);
1089 Info("CheckDataType", "Using run range [%d, %d]", fRunRange[0], fRunRange[1]);
1090 for (Int_t irun=fRunRange[0]; irun<=fRunRange[1]; irun++) {
1091 format = Form("%%s/%s ", fRunPrefix.Data());
1092 path = Form(format.Data(), fGridDataDir.Data(), irun);
1093 if (!DirectoryExists(path)) {
1096 format = Form("%%s/%s.xml", fRunPrefix.Data());
1097 path = Form(format.Data(), workdir.Data(),irun);
1098 TString msg = "\n##### file: ";
1100 msg += " type: xml_collection;";
1101 if (useTags) msg += " using_tags: Yes";
1102 else msg += " using_tags: No";
1103 Info("CheckDataType", "%s", msg.Data());
1104 if (fNrunsPerMaster<2) {
1105 format = Form("%s.xml", fRunPrefix.Data());
1106 AddDataFile(Form(format.Data(),irun));
1109 if (((nruns-1)%fNrunsPerMaster) == 0) {
1110 schunk = Form(fRunPrefix.Data(),irun);
1112 format = Form("_%s.xml", fRunPrefix.Data());
1113 schunk2 = Form(format.Data(), irun);
1114 if ((nruns%fNrunsPerMaster)!=0 && irun != fRunRange[1]) continue;
1116 AddDataFile(schunk);
1121 AddDataFile(schunk);
1127 //______________________________________________________________________________
1128 Int_t AliAnalysisAlien::CopyLocalDataset(const char *griddir, const char *pattern, Int_t nfiles, const char *output, const char *archivefile, const char *outputdir)
1130 // Copy data from the given grid directory according a pattern and make a local
1132 // archivefile (optional) results in that the archive containing the file <pattern> is copied. archivefile can contain a list of files (semicolon-separated) which are all copied
1134 Error("CopyLocalDataset", "Cannot copy local dataset with no grid connection");
1137 if (!DirectoryExists(griddir)) {
1138 Error("CopyLocalDataset", "Data directory %s not existing.", griddir);
1141 TString command = Form("find -z -l %d %s %s", nfiles, griddir, pattern);
1142 printf("Running command: %s\n", command.Data());
1143 TGridResult *res = gGrid->Command(command);
1144 Int_t nfound = res->GetEntries();
1146 Error("CopyLocalDataset", "No file found in <%s> having pattern <%s>", griddir, pattern);
1149 printf("... found %d files. Copying locally ...\n", nfound);
1152 TObjArray* additionalArchives = 0;
1153 if (strlen(archivefile) > 0 && TString(archivefile).Contains(";")) {
1154 additionalArchives = TString(archivefile).Tokenize(";");
1155 archivefile = additionalArchives->At(0)->GetName();
1156 additionalArchives->RemoveAt(0);
1157 additionalArchives->Compress();
1160 // Copy files locally
1162 out.open(output, ios::out);
1164 TString turl, dirname, filename, temp;
1165 TString cdir = gSystem->WorkingDirectory();
1166 gSystem->MakeDirectory(outputdir);
1167 gSystem->ChangeDirectory(outputdir);
1169 for (Int_t i=0; i<nfound; i++) {
1170 map = (TMap*)res->At(i);
1171 turl = map->GetValue("turl")->GetName();
1172 filename = gSystem->BaseName(turl.Data());
1173 dirname = gSystem->DirName(turl.Data());
1174 dirname = gSystem->BaseName(dirname.Data());
1175 gSystem->MakeDirectory(dirname);
1177 TString source(turl);
1178 TString targetFileName(filename);
1180 if (strlen(archivefile) > 0) {
1181 // TODO here the archive in which the file resides should be determined
1182 // however whereis returns only a guid, and guid2lfn does not work
1183 // Therefore we use the one provided as argument for now
1184 source = Form("%s/%s", gSystem->DirName(source.Data()), archivefile);
1185 targetFileName = archivefile;
1187 if (TFile::Cp(source, Form("file:./%s/%s", dirname.Data(), targetFileName.Data()))) {
1188 Bool_t success = kTRUE;
1189 if (additionalArchives) {
1190 for (Int_t j=0; j<additionalArchives->GetEntriesFast(); j++) {
1192 target.Form("./%s/%s", dirname.Data(), additionalArchives->At(j)->GetName());
1193 gSystem->MakeDirectory(gSystem->DirName(target));
1194 success &= TFile::Cp(Form("%s/%s", gSystem->DirName(source.Data()), additionalArchives->At(j)->GetName()), Form("file:%s", target.Data()));
1199 if (strlen(archivefile) > 0) targetFileName = Form("%s#%s", targetFileName.Data(), gSystem->BaseName(turl.Data()));
1200 out << cdir << Form("/%s/%s/%s", outputdir, dirname.Data(), targetFileName.Data()) << endl;
1205 gSystem->ChangeDirectory(cdir);
1207 delete additionalArchives;
1211 //______________________________________________________________________________
1212 Bool_t AliAnalysisAlien::CreateDataset(const char *pattern)
1214 // Create dataset for the grid data directory + run number.
1215 const Int_t gMaxEntries = 15000;
1216 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline)) return kTRUE;
1218 Error("CreateDataset", "Cannot create dataset with no grid connection");
1223 if (!TestBit(AliAnalysisGrid::kTest)) CdWork();
1224 TString workdir = gGrid->GetHomeDirectory();
1225 workdir += fGridWorkingDir;
1227 // Compose the 'find' command arguments
1230 TString delimiter = pattern;
1232 if (delimiter.Contains(" ")) delimiter = "";
1233 else delimiter = " ";
1234 TString options = "-x collection ";
1235 if (TestBit(AliAnalysisGrid::kTest)) options += Form("-l %d ", fNtestFiles);
1236 else options += Form("-l %d ", gMaxEntries); // Protection for the find command
1237 TString conditions = "";
1244 TString schunk, schunk2;
1245 TGridCollection *cbase=0, *cadd=0;
1246 if (!fRunNumbers.Length() && !fRunRange[0]) {
1247 if (fInputFiles && fInputFiles->GetEntries()) return kTRUE;
1248 // Make a single data collection from data directory.
1249 path = fGridDataDir;
1250 if (!DirectoryExists(path)) {
1251 Error("CreateDataset", "Path to data directory %s not valid",fGridDataDir.Data());
1255 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
1256 else file = Form("%s.xml", gSystem->BaseName(path));
1260 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
1262 command += Form("%s -o %d ",options.Data(), nstart);
1264 command += delimiter;
1266 command += conditions;
1267 printf("command: %s\n", command.Data());
1268 TGridResult *res = gGrid->Command(command);
1269 if (res) delete res;
1270 // Write standard output to file
1271 gROOT->ProcessLine(Form("gGrid->Stdout(); > __tmp%d__%s", stage, file.Data()));
1272 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
1273 Bool_t nullFile = kFALSE;
1275 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
1277 nullFile = (gSystem->Exec(Form("grep -c /event __tmp%d__%s 2>/dev/null > __tmp__",stage,file.Data()))==0)?kFALSE:kTRUE;
1279 Error("CreateDataset","Dataset %s produced by the previous find command is empty !", file.Data());
1280 gSystem->Exec("rm -f __tmp*");
1288 gSystem->Exec("rm -f __tmp__");
1289 ncount = line.Atoi();
1292 if (ncount == gMaxEntries) {
1293 Info("CreateDataset", "Dataset %s has more than 15K entries. Trying to merge...", file.Data());
1294 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1295 if (!cbase) cbase = cadd;
1303 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1304 printf("... please wait - TAlienCollection::Add() scales badly...\n");
1307 cbase->ExportXML(Form("file://%s", file.Data()),kFALSE,kFALSE, file, "Merged entries for a run");
1308 delete cbase; cbase = 0;
1310 TFile::Cp(Form("__tmp%d__%s",stage, file.Data()), file.Data());
1312 gSystem->Exec("rm -f __tmp*");
1313 Info("CreateDataset", "Created dataset %s with %d files", file.Data(), nstart+ncount);
1317 Bool_t fileExists = FileExists(file);
1318 if (!TestBit(AliAnalysisGrid::kTest) && (!fileExists || fOverwriteMode)) {
1319 // Copy xml file to alien space
1320 if (fileExists) gGrid->Rm(file);
1321 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
1322 if (!FileExists(file)) {
1323 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
1326 // Update list of files to be processed.
1328 AddDataFile(Form("%s/%s", workdir.Data(), file.Data()));
1332 Bool_t nullResult = kTRUE;
1333 if (fRunNumbers.Length()) {
1334 TObjArray *arr = fRunNumbers.Tokenize(" ");
1337 while ((os=(TObjString*)next())) {
1340 path = Form("%s/%s", fGridDataDir.Data(), os->GetString().Data());
1341 if (!DirectoryExists(path)) continue;
1343 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
1344 else file = Form("%s.xml", os->GetString().Data());
1345 // If local collection file does not exist, create it via 'find' command.
1349 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
1351 command += Form("%s -o %d ",options.Data(), nstart);
1353 command += delimiter;
1355 command += conditions;
1356 TGridResult *res = gGrid->Command(command);
1357 if (res) delete res;
1358 // Write standard output to file
1359 gROOT->ProcessLine(Form("gGrid->Stdout(); > __tmp%d__%s", stage,file.Data()));
1360 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
1361 Bool_t nullFile = kFALSE;
1363 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
1365 nullFile = (gSystem->Exec(Form("grep -c /event __tmp%d__%s 2>/dev/null > __tmp__",stage,file.Data()))==0)?kFALSE:kTRUE;
1367 Warning("CreateDataset","Dataset %s produced by: <%s> is empty !", file.Data(), command.Data());
1368 gSystem->Exec("rm -f __tmp*");
1369 fRunNumbers.ReplaceAll(os->GetString().Data(), "");
1377 gSystem->Exec("rm -f __tmp__");
1378 ncount = line.Atoi();
1380 nullResult = kFALSE;
1382 if (ncount == gMaxEntries) {
1383 Info("CreateDataset", "Dataset %s has more than 15K entries. Trying to merge...", file.Data());
1384 if (fNrunsPerMaster > 1) {
1385 Error("CreateDataset", "File %s has more than %d entries. Please set the number of runs per master to 1 !",
1386 file.Data(),gMaxEntries);
1389 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1390 if (!cbase) cbase = cadd;
1397 if (cbase && fNrunsPerMaster<2) {
1398 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1399 printf("... please wait - TAlienCollection::Add() scales badly...\n");
1402 cbase->ExportXML(Form("file://%s", file.Data()),kFALSE,kFALSE, file, "Merged entries for a run");
1403 delete cbase; cbase = 0;
1405 TFile::Cp(Form("__tmp%d__%s",stage, file.Data()), file.Data());
1407 gSystem->Exec("rm -f __tmp*");
1408 Info("CreateDataset", "Created dataset %s with %d files", file.Data(), nstart+ncount);
1412 if (TestBit(AliAnalysisGrid::kTest)) break;
1413 // Check if there is one run per master job.
1414 if (fNrunsPerMaster<2) {
1415 if (FileExists(file)) {
1416 if (fOverwriteMode) gGrid->Rm(file);
1418 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
1422 // Copy xml file to alien space
1423 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
1424 if (!FileExists(file)) {
1425 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
1431 if (((nruns-1)%fNrunsPerMaster) == 0) {
1432 schunk = os->GetString();
1433 cbase = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
1435 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
1436 printf(" Merging collection <%s> into masterjob input...\n", file.Data());
1440 if ((nruns%fNrunsPerMaster)!=0 && os!=arr->Last()) {
1443 schunk += Form("_%s.xml", os->GetString().Data());
1444 if (FileExists(schunk)) {
1445 if (fOverwriteMode) gGrid->Rm(file);
1447 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", schunk.Data());
1451 printf("Exporting merged collection <%s> and copying to AliEn\n", schunk.Data());
1452 cbase->ExportXML(Form("file://%s", schunk.Data()),kFALSE,kFALSE, schunk, "Merged runs");
1453 TFile::Cp(Form("file:%s",schunk.Data()), Form("alien://%s/%s",workdir.Data(), schunk.Data()));
1454 if (!FileExists(schunk)) {
1455 Error("CreateDataset", "Copy command did NOT succeed for %s", schunk.Data());
1463 Error("CreateDataset", "No valid dataset corresponding to the query!");
1467 // Process a full run range.
1468 for (Int_t irun=fRunRange[0]; irun<=fRunRange[1]; irun++) {
1469 format = Form("%%s/%s", fRunPrefix.Data());
1472 path = Form(format.Data(), fGridDataDir.Data(), irun);
1473 if (!DirectoryExists(path)) continue;
1475 format = Form("%s.xml", fRunPrefix.Data());
1476 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
1477 else file = Form(format.Data(), irun);
1478 if (FileExists(file) && fNrunsPerMaster<2 && !TestBit(AliAnalysisGrid::kTest)) {
1479 if (fOverwriteMode) gGrid->Rm(file);
1481 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
1485 // If local collection file does not exist, create it via 'find' command.
1489 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
1491 command += Form("%s -o %d ",options.Data(), nstart);
1493 command += delimiter;
1495 command += conditions;
1496 TGridResult *res = gGrid->Command(command);
1497 if (res) delete res;
1498 // Write standard output to file
1499 gROOT->ProcessLine(Form("gGrid->Stdout(); > __tmp%d__%s", stage,file.Data()));
1500 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
1501 Bool_t nullFile = kFALSE;
1503 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
1505 nullFile = (gSystem->Exec(Form("grep -c /event __tmp%d__%s 2>/dev/null > __tmp__",stage,file.Data()))==0)?kFALSE:kTRUE;
1507 Warning("CreateDataset","Dataset %s produced by: <%s> is empty !", file.Data(), command.Data());
1508 gSystem->Exec("rm -f __tmp*");
1516 gSystem->Exec("rm -f __tmp__");
1517 ncount = line.Atoi();
1519 nullResult = kFALSE;
1521 if (ncount == gMaxEntries) {
1522 Info("CreateDataset", "Dataset %s has more than 15K entries. Trying to merge...", file.Data());
1523 if (fNrunsPerMaster > 1) {
1524 Error("CreateDataset", "File %s has more than %d entries. Please set the number of runs per master to 1 !",
1525 file.Data(),gMaxEntries);
1528 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1529 if (!cbase) cbase = cadd;
1536 if (cbase && fNrunsPerMaster<2) {
1537 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1538 printf("... please wait - TAlienCollection::Add() scales badly...\n");
1541 cbase->ExportXML(Form("file://%s", file.Data()),kFALSE,kFALSE, file, "Merged entries for a run");
1542 delete cbase; cbase = 0;
1544 TFile::Cp(Form("__tmp%d__%s",stage, file.Data()), file.Data());
1546 Info("CreateDataset", "Created dataset %s with %d files", file.Data(), nstart+ncount);
1550 if (TestBit(AliAnalysisGrid::kTest)) break;
1551 // Check if there is one run per master job.
1552 if (fNrunsPerMaster<2) {
1553 if (FileExists(file)) {
1554 if (fOverwriteMode) gGrid->Rm(file);
1556 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
1560 // Copy xml file to alien space
1561 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
1562 if (!FileExists(file)) {
1563 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
1568 // Check if the collection for the chunk exist locally.
1569 Int_t nchunk = (nruns-1)/fNrunsPerMaster;
1570 if (FileExists(fInputFiles->At(nchunk)->GetName())) {
1571 if (fOverwriteMode) gGrid->Rm(fInputFiles->At(nchunk)->GetName());
1574 printf(" Merging collection <%s> into %d runs chunk...\n",file.Data(),fNrunsPerMaster);
1575 if (((nruns-1)%fNrunsPerMaster) == 0) {
1576 schunk = Form(fRunPrefix.Data(), irun);
1577 cbase = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
1579 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
1583 format = Form("%%s_%s.xml", fRunPrefix.Data());
1584 schunk2 = Form(format.Data(), schunk.Data(), irun);
1585 if ((nruns%fNrunsPerMaster)!=0 && irun!=fRunRange[1] && schunk2 != fInputFiles->Last()->GetName()) {
1589 if (FileExists(schunk)) {
1590 if (fOverwriteMode) gGrid->Rm(schunk);
1592 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", schunk.Data());
1596 printf("Exporting merged collection <%s> and copying to AliEn.\n", schunk.Data());
1597 cbase->ExportXML(Form("file://%s", schunk.Data()),kFALSE,kFALSE, schunk, "Merged runs");
1598 if (FileExists(schunk)) {
1599 if (fOverwriteMode) gGrid->Rm(schunk);
1601 Info("CreateDataset", "\n##### Dataset %s exist. Skipping copy...", schunk.Data());
1605 TFile::Cp(Form("file:%s",schunk.Data()), Form("alien://%s/%s",workdir.Data(), schunk.Data()));
1606 if (!FileExists(schunk)) {
1607 Error("CreateDataset", "Copy command did NOT succeed for %s", schunk.Data());
1613 Error("CreateDataset", "No valid dataset corresponding to the query!");
1620 //______________________________________________________________________________
1621 Bool_t AliAnalysisAlien::CreateJDL()
1623 // Generate a JDL file according to current settings. The name of the file is
1624 // specified by fJDLName.
1625 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
1626 Bool_t error = kFALSE;
1628 Bool_t copy = kTRUE;
1629 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
1630 Bool_t generate = kTRUE;
1631 if (TestBit(AliAnalysisGrid::kTest) || TestBit(AliAnalysisGrid::kSubmit)) generate = kFALSE;
1633 Error("CreateJDL", "Alien connection required");
1636 // Check validity of alien workspace
1638 if (!fProductionMode && !fGridWorkingDir.BeginsWith("/alice")) workdir = gGrid->GetHomeDirectory();
1639 if (!fProductionMode && !TestBit(AliAnalysisGrid::kTest)) CdWork();
1640 workdir += fGridWorkingDir;
1643 if (!fInputFiles && !fMCLoop) {
1644 Error("CreateJDL()", "Define some input files for your analysis.");
1647 // Compose list of input files
1648 // Check if output files were defined
1649 if (!fOutputFiles.Length()) {
1650 Error("CreateJDL", "You must define at least one output file");
1653 // Check if an output directory was defined and valid
1654 if (!fGridOutputDir.Length()) {
1655 Error("CreateJDL", "You must define AliEn output directory");
1658 if (!fProductionMode) {
1659 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s", workdir.Data(), fGridOutputDir.Data());
1660 if (!DirectoryExists(fGridOutputDir)) {
1661 if (gGrid->Mkdir(fGridOutputDir,"-p")) {
1662 Info("CreateJDL", "\n##### Created alien output directory %s", fGridOutputDir.Data());
1664 Error("CreateJDL", "Could not create alien output directory %s", fGridOutputDir.Data());
1668 Warning("CreateJDL", "#### Output directory %s exists! If this contains old data, jobs will fail with ERROR_SV !!! ###", fGridOutputDir.Data());
1673 // Exit if any error up to now
1674 if (error) return kFALSE;
1676 if (!fUser.IsNull()) {
1677 fGridJDL->SetValue("User", Form("\"%s\"", fUser.Data()));
1678 fMergingJDL->SetValue("User", Form("\"%s\"", fUser.Data()));
1680 TString executable = fExecutable;
1681 if (!executable.BeginsWith("/"))
1682 executable.Prepend(Form("%s/", workdir.Data()));
1683 fGridJDL->SetExecutable(executable, "This is the startup script");
1684 TString mergeExec = executable;
1685 mergeExec.ReplaceAll(".sh", "_merge.sh");
1686 fMergingJDL->SetExecutable(mergeExec, "This is the startup script");
1687 mergeExec.ReplaceAll(".sh", ".C");
1688 fMergingJDL->AddToInputSandbox(Form("LF:%s", mergeExec.Data()), "List of input files to be uploaded to workers");
1689 if (!fArguments.IsNull())
1690 fGridJDL->SetArguments(fArguments, "Arguments for the executable command");
1691 if (IsOneStageMerging()) fMergingJDL->SetArguments(fGridOutputDir);
1693 if (fProductionMode) fMergingJDL->SetArguments("wn.xml $4"); // xml, stage
1694 else fMergingJDL->SetArguments("wn.xml $2"); // xml, stage
1697 fGridJDL->SetValue("TTL", Form("\"%d\"",fTTL));
1698 fGridJDL->SetDescription("TTL", Form("Time after which the job is killed (%d min.)", fTTL/60));
1699 fMergingJDL->SetValue("TTL", Form("\"%d\"",fTTL));
1700 fMergingJDL->SetDescription("TTL", Form("Time after which the job is killed (%d min.)", fTTL/60));
1702 if (fMaxInitFailed > 0) {
1703 fGridJDL->SetValue("MaxInitFailed", Form("\"%d\"",fMaxInitFailed));
1704 fGridJDL->SetDescription("MaxInitFailed", "Maximum number of first failing jobs to abort the master job");
1706 if (fSplitMaxInputFileNumber > 0 && !fMCLoop) {
1707 fGridJDL->SetValue("SplitMaxInputFileNumber", Form("\"%d\"", fSplitMaxInputFileNumber));
1708 fGridJDL->SetDescription("SplitMaxInputFileNumber", "Maximum number of input files to be processed per subjob");
1710 if (!IsOneStageMerging()) {
1711 fMergingJDL->SetValue("SplitMaxInputFileNumber", Form("\"%d\"",fMaxMergeFiles));
1712 fMergingJDL->SetDescription("SplitMaxInputFileNumber", "Maximum number of input files to be merged in one go");
1714 if (fSplitMode.Length()) {
1715 fGridJDL->SetValue("Split", Form("\"%s\"", fSplitMode.Data()));
1716 fGridJDL->SetDescription("Split", "We split per SE or file");
1718 fMergingJDL->SetValue("Split", "\"se\"");
1719 fMergingJDL->SetDescription("Split", "We split per SE for merging in stages");
1720 if (!fAliROOTVersion.IsNull()) {
1721 fGridJDL->AddToPackages("AliRoot", fAliROOTVersion,"VO_ALICE", "List of requested packages");
1722 fMergingJDL->AddToPackages("AliRoot", fAliROOTVersion, "VO_ALICE", "List of requested packages");
1724 if (!fROOTVersion.IsNull()) {
1725 fGridJDL->AddToPackages("ROOT", fROOTVersion);
1726 fMergingJDL->AddToPackages("ROOT", fROOTVersion);
1728 if (!fAPIVersion.IsNull()) {
1729 fGridJDL->AddToPackages("APISCONFIG", fAPIVersion);
1730 fMergingJDL->AddToPackages("APISCONFIG", fAPIVersion);
1732 if (!fExternalPackages.IsNull()) {
1733 arr = fExternalPackages.Tokenize(" ");
1735 while ((os=(TObjString*)next())) {
1736 TString pkgname = os->GetString();
1737 Int_t index = pkgname.Index("::");
1738 TString pkgversion = pkgname(index+2, pkgname.Length());
1739 pkgname.Remove(index);
1740 fGridJDL->AddToPackages(pkgname, pkgversion);
1741 fMergingJDL->AddToPackages(pkgname, pkgversion);
1746 fGridJDL->SetInputDataListFormat(fInputFormat, "Format of input data");
1747 fGridJDL->SetInputDataList("wn.xml", "Collection name to be processed on each worker node");
1749 fMergingJDL->SetInputDataListFormat(fInputFormat, "Format of input data");
1750 fMergingJDL->SetInputDataList("wn.xml", "Collection name to be processed on each worker node");
1751 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), fAnalysisMacro.Data()), "List of input files to be uploaded to workers");
1752 TString analysisFile = fExecutable;
1753 analysisFile.ReplaceAll(".sh", ".root");
1754 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),analysisFile.Data()));
1755 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),analysisFile.Data()));
1756 if (fAdditionalLibs.Length()) {
1757 arr = fAdditionalLibs.Tokenize(" ");
1759 while ((os=(TObjString*)next())) {
1760 if (os->GetString().Contains(".so")) continue;
1761 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), os->GetString().Data()));
1762 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), os->GetString().Data()));
1767 TIter next(fPackages);
1769 while ((obj=next())) {
1770 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), obj->GetName()));
1771 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), obj->GetName()));
1774 const char *comment = "List of output files and archives";
1775 if (fOutputArchive.Length()) {
1776 TString outputArchive = fOutputArchive;
1777 if (!fRegisterExcludes.IsNull()) {
1778 arr = fRegisterExcludes.Tokenize(" ");
1780 while ((os=(TObjString*)next1())) {
1781 outputArchive.ReplaceAll(Form("%s,",os->GetString().Data()),"");
1782 outputArchive.ReplaceAll(os->GetString(),"");
1786 arr = outputArchive.Tokenize(" ");
1788 Bool_t first = kTRUE;
1789 while ((os=(TObjString*)next())) {
1790 if (!os->GetString().Contains("@") && fCloseSE.Length())
1791 fGridJDL->AddToSet("Output", Form("%s@%s",os->GetString().Data(), fCloseSE.Data()));
1793 fGridJDL->AddToSet("Output", os->GetString());
1794 if (first) fGridJDL->AddToSetDescription("Output", comment);
1798 // Output archive for the merging jdl
1799 if (TestBit(AliAnalysisGrid::kDefaultOutputs)) {
1800 outputArchive = "log_archive.zip:std*@disk=1 ";
1801 // Add normal output files, extra files + terminate files
1802 TString files = GetListOfFiles("outextter");
1803 // Do not register files in fRegisterExcludes
1804 if (!fRegisterExcludes.IsNull()) {
1805 arr = fRegisterExcludes.Tokenize(" ");
1807 while ((os=(TObjString*)next1())) {
1808 files.ReplaceAll(Form("%s,",os->GetString().Data()),"");
1809 files.ReplaceAll(os->GetString(),"");
1813 files.ReplaceAll(".root", "*.root");
1815 if (mgr->IsCollectThroughput())
1816 outputArchive += Form("root_archive.zip:%s,*.stat@disk=%d %s@disk=%d",files.Data(),fNreplicas, mgr->GetFileInfoLog(),fNreplicas);
1818 outputArchive += Form("root_archive.zip:%s,*.stat@disk=%d",files.Data(),fNreplicas);
1820 TString files = fOutputArchive;
1821 files.ReplaceAll(".root", "*.root"); // nreplicas etc should be already atttached by use
1822 outputArchive = files;
1824 arr = outputArchive.Tokenize(" ");
1827 while ((os=(TObjString*)next2())) {
1828 TString currentfile = os->GetString();
1829 if (!currentfile.Contains("@") && fCloseSE.Length())
1830 fMergingJDL->AddToSet("Output", Form("%s@%s",currentfile.Data(), fCloseSE.Data()));
1832 fMergingJDL->AddToSet("Output", currentfile);
1833 if (first) fMergingJDL->AddToSetDescription("Output", comment);
1838 arr = fOutputFiles.Tokenize(",");
1840 Bool_t first = kTRUE;
1841 while ((os=(TObjString*)next())) {
1842 // Ignore ouputs in jdl that are also in outputarchive
1843 TString sout = os->GetString();
1844 sout.ReplaceAll("*", "");
1845 sout.ReplaceAll(".root", "");
1846 if (sout.Index("@")>0) sout.Remove(sout.Index("@"));
1847 if (fOutputArchive.Contains(sout)) continue;
1848 // Ignore fRegisterExcludes
1849 if (fRegisterExcludes.Contains(sout)) continue;
1850 if (!first) comment = NULL;
1851 if (!os->GetString().Contains("@") && fCloseSE.Length())
1852 fGridJDL->AddToSet("Output", Form("%s@%s",os->GetString().Data(), fCloseSE.Data()));
1854 fGridJDL->AddToSet("Output", os->GetString());
1855 if (first) fGridJDL->AddToSetDescription("Output", comment);
1856 if (fMergeExcludes.Contains(sout)) continue;
1857 if (!os->GetString().Contains("@") && fCloseSE.Length())
1858 fMergingJDL->AddToSet("Output", Form("%s@%s",os->GetString().Data(), fCloseSE.Data()));
1860 fMergingJDL->AddToSet("Output", os->GetString());
1861 if (first) fMergingJDL->AddToSetDescription("Output", comment);
1865 fGridJDL->SetPrice((UInt_t)fPrice, "AliEn price for this job");
1866 fMergingJDL->SetPrice((UInt_t)fPrice, "AliEn price for this job");
1867 TString validationScript = fValidationScript;
1868 fGridJDL->SetValidationCommand(Form("%s/%s", workdir.Data(),validationScript.Data()), "Validation script to be run for each subjob");
1869 validationScript.ReplaceAll(".sh", "_merge.sh");
1870 fMergingJDL->SetValidationCommand(Form("%s/%s", workdir.Data(),validationScript.Data()), "Validation script to be run for each subjob");
1871 if (fMasterResubmitThreshold) {
1872 fGridJDL->SetValue("MasterResubmitThreshold", Form("\"%d%%\"", fMasterResubmitThreshold));
1873 fGridJDL->SetDescription("MasterResubmitThreshold", "Resubmit failed jobs until DONE rate reaches this percentage");
1875 // Write a jdl with 2 input parameters: collection name and output dir name.
1878 // Copy jdl to grid workspace
1880 // Check if an output directory was defined and valid
1881 if (!fGridOutputDir.Length()) {
1882 Error("CreateJDL", "You must define AliEn output directory");
1885 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s", workdir.Data(), fGridOutputDir.Data());
1886 if (!fProductionMode && !DirectoryExists(fGridOutputDir)) {
1887 if (gGrid->Mkdir(fGridOutputDir,"-p")) {
1888 Info("CreateJDL", "\n##### Created alien output directory %s", fGridOutputDir.Data());
1890 Error("CreateJDL", "Could not create alien output directory %s", fGridOutputDir.Data());
1896 if (TestBit(AliAnalysisGrid::kSubmit)) {
1897 TString mergeJDLName = fExecutable;
1898 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
1899 TString locjdl = Form("%s/%s", fGridOutputDir.Data(),fJDLName.Data());
1900 TString locjdl1 = Form("%s/%s", fGridOutputDir.Data(),mergeJDLName.Data());
1901 if (fProductionMode) {
1902 locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
1903 locjdl1 = Form("%s/%s", workdir.Data(),mergeJDLName.Data());
1905 if (FileExists(locjdl)) gGrid->Rm(locjdl);
1906 if (FileExists(locjdl1)) gGrid->Rm(locjdl1);
1907 Info("CreateJDL", "\n##### Copying JDL file <%s> to your AliEn output directory", fJDLName.Data());
1908 if (!copyLocal2Alien("CreateJDL", fJDLName, locjdl))
1909 Fatal("","Terminating");
1910 // TFile::Cp(Form("file:%s",fJDLName.Data()), Form("alien://%s", locjdl.Data()));
1912 Info("CreateJDL", "\n##### Copying merging JDL file <%s> to your AliEn output directory", mergeJDLName.Data());
1913 // TFile::Cp(Form("file:%s",mergeJDLName.Data()), Form("alien://%s", locjdl1.Data()));
1914 if (!copyLocal2Alien("CreateJDL", mergeJDLName.Data(), locjdl1))
1915 Fatal("","Terminating");
1918 if (fAdditionalLibs.Length()) {
1919 arr = fAdditionalLibs.Tokenize(" ");
1922 while ((os=(TObjString*)next())) {
1923 if (os->GetString().Contains(".so")) continue;
1924 Info("CreateJDL", "\n##### Copying dependency: <%s> to your alien workspace", os->GetString().Data());
1925 if (FileExists(os->GetString())) gGrid->Rm(os->GetString());
1926 // TFile::Cp(Form("file:%s",os->GetString().Data()), Form("alien://%s/%s", workdir.Data(), os->GetString().Data()));
1927 if (!copyLocal2Alien("CreateJDL", os->GetString().Data(),
1928 Form("%s/%s", workdir.Data(), os->GetString().Data())))
1929 Fatal("","Terminating");
1934 TIter next(fPackages);
1936 while ((obj=next())) {
1937 if (FileExists(obj->GetName())) gGrid->Rm(obj->GetName());
1938 Info("CreateJDL", "\n##### Copying dependency: <%s> to your alien workspace", obj->GetName());
1939 // TFile::Cp(Form("file:%s",obj->GetName()), Form("alien://%s/%s", workdir.Data(), obj->GetName()));
1940 if (!copyLocal2Alien("CreateJDL",obj->GetName(),
1941 Form("%s/%s", workdir.Data(), obj->GetName())))
1942 Fatal("","Terminating");
1949 //______________________________________________________________________________
1950 Bool_t AliAnalysisAlien::WriteJDL(Bool_t copy)
1952 // Writes one or more JDL's corresponding to findex. If findex is negative,
1953 // all run numbers are considered in one go (jdl). For non-negative indices
1954 // they correspond to the indices in the array fInputFiles.
1955 if (!fInputFiles && !fMCLoop) return kFALSE;
1958 if (!fProductionMode && !fGridWorkingDir.BeginsWith("/alice")) workdir = gGrid->GetHomeDirectory();
1959 workdir += fGridWorkingDir;
1960 TString stageName = "$2";
1961 if (fProductionMode) stageName = "$4";
1962 if (!fMergeDirName.IsNull()) {
1963 fMergingJDL->AddToInputDataCollection(Form("LF:$1/%s/Stage_%s.xml,nodownload",fMergeDirName.Data(),stageName.Data()), "Collection of files to be merged for current stage");
1964 fMergingJDL->SetOutputDirectory(Form("$1/%s/Stage_%s/#alien_counter_03i#",fMergeDirName.Data(),stageName.Data()), "Output directory");
1966 fMergingJDL->AddToInputDataCollection(Form("LF:$1/Stage_%s.xml,nodownload",stageName.Data()), "Collection of files to be merged for current stage");
1967 fMergingJDL->SetOutputDirectory(Form("$1/Stage_%s/#alien_counter_03i#",stageName.Data()), "Output directory");
1969 if (fProductionMode) {
1970 TIter next(fInputFiles);
1971 while ((os=next())) {
1972 fGridJDL->AddToInputDataCollection(Form("LF:%s,nodownload", os->GetName()), "Input xml collections");
1974 if (!fOutputToRunNo)
1975 fGridJDL->SetOutputDirectory(Form("%s/#alien_counter_04i#", fGridOutputDir.Data()));
1977 fGridJDL->SetOutputDirectory(fGridOutputDir);
1979 if (!fRunNumbers.Length() && !fRunRange[0]) {
1980 // One jdl with no parameters in case input data is specified by name.
1981 TIter next(fInputFiles);
1983 fGridJDL->AddToInputDataCollection(Form("LF:%s,nodownload", os->GetName()), "Input xml collections");
1984 if (!fOutputSingle.IsNull())
1985 fGridJDL->SetOutputDirectory(Form("#alienfulldir#/../%s",fOutputSingle.Data()), "Output directory");
1987 fGridJDL->SetOutputDirectory(Form("%s/#alien_counter_03i#", fGridOutputDir.Data()), "Output directory");
1988 fMergingJDL->SetOutputDirectory(fGridOutputDir);
1991 // One jdl to be submitted with 2 input parameters: data collection name and output dir prefix
1992 fGridJDL->AddToInputDataCollection(Form("LF:%s/$1,nodownload", workdir.Data()), "Input xml collections");
1993 if (!fOutputSingle.IsNull()) {
1994 if (!fOutputToRunNo) fGridJDL->SetOutputDirectory(Form("#alienfulldir#/%s",fOutputSingle.Data()), "Output directory");
1995 else fGridJDL->SetOutputDirectory(Form("%s/$2",fGridOutputDir.Data()), "Output directory");
1997 fGridJDL->SetOutputDirectory(Form("%s/$2/#alien_counter_03i#", fGridOutputDir.Data()), "Output directory");
2002 // Generate the JDL as a string
2003 TString sjdl = fGridJDL->Generate();
2004 TString sjdl1 = fMergingJDL->Generate();
2006 if (!fMergeDirName.IsNull()) {
2007 fMergingJDL->SetOutputDirectory(Form("$1/%s",fMergeDirName.Data()), "Output directory");
2008 fMergingJDL->AddToInputSandbox(Form("LF:$1/%s/Stage_%s.xml",fMergeDirName.Data(),stageName.Data()));
2010 fMergingJDL->SetOutputDirectory("$1", "Output directory");
2011 fMergingJDL->AddToInputSandbox(Form("LF:$1/Stage_%s.xml",stageName.Data()));
2013 TString sjdl2 = fMergingJDL->Generate();
2014 Int_t index, index1;
2015 sjdl.ReplaceAll("\",\"", "\",\n \"");
2016 sjdl.ReplaceAll("(member", "\n (member");
2017 sjdl.ReplaceAll("\",\"VO_", "\",\n \"VO_");
2018 sjdl.ReplaceAll("{", "{\n ");
2019 sjdl.ReplaceAll("};", "\n};");
2020 sjdl.ReplaceAll("{\n \n", "{\n");
2021 sjdl.ReplaceAll("\n\n", "\n");
2022 sjdl.ReplaceAll("OutputDirectory", "OutputDir");
2023 sjdl1.ReplaceAll("\",\"", "\",\n \"");
2024 sjdl1.ReplaceAll("(member", "\n (member");
2025 sjdl1.ReplaceAll("\",\"VO_", "\",\n \"VO_");
2026 sjdl1.ReplaceAll("{", "{\n ");
2027 sjdl1.ReplaceAll("};", "\n};");
2028 sjdl1.ReplaceAll("{\n \n", "{\n");
2029 sjdl1.ReplaceAll("\n\n", "\n");
2030 sjdl1.ReplaceAll("OutputDirectory", "OutputDir");
2031 sjdl2.ReplaceAll("\",\"", "\",\n \"");
2032 sjdl2.ReplaceAll("(member", "\n (member");
2033 sjdl2.ReplaceAll("\",\"VO_", "\",\n \"VO_");
2034 sjdl2.ReplaceAll("{", "{\n ");
2035 sjdl2.ReplaceAll("};", "\n};");
2036 sjdl2.ReplaceAll("{\n \n", "{\n");
2037 sjdl2.ReplaceAll("\n\n", "\n");
2038 sjdl2.ReplaceAll("OutputDirectory", "OutputDir");
2039 sjdl += "JDLVariables = \n{\n \"Packages\",\n \"OutputDir\"\n};\n";
2040 sjdl.Prepend(Form("Jobtag = {\n \"comment:%s\"\n};\n", fJobTag.Data()));
2041 index = sjdl.Index("JDLVariables");
2042 if (index >= 0) sjdl.Insert(index, "\n# JDL variables\n");
2043 sjdl += "Workdirectorysize = {\"5000MB\"};";
2044 sjdl1 += "Workdirectorysize = {\"5000MB\"};";
2045 sjdl1 += "JDLVariables = \n{\n \"Packages\",\n \"OutputDir\"\n};\n";
2046 index = fJobTag.Index(":");
2047 if (index < 0) index = fJobTag.Length();
2048 TString jobTag = fJobTag;
2049 if (fProductionMode) jobTag.Insert(index,"_Stage$4");
2050 sjdl1.Prepend(Form("Jobtag = {\n \"comment:%s_Merging\"\n};\n", jobTag.Data()));
2051 if (fProductionMode) {
2052 sjdl1.Prepend("# Generated merging jdl (production mode) \
2053 \n# $1 = full alien path to output directory to be merged \
2054 \n# $2 = train number \
2055 \n# $3 = production (like LHC10b) \
2056 \n# $4 = merging stage \
2057 \n# Stage_<n>.xml made via: find <OutputDir> *Stage<n-1>/*root_archive.zip\n");
2058 sjdl2.Prepend(Form("Jobtag = {\n \"comment:%s_FinalMerging\"\n};\n", jobTag.Data()));
2059 sjdl2.Prepend("# Generated merging jdl \
2060 \n# $1 = full alien path to output directory to be merged \
2061 \n# $2 = train number \
2062 \n# $3 = production (like LHC10b) \
2063 \n# $4 = merging stage \
2064 \n# Stage_<n>.xml made via: find <OutputDir> *Stage<n-1>/*root_archive.zip\n");
2066 sjdl1.Prepend("# Generated merging jdl \
2067 \n# $1 = full alien path to output directory to be merged \
2068 \n# $2 = merging stage \
2069 \n# xml made via: find <OutputDir> *Stage<n-1>/*root_archive.zip\n");
2070 sjdl2.Prepend(Form("Jobtag = {\n \"comment:%s_FinalMerging\"\n};\n", jobTag.Data()));
2071 sjdl2.Prepend("# Generated merging jdl \
2072 \n# $1 = full alien path to output directory to be merged \
2073 \n# $2 = merging stage \
2074 \n# xml made via: find <OutputDir> *Stage<n-1>/*root_archive.zip\n");
2076 index = sjdl1.Index("JDLVariables");
2077 if (index >= 0) sjdl1.Insert(index, "\n# JDL variables\n");
2078 index = sjdl2.Index("JDLVariables");
2079 if (index >= 0) sjdl2.Insert(index, "\n# JDL variables\n");
2080 sjdl1 += "Workdirectorysize = {\"5000MB\"};";
2081 sjdl2 += "Workdirectorysize = {\"5000MB\"};";
2082 index = sjdl2.Index("Split =");
2084 index1 = sjdl2.Index("\n", index);
2085 sjdl2.Remove(index, index1-index+1);
2087 index = sjdl2.Index("SplitMaxInputFileNumber");
2089 index1 = sjdl2.Index("\n", index);
2090 sjdl2.Remove(index, index1-index+1);
2092 index = sjdl2.Index("InputDataCollection");
2094 index1 = sjdl2.Index(";", index);
2095 sjdl2.Remove(index, index1-index+1);
2097 index = sjdl2.Index("InputDataListFormat");
2099 index1 = sjdl2.Index("\n", index);
2100 sjdl2.Remove(index, index1-index+1);
2102 index = sjdl2.Index("InputDataList");
2104 index1 = sjdl2.Index("\n", index);
2105 sjdl2.Remove(index, index1-index+1);
2107 sjdl2.ReplaceAll("wn.xml", Form("Stage_%s.xml",stageName.Data()));
2108 // Write jdl to file
2110 out.open(fJDLName.Data(), ios::out);
2112 Error("WriteJDL", "Bad file name: %s", fJDLName.Data());
2115 out << sjdl << endl;
2117 TString mergeJDLName = fExecutable;
2118 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
2121 out1.open(mergeJDLName.Data(), ios::out);
2123 Error("WriteJDL", "Bad file name: %s", mergeJDLName.Data());
2126 out1 << sjdl1 << endl;
2129 TString finalJDL = mergeJDLName;
2130 finalJDL.ReplaceAll(".jdl", "_final.jdl");
2131 out2.open(finalJDL.Data(), ios::out);
2133 Error("WriteJDL", "Bad file name: %s", finalJDL.Data());
2136 out2 << sjdl2 << endl;
2140 // Copy jdl to grid workspace
2142 Info("WriteJDL", "\n##### You may want to review jdl:%s and analysis macro:%s before running in <submit> mode", fJDLName.Data(), fAnalysisMacro.Data());
2144 TString locjdl = Form("%s/%s", fGridOutputDir.Data(),fJDLName.Data());
2145 TString locjdl1 = Form("%s/%s", fGridOutputDir.Data(),mergeJDLName.Data());
2146 TString finalJDL = mergeJDLName;
2147 finalJDL.ReplaceAll(".jdl", "_final.jdl");
2148 TString locjdl2 = Form("%s/%s", fGridOutputDir.Data(),finalJDL.Data());
2149 if (fProductionMode) {
2150 locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
2151 locjdl1 = Form("%s/%s", workdir.Data(),mergeJDLName.Data());
2152 locjdl2 = Form("%s/%s", workdir.Data(),finalJDL.Data());
2154 if (FileExists(locjdl)) gGrid->Rm(locjdl);
2155 if (FileExists(locjdl1)) gGrid->Rm(locjdl1);
2156 if (FileExists(locjdl2)) gGrid->Rm(locjdl2);
2157 Info("WriteJDL", "\n##### Copying JDL file <%s> to your AliEn output directory", fJDLName.Data());
2158 // TFile::Cp(Form("file:%s",fJDLName.Data()), Form("alien://%s", locjdl.Data()));
2159 if (!copyLocal2Alien("WriteJDL",fJDLName.Data(),locjdl.Data()))
2160 Fatal("","Terminating");
2162 Info("WriteJDL", "\n##### Copying merging JDL files <%s> to your AliEn output directory", mergeJDLName.Data());
2163 // TFile::Cp(Form("file:%s",mergeJDLName.Data()), Form("alien://%s", locjdl1.Data()));
2164 // TFile::Cp(Form("file:%s",finalJDL.Data()), Form("alien://%s", locjdl2.Data()));
2165 if (!copyLocal2Alien("WriteJDL",mergeJDLName.Data(),locjdl1.Data()))
2166 Fatal("","Terminating");
2167 if (!copyLocal2Alien("WriteJDL",finalJDL.Data(),locjdl2.Data()))
2168 Fatal("","Terminating");
2174 //______________________________________________________________________________
2175 Bool_t AliAnalysisAlien::FileExists(const char *lfn)
2177 // Returns true if file exists.
2178 if (!gGrid) return kFALSE;
2180 slfn.ReplaceAll("alien://","");
2181 TGridResult *res = gGrid->Ls(slfn);
2182 if (!res) return kFALSE;
2183 TMap *map = dynamic_cast<TMap*>(res->At(0));
2188 TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("name"));
2189 if (!objs || !objs->GetString().Length()) {
2197 //______________________________________________________________________________
2198 Bool_t AliAnalysisAlien::DirectoryExists(const char *dirname)
2200 // Returns true if directory exists. Can be also a path.
2201 if (!gGrid) return kFALSE;
2202 // Check if dirname is a path
2203 TString dirstripped = dirname;
2204 dirstripped = dirstripped.Strip();
2205 dirstripped = dirstripped.Strip(TString::kTrailing, '/');
2206 TString dir = gSystem->BaseName(dirstripped);
2208 TString path = gSystem->DirName(dirstripped);
2209 TGridResult *res = gGrid->Ls(path, "-F");
2210 if (!res) return kFALSE;
2214 while ((map=dynamic_cast<TMap*>(next()))) {
2215 obj = map->GetValue("name");
2217 if (dir == obj->GetName()) {
2226 //______________________________________________________________________________
2227 void AliAnalysisAlien::CheckDataType(const char *lfn, Bool_t &isCollection, Bool_t &isXml, Bool_t &useTags)
2229 // Check input data type.
2230 isCollection = kFALSE;
2234 Error("CheckDataType", "No connection to grid");
2237 isCollection = IsCollection(lfn);
2238 TString msg = "\n##### file: ";
2241 msg += " type: raw_collection;";
2242 // special treatment for collections
2244 // check for tag files in the collection
2245 TGridResult *res = gGrid->Command(Form("listFilesFromCollection -z -v %s",lfn), kFALSE);
2247 msg += " using_tags: No (unknown)";
2248 Info("CheckDataType", "%s", msg.Data());
2251 const char* typeStr = res->GetKey(0, "origLFN");
2252 if (!typeStr || !strlen(typeStr)) {
2253 msg += " using_tags: No (unknown)";
2254 Info("CheckDataType", "%s", msg.Data());
2257 TString file = typeStr;
2258 useTags = file.Contains(".tag");
2259 if (useTags) msg += " using_tags: Yes";
2260 else msg += " using_tags: No";
2261 Info("CheckDataType", "%s", msg.Data());
2266 isXml = slfn.Contains(".xml");
2268 // Open xml collection and check if there are tag files inside
2269 msg += " type: xml_collection;";
2270 TGridCollection *coll = (TGridCollection*)gROOT->ProcessLine(Form("TAlienCollection::Open(\"alien://%s\",1);",lfn));
2272 msg += " using_tags: No (unknown)";
2273 Info("CheckDataType", "%s", msg.Data());
2276 TMap *map = coll->Next();
2278 msg += " using_tags: No (unknown)";
2279 Info("CheckDataType", "%s", msg.Data());
2282 map = (TMap*)map->GetValue("");
2284 if (map && map->GetValue("name")) file = map->GetValue("name")->GetName();
2285 useTags = file.Contains(".tag");
2287 if (useTags) msg += " using_tags: Yes";
2288 else msg += " using_tags: No";
2289 Info("CheckDataType", "%s", msg.Data());
2292 useTags = slfn.Contains(".tag");
2293 if (slfn.Contains(".root")) msg += " type: root file;";
2294 else msg += " type: unknown file;";
2295 if (useTags) msg += " using_tags: Yes";
2296 else msg += " using_tags: No";
2297 Info("CheckDataType", "%s", msg.Data());
2300 //______________________________________________________________________________
2301 void AliAnalysisAlien::EnablePackage(const char *package)
2303 // Enables a par file supposed to exist in the current directory.
2304 TString pkg(package);
2305 pkg.ReplaceAll(".par", "");
2307 if (gSystem->AccessPathName(pkg)) {
2308 Fatal("EnablePackage", "Package %s not found", pkg.Data());
2311 if (!TObject::TestBit(AliAnalysisGrid::kUsePars))
2312 Info("EnablePackage", "AliEn plugin will use .par packages");
2313 TObject::SetBit(AliAnalysisGrid::kUsePars, kTRUE);
2315 fPackages = new TObjArray();
2316 fPackages->SetOwner();
2318 fPackages->Add(new TObjString(pkg));
2321 //______________________________________________________________________________
2322 TChain *AliAnalysisAlien::GetChainForTestMode(const char *treeName) const
2324 // Make a tree from files having the location specified in fFileForTestMode.
2325 // Inspired from JF's CreateESDChain.
2326 if (fFileForTestMode.IsNull()) {
2327 Error("GetChainForTestMode", "For proof test mode please use SetFileForTestMode() pointing to a file that contains data file locations.");
2330 if (gSystem->AccessPathName(fFileForTestMode)) {
2331 Error("GetChainForTestMode", "File not found: %s", fFileForTestMode.Data());
2336 in.open(fFileForTestMode);
2338 // Read the input list of files and add them to the chain
2340 TString streeName(treeName);
2341 if (IsUseMCchain()) streeName = "TE";
2342 TChain *chain = new TChain(streeName);
2343 TList *friends = new TList();
2344 TChain *cfriend = 0;
2345 if (!fFriendChainName.IsNull()) {
2346 TObjArray *list = fFriendChainName.Tokenize(" ");
2349 while((str=(TObjString*)next())) {
2350 cfriend = new TChain(streeName, str->GetName());
2351 friends->Add(cfriend);
2352 chain->AddFriend(cfriend);
2357 TIter nextfriend(friends);
2361 if (line.IsNull() || line.BeginsWith("#")) continue;
2362 if (count++ == fNtestFiles) break;
2363 TString esdFile(line);
2364 TFile *file = TFile::Open(esdFile);
2365 if (file && !file->IsZombie()) {
2366 chain->Add(esdFile);
2368 if (!fFriendChainName.IsNull()) {
2369 if (esdFile.Index("#") > -1)
2370 esdFile.Remove(esdFile.Index("#"));
2371 bpath = gSystem->DirName(esdFile);
2375 while ((cfriend=(TChain*)nextfriend())) {
2377 fileFriend += cfriend->GetTitle();
2378 file = TFile::Open(fileFriend);
2379 if (file && !file->IsZombie()) {
2381 cfriend->Add(fileFriend);
2383 Fatal("GetChainForTestMode", "Cannot open friend file: %s", fileFriend.Data());
2389 Error("GetChainforTestMode", "Skipping un-openable file: %s", esdFile.Data());
2393 if (!chain->GetListOfFiles()->GetEntries()) {
2394 Error("GetChainForTestMode", "No file from %s could be opened", fFileForTestMode.Data());
2403 //______________________________________________________________________________
2404 const char *AliAnalysisAlien::GetJobStatus(Int_t jobidstart, Int_t lastid, Int_t &nrunning, Int_t &nwaiting, Int_t &nerror, Int_t &ndone)
2406 // Get job status for all jobs with jobid>jobidstart.
2407 static char mstatus[20];
2413 TGridJobStatusList *list = gGrid->Ps("");
2414 if (!list) return mstatus;
2415 Int_t nentries = list->GetSize();
2416 TGridJobStatus *status;
2418 for (Int_t ijob=0; ijob<nentries; ijob++) {
2419 status = (TGridJobStatus *)list->At(ijob);
2420 pid = gROOT->ProcessLine(Form("atoi(((TAlienJobStatus*)%p)->GetKey(\"queueId\"));", status));
2421 if (pid<jobidstart) continue;
2422 if (pid == lastid) {
2423 gROOT->ProcessLine(Form("sprintf((char*)%p,((TAlienJobStatus*)%p)->GetKey(\"status\"));",mstatus, status));
2425 switch (status->GetStatus()) {
2426 case TGridJobStatus::kWAITING:
2428 case TGridJobStatus::kRUNNING:
2430 case TGridJobStatus::kABORTED:
2431 case TGridJobStatus::kFAIL:
2432 case TGridJobStatus::kUNKNOWN:
2434 case TGridJobStatus::kDONE:
2443 //______________________________________________________________________________
2444 Bool_t AliAnalysisAlien::IsCollection(const char *lfn) const
2446 // Returns true if file is a collection. Functionality duplicated from
2447 // TAlien::Type() because we don't want to directly depend on TAlien.
2449 Error("IsCollection", "No connection to grid");
2452 TGridResult *res = gGrid->Command(Form("type -z %s",lfn),kFALSE);
2453 if (!res) return kFALSE;
2454 const char* typeStr = res->GetKey(0, "type");
2455 if (!typeStr || !strlen(typeStr)) return kFALSE;
2456 if (!strcmp(typeStr, "collection")) return kTRUE;
2461 //______________________________________________________________________________
2462 Bool_t AliAnalysisAlien::IsSingleOutput() const
2464 // Check if single-ouput option is on.
2465 return (!fOutputSingle.IsNull());
2468 //______________________________________________________________________________
2469 Long64_t AliAnalysisAlien::RunMacroAndExtractLibs(const char* macro, const char *args, TString &libs)
2471 // Tries to run the specified macro and return the libraries that it loads.
2473 if (strlen(macro)) expname = gSystem->ExpandPathName(macro);
2474 if (expname.IsNull() || gSystem->AccessPathName(expname)) {
2475 ::Error("RunMacroAndExtractLibs","Cannot find macro %s in current directory", macro);
2478 TString oldlibs = gSystem->GetLibraries();
2481 Long64_t retval = m.Exec(args, &error);
2482 if (error != TInterpreter::kNoError)
2484 ::Error("RunMacroAndExtractLibs", "Macro interpretation %s failed", macro);
2487 libs = gSystem->GetLibraries();
2488 libs.ReplaceAll(oldlibs, "");
2489 libs.Strip(TString::kLeading);
2490 TObjArray *libTokens = libs.Tokenize(" ");
2492 for (Int_t i=0; i<libTokens->GetEntries(); i++) {
2493 if (!libs.IsNull()) libs += " ";
2494 libs += gSystem->BaseName(libTokens->At(i)->GetName());
2500 //______________________________________________________________________________
2501 void AliAnalysisAlien::Print(Option_t *) const
2503 // Print current plugin settings.
2504 printf("### AliEn analysis plugin current settings ###\n");
2505 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
2506 if (mgr && mgr->IsProofMode()) {
2507 TString proofType = "= PLUGIN IN PROOF MODE ON CLUSTER:_________________";
2508 if (TestBit(AliAnalysisGrid::kTest))
2509 proofType = "= PLUGIN IN PROOF LITE MODE ON CLUSTER:____________";
2510 printf("%s %s\n", proofType.Data(), fProofCluster.Data());
2511 if (!fProofDataSet.IsNull())
2512 printf("= Requested data set:___________________________ %s\n", fProofDataSet.Data());
2514 printf("= Soft reset signal will be send to master______ CHANGE BEHAVIOR AFTER COMPLETION\n");
2516 printf("= Hard reset signal will be send to master______ CHANGE BEHAVIOR AFTER COMPLETION\n");
2517 if (!fROOTVersion.IsNull())
2518 printf("= ROOT version requested________________________ %s\n", fROOTVersion.Data());
2520 printf("= ROOT version requested________________________ default\n");
2521 printf("= AliRoot version requested_____________________ %s\n", fAliROOTVersion.Data());
2522 if (!fAliRootMode.IsNull())
2523 printf("= Requested AliRoot mode________________________ %s\n", fAliRootMode.Data());
2525 printf("= Number of PROOF workers limited to____________ %d\n", fNproofWorkers);
2526 if (fNproofWorkersPerSlave)
2527 printf("= Maximum number of workers per slave___________ %d\n", fNproofWorkersPerSlave);
2528 if (TestSpecialBit(kClearPackages))
2529 printf("= ClearPackages requested...\n");
2530 if (fIncludePath.Data())
2531 printf("= Include path for runtime task compilation: ___ %s\n", fIncludePath.Data());
2532 printf("= Additional libs to be loaded or souces to be compiled runtime: <%s>\n",fAdditionalLibs.Data());
2533 if (fPackages && fPackages->GetEntries()) {
2534 TIter next(fPackages);
2537 while ((obj=next())) list += obj->GetName();
2538 printf("= Par files to be used: ________________________ %s\n", list.Data());
2540 if (TestSpecialBit(kProofConnectGrid))
2541 printf("= Requested PROOF connection to grid\n");
2544 printf("= OverwriteMode:________________________________ %d\n", fOverwriteMode);
2545 if (fOverwriteMode) {
2546 printf("***** NOTE: Overwrite mode will overwrite the input generated datasets and partial results from previous analysis. \
2547 \n***** To disable, use: plugin->SetOverwriteMode(kFALSE);\n");
2549 printf("= Copy files to grid: __________________________ %s\n", (IsUseCopy())?"YES":"NO");
2550 printf("= Check if files can be copied to grid: ________ %s\n", (IsCheckCopy())?"YES":"NO:Print");
2551 printf("= Production mode:______________________________ %d\n", fProductionMode);
2552 printf("= Version of API requested: ____________________ %s\n", fAPIVersion.Data());
2553 printf("= Version of ROOT requested: ___________________ %s\n", fROOTVersion.Data());
2554 printf("= Version of AliRoot requested: ________________ %s\n", fAliROOTVersion.Data());
2556 printf("= User running the plugin: _____________________ %s\n", fUser.Data());
2557 printf("= Grid workdir relative to user $HOME: _________ %s\n", fGridWorkingDir.Data());
2558 printf("= Grid output directory relative to workdir: ___ %s\n", fGridOutputDir.Data());
2559 TString basedatadir = fGridDataDir;
2560 TString pattern = fDataPattern;
2562 Int_t ind = pattern.Index(" ");
2564 basedatadir += "/%run%/";
2565 basedatadir += pattern(0, ind);
2566 pattern = pattern(ind+1, pattern.Length());
2568 printf("= Data base directory path requested: __________ %s\n", basedatadir.Data());
2569 printf("= Data search pattern: _________________________ %s\n", pattern.Data());
2570 printf("= Input data format: ___________________________ %s\n", fInputFormat.Data());
2571 if (fRunNumbers.Length())
2572 printf("= Run numbers to be processed: _________________ %s\n", fRunNumbers.Data());
2574 printf("= Run range to be processed: ___________________ %d-%d\n", fRunRange[0], fRunRange[1]);
2575 if (!fRunRange[0] && !fRunNumbers.Length()) {
2576 TIter next(fInputFiles);
2579 while ((obj=next())) list += obj->GetName();
2580 printf("= Input files to be processed: _________________ %s\n", list.Data());
2582 if (TestBit(AliAnalysisGrid::kTest))
2583 printf("= Number of input files used in test mode: _____ %d\n", fNtestFiles);
2584 printf("= List of output files to be registered: _______ %s\n", fOutputFiles.Data());
2585 printf("= List of outputs going to be archived: ________ %s\n", fOutputArchive.Data());
2586 printf("= List of outputs that should not be merged: ___ %s\n", fMergeExcludes.Data());
2587 printf("= List of outputs that should not be registered: %s\n", fRegisterExcludes.Data());
2588 printf("= List of outputs produced during Terminate: ___ %s\n", fTerminateFiles.Data());
2589 printf("=====================================================================\n");
2590 printf("= Job price: ___________________________________ %d\n", fPrice);
2591 printf("= Time to live (TTL): __________________________ %d\n", fTTL);
2592 printf("= Max files per subjob: ________________________ %d\n", fSplitMaxInputFileNumber);
2593 if (fMaxInitFailed>0)
2594 printf("= Max number of subjob fails to kill: __________ %d\n", fMaxInitFailed);
2595 if (fMasterResubmitThreshold>0)
2596 printf("= Resubmit master job if failed subjobs >_______ %d\n", fMasterResubmitThreshold);
2597 printf("= Number of replicas for the output files_______ %d\n", fNreplicas);
2598 if (fNrunsPerMaster>0)
2599 printf("= Number of runs per master job: _______________ %d\n", fNrunsPerMaster);
2600 printf("= Number of files in one chunk to be merged: ___ %d\n", fMaxMergeFiles);
2601 printf("= Name of the generated execution script: ______ %s\n", fExecutable.Data());
2602 printf("= Executable command: __________________________ %s\n", fExecutableCommand.Data());
2603 if (fArguments.Length())
2604 printf("= Arguments for the execution script: __________ %s\n",fArguments.Data());
2605 if (fExecutableArgs.Length())
2606 printf("= Arguments after macro name in executable______ %s\n",fExecutableArgs.Data());
2607 printf("= Name of the generated analysis macro: ________ %s\n",fAnalysisMacro.Data());
2608 printf("= User analysis files to be deployed: __________ %s\n",fAnalysisSource.Data());
2609 printf("= Additional libs to be loaded or souces to be compiled runtime: <%s>\n",fAdditionalLibs.Data());
2610 printf("= Master jobs split mode: ______________________ %s\n",fSplitMode.Data());
2612 printf("= Custom name for the dataset to be created: ___ %s\n", fDatasetName.Data());
2613 printf("= Name of the generated JDL: ___________________ %s\n", fJDLName.Data());
2614 if (fIncludePath.Data())
2615 printf("= Include path for runtime task compilation: ___ %s\n", fIncludePath.Data());
2616 if (fCloseSE.Length())
2617 printf("= Force job outputs to storage element: ________ %s\n", fCloseSE.Data());
2618 if (fFriendChainName.Length())
2619 printf("= Open friend chain file on worker: ____________ %s\n", fFriendChainName.Data());
2620 if (fPackages && fPackages->GetEntries()) {
2621 TIter next(fPackages);
2624 while ((obj=next())) list += obj->GetName();
2625 printf("= Par files to be used: ________________________ %s\n", list.Data());
2629 //______________________________________________________________________________
2630 void AliAnalysisAlien::SetDefaults()
2632 // Set default values for everything. What cannot be filled will be left empty.
2633 if (fGridJDL) delete fGridJDL;
2634 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
2635 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
2638 fSplitMaxInputFileNumber = 100;
2640 fMasterResubmitThreshold = 0;
2646 fNrunsPerMaster = 1;
2647 fMaxMergeFiles = 100;
2649 fExecutable = "analysis.sh";
2650 fExecutableCommand = "root -b -q -x";
2652 fExecutableArgs = "";
2653 fAnalysisMacro = "myAnalysis.C";
2654 fAnalysisSource = "";
2655 fAdditionalLibs = "";
2659 fAliROOTVersion = "";
2660 fUser = ""; // Your alien user name
2661 fGridWorkingDir = "";
2662 fGridDataDir = ""; // Can be like: /alice/sim/PDC_08a/LHC08c9/
2663 fDataPattern = "*AliESDs.root"; // Can be like: *AliESDs.root, */pass1/*AliESDs.root, ...
2664 fFriendChainName = "";
2665 fGridOutputDir = "output";
2666 fOutputArchive = "log_archive.zip:std*@disk=1 root_archive.zip:*.root@disk=2";
2667 fOutputFiles = ""; // Like "AliAODs.root histos.root"
2668 fInputFormat = "xml-single";
2669 fJDLName = "analysis.jdl";
2670 fJobTag = "Automatically generated analysis JDL";
2671 fMergeExcludes = "";
2674 SetCheckCopy(kTRUE);
2675 SetDefaultOutputs(kTRUE);
2679 //______________________________________________________________________________
2680 void AliAnalysisAlien::SetFriendChainName(const char *name, const char *libnames)
2682 // Set file name for the chain of friends and optionally additional libs to be loaded.
2683 // Libs should be separated by blancs.
2684 fFriendChainName = name;
2685 fFriendChainName.ReplaceAll(",", " ");
2686 fFriendChainName.Strip();
2687 fFriendChainName.ReplaceAll(" ", " ");
2689 fFriendLibs = libnames;
2690 if (fFriendLibs.Length()) {
2691 if(!fFriendLibs.Contains(".so"))
2692 Fatal("SetFriendChainName()", "You should provide explicit library names (with extension)");
2693 fFriendLibs.ReplaceAll(",", " ");
2694 fFriendLibs.Strip();
2695 fFriendLibs.ReplaceAll(" ", " ");
2699 //______________________________________________________________________________
2700 void AliAnalysisAlien::SetRootVersionForProof(const char *version)
2702 // Obsolete method. Use SetROOTVersion instead
2703 Warning("SetRootVersionForProof", "Obsolete. Use SetROOTVersion instead");
2704 if (fROOTVersion.IsNull()) SetROOTVersion(version);
2705 else Error("SetRootVersionForProof", "ROOT version already set to %s", fROOTVersion.Data());
2708 //______________________________________________________________________________
2709 Bool_t AliAnalysisAlien::CheckMergedFiles(const char *filename, const char *aliendir, Int_t nperchunk, const char *jdl)
2711 // Checks current merge stage, makes xml for the next stage, counts number of files, submits next stage.
2712 // First check if the result is already in the output directory.
2713 if (FileExists(Form("%s/%s",aliendir,filename))) {
2714 printf("Final merged results found. Not merging again.\n");
2717 // Now check the last stage done.
2720 if (!FileExists(Form("%s/Stage_%d.xml",aliendir, stage+1))) break;
2723 // Next stage of merging
2725 TString pattern = "*root_archive.zip";
2726 if (stage>1) pattern = Form("Stage_%d/*root_archive.zip", stage-1);
2727 TGridResult *res = gGrid->Command(Form("find -x Stage_%d %s %s", stage, aliendir, pattern.Data()));
2728 if (res) delete res;
2729 // Write standard output to file
2730 gROOT->ProcessLine(Form("gGrid->Stdout(); > %s", Form("Stage_%d.xml",stage)));
2731 // Count the number of files inside
2733 ifile.open(Form("Stage_%d.xml",stage));
2734 if (!ifile.good()) {
2735 ::Error("CheckMergedFiles", "Could not redirect result of the find command to file %s", Form("Stage_%d.xml",stage));
2740 while (!ifile.eof()) {
2742 if (line.Contains("/event")) nfiles++;
2746 ::Error("CheckMergedFiles", "Cannot start Stage_%d merging since Stage_%d did not produced yet output", stage, stage-1);
2749 printf("=== Stage_%d produced %d files\n", stage-1, nfiles);
2751 // Copy the file in the output directory
2752 printf("===> Copying collection %s in the output directory %s\n", Form("Stage_%d.xml",stage), aliendir);
2753 // TFile::Cp(Form("Stage_%d.xml",stage), Form("alien://%s/Stage_%d.xml",aliendir,stage));
2754 if (!copyLocal2Alien("CheckMergedFiles", Form("Stage_%d.xml",stage),
2755 Form("%s/Stage_%d.xml",aliendir,stage))) Fatal("","Terminating");
2756 // Check if this is the last stage to be done.
2757 Bool_t laststage = (nfiles<nperchunk);
2758 if (fMaxMergeStages && stage>=fMaxMergeStages) laststage = kTRUE;
2761 printf("### Submiting final merging stage %d\n", stage);
2762 TString finalJDL = jdl;
2763 finalJDL.ReplaceAll(".jdl", "_final.jdl");
2764 TString query = Form("submit %s %s %d", finalJDL.Data(), aliendir, stage);
2765 jobId = SubmitSingleJob(query);
2767 printf("### Submiting merging stage %d\n", stage);
2768 TString query = Form("submit %s %s %d", jdl, aliendir, stage);
2769 jobId = SubmitSingleJob(query);
2771 if (!jobId) return kFALSE;
2773 if (!fGridJobIDs.IsNull()) fGridJobIDs.Append(" ");
2774 fGridJobIDs.Append(Form("%d", jobId));
2775 if (!fGridStages.IsNull()) fGridStages.Append(" ");
2776 fGridStages.Append(Form("%s_merge_stage%d",
2777 laststage ? "final" : "partial", stage));
2782 //______________________________________________________________________________
2783 AliAnalysisManager *AliAnalysisAlien::LoadAnalysisManager(const char *fname)
2785 // Loat the analysis manager from a file.
2786 TFile *file = TFile::Open(fname);
2788 ::Error("LoadAnalysisManager", "Cannot open file %s", fname);
2791 TIter nextkey(file->GetListOfKeys());
2792 AliAnalysisManager *mgr = 0;
2794 while ((key=(TKey*)nextkey())) {
2795 if (!strcmp(key->GetClassName(), "AliAnalysisManager"))
2796 mgr = (AliAnalysisManager*)file->Get(key->GetName());
2799 ::Error("LoadAnalysisManager", "No analysis manager found in file %s", fname);
2803 //______________________________________________________________________________
2804 Int_t AliAnalysisAlien::SubmitSingleJob(const char *query)
2806 // Submits a single job corresponding to the query and returns job id. If 0 submission failed.
2807 if (!gGrid) return 0;
2808 printf("=> %s ------> ",query);
2809 TGridResult *res = gGrid->Command(query);
2811 TString jobId = res->GetKey(0,"jobId");
2813 if (jobId.IsNull()) {
2814 printf("submission failed. Reason:\n");
2817 ::Error("SubmitSingleJob", "Your query %s could not be submitted", query);
2820 Int_t ijobId = jobId.Atoi();
2821 printf(" Job id: '%s' (%d)\n", jobId.Data(), ijobId);
2825 //______________________________________________________________________________
2826 Bool_t AliAnalysisAlien::MergeInfo(const char *output, const char *collection)
2828 // Merges a collection of output files using concatenation.
2829 TString scoll(collection);
2830 if (!scoll.Contains(".xml")) return kFALSE;
2831 TGridCollection *coll = (TGridCollection*)gROOT->ProcessLine(Form("TAlienCollection::Open(\"%s\");", collection));
2833 ::Error("MergeInfo", "Input XML %s collection empty.", collection);
2836 // Iterate grid collection
2838 Bool_t merged = kFALSE;
2840 while (coll->Next()) {
2841 TString fname = gSystem->DirName(coll->GetTURL());
2844 outtmp = Form("%d_%s", ifile, output);
2845 if (!TFile::Cp(fname, outtmp)) {
2846 ::Error("MergeInfo", "Could not copy %s", fname.Data());
2851 gSystem->Exec(Form("cp %s lastmerged", outtmp.Data()));
2854 gSystem->Exec(Form("cat lastmerged %s > tempmerged", outtmp.Data()));
2855 gSystem->Exec("cp tempmerged lastmerged");
2858 gSystem->Exec(Form("cp lastmerged %s", output));
2859 gSystem->Exec(Form("rm tempmerged lastmerged *_%s", output));
2865 //______________________________________________________________________________
2866 Bool_t AliAnalysisAlien::MergeOutput(const char *output, const char *basedir, Int_t nmaxmerge, Int_t stage)
2868 // Merge given output files from basedir. Basedir can be an alien output directory
2869 // but also an xml file with root_archive.zip locations. The file merger will merge nmaxmerge
2870 // files in a group (ignored for xml input). Merging can be done in stages:
2871 // stage=0 : will merge all existing files in a single stage, supporting resume if run locally
2872 // stage=1 : works with an xml of all root_archive.zip in the output directory
2873 // stage>1 : works with an xml of all root_archive.zip in the Stage_<n-1> directory
2874 TString outputFile = output;
2876 TString outputChunk;
2877 TString previousChunk = "";
2878 TObjArray *listoffiles = new TObjArray();
2879 // listoffiles->SetOwner();
2880 Int_t countChunk = 0;
2881 Int_t countZero = nmaxmerge;
2882 Bool_t merged = kTRUE;
2883 Bool_t isGrid = kTRUE;
2884 Int_t index = outputFile.Index("@");
2885 if (index > 0) outputFile.Remove(index);
2886 TString inputFile = outputFile;
2887 TString sbasedir = basedir;
2888 if (sbasedir.Contains(".xml")) {
2889 // Merge files pointed by the xml - ignore nmaxmerge and set ichunk to 0
2890 nmaxmerge = 9999999;
2891 TGridCollection *coll = (TGridCollection*)gROOT->ProcessLine(Form("TAlienCollection::Open(\"%s\");", basedir));
2893 ::Error("MergeOutput", "Input XML collection empty.");
2896 // Iterate grid collection
2897 while (coll->Next()) {
2898 TString fname = gSystem->DirName(coll->GetTURL());
2901 listoffiles->Add(new TNamed(fname.Data(),""));
2903 } else if (sbasedir.Contains(".txt")) {
2904 // The file having the .txt extension is expected to contain a list of
2905 // folders where the output files will be looked. For alien folders,
2906 // the full folder LFN is expected (starting with alien://)
2907 // Assume lfn's on each line
2912 ::Error("MergeOutput", "File %s cannot be opened. Merging stopped." ,sbasedir.Data());
2918 if (line.IsNull() || line.BeginsWith("#")) continue;
2920 if (!line.Contains("alien:")) isGrid = kFALSE;
2924 listoffiles->Add(new TNamed(line.Data(),""));
2928 ::Error("MergeOutput","Input file %s contains no files to be merged\n", sbasedir.Data());
2933 command = Form("find %s/ *%s", basedir, inputFile.Data());
2934 printf("command: %s\n", command.Data());
2935 TGridResult *res = gGrid->Command(command);
2937 ::Error("MergeOutput","No result for the find command\n");
2943 while ((map=(TMap*)nextmap())) {
2944 TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("turl"));
2945 if (!objs || !objs->GetString().Length()) {
2946 // Nothing found - skip this output
2951 listoffiles->Add(new TNamed(objs->GetName(),""));
2955 if (!listoffiles->GetEntries()) {
2956 ::Error("MergeOutput","No result for the find command\n");
2961 TFileMerger *fm = 0;
2962 TIter next0(listoffiles);
2963 TObjArray *listoffilestmp = new TObjArray();
2964 listoffilestmp->SetOwner();
2967 // Keep only the files at upper level
2968 Int_t countChar = 0;
2969 while ((nextfile=next0())) {
2970 snextfile = nextfile->GetName();
2971 Int_t crtCount = snextfile.CountChar('/');
2972 if (nextfile == listoffiles->First()) countChar = crtCount;
2973 if (crtCount < countChar) countChar = crtCount;
2976 while ((nextfile=next0())) {
2977 snextfile = nextfile->GetName();
2978 Int_t crtCount = snextfile.CountChar('/');
2979 if (crtCount > countChar) {
2983 listoffilestmp->Add(nextfile);
2986 listoffiles = listoffilestmp; // Now contains 'good' files
2987 listoffiles->Print();
2988 TIter next(listoffiles);
2989 // Check if there is a merge operation to resume. Works only for stage 0 or 1.
2990 outputChunk = outputFile;
2991 outputChunk.ReplaceAll(".root", "_*.root");
2992 // Check for existent temporary merge files
2993 // Check overwrite mode and remove previous partial results if needed
2994 // Preserve old merging functionality for stage 0.
2996 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
2998 // Skip as many input files as in a chunk
2999 for (Int_t counter=0; counter<nmaxmerge; counter++) {
3002 ::Error("MergeOutput", "Mismatch found. Please remove partial merged files from local dir.");
3006 snextfile = nextfile->GetName();
3008 outputChunk = outputFile;
3009 outputChunk.ReplaceAll(".root", Form("_%04d.root", countChunk));
3011 if (gSystem->AccessPathName(outputChunk)) continue;
3012 // Merged file with chunks up to <countChunk> found
3013 ::Info("MergeOutput", "Resume merging of <%s> from <%s>\n", outputFile.Data(), outputChunk.Data());
3014 previousChunk = outputChunk;
3018 countZero = nmaxmerge;
3020 while ((nextfile=next())) {
3021 snextfile = nextfile->GetName();
3022 // Loop 'find' results and get next LFN
3023 if (countZero == nmaxmerge) {
3024 // First file in chunk - create file merger and add previous chunk if any.
3025 fm = new TFileMerger(isGrid);
3026 fm->SetFastMethod(kTRUE);
3027 if (previousChunk.Length()) fm->AddFile(previousChunk.Data());
3028 outputChunk = outputFile;
3029 outputChunk.ReplaceAll(".root", Form("_%04d.root", countChunk));
3031 // If last file found, put merged results in the output file
3032 if (nextfile == listoffiles->Last()) outputChunk = outputFile;
3033 // Add file to be merged and decrement chunk counter.
3034 fm->AddFile(snextfile);
3036 if (countZero==0 || nextfile == listoffiles->Last()) {
3037 if (!fm->GetMergeList() || !fm->GetMergeList()->GetSize()) {
3038 // Nothing found - skip this output
3039 ::Warning("MergeOutput", "No <%s> files found.", inputFile.Data());
3043 fm->OutputFile(outputChunk);
3044 // Merge the outputs, then go to next chunk
3046 ::Error("MergeOutput", "Could not merge all <%s> files", outputFile.Data());
3050 ::Info("MergeOutputs", "\n##### Merged %d output files to <%s>", fm->GetMergeList()->GetSize(), outputChunk.Data());
3051 gSystem->Unlink(previousChunk);
3053 if (nextfile == listoffiles->Last()) break;
3055 countZero = nmaxmerge;
3056 previousChunk = outputChunk;
3063 // Merging stage different than 0.
3064 // Move to the begining of the requested chunk.
3065 fm = new TFileMerger(isGrid);
3066 fm->SetFastMethod(kTRUE);
3067 while ((nextfile=next())) fm->AddFile(nextfile->GetName());
3069 if (!fm->GetMergeList() || !fm->GetMergeList()->GetSize()) {
3070 // Nothing found - skip this output
3071 ::Warning("MergeOutput", "No <%s> files found.", inputFile.Data());
3075 fm->OutputFile(outputFile);
3076 // Merge the outputs
3078 ::Error("MergeOutput", "Could not merge all <%s> files", outputFile.Data());
3082 ::Info("MergeOutput", "\n##### Merged %d output files to <%s>", fm->GetMergeList()->GetSize(), outputFile.Data());
3088 //______________________________________________________________________________
3089 Bool_t AliAnalysisAlien::MergeOutputs()
3091 // Merge analysis outputs existing in the AliEn space.
3092 if (TestBit(AliAnalysisGrid::kTest)) return kTRUE;
3093 if (TestBit(AliAnalysisGrid::kOffline)) return kFALSE;
3095 Error("MergeOutputs", "Cannot merge outputs without grid connection. Terminate will NOT be executed");
3099 if (!TestBit(AliAnalysisGrid::kMerge)) {
3100 Info("MergeOutputs", "### Re-run with <MergeViaJDL> option in terminate mode of the plugin to submit merging jobs ###");
3103 if (fProductionMode) {
3104 Info("MergeOutputs", "### Merging will be submitted by LPM manager... ###");
3107 Info("MergeOutputs", "Submitting merging JDL");
3108 if (!SubmitMerging()) return kFALSE;
3109 Info("MergeOutputs", "### Re-run with <MergeViaJDL> off to collect results after merging jobs are done ###");
3110 Info("MergeOutputs", "### The Terminate() method is executed by the merging jobs");
3113 // Get the output path
3114 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
3115 if (!DirectoryExists(fGridOutputDir)) {
3116 Error("MergeOutputs", "Grid output directory %s not found. Terminate() will NOT be executed", fGridOutputDir.Data());
3119 if (!fOutputFiles.Length()) {
3120 Error("MergeOutputs", "No output file names defined. Are you running the right AliAnalysisAlien configuration ?");
3123 // Check if fast read option was requested
3124 Info("MergeOutputs", "Started local merging of output files from: alien://%s \
3125 \n======= overwrite mode = %d", fGridOutputDir.Data(), (Int_t)fOverwriteMode);
3126 if (fFastReadOption) {
3127 Warning("MergeOutputs", "You requested FastRead option. Using xrootd flags to reduce timeouts. This may skip some files that could be accessed ! \
3128 \n+++ NOTE: To disable this option, use: plugin->SetFastReadOption(kFALSE)");
3129 gEnv->SetValue("XNet.ConnectTimeout",50);
3130 gEnv->SetValue("XNet.RequestTimeout",50);
3131 gEnv->SetValue("XNet.MaxRedirectCount",2);
3132 gEnv->SetValue("XNet.ReconnectTimeout",50);
3133 gEnv->SetValue("XNet.FirstConnectMaxCnt",1);
3135 // Make sure we change the temporary directory
3136 gSystem->Setenv("TMPDIR", gSystem->pwd());
3137 // Set temporary compilation directory to current one
3138 gSystem->SetBuildDir(gSystem->pwd(), kTRUE);
3139 TObjArray *list = fOutputFiles.Tokenize(",");
3143 Bool_t merged = kTRUE;
3144 while((str=(TObjString*)next())) {
3145 outputFile = str->GetString();
3146 Int_t index = outputFile.Index("@");
3147 if (index > 0) outputFile.Remove(index);
3148 TString outputChunk = outputFile;
3149 outputChunk.ReplaceAll(".root", "_*.root");
3150 // Skip already merged outputs
3151 if (!gSystem->AccessPathName(outputFile)) {
3152 if (fOverwriteMode) {
3153 Info("MergeOutputs", "Overwrite mode. Existing file %s was deleted.", outputFile.Data());
3154 gSystem->Unlink(outputFile);
3155 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
3156 Info("MergeOutput", "Overwrite mode: partial merged files %s will removed",
3157 outputChunk.Data());
3158 gSystem->Exec(Form("rm -f %s", outputChunk.Data()));
3161 Info("MergeOutputs", "Output file <%s> found. Not merging again.", outputFile.Data());
3165 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
3166 Info("MergeOutput", "Overwrite mode: partial merged files %s will removed",
3167 outputChunk.Data());
3168 gSystem->Exec(Form("rm -f %s", outputChunk.Data()));
3171 if (fMergeExcludes.Contains(outputFile.Data()) ||
3172 fRegisterExcludes.Contains(outputFile.Data())) continue;
3173 // Perform a 'find' command in the output directory, looking for registered outputs
3174 merged = MergeOutput(outputFile, fGridOutputDir, fMaxMergeFiles);
3176 Error("MergeOutputs", "Terminate() will NOT be executed");
3180 TFile *fileOpened = (TFile*)gROOT->GetListOfFiles()->FindObject(outputFile);
3181 if (fileOpened) fileOpened->Close();
3187 //______________________________________________________________________________
3188 void AliAnalysisAlien::SetDefaultOutputs(Bool_t flag)
3190 // Use the output files connected to output containers from the analysis manager
3191 // rather than the files defined by SetOutputFiles
3192 if (flag && !TObject::TestBit(AliAnalysisGrid::kDefaultOutputs))
3193 Info("SetDefaultOutputs", "Plugin will use the output files taken from analysis manager");
3194 TObject::SetBit(AliAnalysisGrid::kDefaultOutputs, flag);
3197 //______________________________________________________________________________
3198 void AliAnalysisAlien::SetOutputFiles(const char *list)
3200 // Manually set the output files list.
3201 // Removes duplicates. Not allowed if default outputs are not disabled.
3202 if (TObject::TestBit(AliAnalysisGrid::kDefaultOutputs)) {
3203 Fatal("SetOutputFiles", "You have to explicitly call SetDefaultOutputs(kFALSE) to manually set output files.");
3206 Info("SetOutputFiles", "Output file list is set manually - you are on your own.");
3208 TString slist = list;
3209 if (slist.Contains("@")) Warning("SetOutputFiles","The plugin does not allow explicit SE's. Please use: SetNumberOfReplicas() instead.");
3210 TObjArray *arr = slist.Tokenize(" ");
3214 while ((os=(TObjString*)next())) {
3215 sout = os->GetString();
3216 if (sout.Index("@")>0) sout.Remove(sout.Index("@"));
3217 if (fOutputFiles.Contains(sout)) continue;
3218 if (!fOutputFiles.IsNull()) fOutputFiles += ",";
3219 fOutputFiles += sout;
3224 //______________________________________________________________________________
3225 void AliAnalysisAlien::SetOutputArchive(const char *list)
3227 // Manually set the output archive list. Free text - you are on your own...
3228 // Not allowed if default outputs are not disabled.
3229 if (TObject::TestBit(AliAnalysisGrid::kDefaultOutputs)) {
3230 Fatal("SetOutputArchive", "You have to explicitly call SetDefaultOutputs(kFALSE) to manually set the output archives.");
3233 Info("SetOutputArchive", "Output archive is set manually - you are on your own.");
3234 fOutputArchive = list;
3237 //______________________________________________________________________________
3238 void AliAnalysisAlien::SetPreferedSE(const char */*se*/)
3240 // Setting a prefered output SE is not allowed anymore.
3241 Warning("SetPreferedSE", "Setting a preferential SE is not allowed anymore via the plugin. Use SetNumberOfReplicas() and SetDefaultOutputs()");
3244 //______________________________________________________________________________
3245 void AliAnalysisAlien::SetProofParameter(const char *pname, const char *value)
3247 // Set some PROOF special parameter.
3248 TPair *pair = dynamic_cast<TPair*>(fProofParam.FindObject(pname));
3250 TObject *old = pair->Key();
3251 TObject *val = pair->Value();
3252 fProofParam.Remove(old);
3256 fProofParam.Add(new TObjString(pname), new TObjString(value));
3259 //______________________________________________________________________________
3260 const char *AliAnalysisAlien::GetProofParameter(const char *pname) const
3262 // Returns a special PROOF parameter.
3263 TPair *pair = dynamic_cast<TPair*>(fProofParam.FindObject(pname));
3264 if (!pair) return 0;
3265 return pair->Value()->GetName();
3268 //______________________________________________________________________________
3269 Bool_t AliAnalysisAlien::StartAnalysis(Long64_t /*nentries*/, Long64_t /*firstEntry*/)
3271 // Start remote grid analysis.
3272 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
3273 Bool_t testMode = TestBit(AliAnalysisGrid::kTest);
3274 if (!mgr || !mgr->IsInitialized()) {
3275 Error("StartAnalysis", "You need an initialized analysis manager for this");
3278 // Are we in PROOF mode ?
3279 if (mgr->IsProofMode()) {
3280 if (testMode) Info("StartAnalysis", "##### Starting PROOF analysis with Proof Lite via the plugin #####");
3281 else Info("StartAnalysis", "##### Starting PROOF analysis on cluster <%s> via the plugin #####", fProofCluster.Data());
3282 if (fProofCluster.IsNull()) {
3283 Error("StartAnalysis", "You need to specify the proof cluster name via SetProofCluster");
3286 if (fProofDataSet.IsNull() && !testMode) {
3287 Error("StartAnalysis", "You need to specify a dataset using SetProofDataSet()");
3290 // Set the needed environment
3291 gEnv->SetValue("XSec.GSI.DelegProxy","2");
3292 // Do we need to reset PROOF ? The success of the Reset operation cannot be checked
3293 if (fProofReset && !testMode) {
3294 if (fProofReset==1) {
3295 Info("StartAnalysis", "Sending soft reset signal to proof cluster %s", fProofCluster.Data());
3296 gROOT->ProcessLine(Form("TProof::Reset(\"%s\", kFALSE);", fProofCluster.Data()));
3298 Info("StartAnalysis", "Sending hard reset signal to proof cluster %s", fProofCluster.Data());
3299 gROOT->ProcessLine(Form("TProof::Reset(\"%s\", kTRUE);", fProofCluster.Data()));
3301 Info("StartAnalysis", "Stopping the analysis. Please use SetProofReset(0) to resume.");
3306 // Check if there is an old active session
3307 Long_t nsessions = gROOT->ProcessLine(Form("TProof::Mgr(\"%s\")->QuerySessions(\"\")->GetEntries();", fProofCluster.Data()));
3309 Error("StartAnalysis","You have to reset your old session first\n");
3313 // Do we need to change the ROOT version ? The success of this cannot be checked.
3314 if (!fROOTVersion.IsNull() && !testMode) {
3315 gROOT->ProcessLine(Form("TProof::Mgr(\"%s\")->SetROOTVersion(\"VO_ALICE@ROOT::%s\");",
3316 fProofCluster.Data(), fROOTVersion.Data()));
3318 // Connect to PROOF and check the status
3321 if (fNproofWorkersPerSlave) sworkers = Form("workers=%dx", fNproofWorkersPerSlave);
3322 else if (fNproofWorkers) sworkers = Form("workers=%d", fNproofWorkers);
3324 if (!sworkers.IsNull())
3325 proof = gROOT->ProcessLine(Form("TProof::Open(\"%s\", \"%s\");", fProofCluster.Data(), sworkers.Data()));
3327 proof = gROOT->ProcessLine(Form("TProof::Open(\"%s\");", fProofCluster.Data()));
3329 proof = gROOT->ProcessLine("TProof::Open(\"\");");
3331 Error("StartAnalysis", "Could not start PROOF in test mode");
3336 Error("StartAnalysis", "Could not connect to PROOF cluster <%s>", fProofCluster.Data());
3339 if (fNproofWorkersPerSlave*fNproofWorkers > 0)
3340 gROOT->ProcessLine(Form("gProof->SetParallel(%d);", fNproofWorkers));
3341 // Set proof special parameters if any
3342 TIter nextpp(&fProofParam);
3343 TObject *proofparam;
3344 while ((proofparam=nextpp())) {
3345 TString svalue = GetProofParameter(proofparam->GetName());
3346 gROOT->ProcessLine(Form("gProof->SetParameter(\"%s\",%s);", proofparam->GetName(), svalue.Data()));
3348 // Is dataset existing ?
3350 TString dataset = fProofDataSet;
3351 Int_t index = dataset.Index("#");
3352 if (index>=0) dataset.Remove(index);
3353 // if (!gROOT->ProcessLine(Form("gProof->ExistsDataSet(\"%s\");",fProofDataSet.Data()))) {
3354 // Error("StartAnalysis", "Dataset %s not existing", fProofDataSet.Data());
3357 // Info("StartAnalysis", "Dataset %s found", dataset.Data());
3359 // Is ClearPackages() needed ?
3360 if (TestSpecialBit(kClearPackages)) {
3361 Info("StartAnalysis", "ClearPackages signal sent to PROOF. Use SetClearPackages(kFALSE) to reset this.");
3362 gROOT->ProcessLine("gProof->ClearPackages();");
3364 // Is a given aliroot mode requested ?
3367 if (!fAliRootMode.IsNull()) {
3368 TString alirootMode = fAliRootMode;
3369 if (alirootMode == "default") alirootMode = "";
3370 Info("StartAnalysis", "You are requesting AliRoot mode: %s", fAliRootMode.Data());
3371 optionsList.SetOwner();
3372 optionsList.Add(new TNamed("ALIROOT_MODE", alirootMode.Data()));
3373 // Check the additional libs to be loaded
3375 Bool_t parMode = kFALSE;
3376 if (!alirootMode.IsNull()) extraLibs = "ANALYSIS:OADB:ANALYSISalice";
3377 // Parse the extra libs for .so
3378 if (fAdditionalLibs.Length()) {
3379 TString additionalLibs = fAdditionalLibs;
3380 additionalLibs.Strip();
3381 if (additionalLibs.Length() && fFriendLibs.Length())
3382 additionalLibs += " ";
3383 additionalLibs += fFriendLibs;
3384 TObjArray *list = additionalLibs.Tokenize(" ");
3387 while((str=(TObjString*)next())) {
3388 if (str->GetString().Contains(".so")) {
3390 Warning("StartAnalysis", "Plugin does not support loading libs after par files in PROOF mode. Library %s and following will not load on workers", str->GetName());
3393 TString stmp = str->GetName();
3394 if (stmp.BeginsWith("lib")) stmp.Remove(0,3);
3395 stmp.ReplaceAll(".so","");
3396 if (!extraLibs.IsNull()) extraLibs += ":";
3400 if (str->GetString().Contains(".par")) {
3401 // The first par file found in the list will not allow any further .so
3403 if (!parLibs.IsNull()) parLibs += ":";
3404 parLibs += str->GetName();
3408 if (list) delete list;
3410 if (!extraLibs.IsNull()) {
3411 Info("StartAnalysis", "Adding extra libs: %s",extraLibs.Data());
3412 optionsList.Add(new TNamed("ALIROOT_EXTRA_LIBS",extraLibs.Data()));
3414 // Check extra includes
3415 if (!fIncludePath.IsNull()) {
3416 TString includePath = fIncludePath;
3417 includePath.ReplaceAll(" ",":");
3418 includePath.ReplaceAll("$ALICE_ROOT/","");
3419 includePath.ReplaceAll("${ALICE_ROOT}/","");
3420 includePath.ReplaceAll("-I","");
3421 includePath.Remove(TString::kTrailing, ':');
3422 Info("StartAnalysis", "Adding extra includes: %s",includePath.Data());
3423 optionsList.Add(new TNamed("ALIROOT_EXTRA_INCLUDES",includePath.Data()));
3425 // Check if connection to grid is requested
3426 if (TestSpecialBit(kProofConnectGrid))
3427 optionsList.Add(new TNamed("ALIROOT_ENABLE_ALIEN", "1"));
3428 // Enable AliRoot par
3430 // Enable proof lite package
3431 TString alirootLite = gSystem->ExpandPathName("$ALICE_ROOT/ANALYSIS/macros/AliRootProofLite.par");
3432 for (Int_t i=0; i<optionsList.GetSize(); i++) {
3433 TNamed *obj = (TNamed*)optionsList.At(i);
3434 printf("%s %s\n", obj->GetName(), obj->GetTitle());
3436 if (!gROOT->ProcessLine(Form("gProof->UploadPackage(\"%s\");",alirootLite.Data()))
3437 && !gROOT->ProcessLine(Form("gProof->EnablePackage(\"%s\", (TList*)%p);",alirootLite.Data(),&optionsList))) {
3438 Info("StartAnalysis", "AliRootProofLite enabled");
3440 Error("StartAnalysis", "There was an error trying to enable package AliRootProofLite.par");
3444 if ( ! fAliROOTVersion.IsNull() ) {
3445 if (gROOT->ProcessLine(Form("gProof->EnablePackage(\"VO_ALICE@AliRoot::%s\", (TList*)%p, kTRUE);",
3446 fAliROOTVersion.Data(), &optionsList))) {
3447 Error("StartAnalysis", "There was an error trying to enable package VO_ALICE@AliRoot::%s", fAliROOTVersion.Data());
3452 // Enable first par files from fAdditionalLibs
3453 if (!parLibs.IsNull()) {
3454 TObjArray *list = parLibs.Tokenize(":");
3456 TObjString *package;
3457 while((package=(TObjString*)next())) {
3458 TString spkg = package->GetName();
3459 spkg.ReplaceAll(".par", "");
3460 gSystem->Exec(TString::Format("rm -rf %s", spkg.Data()));
3461 if (!gROOT->ProcessLine(Form("gProof->UploadPackage(\"%s\");", package->GetName()))) {
3462 TString enablePackage = (testMode)?Form("gProof->EnablePackage(\"%s\",kFALSE);", package->GetName()):Form("gProof->EnablePackage(\"%s\",kTRUE);", package->GetName());
3463 if (gROOT->ProcessLine(enablePackage)) {
3464 Error("StartAnalysis", "There was an error trying to enable package %s", package->GetName());
3468 Error("StartAnalysis", "There was an error trying to upload package %s", package->GetName());
3472 if (list) delete list;
3475 if (fAdditionalLibs.Contains(".so") && !testMode) {
3476 Error("StartAnalysis", "You request additional libs to be loaded but did not enabled any AliRoot mode. Please refer to: \
3477 \n http://aaf.cern.ch/node/83 and use a parameter for SetAliRootMode()");
3481 // Enable par files if requested
3482 if (fPackages && fPackages->GetEntries()) {
3483 TIter next(fPackages);
3485 while ((package=next())) {
3486 // Skip packages already enabled
3487 if (parLibs.Contains(package->GetName())) continue;
3488 TString spkg = package->GetName();
3489 spkg.ReplaceAll(".par", "");
3490 gSystem->Exec(TString::Format("rm -rf %s", spkg.Data()));
3491 if (!gROOT->ProcessLine(Form("gProof->UploadPackage(\"%s\");", package->GetName()))) {
3492 if (gROOT->ProcessLine(Form("gProof->EnablePackage(\"%s\",kTRUE);", package->GetName()))) {
3493 Error("StartAnalysis", "There was an error trying to enable package %s", package->GetName());
3497 Error("StartAnalysis", "There was an error trying to upload package %s", package->GetName());
3502 // Do we need to load analysis source files ?
3503 // NOTE: don't load on client since this is anyway done by the user to attach his task.
3504 if (fAnalysisSource.Length()) {
3505 TObjArray *list = fAnalysisSource.Tokenize(" ");
3508 while((str=(TObjString*)next())) {
3509 gROOT->ProcessLine(Form("gProof->Load(\"%s+g\", kTRUE);", str->GetName()));
3511 if (list) delete list;
3514 // Register dataset to proof lite.
3515 if (fFileForTestMode.IsNull()) {
3516 Error("GetChainForTestMode", "For proof test mode please use SetFileForTestMode() pointing to a file that contains data file locations.");
3519 if (gSystem->AccessPathName(fFileForTestMode)) {
3520 Error("GetChainForTestMode", "File not found: %s", fFileForTestMode.Data());
3523 TFileCollection *coll = new TFileCollection();
3524 coll->AddFromFile(fFileForTestMode);
3525 gROOT->ProcessLine(Form("gProof->RegisterDataSet(\"test_collection\", (TFileCollection*)%p, \"OV\");", coll));
3526 gROOT->ProcessLine("gProof->ShowDataSets()");
3531 // Check if output files have to be taken from the analysis manager
3532 if (TestBit(AliAnalysisGrid::kDefaultOutputs)) {
3533 // Add output files and AOD files
3534 fOutputFiles = GetListOfFiles("outaod");
3535 // Add extra files registered to the analysis manager
3536 TString extra = GetListOfFiles("ext");
3537 if (!extra.IsNull()) {
3538 extra.ReplaceAll(".root", "*.root");
3539 if (!fOutputFiles.IsNull()) fOutputFiles += ",";
3540 fOutputFiles += extra;
3542 // Compose the output archive.
3543 fOutputArchive = "log_archive.zip:std*@disk=1 ";
3544 if (mgr->IsCollectThroughput())
3545 fOutputArchive += Form("root_archive.zip:%s,*.stat@disk=%d %s@disk=%d",fOutputFiles.Data(),fNreplicas, mgr->GetFileInfoLog(),fNreplicas);
3547 fOutputArchive += Form("root_archive.zip:%s,*.stat@disk=%d",fOutputFiles.Data(),fNreplicas);
3549 // if (!fCloseSE.Length()) fCloseSE = gSystem->Getenv("alien_CLOSE_SE");
3550 if (TestBit(AliAnalysisGrid::kOffline)) {
3551 Info("StartAnalysis","\n##### OFFLINE MODE ##### Files to be used in GRID are produced but not copied \
3552 \n there nor any job run. You can revise the JDL and analysis \
3553 \n macro then run the same in \"submit\" mode.");
3554 } else if (TestBit(AliAnalysisGrid::kTest)) {
3555 Info("StartAnalysis","\n##### LOCAL MODE ##### Your analysis will be run locally on a subset of the requested \
3557 } else if (TestBit(AliAnalysisGrid::kSubmit)) {
3558 Info("StartAnalysis","\n##### SUBMIT MODE ##### Files required by your analysis are copied to your grid working \
3559 \n space and job submitted.");
3560 } else if (TestBit(AliAnalysisGrid::kMerge)) {
3561 Info("StartAnalysis","\n##### MERGE MODE ##### The registered outputs of the analysis will be merged");
3562 if (fMergeViaJDL) CheckInputData();
3565 Info("StartAnalysis","\n##### FULL ANALYSIS MODE ##### Producing needed files and submitting your analysis job...");
3570 Error("StartAnalysis", "Cannot start grid analysis without grid connection");
3573 if (IsCheckCopy() && gGrid) CheckFileCopy(gGrid->GetHomeDirectory());
3574 if (!CheckInputData()) {
3575 Error("StartAnalysis", "There was an error in preprocessing your requested input data");
3578 if (!CreateDataset(fDataPattern)) {
3580 if (!fRunNumbers.Length() && !fRunRange[0]) serror = Form("path to data directory: <%s>", fGridDataDir.Data());
3581 if (fRunNumbers.Length()) serror = "run numbers";
3582 if (fRunRange[0]) serror = Form("run range [%d, %d]", fRunRange[0], fRunRange[1]);
3583 serror += Form("\n or data pattern <%s>", fDataPattern.Data());
3584 Error("StartAnalysis", "No data to process. Please fix %s in your plugin configuration.", serror.Data());
3587 WriteAnalysisFile();
3588 WriteAnalysisMacro();
3590 WriteValidationScript();
3592 WriteMergingMacro();
3593 WriteMergeExecutable();
3594 WriteValidationScript(kTRUE);
3596 if (!CreateJDL()) return kFALSE;
3597 if (TestBit(AliAnalysisGrid::kOffline)) return kFALSE;
3599 // Locally testing the analysis
3600 Info("StartAnalysis", "\n_______________________________________________________________________ \
3601 \n Running analysis script in a daughter shell as on a worker node \
3602 \n_______________________________________________________________________");
3603 TObjArray *list = fOutputFiles.Tokenize(",");
3607 while((str=(TObjString*)next())) {
3608 outputFile = str->GetString();
3609 Int_t index = outputFile.Index("@");
3610 if (index > 0) outputFile.Remove(index);
3611 if (!gSystem->AccessPathName(outputFile)) gSystem->Exec(Form("rm %s", outputFile.Data()));
3614 gSystem->Exec(Form("bash %s 2>stderr", fExecutable.Data()));
3615 gSystem->Exec(Form("bash %s",fValidationScript.Data()));
3616 // gSystem->Exec("cat stdout");
3619 // Check if submitting is managed by LPM manager
3620 if (fProductionMode) {
3621 //TString prodfile = fJDLName;
3622 //prodfile.ReplaceAll(".jdl", ".prod");
3623 //WriteProductionFile(prodfile);
3624 Info("StartAnalysis", "Job submitting is managed by LPM. Rerun in terminate mode after jobs finished.");
3627 // Submit AliEn job(s)
3628 gGrid->Cd(fGridOutputDir);
3633 if (!fRunNumbers.Length() && !fRunRange[0]) {
3634 // Submit a given xml or a set of runs
3635 res = gGrid->Command(Form("submit %s", fJDLName.Data()));
3636 printf("*************************** %s\n",Form("submit %s", fJDLName.Data()));
3638 const char *cjobId = res->GetKey(0,"jobId");
3642 Error("StartAnalysis", "Your JDL %s could not be submitted", fJDLName.Data());
3645 Info("StartAnalysis", "\n_______________________________________________________________________ \
3646 \n##### Your JDL %s was successfully submitted. \nTHE JOB ID IS: %s \
3647 \n_______________________________________________________________________",
3648 fJDLName.Data(), cjobId);
3651 if (!fGridJobIDs.IsNull()) fGridJobIDs.Append(" ");
3652 fGridJobIDs.Append(jobID);
3653 if (!fGridStages.IsNull()) fGridStages.Append(" ");
3654 fGridStages.Append("full");
3659 Error("StartAnalysis", "No grid result after submission !!! Bailing out...");
3663 // Submit for a range of enumeration of runs.
3664 if (!Submit()) return kFALSE;
3665 jobID = fGridJobIDs;
3669 Info("StartAnalysis", "\n#### STARTING AN ALIEN SHELL FOR YOU. EXIT WHEN YOUR JOB %s HAS FINISHED. #### \
3670 \n You may exit at any time and terminate the job later using the option <terminate> \
3671 \n ##################################################################################", jobID.Data());
3672 gSystem->Exec("aliensh");
3674 Info("StartAnalysis", "\n#### SUBMITTED JOB %s TO ALIEN QUEUE #### \
3675 \n Remember to terminate the job later using the option <terminate> \
3676 \n ##################################################################################", jobID.Data());
3681 //______________________________________________________________________________
3682 const char *AliAnalysisAlien::GetListOfFiles(const char *type)
3684 // Get a comma-separated list of output files of the requested type.
3685 // Type can be (case unsensitive):
3686 // aod - list of aod files (std, extensions and filters)
3687 // out - list of output files connected to containers (but not aod's or extras)
3688 // ext - list of extra files registered to the manager
3689 // ter - list of files produced in terminate
3690 static TString files;
3692 TString stype = type;
3694 TString aodfiles, extra;
3695 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
3697 ::Error("GetListOfFiles", "Cannot call this without analysis manager");
3698 return files.Data();
3700 if (mgr->GetOutputEventHandler()) {
3702 if (mgr->GetOutputEventHandler()->GetFillAOD())
3703 aodfiles = mgr->GetOutputEventHandler()->GetOutputFileName();
3704 TString extraaod = mgr->GetOutputEventHandler()->GetExtraOutputs();
3705 if (!extraaod.IsNull() && mgr->GetOutputEventHandler()->GetFillExtension()) {
3707 aodfiles += extraaod;
3710 if (stype.Contains("aod")) {
3712 if (stype == "aod") return files.Data();
3714 // Add output files that are not in the list of AOD files
3715 TString outputfiles = "";
3716 TIter next(mgr->GetOutputs());
3717 AliAnalysisDataContainer *output;
3718 const char *filename = 0;
3719 while ((output=(AliAnalysisDataContainer*)next())) {
3720 filename = output->GetFileName();
3721 if (!(strcmp(filename, "default"))) continue;
3722 if (outputfiles.Contains(filename)) continue;
3723 if (aodfiles.Contains(filename)) continue;
3724 if (!outputfiles.IsNull()) outputfiles += ",";
3725 outputfiles += filename;
3727 if (stype.Contains("out")) {
3728 if (!files.IsNull()) files += ",";
3729 files += outputfiles;
3730 if (stype == "out") return files.Data();
3732 // Add extra files registered to the analysis manager
3734 extra = mgr->GetExtraFiles();
3735 if (!extra.IsNull()) {
3737 extra.ReplaceAll(" ", ",");
3738 TObjArray *fextra = extra.Tokenize(",");
3739 TIter nextx(fextra);
3741 while ((obj=nextx())) {
3742 if (aodfiles.Contains(obj->GetName())) continue;
3743 if (outputfiles.Contains(obj->GetName())) continue;
3744 if (sextra.Contains(obj->GetName())) continue;
3745 if (!sextra.IsNull()) sextra += ",";
3746 sextra += obj->GetName();
3749 if (stype.Contains("ext")) {
3750 if (!files.IsNull()) files += ",";
3754 if (stype == "ext") return files.Data();
3756 if (!fTerminateFiles.IsNull()) {
3757 fTerminateFiles.Strip();
3758 fTerminateFiles.ReplaceAll(" ",",");
3759 TObjArray *fextra = fTerminateFiles.Tokenize(",");
3760 TIter nextx(fextra);
3762 while ((obj=nextx())) {
3763 if (aodfiles.Contains(obj->GetName())) continue;
3764 if (outputfiles.Contains(obj->GetName())) continue;
3765 if (termfiles.Contains(obj->GetName())) continue;
3766 if (sextra.Contains(obj->GetName())) continue;
3767 if (!termfiles.IsNull()) termfiles += ",";
3768 termfiles += obj->GetName();
3772 if (stype.Contains("ter")) {
3773 if (!files.IsNull() && !termfiles.IsNull()) {
3778 return files.Data();
3781 //______________________________________________________________________________
3782 Bool_t AliAnalysisAlien::Submit()
3784 // Submit all master jobs.
3785 Int_t nmasterjobs = fInputFiles->GetEntries();
3786 Long_t tshoot = gSystem->Now();
3787 if (!fNsubmitted && !SubmitNext()) return kFALSE;
3788 while (fNsubmitted < nmasterjobs) {
3789 Long_t now = gSystem->Now();
3790 if ((now-tshoot)>30000) {
3792 if (!SubmitNext()) return kFALSE;
3798 //______________________________________________________________________________
3799 Bool_t AliAnalysisAlien::SubmitMerging()
3801 // Submit all merging jobs.
3802 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
3803 gGrid->Cd(fGridOutputDir);
3804 TString mergeJDLName = fExecutable;
3805 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
3807 Error("SubmitMerging", "You have to use explicit run numbers or run range to merge via JDL!");
3810 Int_t ntosubmit = fInputFiles->GetEntries();
3811 for (Int_t i=0; i<ntosubmit; i++) {
3812 TString runOutDir = gSystem->BaseName(fInputFiles->At(i)->GetName());
3813 runOutDir.ReplaceAll(".xml", "");
3814 if (fOutputToRunNo) {
3815 // The output directory is the run number
3816 printf("### Submitting merging job for run <%s>\n", runOutDir.Data());
3817 runOutDir = Form("%s/%s", fGridOutputDir.Data(), runOutDir.Data());
3819 if (!fRunNumbers.Length() && !fRunRange[0]) {
3820 // The output directory is the grid outdir
3821 printf("### Submitting merging job for the full output directory %s.\n", fGridOutputDir.Data());
3822 runOutDir = fGridOutputDir;
3824 // The output directory is the master number in 3 digits format
3825 printf("### Submitting merging job for master <%03d>\n", i);
3826 runOutDir = Form("%s/%03d",fGridOutputDir.Data(), i);
3829 // Check now the number of merging stages.
3830 TObjArray *list = fOutputFiles.Tokenize(",");
3834 while((str=(TObjString*)next())) {
3835 outputFile = str->GetString();
3836 Int_t index = outputFile.Index("@");
3837 if (index > 0) outputFile.Remove(index);
3838 if (!fMergeExcludes.Contains(outputFile) &&
3839 !fRegisterExcludes.Contains(outputFile)) break;
3842 Bool_t done = CheckMergedFiles(outputFile, runOutDir, fMaxMergeFiles, mergeJDLName);
3843 if (!done && (i==ntosubmit-1)) return kFALSE;
3844 if (!fRunNumbers.Length() && !fRunRange[0]) break;
3846 if (!ntosubmit) return kTRUE;
3848 Info("StartAnalysis", "\n #### STARTING AN ALIEN SHELL FOR YOU. You can exit any time or inspect your jobs in a different shell.##########\
3849 \n Make sure your jobs are in a final state (you can resubmit failed ones via 'masterjob <id> resubmit ERROR_ALL')\
3850 \n Rerun in 'terminate' mode to submit all merging stages, each AFTER the previous one completed. The final merged \
3851 \n output will be written to your alien output directory, while separate stages in <Stage_n>. \
3852 \n ################################################################################################################");
3853 gSystem->Exec("aliensh");
3855 Info("StartAnalysis", "\n #### STARTED MERGING JOBS FOR YOU #### \
3856 \n Make sure your jobs are in a final state (you can resubmit failed ones via 'masterjob <id> resubmit ERROR_ALL') \
3857 \n Rerun in 'terminate' mode to submit all merging stages, each AFTER the previous one completed. The final merged \
3858 \n output will be written to your alien output directory, while separate stages in <Stage_n>. \
3859 \n ################################################################################################################");
3864 //______________________________________________________________________________
3865 Bool_t AliAnalysisAlien::SubmitNext()
3867 // Submit next bunch of master jobs if the queue is free. The first master job is
3868 // submitted right away, while the next will not be unless the previous was split.
3869 // The plugin will not submit new master jobs if there are more that 500 jobs in
3871 static Bool_t iscalled = kFALSE;
3872 static Int_t firstmaster = 0;
3873 static Int_t lastmaster = 0;
3874 static Int_t npermaster = 0;
3875 if (iscalled) return kTRUE;
3877 Int_t nrunning=0, nwaiting=0, nerror=0, ndone=0;
3878 Int_t ntosubmit = 0;
3881 Int_t nmasterjobs = fInputFiles->GetEntries();
3884 if (!IsUseSubmitPolicy()) {
3886 Info("SubmitNext","### Warning submit policy not used ! Submitting too many jobs at a time may be prohibitted. \
3887 \n### You can use SetUseSubmitPolicy() to enable if you have problems.");
3888 ntosubmit = nmasterjobs;
3891 TString status = GetJobStatus(firstmaster, lastmaster, nrunning, nwaiting, nerror, ndone);
3892 printf("=== master %d: %s\n", lastmaster, status.Data());
3893 // If last master not split, just return
3894 if (status != "SPLIT") {iscalled = kFALSE; return kTRUE;}
3895 // No more than 100 waiting jobs
3896 if (nwaiting>500) {iscalled = kFALSE; return kTRUE;}
3897 npermaster = (nrunning+nwaiting+nerror+ndone)/fNsubmitted;
3898 if (npermaster) ntosubmit = (500-nwaiting)/npermaster;
3899 if (!ntosubmit) ntosubmit = 1;
3900 printf("=== WAITING(%d) RUNNING(%d) DONE(%d) OTHER(%d) NperMaster=%d => to submit %d jobs\n",
3901 nwaiting, nrunning, ndone, nerror, npermaster, ntosubmit);
3903 for (Int_t i=0; i<ntosubmit; i++) {
3904 // Submit for a range of enumeration of runs.
3905 if (fNsubmitted>=nmasterjobs) {iscalled = kFALSE; return kTRUE;}
3907 TString runOutDir = gSystem->BaseName(fInputFiles->At(fNsubmitted)->GetName());
3908 runOutDir.ReplaceAll(".xml", "");
3910 query = Form("submit %s %s %s", fJDLName.Data(), fInputFiles->At(fNsubmitted)->GetName(), runOutDir.Data());
3912 query = Form("submit %s %s %03d", fJDLName.Data(), fInputFiles->At(fNsubmitted)->GetName(), fNsubmitted);
3913 printf("********* %s\n",query.Data());
3914 res = gGrid->Command(query);
3916 TString cjobId1 = res->GetKey(0,"jobId");
3917 if (!cjobId1.Length()) {
3921 Error("StartAnalysis", "Your JDL %s could not be submitted. The message was:", fJDLName.Data());
3924 Info("StartAnalysis", "\n_______________________________________________________________________ \
3925 \n##### Your JDL %s submitted (%d to go). \nTHE JOB ID IS: %s \
3926 \n_______________________________________________________________________",
3927 fJDLName.Data(), nmasterjobs-fNsubmitted-1, cjobId1.Data());
3928 if (!fGridJobIDs.IsNull()) fGridJobIDs.Append(" ");
3929 fGridJobIDs.Append(cjobId1);
3930 if (!fGridStages.IsNull()) fGridStages.Append(" ");
3931 fGridStages.Append("full");
3934 lastmaster = cjobId1.Atoi();
3935 if (!firstmaster) firstmaster = lastmaster;
3940 Error("StartAnalysis", "No grid result after submission !!! Bailing out...");
3948 //______________________________________________________________________________
3949 void AliAnalysisAlien::WriteAnalysisFile()
3951 // Write current analysis manager into the file <analysisFile>
3952 TString analysisFile = fExecutable;
3953 analysisFile.ReplaceAll(".sh", ".root");
3954 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3955 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
3956 if (!mgr || !mgr->IsInitialized()) {
3957 Error("WriteAnalysisFile", "You need an initialized analysis manager for this");
3960 // Check analysis type
3962 if (mgr->GetMCtruthEventHandler()) TObject::SetBit(AliAnalysisGrid::kUseMC);
3963 handler = (TObject*)mgr->GetInputEventHandler();
3965 if (handler->InheritsFrom("AliMultiInputEventHandler")) {
3966 AliMultiInputEventHandler *multiIH = (AliMultiInputEventHandler*)handler;
3967 if (multiIH->GetFirstInputEventHandler()->InheritsFrom("AliESDInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseESD);
3968 if (multiIH->GetFirstInputEventHandler()->InheritsFrom("AliAODInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseAOD);
3970 if (handler->InheritsFrom("AliESDInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseESD);
3971 if (handler->InheritsFrom("AliAODInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseAOD);
3974 TDirectory *cdir = gDirectory;
3975 TFile *file = TFile::Open(analysisFile, "RECREATE");
3977 // Skip task Terminate calls for the grid job (but not in test mode, where we want to check also the terminate mode
3978 if (!TestBit(AliAnalysisGrid::kTest)) mgr->SetSkipTerminate(kTRUE);
3979 // Unless merging makes no sense
3980 if (IsSingleOutput()) mgr->SetSkipTerminate(kFALSE);
3983 // Enable termination for local jobs
3984 mgr->SetSkipTerminate(kFALSE);
3986 if (cdir) cdir->cd();
3987 Info("WriteAnalysisFile", "\n##### Analysis manager: %s wrote to file <%s>\n", mgr->GetName(),analysisFile.Data());
3989 Bool_t copy = kTRUE;
3990 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3993 TString workdir = gGrid->GetHomeDirectory();
3994 workdir += fGridWorkingDir;
3995 Info("WriteAnalysisFile", "\n##### Copying file <%s> containing your initialized analysis manager to your alien workspace", analysisFile.Data());
3996 if (FileExists(analysisFile)) gGrid->Rm(analysisFile);
3997 if (!copyLocal2Alien("WriteAnalysisFile",analysisFile.Data(),
3998 Form("%s/%s", workdir.Data(),analysisFile.Data()))) Fatal("","Terminating");
4002 //______________________________________________________________________________
4003 void AliAnalysisAlien::WriteAnalysisMacro()
4005 // Write the analysis macro that will steer the analysis in grid mode.
4006 if (!TestBit(AliAnalysisGrid::kSubmit)) {
4008 out.open(fAnalysisMacro.Data(), ios::out);
4010 Error("WriteAnalysisMacro", "could not open file %s for writing", fAnalysisMacro.Data());
4013 Bool_t hasSTEERBase = kFALSE;
4014 Bool_t hasESD = kFALSE;
4015 Bool_t hasAOD = kFALSE;
4016 Bool_t hasANALYSIS = kFALSE;
4017 Bool_t hasOADB = kFALSE;
4018 Bool_t hasANALYSISalice = kFALSE;
4019 Bool_t hasCORRFW = kFALSE;
4020 TString func = fAnalysisMacro;
4021 TString type = "ESD";
4022 TString comment = "// Analysis using ";
4027 if (IsUseMCchain()) {
4031 if (TObject::TestBit(AliAnalysisGrid::kUseESD)) comment += "ESD";
4032 if (TObject::TestBit(AliAnalysisGrid::kUseAOD)) {
4038 if (type!="AOD" && fFriendChainName!="") {
4039 Error("WriteAnalysisMacro", "Friend chain can be attached only to AOD");
4042 if (TObject::TestBit(AliAnalysisGrid::kUseMC)) comment += "/MC";
4043 else comment += " data";
4044 out << "const char *anatype = \"" << type.Data() << "\";" << endl << endl;
4045 func.ReplaceAll(".C", "");
4046 out << "void " << func.Data() << "()" << endl;
4048 out << comment.Data() << endl;
4049 out << "// Automatically generated analysis steering macro executed in grid subjobs" << endl << endl;
4050 out << " TStopwatch timer;" << endl;
4051 out << " timer.Start();" << endl << endl;
4052 // Change temp directory to current one
4053 if (!IsLocalTest()) {
4054 out << "// connect to AliEn and make the chain" << endl;
4055 out << " if (!TGrid::Connect(\"alien://\")) return;" << endl;
4057 out << "// Set temporary merging directory to current one" << endl;
4058 out << " gSystem->Setenv(\"TMPDIR\", gSystem->pwd());" << endl << endl;
4059 out << "// Set temporary compilation directory to current one" << endl;
4060 out << " gSystem->SetBuildDir(gSystem->pwd(), kTRUE);" << endl << endl;
4061 // Reset existing include path
4062 out << "// Reset existing include path and add current directory first in the search" << endl;
4063 out << " gSystem->SetIncludePath(\"-I.\");" << endl;
4064 if (!fExecutableCommand.Contains("aliroot")) {
4065 out << "// load base root libraries" << endl;
4066 out << " gSystem->Load(\"libTree\");" << endl;
4067 out << " gSystem->Load(\"libGeom\");" << endl;
4068 out << " gSystem->Load(\"libVMC\");" << endl;
4069 out << " gSystem->Load(\"libPhysics\");" << endl << endl;
4070 out << " gSystem->Load(\"libMinuit\");" << endl << endl;
4072 if (fAdditionalRootLibs.Length()) {
4073 // in principle libtree /lib geom libvmc etc. can go into this list, too
4074 out << "// Add aditional libraries" << endl;
4075 TObjArray *list = fAdditionalRootLibs.Tokenize(" ");
4078 while((str=(TObjString*)next())) {
4079 if (str->GetString().Contains(".so"))
4080 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
4082 if (list) delete list;
4084 out << "// Load analysis framework libraries" << endl;
4085 TString setupPar = "AliAnalysisAlien::SetupPar";
4087 if (!fExecutableCommand.Contains("aliroot")) {
4088 out << " gSystem->Load(\"libSTEERBase\");" << endl;
4089 out << " gSystem->Load(\"libESD\");" << endl;
4090 out << " gSystem->Load(\"libAOD\");" << endl;
4092 out << " gSystem->Load(\"libANALYSIS\");" << endl;
4093 out << " gSystem->Load(\"libOADB\");" << endl;
4094 out << " gSystem->Load(\"libANALYSISalice\");" << endl;
4095 out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
4097 TIter next(fPackages);
4100 while ((obj=next())) {
4101 pkgname = obj->GetName();
4102 if (pkgname == "STEERBase" ||
4103 pkgname == "STEERBase.par") hasSTEERBase = kTRUE;
4104 if (pkgname == "ESD" ||
4105 pkgname == "ESD.par") hasESD = kTRUE;
4106 if (pkgname == "AOD" ||
4107 pkgname == "AOD.par") hasAOD = kTRUE;
4108 if (pkgname == "ANALYSIS" ||
4109 pkgname == "ANALYSIS.par") hasANALYSIS = kTRUE;
4110 if (pkgname == "OADB" ||
4111 pkgname == "OADB.par") hasOADB = kTRUE;
4112 if (pkgname == "ANALYSISalice" ||
4113 pkgname == "ANALYSISalice.par") hasANALYSISalice = kTRUE;
4114 if (pkgname == "CORRFW" ||
4115 pkgname == "CORRFW.par") hasCORRFW = kTRUE;
4117 if (hasANALYSISalice) setupPar = "SetupPar";
4118 if (!hasSTEERBase) out << " gSystem->Load(\"libSTEERBase\");" << endl;
4119 else out << " if (!" << setupPar << "(\"STEERBase\")) return;" << endl;
4120 if (!hasESD) out << " gSystem->Load(\"libESD\");" << endl;
4121 else out << " if (!" << setupPar << "(\"ESD\")) return;" << endl;
4122 if (!hasAOD) out << " gSystem->Load(\"libAOD\");" << endl;
4123 else out << " if (!" << setupPar << "(\"AOD\")) return;" << endl;
4124 if (!hasANALYSIS) out << " gSystem->Load(\"libANALYSIS\");" << endl;
4125 else out << " if (!" << setupPar << "(\"ANALYSIS\")) return;" << endl;
4126 if (!hasOADB) out << " gSystem->Load(\"libOADB\");" << endl;
4127 else out << " if (!" << setupPar << "(\"OADB\")) return;" << endl;
4128 if (!hasANALYSISalice) out << " gSystem->Load(\"libANALYSISalice\");" << endl;
4129 else out << " if (!" << setupPar << "(\"ANALYSISalice\")) return;" << endl;
4130 if (!hasCORRFW) out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
4131 else out << " if (!" << setupPar << "(\"CORRFW\")) return;" << endl << endl;
4132 out << "// Compile other par packages" << endl;
4134 while ((obj=next())) {
4135 pkgname = obj->GetName();
4136 if (pkgname == "STEERBase" ||
4137 pkgname == "STEERBase.par" ||
4139 pkgname == "ESD.par" ||
4141 pkgname == "AOD.par" ||
4142 pkgname == "ANALYSIS" ||
4143 pkgname == "ANALYSIS.par" ||
4144 pkgname == "OADB" ||
4145 pkgname == "OADB.par" ||
4146 pkgname == "ANALYSISalice" ||
4147 pkgname == "ANALYSISalice.par" ||
4148 pkgname == "CORRFW" ||
4149 pkgname == "CORRFW.par") continue;
4150 out << " if (!" << setupPar << "(\"" << obj->GetName() << "\")) return;" << endl;
4153 out << "// include path" << endl;
4154 // Get the include path from the interpreter and remove entries pointing to AliRoot
4155 out << " TString intPath = gInterpreter->GetIncludePath();" << endl;
4156 out << " TObjArray *listpaths = intPath.Tokenize(\" \");" << endl;
4157 out << " TIter nextpath(listpaths);" << endl;
4158 out << " TObjString *pname;" << endl;
4159 out << " while ((pname=(TObjString*)nextpath())) {" << endl;
4160 out << " TString current = pname->GetName();" << endl;
4161 out << " if (current.Contains(\"AliRoot\") || current.Contains(\"ALICE_ROOT\")) continue;" << endl;
4162 out << " gSystem->AddIncludePath(current);" << endl;
4163 out << " }" << endl;
4164 out << " if (listpaths) delete listpaths;" << endl;
4165 if (fIncludePath.Length()) out << " gSystem->AddIncludePath(\"" << fIncludePath.Data() << "\");" << endl;
4166 out << " gROOT->ProcessLine(\".include $ALICE_ROOT/include\");" << endl;
4167 out << " printf(\"Include path: %s\\n\", gSystem->GetIncludePath());" << endl << endl;
4168 if (fMCLoop && !fGeneratorLibs.IsNull()) {
4169 out << "// MC generator libraries" << endl;
4170 TObjArray *list = fGeneratorLibs.Tokenize(" ");
4173 while((str=(TObjString*)next())) {
4174 out << " gSystem->Load(\"" << str->GetName() << "\");" << endl;
4178 if (fAdditionalLibs.Length()) {
4179 out << "// Add aditional AliRoot libraries" << endl;
4180 TString additionalLibs = fAdditionalLibs;
4181 additionalLibs.Strip();
4182 if (additionalLibs.Length() && fFriendLibs.Length())
4183 additionalLibs += " ";
4184 additionalLibs += fFriendLibs;
4185 TObjArray *list = additionalLibs.Tokenize(" ");
4188 while((str=(TObjString*)next())) {
4189 if (str->GetString().Contains(".so"))
4190 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
4191 if (str->GetString().Contains(".par"))
4192 out << " if (!" << setupPar << "(\"" << str->GetString() << "\")) return;" << endl;
4197 out << "// analysis source to be compiled at runtime (if any)" << endl;
4198 if (fAnalysisSource.Length()) {
4199 TObjArray *list = fAnalysisSource.Tokenize(" ");
4202 while((str=(TObjString*)next())) {
4203 out << " gROOT->ProcessLine(\".L " << str->GetString().Data() << "+g\");" << endl;
4205 if (list) delete list;
4208 // out << " printf(\"Currently load libraries:\\n\");" << endl;
4209 // out << " printf(\"%s\\n\", gSystem->GetLibraries());" << endl;
4210 if (fFastReadOption) {
4211 Warning("WriteAnalysisMacro", "!!! You requested FastRead option. Using xrootd flags to reduce timeouts in the grid jobs. This may skip some files that could be accessed !!! \
4212 \n+++ NOTE: To disable this option, use: plugin->SetFastReadOption(kFALSE)");
4213 out << "// fast xrootd reading enabled" << endl;
4214 out << " printf(\"!!! You requested FastRead option. Using xrootd flags to reduce timeouts. Note that this may skip some files that could be accessed !!!\");" << endl;
4215 out << " gEnv->SetValue(\"XNet.ConnectTimeout\",50);" << endl;
4216 out << " gEnv->SetValue(\"XNet.RequestTimeout\",50);" << endl;
4217 out << " gEnv->SetValue(\"XNet.MaxRedirectCount\",2);" << endl;
4218 out << " gEnv->SetValue(\"XNet.ReconnectTimeout\",50);" << endl;
4219 out << " gEnv->SetValue(\"XNet.FirstConnectMaxCnt\",1);" << endl << endl;
4221 out << "// read the analysis manager from file" << endl;
4222 TString analysisFile = fExecutable;
4223 analysisFile.ReplaceAll(".sh", ".root");
4224 out << " AliAnalysisManager *mgr = AliAnalysisAlien::LoadAnalysisManager(\""
4225 << analysisFile << "\");" << endl;
4226 out << " if (!mgr) return;" << endl;
4227 if (IsLocalTest()) {
4228 out << " AliAnalysisAlien *plugin = new AliAnalysisAlien();" << endl;
4229 out << " plugin->SetRunMode(\"test\");" << endl;
4230 if (fFileForTestMode.IsNull())
4231 out << " plugin->SetFileForTestMode(\"data.txt\");" << endl;
4233 out << " plugin->SetFileForTestMode(\"" << fFileForTestMode << "\");" << endl;
4234 out << " plugin->SetNtestFiles(" << fNtestFiles << ");" << endl;
4235 if (!fFriendChainName.IsNull())
4236 out << " plugin->SetFriendChainName(\"" << fFriendChainName << "\",\"" << fFriendLibs << "\");" << endl;
4238 out << " plugin->SetUseMCchain();" << endl;
4240 out << " plugin->SetMCLoop(kTRUE);" << endl;
4241 out << " mgr->SetGridHandler(plugin);" << endl;
4242 if (AliAnalysisManager::GetAnalysisManager()) {
4243 out << " mgr->SetDebugLevel(" << AliAnalysisManager::GetAnalysisManager()->GetDebugLevel() << ");" << endl;
4244 out << " mgr->SetNSysInfo(" << AliAnalysisManager::GetAnalysisManager()->GetNsysInfo() << ");" << endl;
4246 out << " mgr->SetDebugLevel(10);" << endl;
4247 out << " mgr->SetNSysInfo(100);" << endl;
4250 out << " mgr->PrintStatus();" << endl;
4251 if (AliAnalysisManager::GetAnalysisManager()) {
4252 if (AliAnalysisManager::GetAnalysisManager()->GetDebugLevel()>3) {
4253 out << " gEnv->SetValue(\"XNet.Debug\", \"1\");" << endl;
4255 if (TestBit(AliAnalysisGrid::kTest))
4256 out << " AliLog::SetGlobalLogLevel(AliLog::kWarning);" << endl;
4258 out << " AliLog::SetGlobalLogLevel(AliLog::kError);" << endl;
4261 if (!IsLocalTest()) {
4263 out << " mgr->SetCacheSize(0);" << endl;
4264 out << " mgr->EventLoop(" << fNMCevents << ");" << endl;
4266 out << " TChain *chain = CreateChain(\"wn.xml\", anatype);" << endl << endl;
4267 out << " mgr->StartAnalysis(\"localfile\", chain);" << endl;
4271 out << " mgr->SetCacheSize(0);" << endl;
4272 out << " mgr->EventLoop(" << fNMCevents << ");" << endl;
4274 out << " mgr->StartAnalysis(\"localfile\");" << endl;
4277 out << " timer.Stop();" << endl;
4278 out << " timer.Print();" << endl;
4279 out << "}" << endl << endl;
4280 if (!IsLocalTest() && !fMCLoop) {
4281 out <<"//________________________________________________________________________________" << endl;
4282 out << "TChain* CreateChain(const char *xmlfile, const char *type=\"ESD\")" << endl;
4284 out << "// Create a chain using url's from xml file" << endl;
4285 out << " TString filename;" << endl;
4286 out << " Int_t run = 0;" << endl;
4287 if (IsUseMCchain()) {
4288 out << " TString treename = \"TE\";" << endl;
4290 if (!fTreeName.IsNull()) {
4291 out << " TString treename = \"" << fTreeName << "\";" << endl;
4293 out << " TString treename = type;" << endl;
4294 out << " treename.ToLower();" << endl;
4295 out << " treename += \"Tree\";" << endl;
4298 out << " printf(\"***************************************\\n\");" << endl;
4299 out << " printf(\" Getting chain of trees %s\\n\", treename.Data());" << endl;
4300 out << " printf(\"***************************************\\n\");" << endl;
4301 out << " TAlienCollection *coll = TAlienCollection::Open(xmlfile);" << endl;
4302 out << " if (!coll) {" << endl;
4303 out << " ::Error(\"CreateChain\", \"Cannot create an AliEn collection from %s\", xmlfile);" << endl;
4304 out << " return NULL;" << endl;
4305 out << " }" << endl;
4306 out << " AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();" << endl;
4307 out << " TChain *chain = new TChain(treename);" << endl;
4308 if(!fFriendChainName.IsNull()) {
4309 out << " TList *friends = new TList();" << endl;
4310 out << " TIter nextfriend(friends);" << endl;
4311 out << " TChain *cfriend = 0;" << endl;
4312 TObjArray *list = fFriendChainName.Tokenize(" ");
4315 while((str=(TObjString*)next())) {
4316 out << " cfriend = new TChain(treename, \"" << str->GetName() << "\");" << endl;
4317 out << " friends->Add(cfriend);" << endl;
4318 out << " chain->AddFriend(cfriend);" << endl;
4321 // out << " TChain *chainFriend = new TChain(treename);" << endl;
4323 out << " coll->Reset();" << endl;
4324 out << " while (coll->Next()) {" << endl;
4325 out << " filename = coll->GetTURL("");" << endl;
4326 out << " if (mgr) {" << endl;
4327 out << " Int_t nrun = AliAnalysisManager::GetRunFromAlienPath(filename);" << endl;
4328 out << " if (nrun && nrun != run) {" << endl;
4329 out << " printf(\"### Run number detected from chain: %d\\n\", nrun);" << endl;
4330 out << " mgr->SetRunFromPath(nrun);" << endl;
4331 out << " run = nrun;" << endl;
4332 out << " }" << endl;
4333 out << " }" << endl;
4334 out << " chain->Add(filename);" << endl;
4335 if(!fFriendChainName.IsNull()) {
4336 out << " TString bpath=coll->GetTURL(\"\");" << endl;
4337 out << " if (bpath.Index(\"#\") > -1) bpath.Remove(bpath.Index(\"#\"));" << endl;
4338 out << " bpath = gSystem->DirName(bpath);" << endl;
4339 out << " bpath += \"/\";" << endl;
4340 out << " TString fileFriend;" << endl;
4341 out << " nextfriend.Reset();" << endl;
4342 out << " while ((cfriend=(TChain*)nextfriend())) {" << endl;
4343 out << " fileFriend = bpath;" << endl;
4344 out << " fileFriend += cfriend->GetTitle();" << endl;
4345 out << " TFile *file = TFile::Open(fileFriend);" << endl;
4346 out << " if (file) {" << endl;
4347 out << " file->Close();" << endl;
4348 out << " cfriend->Add(fileFriend.Data());" << endl;
4349 out << " } else {" << endl;
4350 out << " ::Fatal(\"CreateChain\", \"Cannot open friend file: %s\", fileFriend.Data());" << endl;
4351 out << " return 0;" << endl;
4352 out << " }" << endl;
4353 out << " }" << endl;
4355 out << " }" << endl;
4356 out << " if (!chain->GetNtrees()) {" << endl;
4357 out << " ::Error(\"CreateChain\", \"No tree found from collection %s\", xmlfile);" << endl;
4358 out << " return NULL;" << endl;
4359 out << " }" << endl;
4360 out << " return chain;" << endl;
4361 out << "}" << endl << endl;
4363 if (hasANALYSISalice) {
4364 out <<"//________________________________________________________________________________" << endl;
4365 out << "Bool_t SetupPar(const char *package) {" << endl;
4366 out << "// Compile the package and set it up." << endl;
4367 out << " TString pkgdir = package;" << endl;
4368 out << " pkgdir.ReplaceAll(\".par\",\"\");" << endl;
4369 out << " gSystem->Exec(TString::Format(\"tar xvzf %s.par\", pkgdir.Data()));" << endl;
4370 out << " TString cdir = gSystem->WorkingDirectory();" << endl;
4371 out << " gSystem->ChangeDirectory(pkgdir);" << endl;
4372 out << " // Check for BUILD.sh and execute" << endl;
4373 out << " if (!gSystem->AccessPathName(\"PROOF-INF/BUILD.sh\")) {" << endl;
4374 out << " printf(\"*******************************\\n\");" << endl;
4375 out << " printf(\"*** Building PAR archive ***\\n\");" << endl;
4376 out << " printf(\"*******************************\\n\");" << endl;
4377 out << " if (gSystem->Exec(\"PROOF-INF/BUILD.sh\")) {" << endl;
4378 out << " ::Error(\"SetupPar\", \"Cannot build par archive %s\", pkgdir.Data());" << endl;
4379 out << " gSystem->ChangeDirectory(cdir);" << endl;
4380 out << " return kFALSE;" << endl;
4381 out << " }" << endl;
4382 out << " } else {" << endl;
4383 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/BUILD.sh for package %s\", pkgdir.Data());" << endl;
4384 out << " gSystem->ChangeDirectory(cdir);" << endl;
4385 out << " return kFALSE;" << endl;
4386 out << " }" << endl;
4387 out << " // Check for SETUP.C and execute" << endl;
4388 out << " if (!gSystem->AccessPathName(\"PROOF-INF/SETUP.C\")) {" << endl;
4389 out << " printf(\"*******************************\\n\");" << endl;
4390 out << " printf(\"*** Setup PAR archive ***\\n\");" << endl;
4391 out << " printf(\"*******************************\\n\");" << endl;
4392 out << " gROOT->Macro(\"PROOF-INF/SETUP.C\");" << endl;
4393 out << " } else {" << endl;
4394 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/SETUP.C for package %s\", pkgdir.Data());" << endl;
4395 out << " gSystem->ChangeDirectory(cdir);" << endl;
4396 out << " return kFALSE;" << endl;
4397 out << " }" << endl;
4398 out << " // Restore original workdir" << endl;
4399 out << " gSystem->ChangeDirectory(cdir);" << endl;
4400 out << " return kTRUE;" << endl;
4403 Info("WriteAnalysisMacro", "\n##### Analysis macro to run on worker nodes <%s> written",fAnalysisMacro.Data());
4405 Bool_t copy = kTRUE;
4406 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
4409 TString workdir = gGrid->GetHomeDirectory();
4410 workdir += fGridWorkingDir;
4411 if (FileExists(fAnalysisMacro)) gGrid->Rm(fAnalysisMacro);
4412 Info("WriteAnalysisMacro", "\n##### Copying analysis macro: <%s> to your alien workspace", fAnalysisMacro.Data());
4413 // TFile::Cp(Form("file:%s",fAnalysisMacro.Data()), Form("alien://%s/%s", workdir.Data(), fAnalysisMacro.Data()));
4414 if (!copyLocal2Alien("WriteAnalysisMacro",fAnalysisMacro.Data(),
4415 Form("alien://%s/%s", workdir.Data(),
4416 fAnalysisMacro.Data()))) Fatal("","Terminating");
4420 //______________________________________________________________________________
4421 void AliAnalysisAlien::WriteMergingMacro()
4423 // Write a macro to merge the outputs per master job.
4424 if (!fMergeViaJDL) return;
4425 if (!fOutputFiles.Length()) {
4426 Error("WriteMergingMacro", "No output file names defined. Are you running the right AliAnalysisAlien configuration ?");
4429 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
4430 TString mergingMacro = fExecutable;
4431 mergingMacro.ReplaceAll(".sh","_merge.C");
4432 if (gGrid && !fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
4433 if (!TestBit(AliAnalysisGrid::kSubmit)) {
4435 out.open(mergingMacro.Data(), ios::out);
4437 Error("WriteMergingMacro", "could not open file %s for writing", fAnalysisMacro.Data());
4440 Bool_t hasSTEERBase = kFALSE;
4441 Bool_t hasESD = kFALSE;
4442 Bool_t hasAOD = kFALSE;
4443 Bool_t hasANALYSIS = kFALSE;
4444 Bool_t hasOADB = kFALSE;
4445 Bool_t hasANALYSISalice = kFALSE;
4446 Bool_t hasCORRFW = kFALSE;
4447 TString func = mergingMacro;
4449 func.ReplaceAll(".C", "");
4450 out << "void " << func.Data() << "(const char *dir, Int_t stage=0)" << endl;
4452 out << "// Automatically generated merging macro executed in grid subjobs" << endl << endl;
4453 out << " TStopwatch timer;" << endl;
4454 out << " timer.Start();" << endl << endl;
4455 // Reset existing include path
4456 out << "// Reset existing include path and add current directory first in the search" << endl;
4457 out << " gSystem->SetIncludePath(\"-I.\");" << endl;
4458 if (!fExecutableCommand.Contains("aliroot")) {
4459 out << "// load base root libraries" << endl;
4460 out << " gSystem->Load(\"libTree\");" << endl;
4461 out << " gSystem->Load(\"libGeom\");" << endl;
4462 out << " gSystem->Load(\"libVMC\");" << endl;
4463 out << " gSystem->Load(\"libPhysics\");" << endl << endl;
4464 out << " gSystem->Load(\"libMinuit\");" << endl << endl;
4466 if (fAdditionalRootLibs.Length()) {
4467 // in principle libtree /lib geom libvmc etc. can go into this list, too
4468 out << "// Add aditional libraries" << endl;
4469 TObjArray *list = fAdditionalRootLibs.Tokenize(" ");
4472 while((str=(TObjString*)next())) {
4473 if (str->GetString().Contains(".so"))
4474 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
4476 if (list) delete list;
4478 out << "// Load analysis framework libraries" << endl;
4480 if (!fExecutableCommand.Contains("aliroot")) {
4481 out << " gSystem->Load(\"libSTEERBase\");" << endl;
4482 out << " gSystem->Load(\"libESD\");" << endl;
4483 out << " gSystem->Load(\"libAOD\");" << endl;
4485 out << " gSystem->Load(\"libANALYSIS\");" << endl;
4486 out << " gSystem->Load(\"libOADB\");" << endl;
4487 out << " gSystem->Load(\"libANALYSISalice\");" << endl;
4488 out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
4490 TIter next(fPackages);
4493 TString setupPar = "AliAnalysisAlien::SetupPar";
4494 while ((obj=next())) {
4495 pkgname = obj->GetName();
4496 if (pkgname == "STEERBase" ||
4497 pkgname == "STEERBase.par") hasSTEERBase = kTRUE;
4498 if (pkgname == "ESD" ||
4499 pkgname == "ESD.par") hasESD = kTRUE;
4500 if (pkgname == "AOD" ||
4501 pkgname == "AOD.par") hasAOD = kTRUE;
4502 if (pkgname == "ANALYSIS" ||
4503 pkgname == "ANALYSIS.par") hasANALYSIS = kTRUE;
4504 if (pkgname == "OADB" ||
4505 pkgname == "OADB.par") hasOADB = kTRUE;
4506 if (pkgname == "ANALYSISalice" ||
4507 pkgname == "ANALYSISalice.par") hasANALYSISalice = kTRUE;
4508 if (pkgname == "CORRFW" ||
4509 pkgname == "CORRFW.par") hasCORRFW = kTRUE;
4511 if (hasANALYSISalice) setupPar = "SetupPar";
4512 if (!hasSTEERBase) out << " gSystem->Load(\"libSTEERBase\");" << endl;
4513 else out << " if (!" << setupPar << "(\"STEERBase\")) return;" << endl;
4514 if (!hasESD) out << " gSystem->Load(\"libESD\");" << endl;
4515 else out << " if (!" << setupPar << "(\"ESD\")) return;" << endl;
4516 if (!hasAOD) out << " gSystem->Load(\"libAOD\");" << endl;
4517 else out << " if (!" << setupPar << "(\"AOD\")) return;" << endl;
4518 out << " gSystem->Load(\"libOADB\");" << endl;
4519 if (!hasANALYSIS) out << " gSystem->Load(\"libANALYSIS\");" << endl;
4520 else out << " if (!" << setupPar << "(\"ANALYSIS\")) return;" << endl;
4521 if (!hasOADB) out << " gSystem->Load(\"libOADB\");" << endl;
4522 else out << " if (!" << setupPar << "(\"OADB\")) return;" << endl;
4523 if (!hasANALYSISalice) out << " gSystem->Load(\"libANALYSISalice\");" << endl;
4524 else out << " if (!" << setupPar << "(\"ANALYSISalice\")) return;" << endl;
4525 if (!hasCORRFW) out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
4526 else out << " if (!" << setupPar << "(\"CORRFW\")) return;" << endl << endl;
4527 out << "// Compile other par packages" << endl;
4529 while ((obj=next())) {
4530 pkgname = obj->GetName();
4531 if (pkgname == "STEERBase" ||
4532 pkgname == "STEERBase.par" ||
4534 pkgname == "ESD.par" ||
4536 pkgname == "AOD.par" ||
4537 pkgname == "ANALYSIS" ||
4538 pkgname == "ANALYSIS.par" ||
4539 pkgname == "OADB" ||
4540 pkgname == "OADB.par" ||
4541 pkgname == "ANALYSISalice" ||
4542 pkgname == "ANALYSISalice.par" ||
4543 pkgname == "CORRFW" ||
4544 pkgname == "CORRFW.par") continue;
4545 out << " if (!" << setupPar << "(\"" << obj->GetName() << "\")) return;" << endl;
4548 out << "// include path" << endl;
4549 // Get the include path from the interpreter and remove entries pointing to AliRoot
4550 out << " TString intPath = gInterpreter->GetIncludePath();" << endl;
4551 out << " TObjArray *listpaths = intPath.Tokenize(\" \");" << endl;
4552 out << " TIter nextpath(listpaths);" << endl;
4553 out << " TObjString *pname;" << endl;
4554 out << " while ((pname=(TObjString*)nextpath())) {" << endl;
4555 out << " TString current = pname->GetName();" << endl;
4556 out << " if (current.Contains(\"AliRoot\") || current.Contains(\"ALICE_ROOT\")) continue;" << endl;
4557 out << " gSystem->AddIncludePath(current);" << endl;
4558 out << " }" << endl;
4559 out << " if (listpaths) delete listpaths;" << endl;
4560 if (fIncludePath.Length()) out << " gSystem->AddIncludePath(\"" << fIncludePath.Data() << "\");" << endl;
4561 out << " gROOT->ProcessLine(\".include $ALICE_ROOT/include\");" << endl;
4562 out << " printf(\"Include path: %s\\n\", gSystem->GetIncludePath());" << endl << endl;
4563 if (fMCLoop && !fGeneratorLibs.IsNull()) {
4564 out << "// MC generator libraries" << endl;
4565 TObjArray *list = fGeneratorLibs.Tokenize(" ");
4568 while((str=(TObjString*)next())) {
4569 out << " gSystem->Load(\"" << str->GetName() << "\");" << endl;
4573 if (fAdditionalLibs.Length()) {
4574 out << "// Add aditional AliRoot libraries" << endl;
4575 TString additionalLibs = fAdditionalLibs;
4576 additionalLibs.Strip();
4577 if (additionalLibs.Length() && fFriendLibs.Length())
4578 additionalLibs += " ";
4579 additionalLibs += fFriendLibs;
4580 TObjArray *list = additionalLibs.Tokenize(" ");
4583 while((str=(TObjString*)next())) {
4584 if (str->GetString().Contains(".so"))
4585 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
4587 if (list) delete list;
4590 out << "// Analysis source to be compiled at runtime (if any)" << endl;
4591 if (fAnalysisSource.Length()) {
4592 TObjArray *list = fAnalysisSource.Tokenize(" ");
4595 while((str=(TObjString*)next())) {
4596 out << " gROOT->ProcessLine(\".L " << str->GetString().Data() << "+g\");" << endl;
4598 if (list) delete list;
4602 if (fFastReadOption) {
4603 Warning("WriteMergingMacro", "!!! You requested FastRead option. Using xrootd flags to reduce timeouts in the grid merging jobs. Note that this may skip some files that could be accessed !!!");
4604 out << "// fast xrootd reading enabled" << endl;
4605 out << " printf(\"!!! You requested FastRead option. Using xrootd flags to reduce timeouts. Note that this may skip some files that could be accessed !!!\");" << endl;
4606 out << " gEnv->SetValue(\"XNet.ConnectTimeout\",50);" << endl;
4607 out << " gEnv->SetValue(\"XNet.RequestTimeout\",50);" << endl;
4608 out << " gEnv->SetValue(\"XNet.MaxRedirectCount\",2);" << endl;
4609 out << " gEnv->SetValue(\"XNet.ReconnectTimeout\",50);" << endl;
4610 out << " gEnv->SetValue(\"XNet.FirstConnectMaxCnt\",1);" << endl << endl;
4612 // Change temp directory to current one
4613 out << "// Connect to AliEn" << endl;
4614 out << " if (!TGrid::Connect(\"alien://\")) return;" << endl;
4615 out << "// Set temporary merging directory to current one" << endl;
4616 out << " gSystem->Setenv(\"TMPDIR\", gSystem->pwd());" << endl << endl;
4617 out << "// Set temporary compilation directory to current one" << endl;
4618 out << " gSystem->SetBuildDir(gSystem->pwd(), kTRUE);" << endl << endl;
4619 out << " TString outputDir = dir;" << endl;
4620 out << " TString outputFiles = \"" << GetListOfFiles("out") << "\";" << endl;
4621 out << " TString mergeExcludes = \"" << fMergeExcludes << " " << fRegisterExcludes << "\";" << endl;
4622 out << " TObjArray *list = outputFiles.Tokenize(\",\");" << endl;
4623 out << " TIter *iter = new TIter(list);" << endl;
4624 out << " TObjString *str;" << endl;
4625 out << " TString outputFile;" << endl;
4626 out << " Bool_t merged = kTRUE;" << endl;
4627 TString analysisFile = fExecutable;
4628 analysisFile.ReplaceAll(".sh", ".root");
4629 out << " AliAnalysisManager *mgr = AliAnalysisAlien::LoadAnalysisManager(\""
4630 << analysisFile << "\");" << endl;
4631 out << " if (!mgr) {" << endl;
4632 out << " printf(\"ERROR: Analysis manager could not be extracted from file \");" << endl;
4633 out << " return;" << endl;
4634 out << " }" << endl;
4635 if (IsLocalTest()) {
4636 out << " printf(\"===================================\n\");" << endl;
4637 out << " printf(\"Testing merging...\\n\");" << endl;
4638 out << " printf(\"===================================\n\");" << endl;
4640 out << " while((str=(TObjString*)iter->Next())) {" << endl;
4641 out << " outputFile = str->GetString();" << endl;
4642 out << " if (outputFile.Contains(\"*\")) continue;" << endl;
4643 out << " Int_t index = outputFile.Index(\"@\");" << endl;
4644 out << " if (index > 0) outputFile.Remove(index);" << endl;
4645 out << " // Skip already merged outputs" << endl;
4646 out << " if (!gSystem->AccessPathName(outputFile)) {" << endl;
4647 out << " printf(\"Output file <%s> found. Not merging again.\\n\",outputFile.Data());" << endl;
4648 out << " continue;" << endl;
4649 out << " }" << endl;
4650 out << " if (mergeExcludes.Contains(outputFile.Data())) continue;" << endl;
4651 out << " merged = AliAnalysisAlien::MergeOutput(outputFile, outputDir, " << fMaxMergeFiles << ", stage);" << endl;
4652 out << " if (!merged) {" << endl;
4653 out << " printf(\"ERROR: Cannot merge %s\\n\", outputFile.Data());" << endl;
4654 out << " return;" << endl;
4655 out << " }" << endl;
4656 out << " }" << endl;
4657 if (mgr && mgr->IsCollectThroughput() && !IsLocalTest()) {
4658 out << " TString infolog = \"" << mgr->GetFileInfoLog() << "\";" << endl;
4659 out << " AliAnalysisAlien::MergeInfo(infolog, outputDir);" << endl;
4661 out << " // all outputs merged, validate" << endl;
4662 out << " ofstream out;" << endl;
4663 out << " out.open(\"outputs_valid\", ios::out);" << endl;
4664 out << " out.close();" << endl;
4665 out << " // read the analysis manager from file" << endl;
4666 if (IsLocalTest()) {
4667 out << " printf(\"===================================\n\");" << endl;
4668 out << " printf(\"Testing Terminate()...\\n\");" << endl;
4669 out << " printf(\"===================================\n\");" << endl;
4671 out << " if (!outputDir.Contains(\"Stage\")) return;" << endl;
4673 out << " mgr->SetRunFromPath(mgr->GetRunFromAlienPath(dir));" << endl;
4674 out << " mgr->SetSkipTerminate(kFALSE);" << endl;
4675 out << " mgr->PrintStatus();" << endl;
4677 if (mgr->GetDebugLevel()>3) {
4678 out << " gEnv->SetValue(\"XNet.Debug\", \"1\");" << endl;
4680 if (TestBit(AliAnalysisGrid::kTest))
4681 out << " AliLog::SetGlobalLogLevel(AliLog::kWarning);" << endl;
4683 out << " AliLog::SetGlobalLogLevel(AliLog::kError);" << endl;
4686 out << " TTree *tree = NULL;" << endl;
4687 out << " mgr->StartAnalysis(\"gridterminate\", tree);" << endl;
4688 out << "}" << endl << endl;
4689 if (hasANALYSISalice) {
4690 out <<"//________________________________________________________________________________" << endl;
4691 out << "Bool_t SetupPar(const char *package) {" << endl;
4692 out << "// Compile the package and set it up." << endl;
4693 out << " TString pkgdir = package;" << endl;
4694 out << " pkgdir.ReplaceAll(\".par\",\"\");" << endl;
4695 out << " gSystem->Exec(TString::Format(\"tar xvzf %s.par\", pkgdir.Data()));" << endl;
4696 out << " TString cdir = gSystem->WorkingDirectory();" << endl;
4697 out << " gSystem->ChangeDirectory(pkgdir);" << endl;
4698 out << " // Check for BUILD.sh and execute" << endl;
4699 out << " if (!gSystem->AccessPathName(\"PROOF-INF/BUILD.sh\")) {" << endl;
4700 out << " printf(\"*******************************\\n\");" << endl;
4701 out << " printf(\"*** Building PAR archive ***\\n\");" << endl;
4702 out << " printf(\"*******************************\\n\");" << endl;
4703 out << " if (gSystem->Exec(\"PROOF-INF/BUILD.sh\")) {" << endl;
4704 out << " ::Error(\"SetupPar\", \"Cannot build par archive %s\", pkgdir.Data());" << endl;
4705 out << " gSystem->ChangeDirectory(cdir);" << endl;
4706 out << " return kFALSE;" << endl;
4707 out << " }" << endl;
4708 out << " } else {" << endl;
4709 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/BUILD.sh for package %s\", pkgdir.Data());" << endl;
4710 out << " gSystem->ChangeDirectory(cdir);" << endl;
4711 out << " return kFALSE;" << endl;
4712 out << " }" << endl;
4713 out << " // Check for SETUP.C and execute" << endl;
4714 out << " if (!gSystem->AccessPathName(\"PROOF-INF/SETUP.C\")) {" << endl;
4715 out << " printf(\"*******************************\\n\");" << endl;
4716 out << " printf(\"*** Setup PAR archive ***\\n\");" << endl;
4717 out << " printf(\"*******************************\\n\");" << endl;
4718 out << " gROOT->Macro(\"PROOF-INF/SETUP.C\");" << endl;
4719 out << " } else {" << endl;
4720 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/SETUP.C for package %s\", pkgdir.Data());" << endl;
4721 out << " gSystem->ChangeDirectory(cdir);" << endl;
4722 out << " return kFALSE;" << endl;
4723 out << " }" << endl;
4724 out << " // Restore original workdir" << endl;
4725 out << " gSystem->ChangeDirectory(cdir);" << endl;
4726 out << " return kTRUE;" << endl;
4730 Bool_t copy = kTRUE;
4731 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
4734 TString workdir = gGrid->GetHomeDirectory();
4735 workdir += fGridWorkingDir;
4736 if (FileExists(mergingMacro)) gGrid->Rm(mergingMacro);
4737 Info("WriteMergingMacro", "\n##### Copying merging macro: <%s> to your alien workspace", mergingMacro.Data());
4738 // TFile::Cp(Form("file:%s",mergingMacro.Data()), Form("alien://%s/%s", workdir.Data(), mergingMacro.Data()));
4739 if (!copyLocal2Alien("WriteMergeMacro",mergingMacro.Data(),
4740 Form("%s/%s", workdir.Data(), mergingMacro.Data()))) Fatal("","Terminating");
4744 //______________________________________________________________________________
4745 Bool_t AliAnalysisAlien::SetupPar(const char *package)
4747 // Compile the par file archive pointed by <package>. This must be present in the current directory.
4748 // Note that for loading the compiled library. The current directory should have precedence in
4750 TString pkgdir = package;
4751 pkgdir.ReplaceAll(".par","");
4752 gSystem->Exec(TString::Format("tar xzf %s.par", pkgdir.Data()));
4753 TString cdir = gSystem->WorkingDirectory();
4754 gSystem->ChangeDirectory(pkgdir);
4755 // Check for BUILD.sh and execute
4756 if (!gSystem->AccessPathName("PROOF-INF/BUILD.sh")) {
4757 printf("**************************************************\n");
4758 printf("*** Building PAR archive %s\n", package);
4759 printf("**************************************************\n");
4760 if (gSystem->Exec("PROOF-INF/BUILD.sh")) {
4761 ::Error("SetupPar", "Cannot build par archive %s", pkgdir.Data());
4762 gSystem->ChangeDirectory(cdir);
4766 ::Error("SetupPar","Cannot access PROOF-INF/BUILD.sh for package %s", pkgdir.Data());
4767 gSystem->ChangeDirectory(cdir);
4770 // Check for SETUP.C and execute
4771 if (!gSystem->AccessPathName("PROOF-INF/SETUP.C")) {
4772 printf("**************************************************\n");
4773 printf("*** Setup PAR archive %s\n", package);
4774 printf("**************************************************\n");
4775 gROOT->Macro("PROOF-INF/SETUP.C");
4776 printf("*** Loaded library: %s\n", gSystem->GetLibraries(pkgdir,"",kFALSE));
4778 ::Error("SetupPar","Cannot access PROOF-INF/SETUP.C for package %s", pkgdir.Data());
4779 gSystem->ChangeDirectory(cdir);
4782 // Restore original workdir
4783 gSystem->ChangeDirectory(cdir);
4787 //______________________________________________________________________________
4788 void AliAnalysisAlien::WriteExecutable()
4790 // Generate the alien executable script.
4791 // Patch executable with -x to catch error code
4792 if (fExecutableCommand.Contains("root") &&
4793 fExecutableCommand.Contains("-q") &&
4794 !fExecutableCommand.Contains("-x")) fExecutableCommand += " -x";
4795 if (!TestBit(AliAnalysisGrid::kSubmit)) {
4797 out.open(fExecutable.Data(), ios::out);
4799 Error("WriteExecutable", "Bad file name for executable: %s", fExecutable.Data());
4802 out << "#!/bin/bash" << endl;
4803 // Make sure we can properly compile par files
4804 out << "export LD_LIBRARY_PATH=.:$LD_LIBRARY_PATH" << endl;
4805 out << "echo \"=========================================\"" << endl;
4806 out << "echo \"############## PATH : ##############\"" << endl;
4807 out << "echo $PATH" << endl;
4808 out << "echo \"############## LD_LIBRARY_PATH : ##############\"" << endl;
4809 out << "echo $LD_LIBRARY_PATH" << endl;
4810 out << "echo \"############## ROOTSYS : ##############\"" << endl;
4811 out << "echo $ROOTSYS" << endl;
4812 out << "echo \"############## which root : ##############\"" << endl;
4813 out << "which root" << endl;
4814 out << "echo \"############## ALICE_ROOT : ##############\"" << endl;
4815 out << "echo $ALICE_ROOT" << endl;
4816 out << "echo \"############## which aliroot : ##############\"" << endl;
4817 out << "which aliroot" << endl;
4818 out << "echo \"############## system limits : ##############\"" << endl;
4819 out << "ulimit -a" << endl;
4820 out << "echo \"############## memory : ##############\"" << endl;
4821 out << "free -m" << endl;
4822 out << "echo \"=========================================\"" << endl << endl;
4823 out << fExecutableCommand << " ";
4824 out << fAnalysisMacro.Data() << " " << fExecutableArgs.Data() << endl;
4825 out << "RET=$?" << endl;
4826 out << "if [ \"$RET\" != \"0\" ];then" << endl;
4827 out << " echo \"======== ERROR : " << fAnalysisMacro.Data() << " finished with NON zero code: $RET ========\"" << endl;
4828 out << " if [ \"$RET\" -gt 128 ] && [ \"$RET\" -lt 160 ]; then"<<endl;
4829 out << " let sig=\"$RET - 128\""<<endl;
4830 out << " sigs='HUP INT QUIT ILL TRAP ABRT BUS FPE"<<endl;
4831 out << " KILL USR1 SEGV USR2 PIPE ALRM TERM STKFLT"<<endl;
4832 out << " CHLD CONT STOP TSTP TTIN TTOU URG XCPU"<<endl;
4833 out << " XFSZ VTALRM PROF WINCH IO PWR SYS'"<<endl;
4834 out << " sig=SIG`echo $sigs | awk '{ print $'\"$sig\"' }'`"<<endl;
4835 out << " echo \"======== it appears to have been killed with signal: $sig ========\""<<endl;
4837 out << " exit $RET"<< endl;
4838 out << "fi" << endl << endl ;
4839 out << "echo \"======== " << fAnalysisMacro.Data() << " finished with exit code: $RET ========\"" << endl;
4840 out << "echo \"############## memory after: ##############\"" << endl;
4841 out << "free -m" << endl;
4843 Bool_t copy = kTRUE;
4844 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
4847 TString workdir = gGrid->GetHomeDirectory();
4848 workdir += fGridWorkingDir;
4849 TString executable = TString::Format("%s/%s", workdir.Data(), fExecutable.Data());
4850 if (FileExists(executable)) gGrid->Rm(executable);
4851 Info("WriteExecutable", "\n##### Copying executable file <%s> to your AliEn bin directory", fExecutable.Data());
4852 if (!copyLocal2Alien("WriteExecutable",fExecutable.Data(),
4853 executable.Data())) Fatal("","Terminating");
4857 //______________________________________________________________________________
4858 void AliAnalysisAlien::WriteMergeExecutable()
4860 // Generate the alien executable script for the merging job.
4861 if (!fMergeViaJDL) return;
4862 TString mergeExec = fExecutable;
4863 mergeExec.ReplaceAll(".sh", "_merge.sh");
4864 if (!TestBit(AliAnalysisGrid::kSubmit)) {
4866 out.open(mergeExec.Data(), ios::out);
4868 Error("WriteMergingExecutable", "Bad file name for executable: %s", mergeExec.Data());
4871 out << "#!/bin/bash" << endl;
4872 // Make sure we can properly compile par files
4873 out << "export LD_LIBRARY_PATH=.:$LD_LIBRARY_PATH" << endl;
4874 out << "echo \"=========================================\"" << endl;
4875 out << "echo \"############## PATH : ##############\"" << endl;
4876 out << "echo $PATH" << endl;
4877 out << "echo \"############## LD_LIBRARY_PATH : ##############\"" << endl;
4878 out << "echo $LD_LIBRARY_PATH" << endl;
4879 out << "echo \"############## ROOTSYS : ##############\"" << endl;
4880 out << "echo $ROOTSYS" << endl;
4881 out << "echo \"############## which root : ##############\"" << endl;
4882 out << "which root" << endl;
4883 out << "echo \"############## ALICE_ROOT : ##############\"" << endl;
4884 out << "echo $ALICE_ROOT" << endl;
4885 out << "echo \"############## which aliroot : ##############\"" << endl;
4886 out << "which aliroot" << endl;
4887 out << "echo \"############## system limits : ##############\"" << endl;
4888 out << "ulimit -a" << endl;
4889 out << "echo \"############## memory : ##############\"" << endl;
4890 out << "free -m" << endl;
4891 out << "echo \"=========================================\"" << endl << endl;
4892 TString mergeMacro = fExecutable;
4893 mergeMacro.ReplaceAll(".sh", "_merge.C");
4894 if (IsOneStageMerging())
4895 out << "export ARG=\"" << mergeMacro << "(\\\"$1\\\")\"" << endl;
4897 out << "export ARG=\"" << mergeMacro << "(\\\"$1\\\",$2)\"" << endl;
4898 out << fExecutableCommand << " " << "$ARG" << endl;
4899 out << "RET=$?" << endl;
4900 out << "if [ \"$RET\" != \"0\" ];then" << endl;
4901 out << " echo \"======== ERROR : " << fAnalysisMacro.Data() << " finished with NON zero code: $RET ========\"" << endl;
4902 out << " if [ \"$RET\" -gt 128 ] && [ \"$RET\" -lt 160 ]; then"<<endl;
4903 out << " let sig=\"$RET - 128\""<<endl;
4904 out << " sigs='HUP INT QUIT ILL TRAP ABRT BUS FPE"<<endl;
4905 out << " KILL USR1 SEGV USR2 PIPE ALRM TERM STKFLT"<<endl;
4906 out << " CHLD CONT STOP TSTP TTIN TTOU URG XCPU"<<endl;
4907 out << " XFSZ VTALRM PROF WINCH IO PWR SYS'"<<endl;
4908 out << " sig=SIG`echo $sigs | awk '{ print $'\"$sig\"' }'`"<<endl;
4909 out << " echo \"======== it appears to have been killed with signal: $sig ========\""<<endl;
4911 out << " exit $RET"<< endl;
4912 out << "fi" << endl << endl ;
4913 out << "echo \"======== " << mergeMacro.Data() << " finished with exit code: $? ========\"" << endl;
4914 out << "echo \"############## memory after: ##############\"" << endl;
4915 out << "free -m" << endl;
4917 Bool_t copy = kTRUE;
4918 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
4921 TString workdir = gGrid->GetHomeDirectory();
4922 workdir += fGridWorkingDir;
4923 TString executable = TString::Format("%s/%s", workdir.Data(), mergeExec.Data());
4924 if (FileExists(executable)) gGrid->Rm(executable);
4925 Info("WriteMergeExecutable", "\n##### Copying executable file <%s> to your AliEn bin directory", mergeExec.Data());
4926 if (!copyLocal2Alien("WriteMergeExecutable",mergeExec.Data(),
4927 executable.Data())) Fatal("","Terminating");
4931 //______________________________________________________________________________
4932 void AliAnalysisAlien::WriteProductionFile(const char *filename) const
4934 // Write the production file to be submitted by LPM manager. The format is:
4935 // First line: full_path_to_jdl estimated_no_subjobs_per_master
4936 // Next lines: full_path_to_dataset XXX (XXX is a string)
4937 // To submit, one has to: submit jdl XXX for all lines
4939 out.open(filename, ios::out);
4941 Error("WriteProductionFile", "Bad file name: %s", filename);
4945 if (!fProductionMode && !fGridWorkingDir.BeginsWith("/alice"))
4946 workdir = gGrid->GetHomeDirectory();
4947 workdir += fGridWorkingDir;
4948 Int_t njobspermaster = 1000*fNrunsPerMaster/fSplitMaxInputFileNumber;
4949 TString locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
4950 out << locjdl << " " << njobspermaster << endl;
4951 Int_t nmasterjobs = fInputFiles->GetEntries();
4952 for (Int_t i=0; i<nmasterjobs; i++) {
4953 TString runOutDir = gSystem->BaseName(fInputFiles->At(i)->GetName());
4954 runOutDir.ReplaceAll(".xml", "");
4956 out << Form("%s", fInputFiles->At(i)->GetName()) << " " << runOutDir << endl;
4958 out << Form("%s", fInputFiles->At(i)->GetName()) << " " << Form("%03d", i) << endl;
4961 Info("WriteProductionFile", "\n##### Copying production file <%s> to your work directory", filename);
4962 if (FileExists(filename)) gGrid->Rm(filename);
4963 // TFile::Cp(Form("file:%s",filename), Form("alien://%s/%s", workdir.Data(),filename));
4964 if (!copyLocal2Alien("WriteProductionFile", filename,
4965 Form("%s/%s", workdir.Data(),filename))) Fatal("","Terminating");
4969 //______________________________________________________________________________
4970 void AliAnalysisAlien::WriteValidationScript(Bool_t merge)
4972 // Generate the alien validation script.
4973 // Generate the validation script
4975 if (fValidationScript.IsNull()) {
4976 fValidationScript = fExecutable;
4977 fValidationScript.ReplaceAll(".sh", "_validation.sh");
4979 TString validationScript = fValidationScript;
4980 if (merge) validationScript.ReplaceAll(".sh", "_merge.sh");
4982 Error("WriteValidationScript", "Alien connection required");
4985 if (!fTerminateFiles.IsNull()) {
4986 fTerminateFiles.Strip();
4987 fTerminateFiles.ReplaceAll(" ",",");
4989 TString outStream = "";
4990 if (!TestBit(AliAnalysisGrid::kTest)) outStream = " >> stdout";
4991 if (!TestBit(AliAnalysisGrid::kSubmit)) {
4993 out.open(validationScript, ios::out);
4994 out << "#!/bin/bash" << endl;
4995 out << "##################################################" << endl;
4996 out << "validateout=`dirname $0`" << endl;
4997 out << "validatetime=`date`" << endl;
4998 out << "validated=\"0\";" << endl;
4999 out << "error=0" << endl;
5000 out << "if [ -z $validateout ]" << endl;
5001 out << "then" << endl;
5002 out << " validateout=\".\"" << endl;
5003 out << "fi" << endl << endl;
5004 out << "cd $validateout;" << endl;
5005 out << "validateworkdir=`pwd`;" << endl << endl;
5006 out << "echo \"*******************************************************\"" << outStream << endl;
5007 out << "echo \"* Automatically generated validation script *\"" << outStream << endl;
5009 out << "echo \"* Time: $validatetime \"" << outStream << endl;
5010 out << "echo \"* Dir: $validateout\"" << outStream << endl;
5011 out << "echo \"* Workdir: $validateworkdir\"" << outStream << endl;
5012 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl;
5013 out << "ls -la ./" << outStream << endl;
5014 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl << endl;
5015 out << "##################################################" << endl;
5018 out << "if [ ! -f stderr ] ; then" << endl;
5019 out << " error=1" << endl;
5020 out << " echo \"* ########## Job not validated - no stderr ###\" " << outStream << endl;
5021 out << " echo \"Error = $error\" " << outStream << endl;
5022 out << "fi" << endl;
5024 out << "parArch=`grep -Ei \"Cannot Build the PAR Archive\" stderr`" << endl;
5025 out << "segViol=`grep -Ei \"Segmentation violation\" stderr`" << endl;
5026 out << "segFault=`grep -Ei \"Segmentation fault\" stderr`" << endl;
5027 out << "glibcErr=`grep -Ei \"*** glibc detected ***\" stderr`" << endl;
5030 out << "if [ \"$parArch\" != \"\" ] ; then" << endl;
5031 out << " error=1" << endl;
5032 out << " echo \"* ########## Job not validated - PAR archive not built ###\" " << outStream << endl;
5033 out << " echo \"$parArch\" " << outStream << endl;
5034 out << " echo \"Error = $error\" " << outStream << endl;
5035 out << "fi" << endl;
5037 out << "if [ \"$segViol\" != \"\" ] ; then" << endl;
5038 out << " error=1" << endl;
5039 out << " echo \"* ########## Job not validated - Segment. violation ###\" " << outStream << endl;
5040 out << " echo \"$segViol\" " << outStream << endl;
5041 out << " echo \"Error = $error\" " << outStream << endl;
5042 out << "fi" << endl;
5044 out << "if [ \"$segFault\" != \"\" ] ; then" << endl;
5045 out << " error=1" << endl;
5046 out << " echo \"* ########## Job not validated - Segment. fault ###\" " << outStream << endl;
5047 out << " echo \"$segFault\" " << outStream << endl;
5048 out << " echo \"Error = $error\" " << outStream << endl;
5049 out << "fi" << endl;
5051 out << "if [ \"$glibcErr\" != \"\" ] ; then" << endl;
5052 out << " error=1" << endl;
5053 out << " echo \"* ########## Job not validated - *** glibc detected *** ###\" " << outStream << endl;
5054 out << " echo \"$glibcErr\" " << outStream << endl;
5055 out << " echo \"Error = $error\" " << outStream << endl;
5056 out << "fi" << endl;
5058 // Part dedicated to the specific analyses running into the train
5060 TString outputFiles = fOutputFiles;
5061 if (merge && !fTerminateFiles.IsNull()) {
5063 outputFiles += fTerminateFiles;
5065 TObjArray *arr = outputFiles.Tokenize(",");
5068 while (!merge && (os=(TObjString*)next1())) {
5069 // No need to validate outputs produced by merging since the merging macro does this
5070 outputFile = os->GetString();
5071 Int_t index = outputFile.Index("@");
5072 if (index > 0) outputFile.Remove(index);
5073 if (fTerminateFiles.Contains(outputFile)) continue;
5074 if (outputFile.Contains("*")) continue;
5075 out << "if ! [ -f " << outputFile.Data() << " ] ; then" << endl;
5076 out << " error=1" << endl;
5077 out << " echo \"Output file " << outputFile << " not found. Job FAILED !\"" << outStream << endl;
5078 out << " echo \"Output file " << outputFile << " not found. Job FAILED !\" >> stderr" << endl;
5079 out << "fi" << endl;
5082 out << "if ! [ -f outputs_valid ] ; then" << endl;
5083 out << " error=1" << endl;
5084 out << " echo \"Output files were not validated by the analysis manager\" >> stdout" << endl;
5085 out << " echo \"Output files were not validated by the analysis manager\" >> stderr" << endl;
5086 out << "fi" << endl;
5088 out << "if [ $error = 0 ] ; then" << endl;
5089 out << " echo \"* ---------------- Job Validated ------------------*\"" << outStream << endl;
5090 if (!IsKeepLogs()) {
5091 out << " echo \"* === Logs std* will be deleted === \"" << endl;
5093 out << " rm -f std*" << endl;
5095 out << "fi" << endl;
5097 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl;
5098 out << "echo \"*******************************************************\"" << outStream << endl;
5099 out << "cd -" << endl;
5100 out << "exit $error" << endl;
5102 Bool_t copy = kTRUE;
5103 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
5106 TString workdir = gGrid->GetHomeDirectory();
5107 workdir += fGridWorkingDir;
5108 Info("WriteValidationScript", "\n##### Copying validation script <%s> to your AliEn working space", validationScript.Data());
5109 if (FileExists(validationScript)) gGrid->Rm(validationScript);
5110 // TFile::Cp(Form("file:%s",validationScript.Data()), Form("alien://%s/%s", workdir.Data(),validationScript.Data()));
5111 if (!copyLocal2Alien("WriteValidationScript", validationScript.Data(),
5112 Form("%s/%s",workdir.Data(), validationScript.Data()))) Fatal("","Terminating");