1 /**************************************************************************
2 * Copyright(c) 1998-2007, ALICE Experiment at CERN, All rights reserved. *
4 * Author: The ALICE Off-line Project. *
5 * Contributors are mentioned in the code where appropriate. *
7 * Permission to use, copy, modify and distribute this software and its *
8 * documentation strictly for non-commercial purposes is hereby granted *
9 * without fee, provided that the above copyright notice appears in all *
10 * copies and that both the copyright notice and this permission notice *
11 * appear in the supporting documentation. The authors make no claims *
12 * about the suitability of this software for any purpose. It is *
13 * provided "as is" without express or implied warranty. *
14 **************************************************************************/
16 // Author: Mihaela Gheata, 01/09/2008
18 //==============================================================================
19 // AliAnalysisAlien - AliEn utility class. Provides interface for creating
20 // a personalized JDL, finding and creating a dataset.
21 //==============================================================================
23 #include "AliAnalysisAlien.h"
25 #include "Riostream.h"
32 #include "TInterpreter.h"
34 #include "TFileCollection.h"
36 #include "TObjString.h"
37 #include "TObjArray.h"
40 #include "TGridResult.h"
41 #include "TGridCollection.h"
43 #include "TGridJobStatusList.h"
44 #include "TGridJobStatus.h"
45 #include "TFileMerger.h"
46 #include "AliAnalysisManager.h"
47 #include "AliAnalysisTaskCfg.h"
48 #include "AliVEventHandler.h"
49 #include "AliAnalysisDataContainer.h"
50 #include "AliMultiInputEventHandler.h"
56 ClassImp(AliAnalysisAlien)
62 Bool_t copyLocal2Alien(const char* where, const char* loc, const char* rem)
64 TString sl(Form("file:%s", loc));
65 TString sr(Form("alien://%s", rem));
66 Bool_t ret = TFile::Cp(sl, sr);
68 Warning(where, "Failed to copy %s to %s", sl.Data(), sr.Data());
74 //______________________________________________________________________________
75 AliAnalysisAlien::AliAnalysisAlien()
81 fSplitMaxInputFileNumber(0),
83 fMasterResubmitThreshold(0),
96 fNproofWorkersPerSlave(0),
102 fExecutableCommand(),
108 fAdditionalRootLibs(),
156 //______________________________________________________________________________
157 AliAnalysisAlien::AliAnalysisAlien(const char *name)
158 :AliAnalysisGrid(name),
163 fSplitMaxInputFileNumber(0),
165 fMasterResubmitThreshold(0),
178 fNproofWorkersPerSlave(0),
184 fExecutableCommand(),
190 fAdditionalRootLibs(),
238 //______________________________________________________________________________
239 AliAnalysisAlien::AliAnalysisAlien(const AliAnalysisAlien& other)
240 :AliAnalysisGrid(other),
243 fPrice(other.fPrice),
245 fSplitMaxInputFileNumber(other.fSplitMaxInputFileNumber),
246 fMaxInitFailed(other.fMaxInitFailed),
247 fMasterResubmitThreshold(other.fMasterResubmitThreshold),
248 fNtestFiles(other.fNtestFiles),
249 fNrunsPerMaster(other.fNrunsPerMaster),
250 fMaxMergeFiles(other.fMaxMergeFiles),
251 fMaxMergeStages(other.fMaxMergeStages),
252 fNsubmitted(other.fNsubmitted),
253 fProductionMode(other.fProductionMode),
254 fOutputToRunNo(other.fOutputToRunNo),
255 fMergeViaJDL(other.fMergeViaJDL),
256 fFastReadOption(other.fFastReadOption),
257 fOverwriteMode(other.fOverwriteMode),
258 fNreplicas(other.fNreplicas),
259 fNproofWorkers(other.fNproofWorkers),
260 fNproofWorkersPerSlave(other.fNproofWorkersPerSlave),
261 fProofReset(other.fProofReset),
262 fNMCevents(other.fNMCevents),
263 fNMCjobs(other.fNMCjobs),
264 fRunNumbers(other.fRunNumbers),
265 fExecutable(other.fExecutable),
266 fExecutableCommand(other.fExecutableCommand),
267 fArguments(other.fArguments),
268 fExecutableArgs(other.fExecutableArgs),
269 fAnalysisMacro(other.fAnalysisMacro),
270 fAnalysisSource(other.fAnalysisSource),
271 fValidationScript(other.fValidationScript),
272 fAdditionalRootLibs(other.fAdditionalRootLibs),
273 fAdditionalLibs(other.fAdditionalLibs),
274 fGeneratorLibs(other.fGeneratorLibs),
275 fSplitMode(other.fSplitMode),
276 fAPIVersion(other.fAPIVersion),
277 fROOTVersion(other.fROOTVersion),
278 fAliROOTVersion(other.fAliROOTVersion),
279 fExternalPackages(other.fExternalPackages),
281 fGridWorkingDir(other.fGridWorkingDir),
282 fGridDataDir(other.fGridDataDir),
283 fDataPattern(other.fDataPattern),
284 fGridOutputDir(other.fGridOutputDir),
285 fOutputArchive(other.fOutputArchive),
286 fOutputFiles(other.fOutputFiles),
287 fInputFormat(other.fInputFormat),
288 fDatasetName(other.fDatasetName),
289 fJDLName(other.fJDLName),
290 fTerminateFiles(other.fTerminateFiles),
291 fMergeExcludes(other.fMergeExcludes),
292 fRegisterExcludes(other.fRegisterExcludes),
293 fIncludePath(other.fIncludePath),
294 fCloseSE(other.fCloseSE),
295 fFriendChainName(other.fFriendChainName),
296 fJobTag(other.fJobTag),
297 fOutputSingle(other.fOutputSingle),
298 fRunPrefix(other.fRunPrefix),
299 fProofCluster(other.fProofCluster),
300 fProofDataSet(other.fProofDataSet),
301 fFileForTestMode(other.fFileForTestMode),
302 fAliRootMode(other.fAliRootMode),
303 fProofProcessOpt(other.fProofProcessOpt),
304 fMergeDirName(other.fMergeDirName),
309 fDropToShell(other.fDropToShell),
310 fMCLoop(other.fMCLoop),
311 fGridJobIDs(other.fGridJobIDs),
312 fGridStages(other.fGridStages),
313 fFriendLibs(other.fFriendLibs),
314 fTreeName(other.fTreeName)
317 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
318 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
319 fRunRange[0] = other.fRunRange[0];
320 fRunRange[1] = other.fRunRange[1];
321 if (other.fInputFiles) {
322 fInputFiles = new TObjArray();
323 TIter next(other.fInputFiles);
325 while ((obj=next())) fInputFiles->Add(new TObjString(obj->GetName()));
326 fInputFiles->SetOwner();
328 if (other.fPackages) {
329 fPackages = new TObjArray();
330 TIter next(other.fPackages);
332 while ((obj=next())) fPackages->Add(new TObjString(obj->GetName()));
333 fPackages->SetOwner();
335 if (other.fModules) {
336 fModules = new TObjArray();
337 fModules->SetOwner();
338 TIter next(other.fModules);
339 AliAnalysisTaskCfg *mod, *crt;
340 while ((crt=(AliAnalysisTaskCfg*)next())) {
341 mod = new AliAnalysisTaskCfg(*crt);
347 //______________________________________________________________________________
348 AliAnalysisAlien::~AliAnalysisAlien()
356 fProofParam.DeleteAll();
359 //______________________________________________________________________________
360 AliAnalysisAlien &AliAnalysisAlien::operator=(const AliAnalysisAlien& other)
363 if (this != &other) {
364 AliAnalysisGrid::operator=(other);
365 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
366 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
367 fPrice = other.fPrice;
369 fSplitMaxInputFileNumber = other.fSplitMaxInputFileNumber;
370 fMaxInitFailed = other.fMaxInitFailed;
371 fMasterResubmitThreshold = other.fMasterResubmitThreshold;
372 fNtestFiles = other.fNtestFiles;
373 fNrunsPerMaster = other.fNrunsPerMaster;
374 fMaxMergeFiles = other.fMaxMergeFiles;
375 fMaxMergeStages = other.fMaxMergeStages;
376 fNsubmitted = other.fNsubmitted;
377 fProductionMode = other.fProductionMode;
378 fOutputToRunNo = other.fOutputToRunNo;
379 fMergeViaJDL = other.fMergeViaJDL;
380 fFastReadOption = other.fFastReadOption;
381 fOverwriteMode = other.fOverwriteMode;
382 fNreplicas = other.fNreplicas;
383 fNproofWorkers = other.fNproofWorkers;
384 fNproofWorkersPerSlave = other.fNproofWorkersPerSlave;
385 fProofReset = other.fProofReset;
386 fNMCevents = other.fNMCevents;
387 fNMCjobs = other.fNMCjobs;
388 fRunNumbers = other.fRunNumbers;
389 fExecutable = other.fExecutable;
390 fExecutableCommand = other.fExecutableCommand;
391 fArguments = other.fArguments;
392 fExecutableArgs = other.fExecutableArgs;
393 fAnalysisMacro = other.fAnalysisMacro;
394 fAnalysisSource = other.fAnalysisSource;
395 fValidationScript = other.fValidationScript;
396 fAdditionalRootLibs = other.fAdditionalRootLibs;
397 fAdditionalLibs = other.fAdditionalLibs;
398 fGeneratorLibs = other.fGeneratorLibs;
399 fSplitMode = other.fSplitMode;
400 fAPIVersion = other.fAPIVersion;
401 fROOTVersion = other.fROOTVersion;
402 fAliROOTVersion = other.fAliROOTVersion;
403 fExternalPackages = other.fExternalPackages;
405 fGridWorkingDir = other.fGridWorkingDir;
406 fGridDataDir = other.fGridDataDir;
407 fDataPattern = other.fDataPattern;
408 fGridOutputDir = other.fGridOutputDir;
409 fOutputArchive = other.fOutputArchive;
410 fOutputFiles = other.fOutputFiles;
411 fInputFormat = other.fInputFormat;
412 fDatasetName = other.fDatasetName;
413 fJDLName = other.fJDLName;
414 fTerminateFiles = other.fTerminateFiles;
415 fMergeExcludes = other.fMergeExcludes;
416 fRegisterExcludes = other.fRegisterExcludes;
417 fIncludePath = other.fIncludePath;
418 fCloseSE = other.fCloseSE;
419 fFriendChainName = other.fFriendChainName;
420 fJobTag = other.fJobTag;
421 fOutputSingle = other.fOutputSingle;
422 fRunPrefix = other.fRunPrefix;
423 fProofCluster = other.fProofCluster;
424 fProofDataSet = other.fProofDataSet;
425 fFileForTestMode = other.fFileForTestMode;
426 fAliRootMode = other.fAliRootMode;
427 fProofProcessOpt = other.fProofProcessOpt;
428 fMergeDirName = other.fMergeDirName;
429 fDropToShell = other.fDropToShell;
430 fMCLoop = other.fMCLoop;
431 fGridJobIDs = other.fGridJobIDs;
432 fGridStages = other.fGridStages;
433 fFriendLibs = other.fFriendLibs;
434 fTreeName = other.fTreeName;
435 if (other.fInputFiles) {
436 fInputFiles = new TObjArray();
437 TIter next(other.fInputFiles);
439 while ((obj=next())) fInputFiles->Add(new TObjString(obj->GetName()));
440 fInputFiles->SetOwner();
442 if (other.fPackages) {
443 fPackages = new TObjArray();
444 TIter next(other.fPackages);
446 while ((obj=next())) fPackages->Add(new TObjString(obj->GetName()));
447 fPackages->SetOwner();
449 if (other.fModules) {
450 fModules = new TObjArray();
451 fModules->SetOwner();
452 TIter next(other.fModules);
453 AliAnalysisTaskCfg *mod, *crt;
454 while ((crt=(AliAnalysisTaskCfg*)next())) {
455 mod = new AliAnalysisTaskCfg(*crt);
463 //______________________________________________________________________________
464 void AliAnalysisAlien::AddAdditionalLibrary(const char *name)
466 // Add a single additional library to be loaded. Extension must be present.
468 if (!lib.Contains(".")) {
469 Error("AddAdditionalLibrary", "Extension not defined for %s", name);
472 if (fAdditionalLibs.Contains(name)) {
473 Warning("AddAdditionalLibrary", "Library %s already added.", name);
476 if (!fAdditionalLibs.IsNull()) fAdditionalLibs += " ";
477 fAdditionalLibs += lib;
480 //______________________________________________________________________________
481 void AliAnalysisAlien::AddModule(AliAnalysisTaskCfg *module)
483 // Adding a module. Checks if already existing. Becomes owned by this.
485 if (GetModule(module->GetName())) {
486 Error("AddModule", "A module having the same name %s already added", module->GetName());
490 fModules = new TObjArray();
491 fModules->SetOwner();
493 fModules->Add(module);
496 //______________________________________________________________________________
497 void AliAnalysisAlien::AddModules(TObjArray *list)
499 // Adding a list of modules. Checks if already existing. Becomes owned by this.
501 AliAnalysisTaskCfg *module;
502 while ((module = (AliAnalysisTaskCfg*)next())) AddModule(module);
505 //______________________________________________________________________________
506 Bool_t AliAnalysisAlien::CheckDependencies()
508 // Check if all dependencies are satisfied. Reorder modules if needed.
509 Int_t nmodules = GetNmodules();
511 Warning("CheckDependencies", "No modules added yet to check their dependencies");
514 AliAnalysisTaskCfg *mod = 0;
515 AliAnalysisTaskCfg *dep = 0;
518 for (i=0; i<nmodules; i++) {
519 mod = (AliAnalysisTaskCfg*) fModules->At(i);
520 Int_t ndeps = mod->GetNdeps();
522 for (j=0; j<ndeps; j++) {
523 depname = mod->GetDependency(j);
524 dep = GetModule(depname);
526 Error("CheckDependencies","Dependency %s not added for module %s",
527 depname.Data(), mod->GetName());
530 if (dep->NeedsDependency(mod->GetName())) {
531 Error("CheckDependencies","Modules %s and %s circularly depend on each other",
532 mod->GetName(), dep->GetName());
535 Int_t idep = fModules->IndexOf(dep);
536 // The dependency task must come first
538 // Remove at idep and move all objects below up one slot
539 // down to index i included.
540 fModules->RemoveAt(idep);
541 for (k=idep-1; k>=i; k--) fModules->AddAt(fModules->RemoveAt(k),k+1);
542 fModules->AddAt(dep, i++);
544 //Redo from istart if dependencies were inserted
545 if (i>istart) i=istart-1;
551 //______________________________________________________________________________
552 AliAnalysisManager *AliAnalysisAlien::CreateAnalysisManager(const char *name, const char *filename)
554 // Create the analysis manager and optionally execute the macro in filename.
555 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
557 mgr->SetMCLoop(fMCLoop);
560 mgr = new AliAnalysisManager(name);
561 mgr->SetGridHandler((AliAnalysisGrid*)this);
562 mgr->SetMCLoop(fMCLoop);
563 if (strlen(filename)) {
564 TString line = gSystem->ExpandPathName(filename);
566 gROOT->ProcessLine(line.Data());
571 //______________________________________________________________________________
572 Int_t AliAnalysisAlien::GetNmodules() const
574 // Get number of modules.
575 if (!fModules) return 0;
576 return fModules->GetEntries();
579 //______________________________________________________________________________
580 AliAnalysisTaskCfg *AliAnalysisAlien::GetModule(const char *name)
582 // Get a module by name.
583 if (!fModules) return 0;
584 return (AliAnalysisTaskCfg*)fModules->FindObject(name);
587 //______________________________________________________________________________
588 Bool_t AliAnalysisAlien::LoadModule(AliAnalysisTaskCfg *mod)
590 // Load a given module.
591 if (mod->IsLoaded()) return kTRUE;
592 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
594 Error("LoadModule", "No analysis manager created yet. Use CreateAnalysisManager first.");
597 Int_t ndeps = mod->GetNdeps();
599 for (Int_t j=0; j<ndeps; j++) {
600 depname = mod->GetDependency(j);
601 AliAnalysisTaskCfg *dep = GetModule(depname);
603 Error("LoadModule","Dependency %s not existing for module %s",
604 depname.Data(), mod->GetName());
607 if (!LoadModule(dep)) {
608 Error("LoadModule","Dependency %s for module %s could not be loaded",
609 depname.Data(), mod->GetName());
613 // Load libraries for the module
614 if (!mod->CheckLoadLibraries()) {
615 Error("LoadModule", "Cannot load all libraries for module %s", mod->GetName());
618 // Check if a custom file name was requested
619 if (strlen(mod->GetOutputFileName())) mgr->SetCommonFileName(mod->GetOutputFileName());
621 // Check if a custom terminate file name was requested
622 if (strlen(mod->GetTerminateFileName())) {
623 if (!fTerminateFiles.IsNull()) fTerminateFiles += ",";
624 fTerminateFiles += mod->GetTerminateFileName();
628 if (mod->ExecuteMacro()<0) {
629 Error("LoadModule", "Executing the macro %s with arguments: %s for module %s returned a negative value",
630 mod->GetMacroName(), mod->GetMacroArgs(), mod->GetName());
633 // Configure dependencies
634 if (mod->GetConfigMacro() && mod->ExecuteConfigMacro()<0) {
635 Error("LoadModule", "There was an error executing the deps config macro %s for module %s",
636 mod->GetConfigMacro()->GetTitle(), mod->GetName());
639 // Adjust extra libraries
640 Int_t nlibs = mod->GetNlibs();
642 for (Int_t i=0; i<nlibs; i++) {
643 lib = mod->GetLibrary(i);
644 lib = Form("lib%s.so", lib.Data());
645 if (fAdditionalLibs.Contains(lib)) continue;
646 if (!fAdditionalLibs.IsNull()) fAdditionalLibs += " ";
647 fAdditionalLibs += lib;
652 //______________________________________________________________________________
653 Bool_t AliAnalysisAlien::GenerateTrain(const char *name)
655 // Generate the full train.
656 fAdditionalLibs = "";
657 if (!LoadModules()) return kFALSE;
658 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
659 if (!mgr->InitAnalysis()) return kFALSE;
662 Int_t productionMode = fProductionMode;
664 TString macro = fAnalysisMacro;
665 TString executable = fExecutable;
666 TString validation = fValidationScript;
667 TString execCommand = fExecutableCommand;
668 SetAnalysisMacro(Form("%s.C", name));
669 SetExecutable(Form("%s.sh", name));
670 // SetExecutableCommand("aliroot -b -q ");
671 SetValidationScript(Form("%s_validation.sh", name));
673 SetProductionMode(productionMode);
674 fAnalysisMacro = macro;
675 fExecutable = executable;
676 fExecutableCommand = execCommand;
677 fValidationScript = validation;
681 //______________________________________________________________________________
682 Bool_t AliAnalysisAlien::GenerateTest(const char *name, const char *modname)
684 // Generate test macros for a single module or for the full train.
685 fAdditionalLibs = "";
686 if (strlen(modname)) {
687 if (!CheckDependencies()) return kFALSE;
688 AliAnalysisTaskCfg *mod = GetModule(modname);
690 Error("GenerateTest", "cannot generate test for inexistent module %s", modname);
693 if (!LoadModule(mod)) return kFALSE;
694 if (!LoadFriendLibs()) return kFALSE;
695 } else if (!LoadModules()) return kFALSE;
696 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
697 if (!mgr->InitAnalysis()) return kFALSE;
701 Int_t productionMode = fProductionMode;
703 TString macro = fAnalysisMacro;
704 TString executable = fExecutable;
705 TString validation = fValidationScript;
706 TString execCommand = fExecutableCommand;
707 SetAnalysisMacro(Form("%s.C", name));
708 SetExecutable(Form("%s.sh", name));
709 fOutputFiles = GetListOfFiles("outaod");
710 // Add extra files registered to the analysis manager
711 TString extra = GetListOfFiles("ext");
712 if (!extra.IsNull()) {
713 extra.ReplaceAll(".root", "*.root");
714 if (!fOutputFiles.IsNull()) fOutputFiles += ",";
715 fOutputFiles += extra;
717 // SetExecutableCommand("aliroot -b -q ");
718 SetValidationScript(Form("%s_validation.sh", name));
720 WriteAnalysisMacro();
722 WriteValidationScript();
724 WriteMergeExecutable();
725 WriteValidationScript(kTRUE);
726 SetLocalTest(kFALSE);
727 SetProductionMode(productionMode);
728 fAnalysisMacro = macro;
729 fExecutable = executable;
730 fExecutableCommand = execCommand;
731 fValidationScript = validation;
735 //______________________________________________________________________________
736 Bool_t AliAnalysisAlien::LoadModules()
738 // Load all modules by executing the AddTask macros. Checks first the dependencies.
739 fAdditionalLibs = "";
740 Int_t nmodules = GetNmodules();
742 Warning("LoadModules", "No module to be loaded");
745 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
747 Error("LoadModules", "No analysis manager created yet. Use CreateAnalysisManager first.");
750 if (!CheckDependencies()) return kFALSE;
751 nmodules = GetNmodules();
752 AliAnalysisTaskCfg *mod;
753 for (Int_t imod=0; imod<nmodules; imod++) {
754 mod = (AliAnalysisTaskCfg*)fModules->At(imod);
755 if (!LoadModule(mod)) return kFALSE;
757 // Load additional friend libraries
758 return LoadFriendLibs();
761 //______________________________________________________________________________
762 Bool_t AliAnalysisAlien::LoadFriendLibs() const
764 // Load libraries required for reading friends.
765 if (fFriendLibs.Length()) {
768 if (fFriendLibs.Contains(",")) list = fFriendLibs.Tokenize(",");
769 else list = fFriendLibs.Tokenize(" ");
770 for (Int_t ilib=0; ilib<list->GetEntriesFast(); ilib++) {
771 lib = list->At(ilib)->GetName();
772 lib.ReplaceAll(".so","");
773 lib.ReplaceAll(".dylib","");
774 lib.ReplaceAll(" ","");
775 if (lib.BeginsWith("lib")) lib.Remove(0, 3);
777 Int_t loaded = strlen(gSystem->GetLibraries(lib,"",kFALSE));
778 if (!loaded) loaded = gSystem->Load(lib);
780 Error("LoadModules", "Cannot load library for friends %s", lib.Data());
789 //______________________________________________________________________________
790 void AliAnalysisAlien::SetRunPrefix(const char *prefix)
792 // Set the run number format. Can be a prefix or a format like "%09d"
794 if (!fRunPrefix.Contains("%")) fRunPrefix += "%d";
797 //______________________________________________________________________________
798 void AliAnalysisAlien::AddIncludePath(const char *path)
800 // Add include path in the remote analysis macro.
802 if (p.Contains("-I")) fIncludePath += Form("%s ", path);
803 else fIncludePath += Form("-I%s ", path);
806 //______________________________________________________________________________
807 void AliAnalysisAlien::AddRunNumber(Int_t run)
809 // Add a run number to the list of runs to be processed.
810 if (fRunNumbers.Length()) fRunNumbers += " ";
811 fRunNumbers += Form(fRunPrefix.Data(), run);
814 //______________________________________________________________________________
815 void AliAnalysisAlien::AddRunList(const char* runList)
817 // Add several runs into the list of runs; they are expected to be separated by a blank character.
818 TString sList = runList;
819 TObjArray *list = sList.Tokenize(" ");
820 Int_t n = list->GetEntries();
821 for (Int_t i = 0; i < n; i++) {
822 TObjString *os = (TObjString*)list->At(i);
823 AddRunNumber(os->GetString().Atoi());
828 //______________________________________________________________________________
829 void AliAnalysisAlien::AddRunNumber(const char* run)
831 // Add a run number to the list of runs to be processed.
834 TObjArray *arr = runs.Tokenize(" ");
837 prefix.Append(fRunPrefix, fRunPrefix.Index("%d"));
838 while ((os=(TObjString*)next())){
839 if (fRunNumbers.Length()) fRunNumbers += " ";
840 fRunNumbers += Form("%s%s", prefix.Data(), os->GetString().Data());
845 //______________________________________________________________________________
846 void AliAnalysisAlien::AddDataFile(const char *lfn)
848 // Adds a data file to the input to be analysed. The file should be a valid LFN
849 // or point to an existing file in the alien workdir.
850 if (!fInputFiles) fInputFiles = new TObjArray();
851 fInputFiles->Add(new TObjString(lfn));
854 //______________________________________________________________________________
855 void AliAnalysisAlien::AddExternalPackage(const char *package)
857 // Adds external packages w.r.t to the default ones (root,aliroot and gapi)
858 if (fExternalPackages) fExternalPackages += " ";
859 fExternalPackages += package;
862 //______________________________________________________________________________
863 Bool_t AliAnalysisAlien::Connect()
865 // Try to connect to AliEn. User needs a valid token and /tmp/gclient_env_$UID sourced.
866 if (gGrid && gGrid->IsConnected()) return kTRUE;
867 if (fProductionMode) return kTRUE;
869 Info("Connect", "Trying to connect to AliEn ...");
870 TGrid::Connect("alien://");
872 if (!gGrid || !gGrid->IsConnected()) {
873 Error("Connect", "Did not managed to connect to AliEn. Make sure you have a valid token.");
876 fUser = gGrid->GetUser();
877 Info("Connect", "\n##### Connected to AliEn as user %s. Setting analysis user to <%s>", fUser.Data(), fUser.Data());
881 //______________________________________________________________________________
882 void AliAnalysisAlien::CdWork()
884 // Check validity of alien workspace. Create directory if possible.
886 Error("CdWork", "Alien connection required");
889 TString homedir = gGrid->GetHomeDirectory();
890 TString workdir = homedir + fGridWorkingDir;
891 if (DirectoryExists(workdir)) {
895 // Work directory not existing - create it
897 if (gGrid->Mkdir(workdir, "-p")) {
898 gGrid->Cd(fGridWorkingDir);
899 Info("CdWork", "\n##### Created alien working directory %s", fGridWorkingDir.Data());
901 Warning("CdWork", "Working directory %s cannot be created.\n Using %s instead.",
902 workdir.Data(), homedir.Data());
903 fGridWorkingDir = "";
907 //______________________________________________________________________________
908 Bool_t AliAnalysisAlien::CheckFileCopy(const char *alienpath)
910 // Check if file copying is possible.
911 if (fProductionMode) return kTRUE;
912 TString salienpath(alienpath);
913 if (salienpath.Contains(" ")) {
914 Error("CheckFileCopy", "path: <%s> contains blancs - FIX IT !",alienpath);
918 Error("CheckFileCopy", "Not connected to AliEn. File copying cannot be tested.");
921 Info("CheckFileCopy", "Checking possibility to copy files to your AliEn home directory... \
922 \n +++ NOTE: You can disable this via: plugin->SetCheckCopy(kFALSE);");
923 // Check if alien_CLOSE_SE is defined
924 TString closeSE = gSystem->Getenv("alien_CLOSE_SE");
925 if (!closeSE.IsNull()) {
926 Info("CheckFileCopy", "Your current close storage is pointing to: \
927 \n alien_CLOSE_SE = \"%s\"", closeSE.Data());
929 Warning("CheckFileCopy", "Your current close storage is empty ! Depending on your location, file copying may fail.");
931 // Check if grid directory exists.
932 if (!DirectoryExists(alienpath)) {
933 Error("CheckFileCopy", "Alien path %s does not seem to exist", alienpath);
936 TString stest = "plugin_test_copy";
937 TFile f(stest, "RECREATE");
938 // User may not have write permissions to current directory
940 Error("CheckFileCopy", "Cannot create local test file. Do you have write access to current directory: <%s> ?",
941 gSystem->WorkingDirectory());
945 if (FileExists(Form("alien://%s/%s",alienpath, stest.Data()))) gGrid->Rm(Form("alien://%s/%s",alienpath, stest.Data()));
946 if (!TFile::Cp(stest.Data(), Form("alien://%s/%s",alienpath, stest.Data()))) {
947 Error("CheckFileCopy", "Cannot copy files to Alien destination: <%s> This may be temporary, or: \
948 \n# 1. Make sure you have write permissions there. If this is the case: \
949 \n# 2. Check the storage availability at: http://alimonitor.cern.ch/stats?page=SE/table \
950 \n# Do: export alien_CLOSE_SE=\"working_disk_SE\" \
951 \n# To make this permanent put in in your .bashrc (in .alienshrc is not enough) \
952 \n# Redo token: rm /tmp/x509up_u$UID then: alien-token-init <username>", alienpath);
953 gSystem->Unlink(stest.Data());
956 gSystem->Unlink(stest.Data());
957 gGrid->Rm(Form("%s/%s",alienpath,stest.Data()));
958 Info("CheckFileCopy", "### ...SUCCESS ###");
962 //______________________________________________________________________________
963 Bool_t AliAnalysisAlien::CheckInputData()
965 // Check validity of input data. If necessary, create xml files.
966 if (fProductionMode) return kTRUE;
967 if (!fInputFiles && !fRunNumbers.Length() && !fRunRange[0]) {
968 if (!fGridDataDir.Length()) {
969 Error("CkeckInputData", "AliEn path to base data directory must be set.\n = Use: SetGridDataDir()");
973 Error("CheckInputData", "Merging via jdl works only with run numbers, run range or provided xml");
976 Info("CheckInputData", "Analysis will make a single xml for base data directory %s",fGridDataDir.Data());
977 if (fDataPattern.Contains("tag") && TestBit(AliAnalysisGrid::kTest))
978 TObject::SetBit(AliAnalysisGrid::kUseTags, kTRUE); // ADDED (fix problem in determining the tag usage in test mode)
981 // Process declared files
982 Bool_t isCollection = kFALSE;
983 Bool_t isXml = kFALSE;
984 Bool_t useTags = kFALSE;
985 Bool_t checked = kFALSE;
986 if (!TestBit(AliAnalysisGrid::kTest)) CdWork();
988 TString workdir = gGrid->GetHomeDirectory();
989 workdir += fGridWorkingDir;
992 TIter next(fInputFiles);
993 while ((objstr=(TObjString*)next())) {
996 file += objstr->GetString();
997 // Store full lfn path
998 if (FileExists(file)) objstr->SetString(file);
1000 file = objstr->GetName();
1001 if (!FileExists(objstr->GetName())) {
1002 Error("CheckInputData", "Data file %s not found or not in your working dir: %s",
1003 objstr->GetName(), workdir.Data());
1007 Bool_t iscoll, isxml, usetags;
1008 CheckDataType(file, iscoll, isxml, usetags);
1011 isCollection = iscoll;
1014 TObject::SetBit(AliAnalysisGrid::kUseTags, useTags);
1016 if ((iscoll != isCollection) || (isxml != isXml) || (usetags != useTags)) {
1017 Error("CheckInputData", "Some conflict was found in the types of inputs");
1023 // Process requested run numbers
1024 if (!fRunNumbers.Length() && !fRunRange[0]) return kTRUE;
1025 // Check validity of alien data directory
1026 if (!fGridDataDir.Length()) {
1027 Error("CkeckInputData", "AliEn path to base data directory must be set.\n = Use: SetGridDataDir()");
1030 if (!DirectoryExists(fGridDataDir)) {
1031 Error("CheckInputData", "Data directory %s not existing.", fGridDataDir.Data());
1035 Error("CheckInputData", "You are using raw AliEn collections as input. Cannot process run numbers.");
1039 if (checked && !isXml) {
1040 Error("CheckInputData", "Cannot mix processing of full runs with non-xml files");
1043 // Check validity of run number(s)
1048 TString schunk, schunk2;
1052 useTags = fDataPattern.Contains("tag");
1053 TObject::SetBit(AliAnalysisGrid::kUseTags, useTags);
1055 if (useTags != fDataPattern.Contains("tag")) {
1056 Error("CheckInputData", "Cannot mix input files using/not using tags");
1059 if (fRunNumbers.Length()) {
1060 Info("CheckDataType", "Using supplied run numbers (run ranges are ignored)");
1061 arr = fRunNumbers.Tokenize(" ");
1063 while ((os=(TObjString*)next())) {
1064 path = Form("%s/%s ", fGridDataDir.Data(), os->GetString().Data());
1065 if (!DirectoryExists(path)) {
1066 Warning("CheckInputData", "Run number %s not found in path: <%s>", os->GetString().Data(), path.Data());
1069 path = Form("%s/%s.xml", workdir.Data(),os->GetString().Data());
1070 TString msg = "\n##### file: ";
1072 msg += " type: xml_collection;";
1073 if (useTags) msg += " using_tags: Yes";
1074 else msg += " using_tags: No";
1075 Info("CheckDataType", "%s", msg.Data());
1076 if (fNrunsPerMaster<2) {
1077 AddDataFile(Form("%s.xml", os->GetString().Data()));
1080 if (((nruns-1)%fNrunsPerMaster) == 0) {
1081 schunk = os->GetString();
1083 if ((nruns%fNrunsPerMaster)!=0 && os!=arr->Last()) continue;
1084 schunk += Form("_%s.xml", os->GetString().Data());
1085 AddDataFile(schunk);
1090 Info("CheckDataType", "Using run range [%d, %d]", fRunRange[0], fRunRange[1]);
1091 for (Int_t irun=fRunRange[0]; irun<=fRunRange[1]; irun++) {
1092 format = Form("%%s/%s ", fRunPrefix.Data());
1093 path = Form(format.Data(), fGridDataDir.Data(), irun);
1094 if (!DirectoryExists(path)) {
1097 format = Form("%%s/%s.xml", fRunPrefix.Data());
1098 path = Form(format.Data(), workdir.Data(),irun);
1099 TString msg = "\n##### file: ";
1101 msg += " type: xml_collection;";
1102 if (useTags) msg += " using_tags: Yes";
1103 else msg += " using_tags: No";
1104 Info("CheckDataType", "%s", msg.Data());
1105 if (fNrunsPerMaster<2) {
1106 format = Form("%s.xml", fRunPrefix.Data());
1107 AddDataFile(Form(format.Data(),irun));
1110 if (((nruns-1)%fNrunsPerMaster) == 0) {
1111 schunk = Form(fRunPrefix.Data(),irun);
1113 format = Form("_%s.xml", fRunPrefix.Data());
1114 schunk2 = Form(format.Data(), irun);
1115 if ((nruns%fNrunsPerMaster)!=0 && irun != fRunRange[1]) continue;
1117 AddDataFile(schunk);
1122 AddDataFile(schunk);
1128 //______________________________________________________________________________
1129 Int_t AliAnalysisAlien::CopyLocalDataset(const char *griddir, const char *pattern, Int_t nfiles, const char *output, const char *archivefile, const char *outputdir)
1131 // Copy data from the given grid directory according a pattern and make a local
1133 // archivefile (optional) results in that the archive containing the file <pattern> is copied. archivefile can contain a list of files (semicolon-separated) which are all copied
1135 Error("CopyLocalDataset", "Cannot copy local dataset with no grid connection");
1138 if (!DirectoryExists(griddir)) {
1139 Error("CopyLocalDataset", "Data directory %s not existing.", griddir);
1142 TString command = Form("find -z -l %d %s %s", nfiles, griddir, pattern);
1143 printf("Running command: %s\n", command.Data());
1144 TGridResult *res = gGrid->Command(command);
1145 Int_t nfound = res->GetEntries();
1147 Error("CopyLocalDataset", "No file found in <%s> having pattern <%s>", griddir, pattern);
1150 printf("... found %d files. Copying locally ...\n", nfound);
1153 TObjArray* additionalArchives = 0;
1154 if (strlen(archivefile) > 0 && TString(archivefile).Contains(";")) {
1155 additionalArchives = TString(archivefile).Tokenize(";");
1156 archivefile = additionalArchives->At(0)->GetName();
1157 additionalArchives->RemoveAt(0);
1158 additionalArchives->Compress();
1161 // Copy files locally
1163 out.open(output, ios::out);
1165 TString turl, dirname, filename, temp;
1166 TString cdir = gSystem->WorkingDirectory();
1167 gSystem->MakeDirectory(outputdir);
1168 gSystem->ChangeDirectory(outputdir);
1170 for (Int_t i=0; i<nfound; i++) {
1171 map = (TMap*)res->At(i);
1172 turl = map->GetValue("turl")->GetName();
1173 filename = gSystem->BaseName(turl.Data());
1174 dirname = gSystem->DirName(turl.Data());
1175 dirname = gSystem->BaseName(dirname.Data());
1176 gSystem->MakeDirectory(dirname);
1178 TString source(turl);
1179 TString targetFileName(filename);
1181 if (strlen(archivefile) > 0) {
1182 // TODO here the archive in which the file resides should be determined
1183 // however whereis returns only a guid, and guid2lfn does not work
1184 // Therefore we use the one provided as argument for now
1185 source = Form("%s/%s", gSystem->DirName(source.Data()), archivefile);
1186 targetFileName = archivefile;
1188 if (TFile::Cp(source, Form("file:./%s/%s", dirname.Data(), targetFileName.Data()))) {
1189 Bool_t success = kTRUE;
1190 if (additionalArchives) {
1191 for (Int_t j=0; j<additionalArchives->GetEntriesFast(); j++) {
1193 target.Form("./%s/%s", dirname.Data(), additionalArchives->At(j)->GetName());
1194 gSystem->MakeDirectory(gSystem->DirName(target));
1195 success &= TFile::Cp(Form("%s/%s", gSystem->DirName(source.Data()), additionalArchives->At(j)->GetName()), Form("file:%s", target.Data()));
1200 if (strlen(archivefile) > 0) targetFileName = Form("%s#%s", targetFileName.Data(), gSystem->BaseName(turl.Data()));
1201 out << cdir << Form("/%s/%s/%s", outputdir, dirname.Data(), targetFileName.Data()) << endl;
1206 gSystem->ChangeDirectory(cdir);
1208 delete additionalArchives;
1212 //______________________________________________________________________________
1213 Bool_t AliAnalysisAlien::CreateDataset(const char *pattern)
1215 // Create dataset for the grid data directory + run number.
1216 const Int_t gMaxEntries = 15000;
1217 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline)) return kTRUE;
1219 Error("CreateDataset", "Cannot create dataset with no grid connection");
1224 if (!TestBit(AliAnalysisGrid::kTest)) CdWork();
1225 TString workdir = gGrid->GetHomeDirectory();
1226 workdir += fGridWorkingDir;
1228 // Compose the 'find' command arguments
1231 TString delimiter = pattern;
1233 if (delimiter.Contains(" ")) delimiter = "";
1234 else delimiter = " ";
1235 TString options = "-x collection ";
1236 if (TestBit(AliAnalysisGrid::kTest)) options += Form("-l %d ", fNtestFiles);
1237 else options += Form("-l %d ", gMaxEntries); // Protection for the find command
1238 TString conditions = "";
1245 TString schunk, schunk2;
1246 TGridCollection *cbase=0, *cadd=0;
1247 if (!fRunNumbers.Length() && !fRunRange[0]) {
1248 if (fInputFiles && fInputFiles->GetEntries()) return kTRUE;
1249 // Make a single data collection from data directory.
1250 path = fGridDataDir;
1251 if (!DirectoryExists(path)) {
1252 Error("CreateDataset", "Path to data directory %s not valid",fGridDataDir.Data());
1256 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
1257 else file = Form("%s.xml", gSystem->BaseName(path));
1261 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
1263 command += Form("%s -o %d ",options.Data(), nstart);
1265 command += delimiter;
1267 command += conditions;
1268 printf("command: %s\n", command.Data());
1269 TGridResult *res = gGrid->Command(command);
1270 if (res) delete res;
1271 // Write standard output to file
1272 gROOT->ProcessLine(Form("gGrid->Stdout(); > __tmp%d__%s", stage, file.Data()));
1273 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
1274 Bool_t nullFile = kFALSE;
1276 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
1278 nullFile = (gSystem->Exec(Form("grep -c /event __tmp%d__%s 2>/dev/null > __tmp__",stage,file.Data()))==0)?kFALSE:kTRUE;
1280 Error("CreateDataset","Dataset %s produced by the previous find command is empty !", file.Data());
1281 gSystem->Exec("rm -f __tmp*");
1289 gSystem->Exec("rm -f __tmp__");
1290 ncount = line.Atoi();
1293 if (ncount == gMaxEntries) {
1294 Info("CreateDataset", "Dataset %s has more than 15K entries. Trying to merge...", file.Data());
1295 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1296 if (!cbase) cbase = cadd;
1304 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1305 printf("... please wait - TAlienCollection::Add() scales badly...\n");
1308 cbase->ExportXML(Form("file://%s", file.Data()),kFALSE,kFALSE, file, "Merged entries for a run");
1309 delete cbase; cbase = 0;
1311 TFile::Cp(Form("__tmp%d__%s",stage, file.Data()), file.Data());
1313 gSystem->Exec("rm -f __tmp*");
1314 Info("CreateDataset", "Created dataset %s with %d files", file.Data(), nstart+ncount);
1318 Bool_t fileExists = FileExists(file);
1319 if (!TestBit(AliAnalysisGrid::kTest) && (!fileExists || fOverwriteMode)) {
1320 // Copy xml file to alien space
1321 if (fileExists) gGrid->Rm(file);
1322 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
1323 if (!FileExists(file)) {
1324 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
1327 // Update list of files to be processed.
1329 AddDataFile(Form("%s/%s", workdir.Data(), file.Data()));
1333 Bool_t nullResult = kTRUE;
1334 if (fRunNumbers.Length()) {
1335 TObjArray *arr = fRunNumbers.Tokenize(" ");
1338 while ((os=(TObjString*)next())) {
1341 path = Form("%s/%s", fGridDataDir.Data(), os->GetString().Data());
1342 if (!DirectoryExists(path)) continue;
1344 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
1345 else file = Form("%s.xml", os->GetString().Data());
1346 // If local collection file does not exist, create it via 'find' command.
1350 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
1352 command += Form("%s -o %d ",options.Data(), nstart);
1354 command += delimiter;
1356 command += conditions;
1357 TGridResult *res = gGrid->Command(command);
1358 if (res) delete res;
1359 // Write standard output to file
1360 gROOT->ProcessLine(Form("gGrid->Stdout(); > __tmp%d__%s", stage,file.Data()));
1361 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
1362 Bool_t nullFile = kFALSE;
1364 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
1366 nullFile = (gSystem->Exec(Form("grep -c /event __tmp%d__%s 2>/dev/null > __tmp__",stage,file.Data()))==0)?kFALSE:kTRUE;
1368 Warning("CreateDataset","Dataset %s produced by: <%s> is empty !", file.Data(), command.Data());
1369 gSystem->Exec("rm -f __tmp*");
1370 fRunNumbers.ReplaceAll(os->GetString().Data(), "");
1378 gSystem->Exec("rm -f __tmp__");
1379 ncount = line.Atoi();
1381 nullResult = kFALSE;
1383 if (ncount == gMaxEntries) {
1384 Info("CreateDataset", "Dataset %s has more than 15K entries. Trying to merge...", file.Data());
1385 if (fNrunsPerMaster > 1) {
1386 Error("CreateDataset", "File %s has more than %d entries. Please set the number of runs per master to 1 !",
1387 file.Data(),gMaxEntries);
1390 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1391 if (!cbase) cbase = cadd;
1398 if (cbase && fNrunsPerMaster<2) {
1399 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1400 printf("... please wait - TAlienCollection::Add() scales badly...\n");
1403 cbase->ExportXML(Form("file://%s", file.Data()),kFALSE,kFALSE, file, "Merged entries for a run");
1404 delete cbase; cbase = 0;
1406 TFile::Cp(Form("__tmp%d__%s",stage, file.Data()), file.Data());
1408 gSystem->Exec("rm -f __tmp*");
1409 Info("CreateDataset", "Created dataset %s with %d files", file.Data(), nstart+ncount);
1413 if (TestBit(AliAnalysisGrid::kTest)) break;
1414 // Check if there is one run per master job.
1415 if (fNrunsPerMaster<2) {
1416 if (FileExists(file)) {
1417 if (fOverwriteMode) gGrid->Rm(file);
1419 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
1423 // Copy xml file to alien space
1424 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
1425 if (!FileExists(file)) {
1426 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
1432 if (((nruns-1)%fNrunsPerMaster) == 0) {
1433 schunk = os->GetString();
1434 cbase = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
1436 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
1437 printf(" Merging collection <%s> into masterjob input...\n", file.Data());
1441 if ((nruns%fNrunsPerMaster)!=0 && os!=arr->Last()) {
1444 schunk += Form("_%s.xml", os->GetString().Data());
1445 if (FileExists(schunk)) {
1446 if (fOverwriteMode) gGrid->Rm(file);
1448 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", schunk.Data());
1452 printf("Exporting merged collection <%s> and copying to AliEn\n", schunk.Data());
1453 cbase->ExportXML(Form("file://%s", schunk.Data()),kFALSE,kFALSE, schunk, "Merged runs");
1454 TFile::Cp(Form("file:%s",schunk.Data()), Form("alien://%s/%s",workdir.Data(), schunk.Data()));
1455 if (!FileExists(schunk)) {
1456 Error("CreateDataset", "Copy command did NOT succeed for %s", schunk.Data());
1464 Error("CreateDataset", "No valid dataset corresponding to the query!");
1468 // Process a full run range.
1469 for (Int_t irun=fRunRange[0]; irun<=fRunRange[1]; irun++) {
1470 format = Form("%%s/%s", fRunPrefix.Data());
1473 path = Form(format.Data(), fGridDataDir.Data(), irun);
1474 if (!DirectoryExists(path)) continue;
1476 format = Form("%s.xml", fRunPrefix.Data());
1477 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
1478 else file = Form(format.Data(), irun);
1479 if (FileExists(file) && fNrunsPerMaster<2 && !TestBit(AliAnalysisGrid::kTest)) {
1480 if (fOverwriteMode) gGrid->Rm(file);
1482 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
1486 // If local collection file does not exist, create it via 'find' command.
1490 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
1492 command += Form("%s -o %d ",options.Data(), nstart);
1494 command += delimiter;
1496 command += conditions;
1497 TGridResult *res = gGrid->Command(command);
1498 if (res) delete res;
1499 // Write standard output to file
1500 gROOT->ProcessLine(Form("gGrid->Stdout(); > __tmp%d__%s", stage,file.Data()));
1501 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
1502 Bool_t nullFile = kFALSE;
1504 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
1506 nullFile = (gSystem->Exec(Form("grep -c /event __tmp%d__%s 2>/dev/null > __tmp__",stage,file.Data()))==0)?kFALSE:kTRUE;
1508 Warning("CreateDataset","Dataset %s produced by: <%s> is empty !", file.Data(), command.Data());
1509 gSystem->Exec("rm -f __tmp*");
1517 gSystem->Exec("rm -f __tmp__");
1518 ncount = line.Atoi();
1520 nullResult = kFALSE;
1522 if (ncount == gMaxEntries) {
1523 Info("CreateDataset", "Dataset %s has more than 15K entries. Trying to merge...", file.Data());
1524 if (fNrunsPerMaster > 1) {
1525 Error("CreateDataset", "File %s has more than %d entries. Please set the number of runs per master to 1 !",
1526 file.Data(),gMaxEntries);
1529 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1530 if (!cbase) cbase = cadd;
1537 if (cbase && fNrunsPerMaster<2) {
1538 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1539 printf("... please wait - TAlienCollection::Add() scales badly...\n");
1542 cbase->ExportXML(Form("file://%s", file.Data()),kFALSE,kFALSE, file, "Merged entries for a run");
1543 delete cbase; cbase = 0;
1545 TFile::Cp(Form("__tmp%d__%s",stage, file.Data()), file.Data());
1547 Info("CreateDataset", "Created dataset %s with %d files", file.Data(), nstart+ncount);
1551 if (TestBit(AliAnalysisGrid::kTest)) break;
1552 // Check if there is one run per master job.
1553 if (fNrunsPerMaster<2) {
1554 if (FileExists(file)) {
1555 if (fOverwriteMode) gGrid->Rm(file);
1557 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
1561 // Copy xml file to alien space
1562 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
1563 if (!FileExists(file)) {
1564 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
1569 // Check if the collection for the chunk exist locally.
1570 Int_t nchunk = (nruns-1)/fNrunsPerMaster;
1571 if (FileExists(fInputFiles->At(nchunk)->GetName())) {
1572 if (fOverwriteMode) gGrid->Rm(fInputFiles->At(nchunk)->GetName());
1575 printf(" Merging collection <%s> into %d runs chunk...\n",file.Data(),fNrunsPerMaster);
1576 if (((nruns-1)%fNrunsPerMaster) == 0) {
1577 schunk = Form(fRunPrefix.Data(), irun);
1578 cbase = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
1580 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
1584 format = Form("%%s_%s.xml", fRunPrefix.Data());
1585 schunk2 = Form(format.Data(), schunk.Data(), irun);
1586 if ((nruns%fNrunsPerMaster)!=0 && irun!=fRunRange[1] && schunk2 != fInputFiles->Last()->GetName()) {
1590 if (FileExists(schunk)) {
1591 if (fOverwriteMode) gGrid->Rm(schunk);
1593 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", schunk.Data());
1597 printf("Exporting merged collection <%s> and copying to AliEn.\n", schunk.Data());
1598 cbase->ExportXML(Form("file://%s", schunk.Data()),kFALSE,kFALSE, schunk, "Merged runs");
1599 if (FileExists(schunk)) {
1600 if (fOverwriteMode) gGrid->Rm(schunk);
1602 Info("CreateDataset", "\n##### Dataset %s exist. Skipping copy...", schunk.Data());
1606 TFile::Cp(Form("file:%s",schunk.Data()), Form("alien://%s/%s",workdir.Data(), schunk.Data()));
1607 if (!FileExists(schunk)) {
1608 Error("CreateDataset", "Copy command did NOT succeed for %s", schunk.Data());
1614 Error("CreateDataset", "No valid dataset corresponding to the query!");
1621 //______________________________________________________________________________
1622 Bool_t AliAnalysisAlien::CreateJDL()
1624 // Generate a JDL file according to current settings. The name of the file is
1625 // specified by fJDLName.
1626 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
1627 Bool_t error = kFALSE;
1629 Bool_t copy = kTRUE;
1630 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
1631 Bool_t generate = kTRUE;
1632 if (TestBit(AliAnalysisGrid::kTest) || TestBit(AliAnalysisGrid::kSubmit)) generate = kFALSE;
1634 Error("CreateJDL", "Alien connection required");
1637 // Check validity of alien workspace
1639 if (!fProductionMode && !fGridWorkingDir.BeginsWith("/alice")) workdir = gGrid->GetHomeDirectory();
1640 if (!fProductionMode && !TestBit(AliAnalysisGrid::kTest)) CdWork();
1641 workdir += fGridWorkingDir;
1644 if (!fInputFiles && !fMCLoop) {
1645 Error("CreateJDL()", "Define some input files for your analysis.");
1648 // Compose list of input files
1649 // Check if output files were defined
1650 if (!fOutputFiles.Length()) {
1651 Error("CreateJDL", "You must define at least one output file");
1654 // Check if an output directory was defined and valid
1655 if (!fGridOutputDir.Length()) {
1656 Error("CreateJDL", "You must define AliEn output directory");
1659 if (!fProductionMode) {
1660 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s", workdir.Data(), fGridOutputDir.Data());
1661 if (!DirectoryExists(fGridOutputDir)) {
1662 if (gGrid->Mkdir(fGridOutputDir,"-p")) {
1663 Info("CreateJDL", "\n##### Created alien output directory %s", fGridOutputDir.Data());
1665 Error("CreateJDL", "Could not create alien output directory %s", fGridOutputDir.Data());
1669 Warning("CreateJDL", "#### Output directory %s exists! If this contains old data, jobs will fail with ERROR_SV !!! ###", fGridOutputDir.Data());
1674 // Exit if any error up to now
1675 if (error) return kFALSE;
1677 if (!fUser.IsNull()) {
1678 fGridJDL->SetValue("User", Form("\"%s\"", fUser.Data()));
1679 fMergingJDL->SetValue("User", Form("\"%s\"", fUser.Data()));
1681 TString executable = fExecutable;
1682 if (!executable.BeginsWith("/"))
1683 executable.Prepend(Form("%s/", workdir.Data()));
1684 fGridJDL->SetExecutable(executable, "This is the startup script");
1685 TString mergeExec = executable;
1686 mergeExec.ReplaceAll(".sh", "_merge.sh");
1687 fMergingJDL->SetExecutable(mergeExec, "This is the startup script");
1688 mergeExec.ReplaceAll(".sh", ".C");
1689 fMergingJDL->AddToInputSandbox(Form("LF:%s", mergeExec.Data()), "List of input files to be uploaded to workers");
1690 if (!fArguments.IsNull())
1691 fGridJDL->SetArguments(fArguments, "Arguments for the executable command");
1692 if (IsOneStageMerging()) fMergingJDL->SetArguments(fGridOutputDir);
1694 if (fProductionMode) fMergingJDL->SetArguments("wn.xml $4"); // xml, stage
1695 else fMergingJDL->SetArguments("wn.xml $2"); // xml, stage
1698 fGridJDL->SetValue("TTL", Form("\"%d\"",fTTL));
1699 fGridJDL->SetDescription("TTL", Form("Time after which the job is killed (%d min.)", fTTL/60));
1700 fMergingJDL->SetValue("TTL", Form("\"%d\"",fTTL));
1701 fMergingJDL->SetDescription("TTL", Form("Time after which the job is killed (%d min.)", fTTL/60));
1703 if (fMaxInitFailed > 0) {
1704 fGridJDL->SetValue("MaxInitFailed", Form("\"%d\"",fMaxInitFailed));
1705 fGridJDL->SetDescription("MaxInitFailed", "Maximum number of first failing jobs to abort the master job");
1707 if (fSplitMaxInputFileNumber > 0 && !fMCLoop) {
1708 fGridJDL->SetValue("SplitMaxInputFileNumber", Form("\"%d\"", fSplitMaxInputFileNumber));
1709 fGridJDL->SetDescription("SplitMaxInputFileNumber", "Maximum number of input files to be processed per subjob");
1711 if (!IsOneStageMerging()) {
1712 fMergingJDL->SetValue("SplitMaxInputFileNumber", Form("\"%d\"",fMaxMergeFiles));
1713 fMergingJDL->SetDescription("SplitMaxInputFileNumber", "Maximum number of input files to be merged in one go");
1715 if (fSplitMode.Length()) {
1716 fGridJDL->SetValue("Split", Form("\"%s\"", fSplitMode.Data()));
1717 fGridJDL->SetDescription("Split", "We split per SE or file");
1719 fMergingJDL->SetValue("Split", "\"se\"");
1720 fMergingJDL->SetDescription("Split", "We split per SE for merging in stages");
1721 if (!fAliROOTVersion.IsNull()) {
1722 fGridJDL->AddToPackages("AliRoot", fAliROOTVersion,"VO_ALICE", "List of requested packages");
1723 fMergingJDL->AddToPackages("AliRoot", fAliROOTVersion, "VO_ALICE", "List of requested packages");
1725 if (!fROOTVersion.IsNull()) {
1726 fGridJDL->AddToPackages("ROOT", fROOTVersion);
1727 fMergingJDL->AddToPackages("ROOT", fROOTVersion);
1729 if (!fAPIVersion.IsNull()) {
1730 fGridJDL->AddToPackages("APISCONFIG", fAPIVersion);
1731 fMergingJDL->AddToPackages("APISCONFIG", fAPIVersion);
1733 if (!fExternalPackages.IsNull()) {
1734 arr = fExternalPackages.Tokenize(" ");
1736 while ((os=(TObjString*)next())) {
1737 TString pkgname = os->GetString();
1738 Int_t index = pkgname.Index("::");
1739 TString pkgversion = pkgname(index+2, pkgname.Length());
1740 pkgname.Remove(index);
1741 fGridJDL->AddToPackages(pkgname, pkgversion);
1742 fMergingJDL->AddToPackages(pkgname, pkgversion);
1747 fGridJDL->SetInputDataListFormat(fInputFormat, "Format of input data");
1748 fGridJDL->SetInputDataList("wn.xml", "Collection name to be processed on each worker node");
1750 fMergingJDL->SetInputDataListFormat(fInputFormat, "Format of input data");
1751 fMergingJDL->SetInputDataList("wn.xml", "Collection name to be processed on each worker node");
1752 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), fAnalysisMacro.Data()), "List of input files to be uploaded to workers");
1753 TString analysisFile = fExecutable;
1754 analysisFile.ReplaceAll(".sh", ".root");
1755 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),analysisFile.Data()));
1756 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),analysisFile.Data()));
1757 if (fAdditionalLibs.Length()) {
1758 arr = fAdditionalLibs.Tokenize(" ");
1760 while ((os=(TObjString*)next())) {
1761 if (os->GetString().Contains(".so") ||
1762 os->GetString().Contains(".dylib")) continue;
1763 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), os->GetString().Data()));
1764 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), os->GetString().Data()));
1769 TIter next(fPackages);
1771 while ((obj=next())) {
1772 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), obj->GetName()));
1773 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), obj->GetName()));
1776 const char *comment = "List of output files and archives";
1777 if (fOutputArchive.Length()) {
1778 TString outputArchive = fOutputArchive;
1779 if (!fRegisterExcludes.IsNull()) {
1780 arr = fRegisterExcludes.Tokenize(" ");
1782 while ((os=(TObjString*)next1())) {
1783 outputArchive.ReplaceAll(Form("%s,",os->GetString().Data()),"");
1784 outputArchive.ReplaceAll(os->GetString(),"");
1788 arr = outputArchive.Tokenize(" ");
1790 Bool_t first = kTRUE;
1791 while ((os=(TObjString*)next())) {
1792 if (!os->GetString().Contains("@") && fCloseSE.Length())
1793 fGridJDL->AddToSet("Output", Form("%s@%s",os->GetString().Data(), fCloseSE.Data()));
1795 fGridJDL->AddToSet("Output", os->GetString());
1796 if (first) fGridJDL->AddToSetDescription("Output", comment);
1800 // Output archive for the merging jdl
1801 if (TestBit(AliAnalysisGrid::kDefaultOutputs)) {
1802 outputArchive = "log_archive.zip:std*@disk=1 ";
1803 // Add normal output files, extra files + terminate files
1805 if (IsMergeAOD()) files = GetListOfFiles("outaodextter");
1806 else files = GetListOfFiles("outextter");
1807 // Do not register files in fRegisterExcludes
1808 if (!fRegisterExcludes.IsNull()) {
1809 arr = fRegisterExcludes.Tokenize(" ");
1811 while ((os=(TObjString*)next1())) {
1812 files.ReplaceAll(Form("%s,",os->GetString().Data()),"");
1813 files.ReplaceAll(os->GetString(),"");
1817 files.ReplaceAll(".root", "*.root");
1819 if (mgr->IsCollectThroughput())
1820 outputArchive += Form("root_archive.zip:%s,*.stat@disk=%d %s@disk=%d",files.Data(),fNreplicas, mgr->GetFileInfoLog(),fNreplicas);
1822 outputArchive += Form("root_archive.zip:%s,*.stat@disk=%d",files.Data(),fNreplicas);
1824 TString files = fOutputArchive;
1825 files.ReplaceAll(".root", "*.root"); // nreplicas etc should be already atttached by use
1826 outputArchive = files;
1828 arr = outputArchive.Tokenize(" ");
1831 while ((os=(TObjString*)next2())) {
1832 TString currentfile = os->GetString();
1833 if (!currentfile.Contains("@") && fCloseSE.Length())
1834 fMergingJDL->AddToSet("Output", Form("%s@%s",currentfile.Data(), fCloseSE.Data()));
1836 fMergingJDL->AddToSet("Output", currentfile);
1837 if (first) fMergingJDL->AddToSetDescription("Output", comment);
1842 arr = fOutputFiles.Tokenize(",");
1844 Bool_t first = kTRUE;
1845 while ((os=(TObjString*)next())) {
1846 // Ignore ouputs in jdl that are also in outputarchive
1847 TString sout = os->GetString();
1848 sout.ReplaceAll("*", "");
1849 sout.ReplaceAll(".root", "");
1850 if (sout.Index("@")>0) sout.Remove(sout.Index("@"));
1851 if (fOutputArchive.Contains(sout)) continue;
1852 // Ignore fRegisterExcludes
1853 if (fRegisterExcludes.Contains(sout)) continue;
1854 if (!first) comment = NULL;
1855 if (!os->GetString().Contains("@") && fCloseSE.Length())
1856 fGridJDL->AddToSet("Output", Form("%s@%s",os->GetString().Data(), fCloseSE.Data()));
1858 fGridJDL->AddToSet("Output", os->GetString());
1859 if (first) fGridJDL->AddToSetDescription("Output", comment);
1860 if (fMergeExcludes.Contains(sout)) continue;
1861 if (!os->GetString().Contains("@") && fCloseSE.Length())
1862 fMergingJDL->AddToSet("Output", Form("%s@%s",os->GetString().Data(), fCloseSE.Data()));
1864 fMergingJDL->AddToSet("Output", os->GetString());
1865 if (first) fMergingJDL->AddToSetDescription("Output", comment);
1869 fGridJDL->SetPrice((UInt_t)fPrice, "AliEn price for this job");
1870 fMergingJDL->SetPrice((UInt_t)fPrice, "AliEn price for this job");
1871 TString validationScript = fValidationScript;
1872 fGridJDL->SetValidationCommand(Form("%s/%s", workdir.Data(),validationScript.Data()), "Validation script to be run for each subjob");
1873 validationScript.ReplaceAll(".sh", "_merge.sh");
1874 fMergingJDL->SetValidationCommand(Form("%s/%s", workdir.Data(),validationScript.Data()), "Validation script to be run for each subjob");
1875 if (fMasterResubmitThreshold) {
1876 fGridJDL->SetValue("MasterResubmitThreshold", Form("\"%d%%\"", fMasterResubmitThreshold));
1877 fGridJDL->SetDescription("MasterResubmitThreshold", "Resubmit failed jobs until DONE rate reaches this percentage");
1879 // Write a jdl with 2 input parameters: collection name and output dir name.
1882 // Copy jdl to grid workspace
1884 // Check if an output directory was defined and valid
1885 if (!fGridOutputDir.Length()) {
1886 Error("CreateJDL", "You must define AliEn output directory");
1889 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s", workdir.Data(), fGridOutputDir.Data());
1890 if (!fProductionMode && !DirectoryExists(fGridOutputDir)) {
1891 if (gGrid->Mkdir(fGridOutputDir,"-p")) {
1892 Info("CreateJDL", "\n##### Created alien output directory %s", fGridOutputDir.Data());
1894 Error("CreateJDL", "Could not create alien output directory %s", fGridOutputDir.Data());
1900 if (TestBit(AliAnalysisGrid::kSubmit)) {
1901 TString mergeJDLName = fExecutable;
1902 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
1903 TString locjdl = Form("%s/%s", fGridOutputDir.Data(),fJDLName.Data());
1904 TString locjdl1 = Form("%s/%s", fGridOutputDir.Data(),mergeJDLName.Data());
1905 if (fProductionMode) {
1906 locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
1907 locjdl1 = Form("%s/%s", workdir.Data(),mergeJDLName.Data());
1909 if (FileExists(locjdl)) gGrid->Rm(locjdl);
1910 if (FileExists(locjdl1)) gGrid->Rm(locjdl1);
1911 Info("CreateJDL", "\n##### Copying JDL file <%s> to your AliEn output directory", fJDLName.Data());
1912 if (!copyLocal2Alien("CreateJDL", fJDLName, locjdl))
1913 Fatal("","Terminating");
1914 // TFile::Cp(Form("file:%s",fJDLName.Data()), Form("alien://%s", locjdl.Data()));
1916 Info("CreateJDL", "\n##### Copying merging JDL file <%s> to your AliEn output directory", mergeJDLName.Data());
1917 // TFile::Cp(Form("file:%s",mergeJDLName.Data()), Form("alien://%s", locjdl1.Data()));
1918 if (!copyLocal2Alien("CreateJDL", mergeJDLName.Data(), locjdl1))
1919 Fatal("","Terminating");
1922 if (fAdditionalLibs.Length()) {
1923 arr = fAdditionalLibs.Tokenize(" ");
1926 while ((os=(TObjString*)next())) {
1927 if (os->GetString().Contains(".so") ||
1928 os->GetString().Contains(".dylib")) continue;
1929 Info("CreateJDL", "\n##### Copying dependency: <%s> to your alien workspace", os->GetString().Data());
1930 if (FileExists(os->GetString())) gGrid->Rm(os->GetString());
1931 // TFile::Cp(Form("file:%s",os->GetString().Data()), Form("alien://%s/%s", workdir.Data(), os->GetString().Data()));
1932 if (!copyLocal2Alien("CreateJDL", os->GetString().Data(),
1933 Form("%s/%s", workdir.Data(), os->GetString().Data())))
1934 Fatal("","Terminating");
1939 TIter next(fPackages);
1941 while ((obj=next())) {
1942 if (FileExists(obj->GetName())) gGrid->Rm(obj->GetName());
1943 Info("CreateJDL", "\n##### Copying dependency: <%s> to your alien workspace", obj->GetName());
1944 // TFile::Cp(Form("file:%s",obj->GetName()), Form("alien://%s/%s", workdir.Data(), obj->GetName()));
1945 if (!copyLocal2Alien("CreateJDL",obj->GetName(),
1946 Form("%s/%s", workdir.Data(), obj->GetName())))
1947 Fatal("","Terminating");
1954 //______________________________________________________________________________
1955 Bool_t AliAnalysisAlien::WriteJDL(Bool_t copy)
1957 // Writes one or more JDL's corresponding to findex. If findex is negative,
1958 // all run numbers are considered in one go (jdl). For non-negative indices
1959 // they correspond to the indices in the array fInputFiles.
1960 if (!fInputFiles && !fMCLoop) return kFALSE;
1963 if (!fProductionMode && !fGridWorkingDir.BeginsWith("/alice")) workdir = gGrid->GetHomeDirectory();
1964 workdir += fGridWorkingDir;
1965 TString stageName = "$2";
1966 if (fProductionMode) stageName = "$4";
1967 if (!fMergeDirName.IsNull()) {
1968 fMergingJDL->AddToInputDataCollection(Form("LF:$1/%s/Stage_%s.xml,nodownload",fMergeDirName.Data(),stageName.Data()), "Collection of files to be merged for current stage");
1969 fMergingJDL->SetOutputDirectory(Form("$1/%s/Stage_%s/#alien_counter_03i#",fMergeDirName.Data(),stageName.Data()), "Output directory");
1971 fMergingJDL->AddToInputDataCollection(Form("LF:$1/Stage_%s.xml,nodownload",stageName.Data()), "Collection of files to be merged for current stage");
1972 fMergingJDL->SetOutputDirectory(Form("$1/Stage_%s/#alien_counter_03i#",stageName.Data()), "Output directory");
1974 if (fProductionMode) {
1975 TIter next(fInputFiles);
1976 while ((os=next())) {
1977 fGridJDL->AddToInputDataCollection(Form("LF:%s,nodownload", os->GetName()), "Input xml collections");
1979 if (!fOutputToRunNo)
1980 fGridJDL->SetOutputDirectory(Form("%s/#alien_counter_04i#", fGridOutputDir.Data()));
1982 fGridJDL->SetOutputDirectory(fGridOutputDir);
1984 if (!fRunNumbers.Length() && !fRunRange[0]) {
1985 // One jdl with no parameters in case input data is specified by name.
1986 TIter next(fInputFiles);
1988 fGridJDL->AddToInputDataCollection(Form("LF:%s,nodownload", os->GetName()), "Input xml collections");
1989 if (!fOutputSingle.IsNull())
1990 fGridJDL->SetOutputDirectory(Form("#alienfulldir#/../%s",fOutputSingle.Data()), "Output directory");
1992 fGridJDL->SetOutputDirectory(Form("%s/#alien_counter_03i#", fGridOutputDir.Data()), "Output directory");
1993 // fMergingJDL->SetOutputDirectory(fGridOutputDir);
1996 // One jdl to be submitted with 2 input parameters: data collection name and output dir prefix
1997 fGridJDL->AddToInputDataCollection(Form("LF:%s/$1,nodownload", workdir.Data()), "Input xml collections");
1998 if (!fOutputSingle.IsNull()) {
1999 if (!fOutputToRunNo) fGridJDL->SetOutputDirectory(Form("#alienfulldir#/%s",fOutputSingle.Data()), "Output directory");
2000 else fGridJDL->SetOutputDirectory(Form("%s/$2",fGridOutputDir.Data()), "Output directory");
2002 fGridJDL->SetOutputDirectory(Form("%s/$2/#alien_counter_03i#", fGridOutputDir.Data()), "Output directory");
2007 // Generate the JDL as a string
2008 TString sjdl = fGridJDL->Generate();
2009 TString sjdl1 = fMergingJDL->Generate();
2011 if (!fMergeDirName.IsNull()) {
2012 fMergingJDL->SetOutputDirectory(Form("$1/%s",fMergeDirName.Data()), "Output directory");
2013 fMergingJDL->AddToInputSandbox(Form("LF:$1/%s/Stage_%s.xml",fMergeDirName.Data(),stageName.Data()));
2015 fMergingJDL->SetOutputDirectory("$1", "Output directory");
2016 fMergingJDL->AddToInputSandbox(Form("LF:$1/Stage_%s.xml",stageName.Data()));
2018 TString sjdl2 = fMergingJDL->Generate();
2019 Int_t index, index1;
2020 sjdl.ReplaceAll("\",\"", "\",\n \"");
2021 sjdl.ReplaceAll("(member", "\n (member");
2022 sjdl.ReplaceAll("\",\"VO_", "\",\n \"VO_");
2023 sjdl.ReplaceAll("{", "{\n ");
2024 sjdl.ReplaceAll("};", "\n};");
2025 sjdl.ReplaceAll("{\n \n", "{\n");
2026 sjdl.ReplaceAll("\n\n", "\n");
2027 sjdl.ReplaceAll("OutputDirectory", "OutputDir");
2028 sjdl1.ReplaceAll("\",\"", "\",\n \"");
2029 sjdl1.ReplaceAll("(member", "\n (member");
2030 sjdl1.ReplaceAll("\",\"VO_", "\",\n \"VO_");
2031 sjdl1.ReplaceAll("{", "{\n ");
2032 sjdl1.ReplaceAll("};", "\n};");
2033 sjdl1.ReplaceAll("{\n \n", "{\n");
2034 sjdl1.ReplaceAll("\n\n", "\n");
2035 sjdl1.ReplaceAll("OutputDirectory", "OutputDir");
2036 sjdl2.ReplaceAll("\",\"", "\",\n \"");
2037 sjdl2.ReplaceAll("(member", "\n (member");
2038 sjdl2.ReplaceAll("\",\"VO_", "\",\n \"VO_");
2039 sjdl2.ReplaceAll("{", "{\n ");
2040 sjdl2.ReplaceAll("};", "\n};");
2041 sjdl2.ReplaceAll("{\n \n", "{\n");
2042 sjdl2.ReplaceAll("\n\n", "\n");
2043 sjdl2.ReplaceAll("OutputDirectory", "OutputDir");
2044 sjdl += "JDLVariables = \n{\n \"Packages\",\n \"OutputDir\"\n};\n";
2045 sjdl.Prepend(Form("Jobtag = {\n \"comment:%s\"\n};\n", fJobTag.Data()));
2046 index = sjdl.Index("JDLVariables");
2047 if (index >= 0) sjdl.Insert(index, "\n# JDL variables\n");
2048 sjdl += "Workdirectorysize = {\"5000MB\"};";
2049 sjdl1 += "Workdirectorysize = {\"5000MB\"};";
2050 sjdl1 += "JDLVariables = \n{\n \"Packages\",\n \"OutputDir\"\n};\n";
2051 index = fJobTag.Index(":");
2052 if (index < 0) index = fJobTag.Length();
2053 TString jobTag = fJobTag;
2054 if (fProductionMode) jobTag.Insert(index,"_Stage$4");
2055 sjdl1.Prepend(Form("Jobtag = {\n \"comment:%s_Merging\"\n};\n", jobTag.Data()));
2056 if (fProductionMode) {
2057 sjdl1.Prepend("# Generated merging jdl (production mode) \
2058 \n# $1 = full alien path to output directory to be merged \
2059 \n# $2 = train number \
2060 \n# $3 = production (like LHC10b) \
2061 \n# $4 = merging stage \
2062 \n# Stage_<n>.xml made via: find <OutputDir> *Stage<n-1>/*root_archive.zip\n");
2063 sjdl2.Prepend(Form("Jobtag = {\n \"comment:%s_FinalMerging\"\n};\n", jobTag.Data()));
2064 sjdl2.Prepend("# Generated merging jdl \
2065 \n# $1 = full alien path to output directory to be merged \
2066 \n# $2 = train number \
2067 \n# $3 = production (like LHC10b) \
2068 \n# $4 = merging stage \
2069 \n# Stage_<n>.xml made via: find <OutputDir> *Stage<n-1>/*root_archive.zip\n");
2071 sjdl1.Prepend("# Generated merging jdl \
2072 \n# $1 = full alien path to output directory to be merged \
2073 \n# $2 = merging stage \
2074 \n# xml made via: find <OutputDir> *Stage<n-1>/*root_archive.zip\n");
2075 sjdl2.Prepend(Form("Jobtag = {\n \"comment:%s_FinalMerging\"\n};\n", jobTag.Data()));
2076 sjdl2.Prepend("# Generated merging jdl \
2077 \n# $1 = full alien path to output directory to be merged \
2078 \n# $2 = merging stage \
2079 \n# xml made via: find <OutputDir> *Stage<n-1>/*root_archive.zip\n");
2081 index = sjdl1.Index("JDLVariables");
2082 if (index >= 0) sjdl1.Insert(index, "\n# JDL variables\n");
2083 index = sjdl2.Index("JDLVariables");
2084 if (index >= 0) sjdl2.Insert(index, "\n# JDL variables\n");
2085 sjdl1 += "Workdirectorysize = {\"5000MB\"};";
2086 sjdl2 += "Workdirectorysize = {\"5000MB\"};";
2087 index = sjdl2.Index("Split =");
2089 index1 = sjdl2.Index("\n", index);
2090 sjdl2.Remove(index, index1-index+1);
2092 index = sjdl2.Index("SplitMaxInputFileNumber");
2094 index1 = sjdl2.Index("\n", index);
2095 sjdl2.Remove(index, index1-index+1);
2097 index = sjdl2.Index("InputDataCollection");
2099 index1 = sjdl2.Index(";", index);
2100 sjdl2.Remove(index, index1-index+1);
2102 index = sjdl2.Index("InputDataListFormat");
2104 index1 = sjdl2.Index("\n", index);
2105 sjdl2.Remove(index, index1-index+1);
2107 index = sjdl2.Index("InputDataList");
2109 index1 = sjdl2.Index("\n", index);
2110 sjdl2.Remove(index, index1-index+1);
2112 sjdl2.ReplaceAll("wn.xml", Form("Stage_%s.xml",stageName.Data()));
2113 // Write jdl to file
2115 out.open(fJDLName.Data(), ios::out);
2117 Error("WriteJDL", "Bad file name: %s", fJDLName.Data());
2120 out << sjdl << endl;
2122 TString mergeJDLName = fExecutable;
2123 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
2126 out1.open(mergeJDLName.Data(), ios::out);
2128 Error("WriteJDL", "Bad file name: %s", mergeJDLName.Data());
2131 out1 << sjdl1 << endl;
2134 TString finalJDL = mergeJDLName;
2135 finalJDL.ReplaceAll(".jdl", "_final.jdl");
2136 out2.open(finalJDL.Data(), ios::out);
2138 Error("WriteJDL", "Bad file name: %s", finalJDL.Data());
2141 out2 << sjdl2 << endl;
2145 // Copy jdl to grid workspace
2147 Info("WriteJDL", "\n##### You may want to review jdl:%s and analysis macro:%s before running in <submit> mode", fJDLName.Data(), fAnalysisMacro.Data());
2149 TString locjdl = Form("%s/%s", fGridOutputDir.Data(),fJDLName.Data());
2150 TString locjdl1 = Form("%s/%s", fGridOutputDir.Data(),mergeJDLName.Data());
2151 TString finalJDL = mergeJDLName;
2152 finalJDL.ReplaceAll(".jdl", "_final.jdl");
2153 TString locjdl2 = Form("%s/%s", fGridOutputDir.Data(),finalJDL.Data());
2154 if (fProductionMode) {
2155 locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
2156 locjdl1 = Form("%s/%s", workdir.Data(),mergeJDLName.Data());
2157 locjdl2 = Form("%s/%s", workdir.Data(),finalJDL.Data());
2159 if (FileExists(locjdl)) gGrid->Rm(locjdl);
2160 if (FileExists(locjdl1)) gGrid->Rm(locjdl1);
2161 if (FileExists(locjdl2)) gGrid->Rm(locjdl2);
2162 Info("WriteJDL", "\n##### Copying JDL file <%s> to your AliEn output directory", fJDLName.Data());
2163 // TFile::Cp(Form("file:%s",fJDLName.Data()), Form("alien://%s", locjdl.Data()));
2164 if (!copyLocal2Alien("WriteJDL",fJDLName.Data(),locjdl.Data()))
2165 Fatal("","Terminating");
2167 Info("WriteJDL", "\n##### Copying merging JDL files <%s> to your AliEn output directory", mergeJDLName.Data());
2168 // TFile::Cp(Form("file:%s",mergeJDLName.Data()), Form("alien://%s", locjdl1.Data()));
2169 // TFile::Cp(Form("file:%s",finalJDL.Data()), Form("alien://%s", locjdl2.Data()));
2170 if (!copyLocal2Alien("WriteJDL",mergeJDLName.Data(),locjdl1.Data()))
2171 Fatal("","Terminating");
2172 if (!copyLocal2Alien("WriteJDL",finalJDL.Data(),locjdl2.Data()))
2173 Fatal("","Terminating");
2179 //______________________________________________________________________________
2180 Bool_t AliAnalysisAlien::FileExists(const char *lfn)
2182 // Returns true if file exists.
2183 if (!gGrid) return kFALSE;
2185 slfn.ReplaceAll("alien://","");
2186 TGridResult *res = gGrid->Ls(slfn);
2187 if (!res) return kFALSE;
2188 TMap *map = dynamic_cast<TMap*>(res->At(0));
2193 TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("name"));
2194 if (!objs || !objs->GetString().Length()) {
2202 //______________________________________________________________________________
2203 Bool_t AliAnalysisAlien::DirectoryExists(const char *dirname)
2205 // Returns true if directory exists. Can be also a path.
2206 // Since there is not API in TAlien, we use the Cd trick:
2207 if (!gGrid) return kFALSE;
2208 // Backup current path
2209 TString cpath = gGrid->Pwd();
2210 TString command = "cd ";
2211 TString sdir(dirname);
2212 sdir.ReplaceAll("alien://", "");
2214 TGridResult *res = gGrid->Command(command);
2219 TMap *map = (TMap*)res->At(0);
2225 TString sval = map->GetValue("__result__")->GetName();
2226 Bool_t retval = (Bool_t)sval.Atoi();
2232 //______________________________________________________________________________
2233 void AliAnalysisAlien::CheckDataType(const char *lfn, Bool_t &isCollection, Bool_t &isXml, Bool_t &useTags)
2235 // Check input data type.
2236 isCollection = kFALSE;
2240 Error("CheckDataType", "No connection to grid");
2243 isCollection = IsCollection(lfn);
2244 TString msg = "\n##### file: ";
2247 msg += " type: raw_collection;";
2248 // special treatment for collections
2250 // check for tag files in the collection
2251 TGridResult *res = gGrid->Command(Form("listFilesFromCollection -z -v %s",lfn), kFALSE);
2253 msg += " using_tags: No (unknown)";
2254 Info("CheckDataType", "%s", msg.Data());
2257 const char* typeStr = res->GetKey(0, "origLFN");
2258 if (!typeStr || !strlen(typeStr)) {
2259 msg += " using_tags: No (unknown)";
2260 Info("CheckDataType", "%s", msg.Data());
2263 TString file = typeStr;
2264 useTags = file.Contains(".tag");
2265 if (useTags) msg += " using_tags: Yes";
2266 else msg += " using_tags: No";
2267 Info("CheckDataType", "%s", msg.Data());
2272 isXml = slfn.Contains(".xml");
2274 // Open xml collection and check if there are tag files inside
2275 msg += " type: xml_collection;";
2276 TGridCollection *coll = (TGridCollection*)gROOT->ProcessLine(Form("TAlienCollection::Open(\"alien://%s\",1);",lfn));
2278 msg += " using_tags: No (unknown)";
2279 Info("CheckDataType", "%s", msg.Data());
2282 TMap *map = coll->Next();
2284 msg += " using_tags: No (unknown)";
2285 Info("CheckDataType", "%s", msg.Data());
2288 map = (TMap*)map->GetValue("");
2290 if (map && map->GetValue("name")) file = map->GetValue("name")->GetName();
2291 useTags = file.Contains(".tag");
2293 if (useTags) msg += " using_tags: Yes";
2294 else msg += " using_tags: No";
2295 Info("CheckDataType", "%s", msg.Data());
2298 useTags = slfn.Contains(".tag");
2299 if (slfn.Contains(".root")) msg += " type: root file;";
2300 else msg += " type: unknown file;";
2301 if (useTags) msg += " using_tags: Yes";
2302 else msg += " using_tags: No";
2303 Info("CheckDataType", "%s", msg.Data());
2306 //______________________________________________________________________________
2307 void AliAnalysisAlien::EnablePackage(const char *package)
2309 // Enables a par file supposed to exist in the current directory.
2310 TString pkg(package);
2311 pkg.ReplaceAll(".par", "");
2313 if (gSystem->AccessPathName(pkg)) {
2314 Fatal("EnablePackage", "Package %s not found", pkg.Data());
2317 if (!TObject::TestBit(AliAnalysisGrid::kUsePars))
2318 Info("EnablePackage", "AliEn plugin will use .par packages");
2319 TObject::SetBit(AliAnalysisGrid::kUsePars, kTRUE);
2321 fPackages = new TObjArray();
2322 fPackages->SetOwner();
2324 fPackages->Add(new TObjString(pkg));
2327 //______________________________________________________________________________
2328 TChain *AliAnalysisAlien::GetChainForTestMode(const char *treeName) const
2330 // Make a tree from files having the location specified in fFileForTestMode.
2331 // Inspired from JF's CreateESDChain.
2332 if (fFileForTestMode.IsNull()) {
2333 Error("GetChainForTestMode", "For proof test mode please use SetFileForTestMode() pointing to a file that contains data file locations.");
2336 if (gSystem->AccessPathName(fFileForTestMode)) {
2337 Error("GetChainForTestMode", "File not found: %s", fFileForTestMode.Data());
2342 in.open(fFileForTestMode);
2344 // Read the input list of files and add them to the chain
2346 TString streeName(treeName);
2347 if (IsUseMCchain()) streeName = "TE";
2348 TChain *chain = new TChain(streeName);
2349 TList *friends = new TList();
2350 TChain *cfriend = 0;
2351 if (!fFriendChainName.IsNull()) {
2352 TObjArray *list = fFriendChainName.Tokenize(" ");
2355 while((str=(TObjString*)next())) {
2356 cfriend = new TChain(streeName, str->GetName());
2357 friends->Add(cfriend);
2358 chain->AddFriend(cfriend);
2363 TIter nextfriend(friends);
2367 if (line.IsNull() || line.BeginsWith("#")) continue;
2368 if (count++ == fNtestFiles) break;
2369 TString esdFile(line);
2370 TFile *file = TFile::Open(esdFile);
2371 if (file && !file->IsZombie()) {
2372 chain->Add(esdFile);
2374 if (!fFriendChainName.IsNull()) {
2375 if (esdFile.Index("#") > -1)
2376 esdFile.Remove(esdFile.Index("#"));
2377 bpath = gSystem->DirName(esdFile);
2381 while ((cfriend=(TChain*)nextfriend())) {
2383 fileFriend += cfriend->GetTitle();
2384 file = TFile::Open(fileFriend);
2385 if (file && !file->IsZombie()) {
2387 cfriend->Add(fileFriend);
2389 Fatal("GetChainForTestMode", "Cannot open friend file: %s", fileFriend.Data());
2395 Error("GetChainforTestMode", "Skipping un-openable file: %s", esdFile.Data());
2399 if (!chain->GetListOfFiles()->GetEntries()) {
2400 Error("GetChainForTestMode", "No file from %s could be opened", fFileForTestMode.Data());
2409 //______________________________________________________________________________
2410 const char *AliAnalysisAlien::GetJobStatus(Int_t jobidstart, Int_t lastid, Int_t &nrunning, Int_t &nwaiting, Int_t &nerror, Int_t &ndone)
2412 // Get job status for all jobs with jobid>jobidstart.
2413 static char mstatus[20];
2419 TGridJobStatusList *list = gGrid->Ps("");
2420 if (!list) return mstatus;
2421 Int_t nentries = list->GetSize();
2422 TGridJobStatus *status;
2424 for (Int_t ijob=0; ijob<nentries; ijob++) {
2425 status = (TGridJobStatus *)list->At(ijob);
2426 pid = gROOT->ProcessLine(Form("atoi(((TAlienJobStatus*)%p)->GetKey(\"queueId\"));", status));
2427 if (pid<jobidstart) continue;
2428 if (pid == lastid) {
2429 gROOT->ProcessLine(Form("sprintf((char*)%p,((TAlienJobStatus*)%p)->GetKey(\"status\"));",mstatus, status));
2431 switch (status->GetStatus()) {
2432 case TGridJobStatus::kWAITING:
2434 case TGridJobStatus::kRUNNING:
2436 case TGridJobStatus::kABORTED:
2437 case TGridJobStatus::kFAIL:
2438 case TGridJobStatus::kUNKNOWN:
2440 case TGridJobStatus::kDONE:
2449 //______________________________________________________________________________
2450 Bool_t AliAnalysisAlien::IsCollection(const char *lfn) const
2452 // Returns true if file is a collection. Functionality duplicated from
2453 // TAlien::Type() because we don't want to directly depend on TAlien.
2455 Error("IsCollection", "No connection to grid");
2458 TGridResult *res = gGrid->Command(Form("type -z %s",lfn),kFALSE);
2459 if (!res) return kFALSE;
2460 const char* typeStr = res->GetKey(0, "type");
2461 if (!typeStr || !strlen(typeStr)) return kFALSE;
2462 if (!strcmp(typeStr, "collection")) return kTRUE;
2467 //______________________________________________________________________________
2468 Bool_t AliAnalysisAlien::IsSingleOutput() const
2470 // Check if single-ouput option is on.
2471 return (!fOutputSingle.IsNull());
2474 //______________________________________________________________________________
2475 Long64_t AliAnalysisAlien::RunMacroAndExtractLibs(const char* macro, const char *args, TString &libs)
2477 // Tries to run the specified macro and return the libraries that it loads.
2479 if (strlen(macro)) expname = gSystem->ExpandPathName(macro);
2480 if (expname.IsNull() || gSystem->AccessPathName(expname)) {
2481 ::Error("RunMacroAndExtractLibs","Cannot find macro %s in current directory", macro);
2484 TString oldlibs = gSystem->GetLibraries();
2487 Long64_t retval = m.Exec(args, &error);
2488 if (error != TInterpreter::kNoError)
2490 ::Error("RunMacroAndExtractLibs", "Macro interpretation %s failed", macro);
2493 libs = gSystem->GetLibraries();
2494 libs.ReplaceAll(oldlibs, "");
2495 libs.Strip(TString::kLeading);
2496 TObjArray *libTokens = libs.Tokenize(" ");
2498 for (Int_t i=0; i<libTokens->GetEntries(); i++) {
2499 if (!libs.IsNull()) libs += " ";
2500 libs += gSystem->BaseName(libTokens->At(i)->GetName());
2506 //______________________________________________________________________________
2507 void AliAnalysisAlien::Print(Option_t *) const
2509 // Print current plugin settings.
2510 printf("### AliEn analysis plugin current settings ###\n");
2511 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
2512 if (mgr && mgr->IsProofMode()) {
2513 TString proofType = "= PLUGIN IN PROOF MODE ON CLUSTER:_________________";
2514 if (TestBit(AliAnalysisGrid::kTest))
2515 proofType = "= PLUGIN IN PROOF LITE MODE ON CLUSTER:____________";
2516 printf("%s %s\n", proofType.Data(), fProofCluster.Data());
2517 if (!fProofDataSet.IsNull())
2518 printf("= Requested data set:___________________________ %s\n", fProofDataSet.Data());
2520 printf("= Soft reset signal will be send to master______ CHANGE BEHAVIOR AFTER COMPLETION\n");
2522 printf("= Hard reset signal will be send to master______ CHANGE BEHAVIOR AFTER COMPLETION\n");
2523 if (!fROOTVersion.IsNull())
2524 printf("= ROOT version requested________________________ %s\n", fROOTVersion.Data());
2526 printf("= ROOT version requested________________________ default\n");
2527 printf("= AliRoot version requested_____________________ %s\n", fAliROOTVersion.Data());
2528 if (!fAliRootMode.IsNull())
2529 printf("= Requested AliRoot mode________________________ %s\n", fAliRootMode.Data());
2531 printf("= Number of PROOF workers limited to____________ %d\n", fNproofWorkers);
2532 if (fNproofWorkersPerSlave)
2533 printf("= Maximum number of workers per slave___________ %d\n", fNproofWorkersPerSlave);
2534 if (TestSpecialBit(kClearPackages))
2535 printf("= ClearPackages requested...\n");
2536 if (fIncludePath.Data())
2537 printf("= Include path for runtime task compilation: ___ %s\n", fIncludePath.Data());
2538 printf("= Additional libs to be loaded or souces to be compiled runtime: <%s>\n",fAdditionalLibs.Data());
2539 if (fPackages && fPackages->GetEntries()) {
2540 TIter next(fPackages);
2543 while ((obj=next())) list += obj->GetName();
2544 printf("= Par files to be used: ________________________ %s\n", list.Data());
2546 if (TestSpecialBit(kProofConnectGrid))
2547 printf("= Requested PROOF connection to grid\n");
2550 printf("= OverwriteMode:________________________________ %d\n", fOverwriteMode);
2551 if (fOverwriteMode) {
2552 printf("***** NOTE: Overwrite mode will overwrite the input generated datasets and partial results from previous analysis. \
2553 \n***** To disable, use: plugin->SetOverwriteMode(kFALSE);\n");
2555 printf("= Copy files to grid: __________________________ %s\n", (IsUseCopy())?"YES":"NO");
2556 printf("= Check if files can be copied to grid: ________ %s\n", (IsCheckCopy())?"YES":"NO:Print");
2557 printf("= Production mode:______________________________ %d\n", fProductionMode);
2558 printf("= Version of API requested: ____________________ %s\n", fAPIVersion.Data());
2559 printf("= Version of ROOT requested: ___________________ %s\n", fROOTVersion.Data());
2560 printf("= Version of AliRoot requested: ________________ %s\n", fAliROOTVersion.Data());
2562 printf("= User running the plugin: _____________________ %s\n", fUser.Data());
2563 printf("= Grid workdir relative to user $HOME: _________ %s\n", fGridWorkingDir.Data());
2564 printf("= Grid output directory relative to workdir: ___ %s\n", fGridOutputDir.Data());
2565 TString basedatadir = fGridDataDir;
2566 TString pattern = fDataPattern;
2568 Int_t ind = pattern.Index(" ");
2570 basedatadir += "/%run%/";
2571 basedatadir += pattern(0, ind);
2572 pattern = pattern(ind+1, pattern.Length());
2574 printf("= Data base directory path requested: __________ %s\n", basedatadir.Data());
2575 printf("= Data search pattern: _________________________ %s\n", pattern.Data());
2576 printf("= Input data format: ___________________________ %s\n", fInputFormat.Data());
2577 if (fRunNumbers.Length())
2578 printf("= Run numbers to be processed: _________________ %s\n", fRunNumbers.Data());
2580 printf("= Run range to be processed: ___________________ %d-%d\n", fRunRange[0], fRunRange[1]);
2581 if (!fRunRange[0] && !fRunNumbers.Length()) {
2582 TIter next(fInputFiles);
2585 while ((obj=next())) list += obj->GetName();
2586 printf("= Input files to be processed: _________________ %s\n", list.Data());
2588 if (TestBit(AliAnalysisGrid::kTest))
2589 printf("= Number of input files used in test mode: _____ %d\n", fNtestFiles);
2590 printf("= List of output files to be registered: _______ %s\n", fOutputFiles.Data());
2591 printf("= List of outputs going to be archived: ________ %s\n", fOutputArchive.Data());
2592 printf("= List of outputs that should not be merged: ___ %s\n", fMergeExcludes.Data());
2593 printf("= List of outputs that should not be registered: %s\n", fRegisterExcludes.Data());
2594 printf("= List of outputs produced during Terminate: ___ %s\n", fTerminateFiles.Data());
2595 printf("=====================================================================\n");
2596 printf("= Job price: ___________________________________ %d\n", fPrice);
2597 printf("= Time to live (TTL): __________________________ %d\n", fTTL);
2598 printf("= Max files per subjob: ________________________ %d\n", fSplitMaxInputFileNumber);
2599 if (fMaxInitFailed>0)
2600 printf("= Max number of subjob fails to kill: __________ %d\n", fMaxInitFailed);
2601 if (fMasterResubmitThreshold>0)
2602 printf("= Resubmit master job if failed subjobs >_______ %d\n", fMasterResubmitThreshold);
2603 printf("= Number of replicas for the output files_______ %d\n", fNreplicas);
2604 if (fNrunsPerMaster>0)
2605 printf("= Number of runs per master job: _______________ %d\n", fNrunsPerMaster);
2606 printf("= Number of files in one chunk to be merged: ___ %d\n", fMaxMergeFiles);
2607 printf("= Name of the generated execution script: ______ %s\n", fExecutable.Data());
2608 printf("= Executable command: __________________________ %s\n", fExecutableCommand.Data());
2609 if (fArguments.Length())
2610 printf("= Arguments for the execution script: __________ %s\n",fArguments.Data());
2611 if (fExecutableArgs.Length())
2612 printf("= Arguments after macro name in executable______ %s\n",fExecutableArgs.Data());
2613 printf("= Name of the generated analysis macro: ________ %s\n",fAnalysisMacro.Data());
2614 printf("= User analysis files to be deployed: __________ %s\n",fAnalysisSource.Data());
2615 printf("= Additional libs to be loaded or souces to be compiled runtime: <%s>\n",fAdditionalLibs.Data());
2616 printf("= Master jobs split mode: ______________________ %s\n",fSplitMode.Data());
2618 printf("= Custom name for the dataset to be created: ___ %s\n", fDatasetName.Data());
2619 printf("= Name of the generated JDL: ___________________ %s\n", fJDLName.Data());
2620 if (fIncludePath.Data())
2621 printf("= Include path for runtime task compilation: ___ %s\n", fIncludePath.Data());
2622 if (fCloseSE.Length())
2623 printf("= Force job outputs to storage element: ________ %s\n", fCloseSE.Data());
2624 if (fFriendChainName.Length())
2625 printf("= Open friend chain file on worker: ____________ %s\n", fFriendChainName.Data());
2626 if (fPackages && fPackages->GetEntries()) {
2627 TIter next(fPackages);
2630 while ((obj=next())) list += obj->GetName();
2631 printf("= Par files to be used: ________________________ %s\n", list.Data());
2635 //______________________________________________________________________________
2636 void AliAnalysisAlien::SetDefaults()
2638 // Set default values for everything. What cannot be filled will be left empty.
2639 if (fGridJDL) delete fGridJDL;
2640 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
2641 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
2644 fSplitMaxInputFileNumber = 100;
2646 fMasterResubmitThreshold = 0;
2652 fNrunsPerMaster = 1;
2653 fMaxMergeFiles = 100;
2655 fExecutable = "analysis.sh";
2656 fExecutableCommand = "root -b -q -x";
2658 fExecutableArgs = "";
2659 fAnalysisMacro = "myAnalysis.C";
2660 fAnalysisSource = "";
2661 fAdditionalLibs = "";
2665 fAliROOTVersion = "";
2666 fUser = ""; // Your alien user name
2667 fGridWorkingDir = "";
2668 fGridDataDir = ""; // Can be like: /alice/sim/PDC_08a/LHC08c9/
2669 fDataPattern = "*AliESDs.root"; // Can be like: *AliESDs.root, */pass1/*AliESDs.root, ...
2670 fFriendChainName = "";
2671 fGridOutputDir = "output";
2672 fOutputArchive = "log_archive.zip:std*@disk=1 root_archive.zip:*.root@disk=2";
2673 fOutputFiles = ""; // Like "AliAODs.root histos.root"
2674 fInputFormat = "xml-single";
2675 fJDLName = "analysis.jdl";
2676 fJobTag = "Automatically generated analysis JDL";
2677 fMergeExcludes = "";
2680 SetCheckCopy(kTRUE);
2681 SetDefaultOutputs(kTRUE);
2685 //______________________________________________________________________________
2686 void AliAnalysisAlien::SetFriendChainName(const char *name, const char *libnames)
2688 // Set file name for the chain of friends and optionally additional libs to be loaded.
2689 // Libs should be separated by blancs.
2690 fFriendChainName = name;
2691 fFriendChainName.ReplaceAll(",", " ");
2692 fFriendChainName.Strip();
2693 fFriendChainName.ReplaceAll(" ", " ");
2695 fFriendLibs = libnames;
2696 if (fFriendLibs.Length()) {
2697 if(!fFriendLibs.Contains(".so") &&
2698 !fFriendLibs.Contains(".dylib"))
2699 Fatal("SetFriendChainName()", "You should provide explicit library names (with extension)");
2700 fFriendLibs.ReplaceAll(",", " ");
2701 fFriendLibs.Strip();
2702 fFriendLibs.ReplaceAll(" ", " ");
2706 //______________________________________________________________________________
2707 void AliAnalysisAlien::SetRootVersionForProof(const char *version)
2709 // Obsolete method. Use SetROOTVersion instead
2710 Warning("SetRootVersionForProof", "Obsolete. Use SetROOTVersion instead");
2711 if (fROOTVersion.IsNull()) SetROOTVersion(version);
2712 else Error("SetRootVersionForProof", "ROOT version already set to %s", fROOTVersion.Data());
2715 //______________________________________________________________________________
2716 Bool_t AliAnalysisAlien::CheckMergedFiles(const char *filename, const char *aliendir, Int_t nperchunk, const char *jdl)
2718 // Checks current merge stage, makes xml for the next stage, counts number of files, submits next stage.
2719 // First check if the result is already in the output directory.
2720 if (FileExists(Form("%s/%s",aliendir,filename))) {
2721 printf("Final merged results found. Not merging again.\n");
2724 // Now check the last stage done.
2727 if (!FileExists(Form("%s/Stage_%d.xml",aliendir, stage+1))) break;
2730 // Next stage of merging
2732 TString pattern = "*root_archive.zip";
2733 if (stage>1) pattern = Form("Stage_%d/*root_archive.zip", stage-1);
2734 TGridResult *res = gGrid->Command(Form("find -x Stage_%d %s %s", stage, aliendir, pattern.Data()));
2735 if (res) delete res;
2736 // Write standard output to file
2737 gROOT->ProcessLine(Form("gGrid->Stdout(); > %s", Form("Stage_%d.xml",stage)));
2738 // Count the number of files inside
2740 ifile.open(Form("Stage_%d.xml",stage));
2741 if (!ifile.good()) {
2742 ::Error("CheckMergedFiles", "Could not redirect result of the find command to file %s", Form("Stage_%d.xml",stage));
2747 while (!ifile.eof()) {
2749 if (line.Contains("/event")) nfiles++;
2753 ::Error("CheckMergedFiles", "Cannot start Stage_%d merging since Stage_%d did not produced yet output", stage, stage-1);
2756 printf("=== Stage_%d produced %d files\n", stage-1, nfiles);
2758 // Copy the file in the output directory
2759 printf("===> Copying collection %s in the output directory %s\n", Form("Stage_%d.xml",stage), aliendir);
2760 // TFile::Cp(Form("Stage_%d.xml",stage), Form("alien://%s/Stage_%d.xml",aliendir,stage));
2761 if (!copyLocal2Alien("CheckMergedFiles", Form("Stage_%d.xml",stage),
2762 Form("%s/Stage_%d.xml",aliendir,stage))) Fatal("","Terminating");
2763 // Check if this is the last stage to be done.
2764 Bool_t laststage = (nfiles<nperchunk);
2765 if (fMaxMergeStages && stage>=fMaxMergeStages) laststage = kTRUE;
2768 printf("### Submiting final merging stage %d\n", stage);
2769 TString finalJDL = jdl;
2770 finalJDL.ReplaceAll(".jdl", "_final.jdl");
2771 TString query = Form("submit %s %s %d", finalJDL.Data(), aliendir, stage);
2772 jobId = SubmitSingleJob(query);
2774 printf("### Submiting merging stage %d\n", stage);
2775 TString query = Form("submit %s %s %d", jdl, aliendir, stage);
2776 jobId = SubmitSingleJob(query);
2778 if (!jobId) return kFALSE;
2780 if (!fGridJobIDs.IsNull()) fGridJobIDs.Append(" ");
2781 fGridJobIDs.Append(Form("%d", jobId));
2782 if (!fGridStages.IsNull()) fGridStages.Append(" ");
2783 fGridStages.Append(Form("%s_merge_stage%d",
2784 laststage ? "final" : "partial", stage));
2789 //______________________________________________________________________________
2790 AliAnalysisManager *AliAnalysisAlien::LoadAnalysisManager(const char *fname)
2792 // Loat the analysis manager from a file.
2793 TFile *file = TFile::Open(fname);
2795 ::Error("LoadAnalysisManager", "Cannot open file %s", fname);
2798 TIter nextkey(file->GetListOfKeys());
2799 AliAnalysisManager *mgr = 0;
2801 while ((key=(TKey*)nextkey())) {
2802 if (!strcmp(key->GetClassName(), "AliAnalysisManager"))
2803 mgr = (AliAnalysisManager*)file->Get(key->GetName());
2806 ::Error("LoadAnalysisManager", "No analysis manager found in file %s", fname);
2810 //______________________________________________________________________________
2811 Int_t AliAnalysisAlien::SubmitSingleJob(const char *query)
2813 // Submits a single job corresponding to the query and returns job id. If 0 submission failed.
2814 if (!gGrid) return 0;
2815 printf("=> %s ------> ",query);
2816 TGridResult *res = gGrid->Command(query);
2818 TString jobId = res->GetKey(0,"jobId");
2820 if (jobId.IsNull()) {
2821 printf("submission failed. Reason:\n");
2824 ::Error("SubmitSingleJob", "Your query %s could not be submitted", query);
2827 Int_t ijobId = jobId.Atoi();
2828 printf(" Job id: '%s' (%d)\n", jobId.Data(), ijobId);
2832 //______________________________________________________________________________
2833 Bool_t AliAnalysisAlien::MergeInfo(const char *output, const char *collection)
2835 // Merges a collection of output files using concatenation.
2836 TString scoll(collection);
2837 if (!scoll.Contains(".xml")) return kFALSE;
2838 TGridCollection *coll = (TGridCollection*)gROOT->ProcessLine(Form("TAlienCollection::Open(\"%s\");", collection));
2840 ::Error("MergeInfo", "Input XML %s collection empty.", collection);
2843 // Iterate grid collection
2845 Bool_t merged = kFALSE;
2847 while (coll->Next()) {
2848 TString fname = gSystem->DirName(coll->GetTURL());
2851 outtmp = Form("%d_%s", ifile, output);
2852 if (!TFile::Cp(fname, outtmp)) {
2853 ::Error("MergeInfo", "Could not copy %s", fname.Data());
2858 gSystem->Exec(Form("cp %s lastmerged", outtmp.Data()));
2861 gSystem->Exec(Form("cat lastmerged %s > tempmerged", outtmp.Data()));
2862 gSystem->Exec("cp tempmerged lastmerged");
2865 gSystem->Exec(Form("cp lastmerged %s", output));
2866 gSystem->Exec(Form("rm tempmerged lastmerged *_%s", output));
2872 //______________________________________________________________________________
2873 Bool_t AliAnalysisAlien::MergeOutput(const char *output, const char *basedir, Int_t nmaxmerge, Int_t stage)
2875 // Merge given output files from basedir. Basedir can be an alien output directory
2876 // but also an xml file with root_archive.zip locations. The file merger will merge nmaxmerge
2877 // files in a group (ignored for xml input). Merging can be done in stages:
2878 // stage=0 : will merge all existing files in a single stage, supporting resume if run locally
2879 // stage=1 : works with an xml of all root_archive.zip in the output directory
2880 // stage>1 : works with an xml of all root_archive.zip in the Stage_<n-1> directory
2881 TString outputFile = output;
2883 TString outputChunk;
2884 TString previousChunk = "";
2885 TObjArray *listoffiles = new TObjArray();
2886 // listoffiles->SetOwner();
2887 Int_t countChunk = 0;
2888 Int_t countZero = nmaxmerge;
2889 Bool_t merged = kTRUE;
2890 Bool_t isGrid = kTRUE;
2891 Int_t index = outputFile.Index("@");
2892 if (index > 0) outputFile.Remove(index);
2893 TString inputFile = outputFile;
2894 TString sbasedir = basedir;
2895 if (sbasedir.Contains(".xml")) {
2896 // Merge files pointed by the xml - ignore nmaxmerge and set ichunk to 0
2897 nmaxmerge = 9999999;
2898 TGridCollection *coll = (TGridCollection*)gROOT->ProcessLine(Form("TAlienCollection::Open(\"%s\");", basedir));
2900 ::Error("MergeOutput", "Input XML collection empty.");
2903 // Iterate grid collection
2904 while (coll->Next()) {
2905 TString fname = gSystem->DirName(coll->GetTURL());
2908 listoffiles->Add(new TNamed(fname.Data(),""));
2910 } else if (sbasedir.Contains(".txt")) {
2911 // The file having the .txt extension is expected to contain a list of
2912 // folders where the output files will be looked. For alien folders,
2913 // the full folder LFN is expected (starting with alien://)
2914 // Assume lfn's on each line
2919 ::Error("MergeOutput", "File %s cannot be opened. Merging stopped." ,sbasedir.Data());
2925 if (line.IsNull() || line.BeginsWith("#")) continue;
2927 if (!line.Contains("alien:")) isGrid = kFALSE;
2931 listoffiles->Add(new TNamed(line.Data(),""));
2935 ::Error("MergeOutput","Input file %s contains no files to be merged\n", sbasedir.Data());
2940 command = Form("find %s/ *%s", basedir, inputFile.Data());
2941 printf("command: %s\n", command.Data());
2942 TGridResult *res = gGrid->Command(command);
2944 ::Error("MergeOutput","No result for the find command\n");
2950 while ((map=(TMap*)nextmap())) {
2951 TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("turl"));
2952 if (!objs || !objs->GetString().Length()) {
2953 // Nothing found - skip this output
2958 listoffiles->Add(new TNamed(objs->GetName(),""));
2962 if (!listoffiles->GetEntries()) {
2963 ::Error("MergeOutput","No result for the find command\n");
2968 TFileMerger *fm = 0;
2969 TIter next0(listoffiles);
2970 TObjArray *listoffilestmp = new TObjArray();
2971 listoffilestmp->SetOwner();
2974 // Keep only the files at upper level
2975 Int_t countChar = 0;
2976 while ((nextfile=next0())) {
2977 snextfile = nextfile->GetName();
2978 Int_t crtCount = snextfile.CountChar('/');
2979 if (nextfile == listoffiles->First()) countChar = crtCount;
2980 if (crtCount < countChar) countChar = crtCount;
2983 while ((nextfile=next0())) {
2984 snextfile = nextfile->GetName();
2985 Int_t crtCount = snextfile.CountChar('/');
2986 if (crtCount > countChar) {
2990 listoffilestmp->Add(nextfile);
2993 listoffiles = listoffilestmp; // Now contains 'good' files
2994 listoffiles->Print();
2995 TIter next(listoffiles);
2996 // Check if there is a merge operation to resume. Works only for stage 0 or 1.
2997 outputChunk = outputFile;
2998 outputChunk.ReplaceAll(".root", "_*.root");
2999 // Check for existent temporary merge files
3000 // Check overwrite mode and remove previous partial results if needed
3001 // Preserve old merging functionality for stage 0.
3003 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
3005 // Skip as many input files as in a chunk
3006 for (Int_t counter=0; counter<nmaxmerge; counter++) {
3009 ::Error("MergeOutput", "Mismatch found. Please remove partial merged files from local dir.");
3013 snextfile = nextfile->GetName();
3015 outputChunk = outputFile;
3016 outputChunk.ReplaceAll(".root", Form("_%04d.root", countChunk));
3018 if (gSystem->AccessPathName(outputChunk)) continue;
3019 // Merged file with chunks up to <countChunk> found
3020 ::Info("MergeOutput", "Resume merging of <%s> from <%s>\n", outputFile.Data(), outputChunk.Data());
3021 previousChunk = outputChunk;
3025 countZero = nmaxmerge;
3027 while ((nextfile=next())) {
3028 snextfile = nextfile->GetName();
3029 // Loop 'find' results and get next LFN
3030 if (countZero == nmaxmerge) {
3031 // First file in chunk - create file merger and add previous chunk if any.
3032 fm = new TFileMerger(isGrid);
3033 fm->SetFastMethod(kTRUE);
3034 if (previousChunk.Length()) fm->AddFile(previousChunk.Data());
3035 outputChunk = outputFile;
3036 outputChunk.ReplaceAll(".root", Form("_%04d.root", countChunk));
3038 // If last file found, put merged results in the output file
3039 if (nextfile == listoffiles->Last()) outputChunk = outputFile;
3040 // Add file to be merged and decrement chunk counter.
3041 fm->AddFile(snextfile);
3043 if (countZero==0 || nextfile == listoffiles->Last()) {
3044 if (!fm->GetMergeList() || !fm->GetMergeList()->GetSize()) {
3045 // Nothing found - skip this output
3046 ::Warning("MergeOutput", "No <%s> files found.", inputFile.Data());
3050 fm->OutputFile(outputChunk);
3051 // Merge the outputs, then go to next chunk
3053 ::Error("MergeOutput", "Could not merge all <%s> files", outputFile.Data());
3057 ::Info("MergeOutputs", "\n##### Merged %d output files to <%s>", fm->GetMergeList()->GetSize(), outputChunk.Data());
3058 gSystem->Unlink(previousChunk);
3060 if (nextfile == listoffiles->Last()) break;
3062 countZero = nmaxmerge;
3063 previousChunk = outputChunk;
3070 // Merging stage different than 0.
3071 // Move to the begining of the requested chunk.
3072 fm = new TFileMerger(isGrid);
3073 fm->SetFastMethod(kTRUE);
3074 while ((nextfile=next())) fm->AddFile(nextfile->GetName());
3076 if (!fm->GetMergeList() || !fm->GetMergeList()->GetSize()) {
3077 // Nothing found - skip this output
3078 ::Warning("MergeOutput", "No <%s> files found.", inputFile.Data());
3082 fm->OutputFile(outputFile);
3083 // Merge the outputs
3085 ::Error("MergeOutput", "Could not merge all <%s> files", outputFile.Data());
3089 ::Info("MergeOutput", "\n##### Merged %d output files to <%s>", fm->GetMergeList()->GetSize(), outputFile.Data());
3095 //______________________________________________________________________________
3096 Bool_t AliAnalysisAlien::MergeOutputs()
3098 // Merge analysis outputs existing in the AliEn space.
3099 if (TestBit(AliAnalysisGrid::kTest)) return kTRUE;
3100 if (TestBit(AliAnalysisGrid::kOffline)) return kFALSE;
3102 Error("MergeOutputs", "Cannot merge outputs without grid connection. Terminate will NOT be executed");
3106 if (!TestBit(AliAnalysisGrid::kMerge)) {
3107 Info("MergeOutputs", "### Re-run with <MergeViaJDL> option in terminate mode of the plugin to submit merging jobs ###");
3110 if (fProductionMode) {
3111 Info("MergeOutputs", "### Merging will be submitted by LPM manager... ###");
3114 Info("MergeOutputs", "Submitting merging JDL");
3115 if (!SubmitMerging()) return kFALSE;
3116 Info("MergeOutputs", "### Re-run with <MergeViaJDL> off to collect results after merging jobs are done ###");
3117 Info("MergeOutputs", "### The Terminate() method is executed by the merging jobs");
3120 // Get the output path
3121 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
3122 if (!DirectoryExists(fGridOutputDir)) {
3123 Error("MergeOutputs", "Grid output directory %s not found. Terminate() will NOT be executed", fGridOutputDir.Data());
3126 if (!fOutputFiles.Length()) {
3127 Error("MergeOutputs", "No output file names defined. Are you running the right AliAnalysisAlien configuration ?");
3130 // Check if fast read option was requested
3131 Info("MergeOutputs", "Started local merging of output files from: alien://%s \
3132 \n======= overwrite mode = %d", fGridOutputDir.Data(), (Int_t)fOverwriteMode);
3133 if (fFastReadOption) {
3134 Warning("MergeOutputs", "You requested FastRead option. Using xrootd flags to reduce timeouts. This may skip some files that could be accessed ! \
3135 \n+++ NOTE: To disable this option, use: plugin->SetFastReadOption(kFALSE)");
3136 gEnv->SetValue("XNet.ConnectTimeout",50);
3137 gEnv->SetValue("XNet.RequestTimeout",50);
3138 gEnv->SetValue("XNet.MaxRedirectCount",2);
3139 gEnv->SetValue("XNet.ReconnectTimeout",50);
3140 gEnv->SetValue("XNet.FirstConnectMaxCnt",1);
3142 // Make sure we change the temporary directory
3143 gSystem->Setenv("TMPDIR", gSystem->pwd());
3144 // Set temporary compilation directory to current one
3145 gSystem->SetBuildDir(gSystem->pwd(), kTRUE);
3146 TObjArray *list = fOutputFiles.Tokenize(",");
3150 Bool_t merged = kTRUE;
3151 while((str=(TObjString*)next())) {
3152 outputFile = str->GetString();
3153 Int_t index = outputFile.Index("@");
3154 if (index > 0) outputFile.Remove(index);
3155 TString outputChunk = outputFile;
3156 outputChunk.ReplaceAll(".root", "_*.root");
3157 // Skip already merged outputs
3158 if (!gSystem->AccessPathName(outputFile)) {
3159 if (fOverwriteMode) {
3160 Info("MergeOutputs", "Overwrite mode. Existing file %s was deleted.", outputFile.Data());
3161 gSystem->Unlink(outputFile);
3162 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
3163 Info("MergeOutput", "Overwrite mode: partial merged files %s will removed",
3164 outputChunk.Data());
3165 gSystem->Exec(Form("rm -f %s", outputChunk.Data()));
3168 Info("MergeOutputs", "Output file <%s> found. Not merging again.", outputFile.Data());
3172 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
3173 Info("MergeOutput", "Overwrite mode: partial merged files %s will removed",
3174 outputChunk.Data());
3175 gSystem->Exec(Form("rm -f %s", outputChunk.Data()));
3178 if (fMergeExcludes.Contains(outputFile.Data()) ||
3179 fRegisterExcludes.Contains(outputFile.Data())) continue;
3180 // Perform a 'find' command in the output directory, looking for registered outputs
3181 merged = MergeOutput(outputFile, fGridOutputDir, fMaxMergeFiles);
3183 Error("MergeOutputs", "Terminate() will NOT be executed");
3187 TFile *fileOpened = (TFile*)gROOT->GetListOfFiles()->FindObject(outputFile);
3188 if (fileOpened) fileOpened->Close();
3194 //______________________________________________________________________________
3195 void AliAnalysisAlien::SetDefaultOutputs(Bool_t flag)
3197 // Use the output files connected to output containers from the analysis manager
3198 // rather than the files defined by SetOutputFiles
3199 if (flag && !TObject::TestBit(AliAnalysisGrid::kDefaultOutputs))
3200 Info("SetDefaultOutputs", "Plugin will use the output files taken from analysis manager");
3201 TObject::SetBit(AliAnalysisGrid::kDefaultOutputs, flag);
3204 //______________________________________________________________________________
3205 void AliAnalysisAlien::SetOutputFiles(const char *list)
3207 // Manually set the output files list.
3208 // Removes duplicates. Not allowed if default outputs are not disabled.
3209 if (TObject::TestBit(AliAnalysisGrid::kDefaultOutputs)) {
3210 Fatal("SetOutputFiles", "You have to explicitly call SetDefaultOutputs(kFALSE) to manually set output files.");
3213 Info("SetOutputFiles", "Output file list is set manually - you are on your own.");
3215 TString slist = list;
3216 if (slist.Contains("@")) Warning("SetOutputFiles","The plugin does not allow explicit SE's. Please use: SetNumberOfReplicas() instead.");
3217 TObjArray *arr = slist.Tokenize(" ");
3221 while ((os=(TObjString*)next())) {
3222 sout = os->GetString();
3223 if (sout.Index("@")>0) sout.Remove(sout.Index("@"));
3224 if (fOutputFiles.Contains(sout)) continue;
3225 if (!fOutputFiles.IsNull()) fOutputFiles += ",";
3226 fOutputFiles += sout;
3231 //______________________________________________________________________________
3232 void AliAnalysisAlien::SetOutputArchive(const char *list)
3234 // Manually set the output archive list. Free text - you are on your own...
3235 // Not allowed if default outputs are not disabled.
3236 if (TObject::TestBit(AliAnalysisGrid::kDefaultOutputs)) {
3237 Fatal("SetOutputArchive", "You have to explicitly call SetDefaultOutputs(kFALSE) to manually set the output archives.");
3240 Info("SetOutputArchive", "Output archive is set manually - you are on your own.");
3241 fOutputArchive = list;
3244 //______________________________________________________________________________
3245 void AliAnalysisAlien::SetPreferedSE(const char */*se*/)
3247 // Setting a prefered output SE is not allowed anymore.
3248 Warning("SetPreferedSE", "Setting a preferential SE is not allowed anymore via the plugin. Use SetNumberOfReplicas() and SetDefaultOutputs()");
3251 //______________________________________________________________________________
3252 void AliAnalysisAlien::SetProofParameter(const char *pname, const char *value)
3254 // Set some PROOF special parameter.
3255 TPair *pair = dynamic_cast<TPair*>(fProofParam.FindObject(pname));
3257 TObject *old = pair->Key();
3258 TObject *val = pair->Value();
3259 fProofParam.Remove(old);
3263 fProofParam.Add(new TObjString(pname), new TObjString(value));
3266 //______________________________________________________________________________
3267 const char *AliAnalysisAlien::GetProofParameter(const char *pname) const
3269 // Returns a special PROOF parameter.
3270 TPair *pair = dynamic_cast<TPair*>(fProofParam.FindObject(pname));
3271 if (!pair) return 0;
3272 return pair->Value()->GetName();
3275 //______________________________________________________________________________
3276 Bool_t AliAnalysisAlien::StartAnalysis(Long64_t /*nentries*/, Long64_t /*firstEntry*/)
3278 // Start remote grid analysis.
3279 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
3280 Bool_t testMode = TestBit(AliAnalysisGrid::kTest);
3281 if (!mgr || !mgr->IsInitialized()) {
3282 Error("StartAnalysis", "You need an initialized analysis manager for this");
3285 // Are we in PROOF mode ?
3286 if (mgr->IsProofMode()) {
3287 if (testMode) Info("StartAnalysis", "##### Starting PROOF analysis with Proof Lite via the plugin #####");
3288 else Info("StartAnalysis", "##### Starting PROOF analysis on cluster <%s> via the plugin #####", fProofCluster.Data());
3289 if (fProofCluster.IsNull()) {
3290 Error("StartAnalysis", "You need to specify the proof cluster name via SetProofCluster");
3293 if (fProofDataSet.IsNull() && !testMode) {
3294 Error("StartAnalysis", "You need to specify a dataset using SetProofDataSet()");
3297 // Set the needed environment
3298 gEnv->SetValue("XSec.GSI.DelegProxy","2");
3299 // Do we need to reset PROOF ? The success of the Reset operation cannot be checked
3300 if (fProofReset && !testMode) {
3301 if (fProofReset==1) {
3302 Info("StartAnalysis", "Sending soft reset signal to proof cluster %s", fProofCluster.Data());
3303 gROOT->ProcessLine(Form("TProof::Reset(\"%s\", kFALSE);", fProofCluster.Data()));
3305 Info("StartAnalysis", "Sending hard reset signal to proof cluster %s", fProofCluster.Data());
3306 gROOT->ProcessLine(Form("TProof::Reset(\"%s\", kTRUE);", fProofCluster.Data()));
3308 Info("StartAnalysis", "Stopping the analysis. Please use SetProofReset(0) to resume.");
3313 // Check if there is an old active session
3314 Long_t nsessions = gROOT->ProcessLine(Form("TProof::Mgr(\"%s\")->QuerySessions(\"\")->GetEntries();", fProofCluster.Data()));
3316 Error("StartAnalysis","You have to reset your old session first\n");
3320 // Do we need to change the ROOT version ? The success of this cannot be checked.
3321 if (!fROOTVersion.IsNull() && !testMode) {
3322 gROOT->ProcessLine(Form("TProof::Mgr(\"%s\")->SetROOTVersion(\"VO_ALICE@ROOT::%s\");",
3323 fProofCluster.Data(), fROOTVersion.Data()));
3325 // Connect to PROOF and check the status
3328 if (fNproofWorkersPerSlave) sworkers = Form("workers=%dx", fNproofWorkersPerSlave);
3329 else if (fNproofWorkers) sworkers = Form("workers=%d", fNproofWorkers);
3331 if (!sworkers.IsNull())
3332 proof = gROOT->ProcessLine(Form("TProof::Open(\"%s\", \"%s\");", fProofCluster.Data(), sworkers.Data()));
3334 proof = gROOT->ProcessLine(Form("TProof::Open(\"%s\");", fProofCluster.Data()));
3336 proof = gROOT->ProcessLine("TProof::Open(\"\");");
3338 Error("StartAnalysis", "Could not start PROOF in test mode");
3343 Error("StartAnalysis", "Could not connect to PROOF cluster <%s>", fProofCluster.Data());
3346 if (fNproofWorkersPerSlave*fNproofWorkers > 0)
3347 gROOT->ProcessLine(Form("gProof->SetParallel(%d);", fNproofWorkers));
3348 // Set proof special parameters if any
3349 TIter nextpp(&fProofParam);
3350 TObject *proofparam;
3351 while ((proofparam=nextpp())) {
3352 TString svalue = GetProofParameter(proofparam->GetName());
3353 gROOT->ProcessLine(Form("gProof->SetParameter(\"%s\",%s);", proofparam->GetName(), svalue.Data()));
3355 // Is dataset existing ?
3357 TString dataset = fProofDataSet;
3358 Int_t index = dataset.Index("#");
3359 if (index>=0) dataset.Remove(index);
3360 // if (!gROOT->ProcessLine(Form("gProof->ExistsDataSet(\"%s\");",fProofDataSet.Data()))) {
3361 // Error("StartAnalysis", "Dataset %s not existing", fProofDataSet.Data());
3364 // Info("StartAnalysis", "Dataset %s found", dataset.Data());
3366 // Is ClearPackages() needed ?
3367 if (TestSpecialBit(kClearPackages)) {
3368 Info("StartAnalysis", "ClearPackages signal sent to PROOF. Use SetClearPackages(kFALSE) to reset this.");
3369 gROOT->ProcessLine("gProof->ClearPackages();");
3371 // Is a given aliroot mode requested ?
3374 if (!fAliRootMode.IsNull()) {
3375 TString alirootMode = fAliRootMode;
3376 if (alirootMode == "default") alirootMode = "";
3377 Info("StartAnalysis", "You are requesting AliRoot mode: %s", fAliRootMode.Data());
3378 optionsList.SetOwner();
3379 optionsList.Add(new TNamed("ALIROOT_MODE", alirootMode.Data()));
3380 // Check the additional libs to be loaded
3382 Bool_t parMode = kFALSE;
3383 if (!alirootMode.IsNull()) extraLibs = "ANALYSIS:OADB:ANALYSISalice";
3384 // Parse the extra libs for .so or .dylib
3385 if (fAdditionalLibs.Length()) {
3386 TString additionalLibs = fAdditionalLibs;
3387 additionalLibs.Strip();
3388 if (additionalLibs.Length() && fFriendLibs.Length())
3389 additionalLibs += " ";
3390 additionalLibs += fFriendLibs;
3391 TObjArray *list = additionalLibs.Tokenize(" ");
3394 while((str=(TObjString*)next())) {
3395 if (str->GetString().Contains(".so") ||
3396 str->GetString().Contains(".dylib") ) {
3398 Warning("StartAnalysis", "Plugin does not support loading libs after par files in PROOF mode. Library %s and following will not load on workers", str->GetName());
3401 TString stmp = str->GetName();
3402 if (stmp.BeginsWith("lib")) stmp.Remove(0,3);
3403 stmp.ReplaceAll(".so","");
3404 stmp.ReplaceAll(".dylib","");
3405 if (!extraLibs.IsNull()) extraLibs += ":";
3409 if (str->GetString().Contains(".par")) {
3410 // The first par file found in the list will not allow any further .so
3412 if (!parLibs.IsNull()) parLibs += ":";
3413 parLibs += str->GetName();
3417 if (list) delete list;
3419 if (!extraLibs.IsNull()) {
3420 Info("StartAnalysis", "Adding extra libs: %s",extraLibs.Data());
3421 optionsList.Add(new TNamed("ALIROOT_EXTRA_LIBS",extraLibs.Data()));
3423 // Check extra includes
3424 if (!fIncludePath.IsNull()) {
3425 TString includePath = fIncludePath;
3426 includePath.ReplaceAll(" ",":");
3427 includePath.ReplaceAll("$ALICE_ROOT/","");
3428 includePath.ReplaceAll("${ALICE_ROOT}/","");
3429 includePath.ReplaceAll("-I","");
3430 includePath.Remove(TString::kTrailing, ':');
3431 Info("StartAnalysis", "Adding extra includes: %s",includePath.Data());
3432 optionsList.Add(new TNamed("ALIROOT_EXTRA_INCLUDES",includePath.Data()));
3434 // Check if connection to grid is requested
3435 if (TestSpecialBit(kProofConnectGrid))
3436 optionsList.Add(new TNamed("ALIROOT_ENABLE_ALIEN", "1"));
3437 // Enable AliRoot par
3439 // Enable proof lite package
3440 TString alirootLite = gSystem->ExpandPathName("$ALICE_ROOT/ANALYSIS/macros/AliRootProofLite.par");
3441 for (Int_t i=0; i<optionsList.GetSize(); i++) {
3442 TNamed *obj = (TNamed*)optionsList.At(i);
3443 printf("%s %s\n", obj->GetName(), obj->GetTitle());
3445 if (!gROOT->ProcessLine(Form("gProof->UploadPackage(\"%s\");",alirootLite.Data()))
3446 && !gROOT->ProcessLine(Form("gProof->EnablePackage(\"%s\", (TList*)%p);",alirootLite.Data(),&optionsList))) {
3447 Info("StartAnalysis", "AliRootProofLite enabled");
3449 Error("StartAnalysis", "There was an error trying to enable package AliRootProofLite.par");
3453 if ( ! fAliROOTVersion.IsNull() ) {
3454 if (gROOT->ProcessLine(Form("gProof->EnablePackage(\"VO_ALICE@AliRoot::%s\", (TList*)%p, kTRUE);",
3455 fAliROOTVersion.Data(), &optionsList))) {
3456 Error("StartAnalysis", "There was an error trying to enable package VO_ALICE@AliRoot::%s", fAliROOTVersion.Data());
3461 // Enable first par files from fAdditionalLibs
3462 if (!parLibs.IsNull()) {
3463 TObjArray *list = parLibs.Tokenize(":");
3465 TObjString *package;
3466 while((package=(TObjString*)next())) {
3467 TString spkg = package->GetName();
3468 spkg.ReplaceAll(".par", "");
3469 gSystem->Exec(TString::Format("rm -rf %s", spkg.Data()));
3470 if (!gROOT->ProcessLine(Form("gProof->UploadPackage(\"%s\");", package->GetName()))) {
3471 TString enablePackage = (testMode)?Form("gProof->EnablePackage(\"%s\",kFALSE);", package->GetName()):Form("gProof->EnablePackage(\"%s\",kTRUE);", package->GetName());
3472 if (gROOT->ProcessLine(enablePackage)) {
3473 Error("StartAnalysis", "There was an error trying to enable package %s", package->GetName());
3477 Error("StartAnalysis", "There was an error trying to upload package %s", package->GetName());
3481 if (list) delete list;
3484 if ((fAdditionalLibs.Contains(".so") || fAdditionalLibs.Contains(".dylib")) &&
3486 Error("StartAnalysis", "You request additional libs to be loaded but did not enabled any AliRoot mode. Please refer to: \
3487 \n http://aaf.cern.ch/node/83 and use a parameter for SetAliRootMode()");
3491 // Enable par files if requested
3492 if (fPackages && fPackages->GetEntries()) {
3493 TIter next(fPackages);
3495 while ((package=next())) {
3496 // Skip packages already enabled
3497 if (parLibs.Contains(package->GetName())) continue;
3498 TString spkg = package->GetName();
3499 spkg.ReplaceAll(".par", "");
3500 gSystem->Exec(TString::Format("rm -rf %s", spkg.Data()));
3501 if (!gROOT->ProcessLine(Form("gProof->UploadPackage(\"%s\");", package->GetName()))) {
3502 if (gROOT->ProcessLine(Form("gProof->EnablePackage(\"%s\",kTRUE);", package->GetName()))) {
3503 Error("StartAnalysis", "There was an error trying to enable package %s", package->GetName());
3507 Error("StartAnalysis", "There was an error trying to upload package %s", package->GetName());
3512 // Do we need to load analysis source files ?
3513 // NOTE: don't load on client since this is anyway done by the user to attach his task.
3514 if (fAnalysisSource.Length()) {
3515 TObjArray *list = fAnalysisSource.Tokenize(" ");
3518 while((str=(TObjString*)next())) {
3519 gROOT->ProcessLine(Form("gProof->Load(\"%s+g\", kTRUE);", str->GetName()));
3521 if (list) delete list;
3524 // Register dataset to proof lite.
3525 if (fFileForTestMode.IsNull()) {
3526 Error("GetChainForTestMode", "For proof test mode please use SetFileForTestMode() pointing to a file that contains data file locations.");
3529 if (gSystem->AccessPathName(fFileForTestMode)) {
3530 Error("GetChainForTestMode", "File not found: %s", fFileForTestMode.Data());
3533 TFileCollection *coll = new TFileCollection();
3534 coll->AddFromFile(fFileForTestMode);
3535 gROOT->ProcessLine(Form("gProof->RegisterDataSet(\"test_collection\", (TFileCollection*)%p, \"OV\");", coll));
3536 gROOT->ProcessLine("gProof->ShowDataSets()");
3541 // Check if output files have to be taken from the analysis manager
3542 if (TestBit(AliAnalysisGrid::kDefaultOutputs)) {
3543 // Add output files and AOD files
3544 fOutputFiles = GetListOfFiles("outaod");
3545 // Add extra files registered to the analysis manager
3546 TString extra = GetListOfFiles("ext");
3547 if (!extra.IsNull()) {
3548 extra.ReplaceAll(".root", "*.root");
3549 if (!fOutputFiles.IsNull()) fOutputFiles += ",";
3550 fOutputFiles += extra;
3552 // Compose the output archive.
3553 fOutputArchive = "log_archive.zip:std*@disk=1 ";
3554 if (mgr->IsCollectThroughput())
3555 fOutputArchive += Form("root_archive.zip:%s,*.stat@disk=%d %s@disk=%d",fOutputFiles.Data(),fNreplicas, mgr->GetFileInfoLog(),fNreplicas);
3557 fOutputArchive += Form("root_archive.zip:%s,*.stat@disk=%d",fOutputFiles.Data(),fNreplicas);
3559 // if (!fCloseSE.Length()) fCloseSE = gSystem->Getenv("alien_CLOSE_SE");
3560 if (TestBit(AliAnalysisGrid::kOffline)) {
3561 Info("StartAnalysis","\n##### OFFLINE MODE ##### Files to be used in GRID are produced but not copied \
3562 \n there nor any job run. You can revise the JDL and analysis \
3563 \n macro then run the same in \"submit\" mode.");
3564 } else if (TestBit(AliAnalysisGrid::kTest)) {
3565 Info("StartAnalysis","\n##### LOCAL MODE ##### Your analysis will be run locally on a subset of the requested \
3567 } else if (TestBit(AliAnalysisGrid::kSubmit)) {
3568 Info("StartAnalysis","\n##### SUBMIT MODE ##### Files required by your analysis are copied to your grid working \
3569 \n space and job submitted.");
3570 } else if (TestBit(AliAnalysisGrid::kMerge)) {
3571 Info("StartAnalysis","\n##### MERGE MODE ##### The registered outputs of the analysis will be merged");
3572 if (fMergeViaJDL) CheckInputData();
3575 Info("StartAnalysis","\n##### FULL ANALYSIS MODE ##### Producing needed files and submitting your analysis job...");
3580 Error("StartAnalysis", "Cannot start grid analysis without grid connection");
3583 if (IsCheckCopy() && gGrid) CheckFileCopy(gGrid->GetHomeDirectory());
3584 if (!CheckInputData()) {
3585 Error("StartAnalysis", "There was an error in preprocessing your requested input data");
3588 if (!CreateDataset(fDataPattern)) {
3590 if (!fRunNumbers.Length() && !fRunRange[0]) serror = Form("path to data directory: <%s>", fGridDataDir.Data());
3591 if (fRunNumbers.Length()) serror = "run numbers";
3592 if (fRunRange[0]) serror = Form("run range [%d, %d]", fRunRange[0], fRunRange[1]);
3593 serror += Form("\n or data pattern <%s>", fDataPattern.Data());
3594 Error("StartAnalysis", "No data to process. Please fix %s in your plugin configuration.", serror.Data());
3597 WriteAnalysisFile();
3598 WriteAnalysisMacro();
3600 WriteValidationScript();
3602 WriteMergingMacro();
3603 WriteMergeExecutable();
3604 WriteValidationScript(kTRUE);
3606 if (!CreateJDL()) return kFALSE;
3607 if (TestBit(AliAnalysisGrid::kOffline)) return kFALSE;
3609 // Locally testing the analysis
3610 Info("StartAnalysis", "\n_______________________________________________________________________ \
3611 \n Running analysis script in a daughter shell as on a worker node \
3612 \n_______________________________________________________________________");
3613 TObjArray *list = fOutputFiles.Tokenize(",");
3617 while((str=(TObjString*)next())) {
3618 outputFile = str->GetString();
3619 Int_t index = outputFile.Index("@");
3620 if (index > 0) outputFile.Remove(index);
3621 if (!gSystem->AccessPathName(outputFile)) gSystem->Exec(Form("rm %s", outputFile.Data()));
3624 gSystem->Exec(Form("bash %s 2>stderr", fExecutable.Data()));
3625 gSystem->Exec(Form("bash %s",fValidationScript.Data()));
3626 // gSystem->Exec("cat stdout");
3629 // Check if submitting is managed by LPM manager
3630 if (fProductionMode) {
3631 //TString prodfile = fJDLName;
3632 //prodfile.ReplaceAll(".jdl", ".prod");
3633 //WriteProductionFile(prodfile);
3634 Info("StartAnalysis", "Job submitting is managed by LPM. Rerun in terminate mode after jobs finished.");
3637 // Submit AliEn job(s)
3638 gGrid->Cd(fGridOutputDir);
3643 if (!fRunNumbers.Length() && !fRunRange[0]) {
3644 // Submit a given xml or a set of runs
3645 res = gGrid->Command(Form("submit %s", fJDLName.Data()));
3646 printf("*************************** %s\n",Form("submit %s", fJDLName.Data()));
3648 const char *cjobId = res->GetKey(0,"jobId");
3652 Error("StartAnalysis", "Your JDL %s could not be submitted", fJDLName.Data());
3655 Info("StartAnalysis", "\n_______________________________________________________________________ \
3656 \n##### Your JDL %s was successfully submitted. \nTHE JOB ID IS: %s \
3657 \n_______________________________________________________________________",
3658 fJDLName.Data(), cjobId);
3661 if (!fGridJobIDs.IsNull()) fGridJobIDs.Append(" ");
3662 fGridJobIDs.Append(jobID);
3663 if (!fGridStages.IsNull()) fGridStages.Append(" ");
3664 fGridStages.Append("full");
3669 Error("StartAnalysis", "No grid result after submission !!! Bailing out...");
3673 // Submit for a range of enumeration of runs.
3674 if (!Submit()) return kFALSE;
3675 jobID = fGridJobIDs;
3679 Info("StartAnalysis", "\n#### STARTING AN ALIEN SHELL FOR YOU. EXIT WHEN YOUR JOB %s HAS FINISHED. #### \
3680 \n You may exit at any time and terminate the job later using the option <terminate> \
3681 \n ##################################################################################", jobID.Data());
3682 gSystem->Exec("aliensh");
3684 Info("StartAnalysis", "\n#### SUBMITTED JOB %s TO ALIEN QUEUE #### \
3685 \n Remember to terminate the job later using the option <terminate> \
3686 \n ##################################################################################", jobID.Data());
3691 //______________________________________________________________________________
3692 const char *AliAnalysisAlien::GetListOfFiles(const char *type)
3694 // Get a comma-separated list of output files of the requested type.
3695 // Type can be (case unsensitive):
3696 // aod - list of aod files (std, extensions and filters)
3697 // out - list of output files connected to containers (but not aod's or extras)
3698 // ext - list of extra files registered to the manager
3699 // ter - list of files produced in terminate
3700 static TString files;
3702 TString stype = type;
3704 TString aodfiles, extra;
3705 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
3707 ::Error("GetListOfFiles", "Cannot call this without analysis manager");
3708 return files.Data();
3710 if (mgr->GetOutputEventHandler()) {
3712 if (mgr->GetOutputEventHandler()->GetFillAOD())
3713 aodfiles = mgr->GetOutputEventHandler()->GetOutputFileName();
3714 TString extraaod = mgr->GetOutputEventHandler()->GetExtraOutputs(kTRUE);
3715 if (!extraaod.IsNull() && mgr->GetOutputEventHandler()->GetFillExtension()) {
3716 if (!aodfiles.IsNull()) aodfiles += ",";
3717 aodfiles += extraaod;
3720 if (stype.Contains("aod")) {
3722 if (stype == "aod") return files.Data();
3724 // Add output files that are not in the list of AOD files
3725 TString outputfiles = "";
3726 TIter next(mgr->GetOutputs());
3727 AliAnalysisDataContainer *output;
3728 const char *filename = 0;
3729 while ((output=(AliAnalysisDataContainer*)next())) {
3730 filename = output->GetFileName();
3731 if (!(strcmp(filename, "default"))) continue;
3732 if (outputfiles.Contains(filename)) continue;
3733 if (aodfiles.Contains(filename)) continue;
3734 if (!outputfiles.IsNull() && strlen(filename)) outputfiles += ",";
3735 outputfiles += filename;
3737 if (stype.Contains("out")) {
3738 if (!files.IsNull() && !outputfiles.IsNull()) files += ",";
3739 files += outputfiles;
3740 if (stype == "out") return files.Data();
3742 // Add extra files registered to the analysis manager
3744 extra = mgr->GetExtraFiles();
3745 if (!extra.IsNull()) {
3747 extra.ReplaceAll(" ", ",");
3748 TObjArray *fextra = extra.Tokenize(",");
3749 TIter nextx(fextra);
3751 while ((obj=nextx())) {
3752 if (aodfiles.Contains(obj->GetName())) continue;
3753 if (outputfiles.Contains(obj->GetName())) continue;
3754 if (sextra.Contains(obj->GetName())) continue;
3755 if (!sextra.IsNull()) sextra += ",";
3756 sextra += obj->GetName();
3759 if (stype.Contains("ext")) {
3760 if (!files.IsNull() && !sextra.IsNull()) files += ",";
3764 if (stype == "ext") return files.Data();
3766 if (!fTerminateFiles.IsNull()) {
3767 fTerminateFiles.Strip();
3768 fTerminateFiles.ReplaceAll(" ",",");
3769 TObjArray *fextra = fTerminateFiles.Tokenize(",");
3770 TIter nextx(fextra);
3772 while ((obj=nextx())) {
3773 if (aodfiles.Contains(obj->GetName())) continue;
3774 if (outputfiles.Contains(obj->GetName())) continue;
3775 if (termfiles.Contains(obj->GetName())) continue;
3776 if (sextra.Contains(obj->GetName())) continue;
3777 if (!termfiles.IsNull()) termfiles += ",";
3778 termfiles += obj->GetName();
3782 if (stype.Contains("ter")) {
3783 if (!files.IsNull() && !termfiles.IsNull()) {
3788 return files.Data();
3791 //______________________________________________________________________________
3792 Bool_t AliAnalysisAlien::Submit()
3794 // Submit all master jobs.
3795 Int_t nmasterjobs = fInputFiles->GetEntries();
3796 Long_t tshoot = gSystem->Now();
3797 if (!fNsubmitted && !SubmitNext()) return kFALSE;
3798 while (fNsubmitted < nmasterjobs) {
3799 Long_t now = gSystem->Now();
3800 if ((now-tshoot)>30000) {
3802 if (!SubmitNext()) return kFALSE;
3808 //______________________________________________________________________________
3809 Bool_t AliAnalysisAlien::SubmitMerging()
3811 // Submit all merging jobs.
3812 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
3813 gGrid->Cd(fGridOutputDir);
3814 TString mergeJDLName = fExecutable;
3815 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
3817 Error("SubmitMerging", "You have to use explicit run numbers or run range to merge via JDL!");
3820 Int_t ntosubmit = fInputFiles->GetEntries();
3821 for (Int_t i=0; i<ntosubmit; i++) {
3822 TString runOutDir = gSystem->BaseName(fInputFiles->At(i)->GetName());
3823 runOutDir.ReplaceAll(".xml", "");
3824 if (fOutputToRunNo) {
3825 // The output directory is the run number
3826 printf("### Submitting merging job for run <%s>\n", runOutDir.Data());
3827 runOutDir = Form("%s/%s", fGridOutputDir.Data(), runOutDir.Data());
3829 if (!fRunNumbers.Length() && !fRunRange[0]) {
3830 // The output directory is the grid outdir
3831 printf("### Submitting merging job for the full output directory %s.\n", fGridOutputDir.Data());
3832 runOutDir = fGridOutputDir;
3834 // The output directory is the master number in 3 digits format
3835 printf("### Submitting merging job for master <%03d>\n", i);
3836 runOutDir = Form("%s/%03d",fGridOutputDir.Data(), i);
3839 // Check now the number of merging stages.
3840 TObjArray *list = fOutputFiles.Tokenize(",");
3844 while((str=(TObjString*)next())) {
3845 outputFile = str->GetString();
3846 Int_t index = outputFile.Index("@");
3847 if (index > 0) outputFile.Remove(index);
3848 if (!fMergeExcludes.Contains(outputFile) &&
3849 !fRegisterExcludes.Contains(outputFile)) break;
3852 Bool_t done = CheckMergedFiles(outputFile, runOutDir, fMaxMergeFiles, mergeJDLName);
3853 if (!done && (i==ntosubmit-1)) return kFALSE;
3854 if (!fRunNumbers.Length() && !fRunRange[0]) break;
3856 if (!ntosubmit) return kTRUE;
3858 Info("StartAnalysis", "\n #### STARTING AN ALIEN SHELL FOR YOU. You can exit any time or inspect your jobs in a different shell.##########\
3859 \n Make sure your jobs are in a final state (you can resubmit failed ones via 'masterjob <id> resubmit ERROR_ALL')\
3860 \n Rerun in 'terminate' mode to submit all merging stages, each AFTER the previous one completed. The final merged \
3861 \n output will be written to your alien output directory, while separate stages in <Stage_n>. \
3862 \n ################################################################################################################");
3863 gSystem->Exec("aliensh");
3865 Info("StartAnalysis", "\n #### STARTED MERGING JOBS FOR YOU #### \
3866 \n Make sure your jobs are in a final state (you can resubmit failed ones via 'masterjob <id> resubmit ERROR_ALL') \
3867 \n Rerun in 'terminate' mode to submit all merging stages, each AFTER the previous one completed. The final merged \
3868 \n output will be written to your alien output directory, while separate stages in <Stage_n>. \
3869 \n ################################################################################################################");
3874 //______________________________________________________________________________
3875 Bool_t AliAnalysisAlien::SubmitNext()
3877 // Submit next bunch of master jobs if the queue is free. The first master job is
3878 // submitted right away, while the next will not be unless the previous was split.
3879 // The plugin will not submit new master jobs if there are more that 500 jobs in
3881 static Bool_t iscalled = kFALSE;
3882 static Int_t firstmaster = 0;
3883 static Int_t lastmaster = 0;
3884 static Int_t npermaster = 0;
3885 if (iscalled) return kTRUE;
3887 Int_t nrunning=0, nwaiting=0, nerror=0, ndone=0;
3888 Int_t ntosubmit = 0;
3891 Int_t nmasterjobs = fInputFiles->GetEntries();
3894 if (!IsUseSubmitPolicy()) {
3896 Info("SubmitNext","### Warning submit policy not used ! Submitting too many jobs at a time may be prohibitted. \
3897 \n### You can use SetUseSubmitPolicy() to enable if you have problems.");
3898 ntosubmit = nmasterjobs;
3901 TString status = GetJobStatus(firstmaster, lastmaster, nrunning, nwaiting, nerror, ndone);
3902 printf("=== master %d: %s\n", lastmaster, status.Data());
3903 // If last master not split, just return
3904 if (status != "SPLIT") {iscalled = kFALSE; return kTRUE;}
3905 // No more than 100 waiting jobs
3906 if (nwaiting>500) {iscalled = kFALSE; return kTRUE;}
3907 npermaster = (nrunning+nwaiting+nerror+ndone)/fNsubmitted;
3908 if (npermaster) ntosubmit = (500-nwaiting)/npermaster;
3909 if (!ntosubmit) ntosubmit = 1;
3910 printf("=== WAITING(%d) RUNNING(%d) DONE(%d) OTHER(%d) NperMaster=%d => to submit %d jobs\n",
3911 nwaiting, nrunning, ndone, nerror, npermaster, ntosubmit);
3913 for (Int_t i=0; i<ntosubmit; i++) {
3914 // Submit for a range of enumeration of runs.
3915 if (fNsubmitted>=nmasterjobs) {iscalled = kFALSE; return kTRUE;}
3917 TString runOutDir = gSystem->BaseName(fInputFiles->At(fNsubmitted)->GetName());
3918 runOutDir.ReplaceAll(".xml", "");
3920 query = Form("submit %s %s %s", fJDLName.Data(), fInputFiles->At(fNsubmitted)->GetName(), runOutDir.Data());
3922 query = Form("submit %s %s %03d", fJDLName.Data(), fInputFiles->At(fNsubmitted)->GetName(), fNsubmitted);
3923 printf("********* %s\n",query.Data());
3924 res = gGrid->Command(query);
3926 TString cjobId1 = res->GetKey(0,"jobId");
3927 if (!cjobId1.Length()) {
3931 Error("StartAnalysis", "Your JDL %s could not be submitted. The message was:", fJDLName.Data());
3934 Info("StartAnalysis", "\n_______________________________________________________________________ \
3935 \n##### Your JDL %s submitted (%d to go). \nTHE JOB ID IS: %s \
3936 \n_______________________________________________________________________",
3937 fJDLName.Data(), nmasterjobs-fNsubmitted-1, cjobId1.Data());
3938 if (!fGridJobIDs.IsNull()) fGridJobIDs.Append(" ");
3939 fGridJobIDs.Append(cjobId1);
3940 if (!fGridStages.IsNull()) fGridStages.Append(" ");
3941 fGridStages.Append("full");
3944 lastmaster = cjobId1.Atoi();
3945 if (!firstmaster) firstmaster = lastmaster;
3950 Error("StartAnalysis", "No grid result after submission !!! Bailing out...");
3958 //______________________________________________________________________________
3959 void AliAnalysisAlien::WriteAnalysisFile()
3961 // Write current analysis manager into the file <analysisFile>
3962 TString analysisFile = fExecutable;
3963 analysisFile.ReplaceAll(".sh", ".root");
3964 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3965 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
3966 if (!mgr || !mgr->IsInitialized()) {
3967 Error("WriteAnalysisFile", "You need an initialized analysis manager for this");
3970 // Check analysis type
3972 if (mgr->GetMCtruthEventHandler()) TObject::SetBit(AliAnalysisGrid::kUseMC);
3973 handler = (TObject*)mgr->GetInputEventHandler();
3975 if (handler->InheritsFrom("AliMultiInputEventHandler")) {
3976 AliMultiInputEventHandler *multiIH = (AliMultiInputEventHandler*)handler;
3977 if (multiIH->GetFirstInputEventHandler()->InheritsFrom("AliESDInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseESD);
3978 if (multiIH->GetFirstInputEventHandler()->InheritsFrom("AliAODInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseAOD);
3980 if (handler->InheritsFrom("AliESDInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseESD);
3981 if (handler->InheritsFrom("AliAODInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseAOD);
3984 TDirectory *cdir = gDirectory;
3985 TFile *file = TFile::Open(analysisFile, "RECREATE");
3987 // Skip task Terminate calls for the grid job (but not in test mode, where we want to check also the terminate mode
3988 if (!TestBit(AliAnalysisGrid::kTest)) mgr->SetSkipTerminate(kTRUE);
3989 // Unless merging makes no sense
3990 if (IsSingleOutput()) mgr->SetSkipTerminate(kFALSE);
3993 // Enable termination for local jobs
3994 mgr->SetSkipTerminate(kFALSE);
3996 if (cdir) cdir->cd();
3997 Info("WriteAnalysisFile", "\n##### Analysis manager: %s wrote to file <%s>\n", mgr->GetName(),analysisFile.Data());
3999 Bool_t copy = kTRUE;
4000 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
4003 TString workdir = gGrid->GetHomeDirectory();
4004 workdir += fGridWorkingDir;
4005 Info("WriteAnalysisFile", "\n##### Copying file <%s> containing your initialized analysis manager to your alien workspace", analysisFile.Data());
4006 if (FileExists(analysisFile)) gGrid->Rm(analysisFile);
4007 if (!copyLocal2Alien("WriteAnalysisFile",analysisFile.Data(),
4008 Form("%s/%s", workdir.Data(),analysisFile.Data()))) Fatal("","Terminating");
4012 //______________________________________________________________________________
4013 void AliAnalysisAlien::WriteAnalysisMacro()
4015 // Write the analysis macro that will steer the analysis in grid mode.
4016 if (!TestBit(AliAnalysisGrid::kSubmit)) {
4018 out.open(fAnalysisMacro.Data(), ios::out);
4020 Error("WriteAnalysisMacro", "could not open file %s for writing", fAnalysisMacro.Data());
4023 Bool_t hasSTEERBase = kFALSE;
4024 Bool_t hasESD = kFALSE;
4025 Bool_t hasAOD = kFALSE;
4026 Bool_t hasANALYSIS = kFALSE;
4027 Bool_t hasOADB = kFALSE;
4028 Bool_t hasANALYSISalice = kFALSE;
4029 Bool_t hasCORRFW = kFALSE;
4030 TString func = fAnalysisMacro;
4031 TString type = "ESD";
4032 TString comment = "// Analysis using ";
4037 if (IsUseMCchain()) {
4041 if (TObject::TestBit(AliAnalysisGrid::kUseESD)) comment += "ESD";
4042 if (TObject::TestBit(AliAnalysisGrid::kUseAOD)) {
4048 if (type!="AOD" && fFriendChainName!="") {
4049 Error("WriteAnalysisMacro", "Friend chain can be attached only to AOD");
4052 if (TObject::TestBit(AliAnalysisGrid::kUseMC)) comment += "/MC";
4053 else comment += " data";
4054 out << "const char *anatype = \"" << type.Data() << "\";" << endl << endl;
4055 func.ReplaceAll(".C", "");
4056 out << "void " << func.Data() << "()" << endl;
4058 out << comment.Data() << endl;
4059 out << "// Automatically generated analysis steering macro executed in grid subjobs" << endl << endl;
4060 out << " TStopwatch timer;" << endl;
4061 out << " timer.Start();" << endl << endl;
4062 // Change temp directory to current one
4063 if (!IsLocalTest()) {
4064 out << "// connect to AliEn and make the chain" << endl;
4065 out << " if (!TGrid::Connect(\"alien://\")) return;" << endl;
4067 out << "// Set temporary merging directory to current one" << endl;
4068 out << " gSystem->Setenv(\"TMPDIR\", gSystem->pwd());" << endl << endl;
4069 out << "// Set temporary compilation directory to current one" << endl;
4070 out << " gSystem->SetBuildDir(gSystem->pwd(), kTRUE);" << endl << endl;
4071 // Reset existing include path
4072 out << "// Reset existing include path and add current directory first in the search" << endl;
4073 out << " gSystem->SetIncludePath(\"-I.\");" << endl;
4074 if (!fExecutableCommand.Contains("aliroot")) {
4075 out << "// load base root libraries" << endl;
4076 out << " gSystem->Load(\"libTree\");" << endl;
4077 out << " gSystem->Load(\"libGeom\");" << endl;
4078 out << " gSystem->Load(\"libVMC\");" << endl;
4079 out << " gSystem->Load(\"libPhysics\");" << endl << endl;
4080 out << " gSystem->Load(\"libMinuit\");" << endl << endl;
4082 if (fAdditionalRootLibs.Length()) {
4083 // in principle libtree /lib geom libvmc etc. can go into this list, too
4084 out << "// Add aditional libraries" << endl;
4085 TObjArray *list = fAdditionalRootLibs.Tokenize(" ");
4088 while((str=(TObjString*)next())) {
4089 if (str->GetString().Contains(".so") || str->GetString().Contains(".dylib"))
4090 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
4092 if (list) delete list;
4094 out << "// Load analysis framework libraries" << endl;
4095 TString setupPar = "AliAnalysisAlien::SetupPar";
4097 if (!fExecutableCommand.Contains("aliroot")) {
4098 out << " gSystem->Load(\"libSTEERBase\");" << endl;
4099 out << " gSystem->Load(\"libESD\");" << endl;
4100 out << " gSystem->Load(\"libAOD\");" << endl;
4102 out << " gSystem->Load(\"libANALYSIS\");" << endl;
4103 out << " gSystem->Load(\"libOADB\");" << endl;
4104 out << " gSystem->Load(\"libANALYSISalice\");" << endl;
4105 out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
4107 TIter next(fPackages);
4110 while ((obj=next())) {
4111 pkgname = obj->GetName();
4112 if (pkgname == "STEERBase" ||
4113 pkgname == "STEERBase.par") hasSTEERBase = kTRUE;
4114 if (pkgname == "ESD" ||
4115 pkgname == "ESD.par") hasESD = kTRUE;
4116 if (pkgname == "AOD" ||
4117 pkgname == "AOD.par") hasAOD = kTRUE;
4118 if (pkgname == "ANALYSIS" ||
4119 pkgname == "ANALYSIS.par") hasANALYSIS = kTRUE;
4120 if (pkgname == "OADB" ||
4121 pkgname == "OADB.par") hasOADB = kTRUE;
4122 if (pkgname == "ANALYSISalice" ||
4123 pkgname == "ANALYSISalice.par") hasANALYSISalice = kTRUE;
4124 if (pkgname == "CORRFW" ||
4125 pkgname == "CORRFW.par") hasCORRFW = kTRUE;
4127 if (hasANALYSISalice) setupPar = "SetupPar";
4128 if (!hasSTEERBase) out << " gSystem->Load(\"libSTEERBase\");" << endl;
4129 else out << " if (!" << setupPar << "(\"STEERBase\")) return;" << endl;
4130 if (!hasESD) out << " gSystem->Load(\"libESD\");" << endl;
4131 else out << " if (!" << setupPar << "(\"ESD\")) return;" << endl;
4132 if (!hasAOD) out << " gSystem->Load(\"libAOD\");" << endl;
4133 else out << " if (!" << setupPar << "(\"AOD\")) return;" << endl;
4134 if (!hasANALYSIS) out << " gSystem->Load(\"libANALYSIS\");" << endl;
4135 else out << " if (!" << setupPar << "(\"ANALYSIS\")) return;" << endl;
4136 if (!hasOADB) out << " gSystem->Load(\"libOADB\");" << endl;
4137 else out << " if (!" << setupPar << "(\"OADB\")) return;" << endl;
4138 if (!hasANALYSISalice) out << " gSystem->Load(\"libANALYSISalice\");" << endl;
4139 else out << " if (!" << setupPar << "(\"ANALYSISalice\")) return;" << endl;
4140 if (!hasCORRFW) out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
4141 else out << " if (!" << setupPar << "(\"CORRFW\")) return;" << endl << endl;
4142 out << "// Compile other par packages" << endl;
4144 while ((obj=next())) {
4145 pkgname = obj->GetName();
4146 if (pkgname == "STEERBase" ||
4147 pkgname == "STEERBase.par" ||
4149 pkgname == "ESD.par" ||
4151 pkgname == "AOD.par" ||
4152 pkgname == "ANALYSIS" ||
4153 pkgname == "ANALYSIS.par" ||
4154 pkgname == "OADB" ||
4155 pkgname == "OADB.par" ||
4156 pkgname == "ANALYSISalice" ||
4157 pkgname == "ANALYSISalice.par" ||
4158 pkgname == "CORRFW" ||
4159 pkgname == "CORRFW.par") continue;
4160 out << " if (!" << setupPar << "(\"" << obj->GetName() << "\")) return;" << endl;
4163 out << "// include path" << endl;
4164 // Get the include path from the interpreter and remove entries pointing to AliRoot
4165 out << " TString intPath = gInterpreter->GetIncludePath();" << endl;
4166 out << " TObjArray *listpaths = intPath.Tokenize(\" \");" << endl;
4167 out << " TIter nextpath(listpaths);" << endl;
4168 out << " TObjString *pname;" << endl;
4169 out << " while ((pname=(TObjString*)nextpath())) {" << endl;
4170 out << " TString current = pname->GetName();" << endl;
4171 out << " if (current.Contains(\"AliRoot\") || current.Contains(\"ALICE_ROOT\")) continue;" << endl;
4172 out << " gSystem->AddIncludePath(current);" << endl;
4173 out << " }" << endl;
4174 out << " if (listpaths) delete listpaths;" << endl;
4175 if (fIncludePath.Length()) out << " gSystem->AddIncludePath(\"" << fIncludePath.Data() << "\");" << endl;
4176 out << " gROOT->ProcessLine(\".include $ALICE_ROOT/include\");" << endl;
4177 out << " printf(\"Include path: %s\\n\", gSystem->GetIncludePath());" << endl << endl;
4178 if (fMCLoop && !fGeneratorLibs.IsNull()) {
4179 out << "// MC generator libraries" << endl;
4180 TObjArray *list = fGeneratorLibs.Tokenize(" ");
4183 while((str=(TObjString*)next())) {
4184 out << " gSystem->Load(\"" << str->GetName() << "\");" << endl;
4188 if (fAdditionalLibs.Length()) {
4189 out << "// Add aditional AliRoot libraries" << endl;
4190 TString additionalLibs = fAdditionalLibs;
4191 additionalLibs.Strip();
4192 if (additionalLibs.Length() && fFriendLibs.Length())
4193 additionalLibs += " ";
4194 additionalLibs += fFriendLibs;
4195 TObjArray *list = additionalLibs.Tokenize(" ");
4198 while((str=(TObjString*)next())) {
4199 if (str->GetString().Contains(".so") || str->GetString().Contains(".dylib"))
4200 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
4201 if (str->GetString().Contains(".par"))
4202 out << " if (!" << setupPar << "(\"" << str->GetString() << "\")) return;" << endl;
4207 out << "// analysis source to be compiled at runtime (if any)" << endl;
4208 if (fAnalysisSource.Length()) {
4209 TObjArray *list = fAnalysisSource.Tokenize(" ");
4212 while((str=(TObjString*)next())) {
4213 out << " gROOT->ProcessLine(\".L " << str->GetString().Data() << "+g\");" << endl;
4215 if (list) delete list;
4218 // out << " printf(\"Currently load libraries:\\n\");" << endl;
4219 // out << " printf(\"%s\\n\", gSystem->GetLibraries());" << endl;
4220 if (fFastReadOption) {
4221 Warning("WriteAnalysisMacro", "!!! You requested FastRead option. Using xrootd flags to reduce timeouts in the grid jobs. This may skip some files that could be accessed !!! \
4222 \n+++ NOTE: To disable this option, use: plugin->SetFastReadOption(kFALSE)");
4223 out << "// fast xrootd reading enabled" << endl;
4224 out << " printf(\"!!! You requested FastRead option. Using xrootd flags to reduce timeouts. Note that this may skip some files that could be accessed !!!\");" << endl;
4225 out << " gEnv->SetValue(\"XNet.ConnectTimeout\",50);" << endl;
4226 out << " gEnv->SetValue(\"XNet.RequestTimeout\",50);" << endl;
4227 out << " gEnv->SetValue(\"XNet.MaxRedirectCount\",2);" << endl;
4228 out << " gEnv->SetValue(\"XNet.ReconnectTimeout\",50);" << endl;
4229 out << " gEnv->SetValue(\"XNet.FirstConnectMaxCnt\",1);" << endl << endl;
4231 out << "// read the analysis manager from file" << endl;
4232 TString analysisFile = fExecutable;
4233 analysisFile.ReplaceAll(".sh", ".root");
4234 out << " AliAnalysisManager *mgr = AliAnalysisAlien::LoadAnalysisManager(\""
4235 << analysisFile << "\");" << endl;
4236 out << " if (!mgr) return;" << endl;
4237 if (IsLocalTest()) {
4238 out << " AliAnalysisAlien *plugin = new AliAnalysisAlien();" << endl;
4239 out << " plugin->SetRunMode(\"test\");" << endl;
4240 if (fFileForTestMode.IsNull())
4241 out << " plugin->SetFileForTestMode(\"data.txt\");" << endl;
4243 out << " plugin->SetFileForTestMode(\"" << fFileForTestMode << "\");" << endl;
4244 out << " plugin->SetNtestFiles(" << fNtestFiles << ");" << endl;
4245 if (!fFriendChainName.IsNull())
4246 out << " plugin->SetFriendChainName(\"" << fFriendChainName << "\",\"" << fFriendLibs << "\");" << endl;
4248 out << " plugin->SetUseMCchain();" << endl;
4250 out << " plugin->SetMCLoop(kTRUE);" << endl;
4251 out << " mgr->SetGridHandler(plugin);" << endl;
4252 if (AliAnalysisManager::GetAnalysisManager()) {
4253 out << " mgr->SetDebugLevel(" << AliAnalysisManager::GetAnalysisManager()->GetDebugLevel() << ");" << endl;
4254 out << " mgr->SetNSysInfo(" << AliAnalysisManager::GetAnalysisManager()->GetNsysInfo() << ");" << endl;
4256 out << " mgr->SetDebugLevel(10);" << endl;
4257 out << " mgr->SetNSysInfo(100);" << endl;
4260 out << " mgr->PrintStatus();" << endl;
4261 if (AliAnalysisManager::GetAnalysisManager()) {
4262 if (AliAnalysisManager::GetAnalysisManager()->GetDebugLevel()>3) {
4263 out << " gEnv->SetValue(\"XNet.Debug\", \"1\");" << endl;
4265 if (TestBit(AliAnalysisGrid::kTest))
4266 out << " AliLog::SetGlobalLogLevel(AliLog::kWarning);" << endl;
4268 out << " AliLog::SetGlobalLogLevel(AliLog::kError);" << endl;
4271 if (!IsLocalTest()) {
4273 out << " mgr->SetCacheSize(0);" << endl;
4274 out << " mgr->EventLoop(" << fNMCevents << ");" << endl;
4276 out << " TChain *chain = CreateChain(\"wn.xml\", anatype);" << endl << endl;
4277 out << " mgr->StartAnalysis(\"localfile\", chain);" << endl;
4281 out << " mgr->SetCacheSize(0);" << endl;
4282 out << " mgr->EventLoop(" << fNMCevents << ");" << endl;
4284 out << " mgr->StartAnalysis(\"localfile\");" << endl;
4287 out << " timer.Stop();" << endl;
4288 out << " timer.Print();" << endl;
4289 out << "}" << endl << endl;
4290 if (!IsLocalTest() && !fMCLoop) {
4291 out <<"//________________________________________________________________________________" << endl;
4292 out << "TChain* CreateChain(const char *xmlfile, const char *type=\"ESD\")" << endl;
4294 out << "// Create a chain using url's from xml file" << endl;
4295 out << " TString filename;" << endl;
4296 out << " Int_t run = 0;" << endl;
4297 if (IsUseMCchain()) {
4298 out << " TString treename = \"TE\";" << endl;
4300 if (!fTreeName.IsNull()) {
4301 out << " TString treename = \"" << fTreeName << "\";" << endl;
4303 out << " TString treename = type;" << endl;
4304 out << " treename.ToLower();" << endl;
4305 out << " treename += \"Tree\";" << endl;
4308 out << " printf(\"***************************************\\n\");" << endl;
4309 out << " printf(\" Getting chain of trees %s\\n\", treename.Data());" << endl;
4310 out << " printf(\"***************************************\\n\");" << endl;
4311 out << " TAlienCollection *coll = TAlienCollection::Open(xmlfile);" << endl;
4312 out << " if (!coll) {" << endl;
4313 out << " ::Error(\"CreateChain\", \"Cannot create an AliEn collection from %s\", xmlfile);" << endl;
4314 out << " return NULL;" << endl;
4315 out << " }" << endl;
4316 out << " AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();" << endl;
4317 out << " TChain *chain = new TChain(treename);" << endl;
4318 if(!fFriendChainName.IsNull()) {
4319 out << " TList *friends = new TList();" << endl;
4320 out << " TIter nextfriend(friends);" << endl;
4321 out << " TChain *cfriend = 0;" << endl;
4322 TObjArray *list = fFriendChainName.Tokenize(" ");
4325 while((str=(TObjString*)next())) {
4326 out << " cfriend = new TChain(treename, \"" << str->GetName() << "\");" << endl;
4327 out << " friends->Add(cfriend);" << endl;
4328 out << " chain->AddFriend(cfriend);" << endl;
4331 // out << " TChain *chainFriend = new TChain(treename);" << endl;
4333 out << " coll->Reset();" << endl;
4334 out << " while (coll->Next()) {" << endl;
4335 out << " filename = coll->GetTURL("");" << endl;
4336 out << " if (mgr) {" << endl;
4337 out << " Int_t nrun = AliAnalysisManager::GetRunFromAlienPath(filename);" << endl;
4338 out << " if (nrun && nrun != run) {" << endl;
4339 out << " printf(\"### Run number detected from chain: %d\\n\", nrun);" << endl;
4340 out << " mgr->SetRunFromPath(nrun);" << endl;
4341 out << " run = nrun;" << endl;
4342 out << " }" << endl;
4343 out << " }" << endl;
4344 out << " chain->Add(filename);" << endl;
4345 if(!fFriendChainName.IsNull()) {
4346 out << " TString bpath=coll->GetTURL(\"\");" << endl;
4347 out << " if (bpath.Index(\"#\") > -1) bpath.Remove(bpath.Index(\"#\"));" << endl;
4348 out << " bpath = gSystem->DirName(bpath);" << endl;
4349 out << " bpath += \"/\";" << endl;
4350 out << " TString fileFriend;" << endl;
4351 out << " nextfriend.Reset();" << endl;
4352 out << " while ((cfriend=(TChain*)nextfriend())) {" << endl;
4353 out << " fileFriend = bpath;" << endl;
4354 out << " fileFriend += cfriend->GetTitle();" << endl;
4355 out << " TFile *file = TFile::Open(fileFriend);" << endl;
4356 out << " if (file) {" << endl;
4357 out << " file->Close();" << endl;
4358 out << " cfriend->Add(fileFriend.Data());" << endl;
4359 out << " } else {" << endl;
4360 out << " ::Fatal(\"CreateChain\", \"Cannot open friend file: %s\", fileFriend.Data());" << endl;
4361 out << " return 0;" << endl;
4362 out << " }" << endl;
4363 out << " }" << endl;
4365 out << " }" << endl;
4366 out << " if (!chain->GetNtrees()) {" << endl;
4367 out << " ::Error(\"CreateChain\", \"No tree found from collection %s\", xmlfile);" << endl;
4368 out << " return NULL;" << endl;
4369 out << " }" << endl;
4370 out << " return chain;" << endl;
4371 out << "}" << endl << endl;
4373 if (hasANALYSISalice) {
4374 out <<"//________________________________________________________________________________" << endl;
4375 out << "Bool_t SetupPar(const char *package) {" << endl;
4376 out << "// Compile the package and set it up." << endl;
4377 out << " TString pkgdir = package;" << endl;
4378 out << " pkgdir.ReplaceAll(\".par\",\"\");" << endl;
4379 out << " gSystem->Exec(TString::Format(\"tar xvzf %s.par\", pkgdir.Data()));" << endl;
4380 out << " TString cdir = gSystem->WorkingDirectory();" << endl;
4381 out << " gSystem->ChangeDirectory(pkgdir);" << endl;
4382 out << " // Check for BUILD.sh and execute" << endl;
4383 out << " if (!gSystem->AccessPathName(\"PROOF-INF/BUILD.sh\")) {" << endl;
4384 out << " printf(\"*******************************\\n\");" << endl;
4385 out << " printf(\"*** Building PAR archive ***\\n\");" << endl;
4386 out << " printf(\"*******************************\\n\");" << endl;
4387 out << " if (gSystem->Exec(\"PROOF-INF/BUILD.sh\")) {" << endl;
4388 out << " ::Error(\"SetupPar\", \"Cannot build par archive %s\", pkgdir.Data());" << endl;
4389 out << " gSystem->ChangeDirectory(cdir);" << endl;
4390 out << " return kFALSE;" << endl;
4391 out << " }" << endl;
4392 out << " } else {" << endl;
4393 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/BUILD.sh for package %s\", pkgdir.Data());" << endl;
4394 out << " gSystem->ChangeDirectory(cdir);" << endl;
4395 out << " return kFALSE;" << endl;
4396 out << " }" << endl;
4397 out << " // Check for SETUP.C and execute" << endl;
4398 out << " if (!gSystem->AccessPathName(\"PROOF-INF/SETUP.C\")) {" << endl;
4399 out << " printf(\"*******************************\\n\");" << endl;
4400 out << " printf(\"*** Setup PAR archive ***\\n\");" << endl;
4401 out << " printf(\"*******************************\\n\");" << endl;
4402 out << " gROOT->Macro(\"PROOF-INF/SETUP.C\");" << endl;
4403 out << " } else {" << endl;
4404 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/SETUP.C for package %s\", pkgdir.Data());" << endl;
4405 out << " gSystem->ChangeDirectory(cdir);" << endl;
4406 out << " return kFALSE;" << endl;
4407 out << " }" << endl;
4408 out << " // Restore original workdir" << endl;
4409 out << " gSystem->ChangeDirectory(cdir);" << endl;
4410 out << " return kTRUE;" << endl;
4413 Info("WriteAnalysisMacro", "\n##### Analysis macro to run on worker nodes <%s> written",fAnalysisMacro.Data());
4415 Bool_t copy = kTRUE;
4416 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
4419 TString workdir = gGrid->GetHomeDirectory();
4420 workdir += fGridWorkingDir;
4421 if (FileExists(fAnalysisMacro)) gGrid->Rm(fAnalysisMacro);
4422 Info("WriteAnalysisMacro", "\n##### Copying analysis macro: <%s> to your alien workspace", fAnalysisMacro.Data());
4423 // TFile::Cp(Form("file:%s",fAnalysisMacro.Data()), Form("alien://%s/%s", workdir.Data(), fAnalysisMacro.Data()));
4424 if (!copyLocal2Alien("WriteAnalysisMacro",fAnalysisMacro.Data(),
4425 Form("alien://%s/%s", workdir.Data(),
4426 fAnalysisMacro.Data()))) Fatal("","Terminating");
4430 //______________________________________________________________________________
4431 void AliAnalysisAlien::WriteMergingMacro()
4433 // Write a macro to merge the outputs per master job.
4434 if (!fMergeViaJDL) return;
4435 if (!fOutputFiles.Length()) {
4436 Error("WriteMergingMacro", "No output file names defined. Are you running the right AliAnalysisAlien configuration ?");
4439 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
4440 TString mergingMacro = fExecutable;
4441 mergingMacro.ReplaceAll(".sh","_merge.C");
4442 if (gGrid && !fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
4443 if (!TestBit(AliAnalysisGrid::kSubmit)) {
4445 out.open(mergingMacro.Data(), ios::out);
4447 Error("WriteMergingMacro", "could not open file %s for writing", fAnalysisMacro.Data());
4450 Bool_t hasSTEERBase = kFALSE;
4451 Bool_t hasESD = kFALSE;
4452 Bool_t hasAOD = kFALSE;
4453 Bool_t hasANALYSIS = kFALSE;
4454 Bool_t hasOADB = kFALSE;
4455 Bool_t hasANALYSISalice = kFALSE;
4456 Bool_t hasCORRFW = kFALSE;
4457 TString func = mergingMacro;
4459 func.ReplaceAll(".C", "");
4460 out << "void " << func.Data() << "(const char *dir, Int_t stage=0)" << endl;
4462 out << "// Automatically generated merging macro executed in grid subjobs" << endl << endl;
4463 out << " TStopwatch timer;" << endl;
4464 out << " timer.Start();" << endl << endl;
4465 // Reset existing include path
4466 out << "// Reset existing include path and add current directory first in the search" << endl;
4467 out << " gSystem->SetIncludePath(\"-I.\");" << endl;
4468 if (!fExecutableCommand.Contains("aliroot")) {
4469 out << "// load base root libraries" << endl;
4470 out << " gSystem->Load(\"libTree\");" << endl;
4471 out << " gSystem->Load(\"libGeom\");" << endl;
4472 out << " gSystem->Load(\"libVMC\");" << endl;
4473 out << " gSystem->Load(\"libPhysics\");" << endl << endl;
4474 out << " gSystem->Load(\"libMinuit\");" << endl << endl;
4476 if (fAdditionalRootLibs.Length()) {
4477 // in principle libtree /lib geom libvmc etc. can go into this list, too
4478 out << "// Add aditional libraries" << endl;
4479 TObjArray *list = fAdditionalRootLibs.Tokenize(" ");
4482 while((str=(TObjString*)next())) {
4483 if (str->GetString().Contains(".so") || str->GetString().Contains(".dylib"))
4484 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
4486 if (list) delete list;
4488 out << "// Load analysis framework libraries" << endl;
4490 if (!fExecutableCommand.Contains("aliroot")) {
4491 out << " gSystem->Load(\"libSTEERBase\");" << endl;
4492 out << " gSystem->Load(\"libESD\");" << endl;
4493 out << " gSystem->Load(\"libAOD\");" << endl;
4495 out << " gSystem->Load(\"libANALYSIS\");" << endl;
4496 out << " gSystem->Load(\"libOADB\");" << endl;
4497 out << " gSystem->Load(\"libANALYSISalice\");" << endl;
4498 out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
4500 TIter next(fPackages);
4503 TString setupPar = "AliAnalysisAlien::SetupPar";
4504 while ((obj=next())) {
4505 pkgname = obj->GetName();
4506 if (pkgname == "STEERBase" ||
4507 pkgname == "STEERBase.par") hasSTEERBase = kTRUE;
4508 if (pkgname == "ESD" ||
4509 pkgname == "ESD.par") hasESD = kTRUE;
4510 if (pkgname == "AOD" ||
4511 pkgname == "AOD.par") hasAOD = kTRUE;
4512 if (pkgname == "ANALYSIS" ||
4513 pkgname == "ANALYSIS.par") hasANALYSIS = kTRUE;
4514 if (pkgname == "OADB" ||
4515 pkgname == "OADB.par") hasOADB = kTRUE;
4516 if (pkgname == "ANALYSISalice" ||
4517 pkgname == "ANALYSISalice.par") hasANALYSISalice = kTRUE;
4518 if (pkgname == "CORRFW" ||
4519 pkgname == "CORRFW.par") hasCORRFW = kTRUE;
4521 if (hasANALYSISalice) setupPar = "SetupPar";
4522 if (!hasSTEERBase) out << " gSystem->Load(\"libSTEERBase\");" << endl;
4523 else out << " if (!" << setupPar << "(\"STEERBase\")) return;" << endl;
4524 if (!hasESD) out << " gSystem->Load(\"libESD\");" << endl;
4525 else out << " if (!" << setupPar << "(\"ESD\")) return;" << endl;
4526 if (!hasAOD) out << " gSystem->Load(\"libAOD\");" << endl;
4527 else out << " if (!" << setupPar << "(\"AOD\")) return;" << endl;
4528 out << " gSystem->Load(\"libOADB\");" << endl;
4529 if (!hasANALYSIS) out << " gSystem->Load(\"libANALYSIS\");" << endl;
4530 else out << " if (!" << setupPar << "(\"ANALYSIS\")) return;" << endl;
4531 if (!hasOADB) out << " gSystem->Load(\"libOADB\");" << endl;
4532 else out << " if (!" << setupPar << "(\"OADB\")) return;" << endl;
4533 if (!hasANALYSISalice) out << " gSystem->Load(\"libANALYSISalice\");" << endl;
4534 else out << " if (!" << setupPar << "(\"ANALYSISalice\")) return;" << endl;
4535 if (!hasCORRFW) out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
4536 else out << " if (!" << setupPar << "(\"CORRFW\")) return;" << endl << endl;
4537 out << "// Compile other par packages" << endl;
4539 while ((obj=next())) {
4540 pkgname = obj->GetName();
4541 if (pkgname == "STEERBase" ||
4542 pkgname == "STEERBase.par" ||
4544 pkgname == "ESD.par" ||
4546 pkgname == "AOD.par" ||
4547 pkgname == "ANALYSIS" ||
4548 pkgname == "ANALYSIS.par" ||
4549 pkgname == "OADB" ||
4550 pkgname == "OADB.par" ||
4551 pkgname == "ANALYSISalice" ||
4552 pkgname == "ANALYSISalice.par" ||
4553 pkgname == "CORRFW" ||
4554 pkgname == "CORRFW.par") continue;
4555 out << " if (!" << setupPar << "(\"" << obj->GetName() << "\")) return;" << endl;
4558 out << "// include path" << endl;
4559 // Get the include path from the interpreter and remove entries pointing to AliRoot
4560 out << " TString intPath = gInterpreter->GetIncludePath();" << endl;
4561 out << " TObjArray *listpaths = intPath.Tokenize(\" \");" << endl;
4562 out << " TIter nextpath(listpaths);" << endl;
4563 out << " TObjString *pname;" << endl;
4564 out << " while ((pname=(TObjString*)nextpath())) {" << endl;
4565 out << " TString current = pname->GetName();" << endl;
4566 out << " if (current.Contains(\"AliRoot\") || current.Contains(\"ALICE_ROOT\")) continue;" << endl;
4567 out << " gSystem->AddIncludePath(current);" << endl;
4568 out << " }" << endl;
4569 out << " if (listpaths) delete listpaths;" << endl;
4570 if (fIncludePath.Length()) out << " gSystem->AddIncludePath(\"" << fIncludePath.Data() << "\");" << endl;
4571 out << " gROOT->ProcessLine(\".include $ALICE_ROOT/include\");" << endl;
4572 out << " printf(\"Include path: %s\\n\", gSystem->GetIncludePath());" << endl << endl;
4573 if (fMCLoop && !fGeneratorLibs.IsNull()) {
4574 out << "// MC generator libraries" << endl;
4575 TObjArray *list = fGeneratorLibs.Tokenize(" ");
4578 while((str=(TObjString*)next())) {
4579 out << " gSystem->Load(\"" << str->GetName() << "\");" << endl;
4583 if (fAdditionalLibs.Length()) {
4584 out << "// Add aditional AliRoot libraries" << endl;
4585 TString additionalLibs = fAdditionalLibs;
4586 additionalLibs.Strip();
4587 if (additionalLibs.Length() && fFriendLibs.Length())
4588 additionalLibs += " ";
4589 additionalLibs += fFriendLibs;
4590 TObjArray *list = additionalLibs.Tokenize(" ");
4593 while((str=(TObjString*)next())) {
4594 if (str->GetString().Contains(".so") || str->GetString().Contains(".dylib"))
4595 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
4597 if (list) delete list;
4600 out << "// Analysis source to be compiled at runtime (if any)" << endl;
4601 if (fAnalysisSource.Length()) {
4602 TObjArray *list = fAnalysisSource.Tokenize(" ");
4605 while((str=(TObjString*)next())) {
4606 out << " gROOT->ProcessLine(\".L " << str->GetString().Data() << "+g\");" << endl;
4608 if (list) delete list;
4612 if (fFastReadOption) {
4613 Warning("WriteMergingMacro", "!!! You requested FastRead option. Using xrootd flags to reduce timeouts in the grid merging jobs. Note that this may skip some files that could be accessed !!!");
4614 out << "// fast xrootd reading enabled" << endl;
4615 out << " printf(\"!!! You requested FastRead option. Using xrootd flags to reduce timeouts. Note that this may skip some files that could be accessed !!!\");" << endl;
4616 out << " gEnv->SetValue(\"XNet.ConnectTimeout\",50);" << endl;
4617 out << " gEnv->SetValue(\"XNet.RequestTimeout\",50);" << endl;
4618 out << " gEnv->SetValue(\"XNet.MaxRedirectCount\",2);" << endl;
4619 out << " gEnv->SetValue(\"XNet.ReconnectTimeout\",50);" << endl;
4620 out << " gEnv->SetValue(\"XNet.FirstConnectMaxCnt\",1);" << endl << endl;
4622 // Change temp directory to current one
4623 out << "// Connect to AliEn" << endl;
4624 out << " if (!TGrid::Connect(\"alien://\")) return;" << endl;
4625 out << "// Set temporary merging directory to current one" << endl;
4626 out << " gSystem->Setenv(\"TMPDIR\", gSystem->pwd());" << endl << endl;
4627 out << "// Set temporary compilation directory to current one" << endl;
4628 out << " gSystem->SetBuildDir(gSystem->pwd(), kTRUE);" << endl << endl;
4629 out << " TString outputDir = dir;" << endl;
4631 out << " TString outputFiles = \"" << GetListOfFiles("outaod") << "\";" << endl;
4633 out << " TString outputFiles = \"" << GetListOfFiles("out") << "\";" << endl;
4634 out << " TString mergeExcludes = \"" << fMergeExcludes << " " << fRegisterExcludes << "\";" << endl;
4635 out << " TObjArray *list = outputFiles.Tokenize(\",\");" << endl;
4636 out << " TIter *iter = new TIter(list);" << endl;
4637 out << " TObjString *str;" << endl;
4638 out << " TString outputFile;" << endl;
4639 out << " Bool_t merged = kTRUE;" << endl;
4640 TString analysisFile = fExecutable;
4641 analysisFile.ReplaceAll(".sh", ".root");
4642 out << " AliAnalysisManager *mgr = AliAnalysisAlien::LoadAnalysisManager(\""
4643 << analysisFile << "\");" << endl;
4644 out << " if (!mgr) {" << endl;
4645 out << " printf(\"ERROR: Analysis manager could not be extracted from file \");" << endl;
4646 out << " return;" << endl;
4647 out << " }" << endl;
4648 if (IsLocalTest()) {
4649 out << " printf(\"===================================\\n\");" << endl;
4650 out << " printf(\"Testing merging...\\n\");" << endl;
4651 out << " printf(\"===================================\\n\");" << endl;
4653 out << " while((str=(TObjString*)iter->Next())) {" << endl;
4654 out << " outputFile = str->GetString();" << endl;
4655 out << " if (outputFile.Contains(\"*\")) continue;" << endl;
4656 out << " Int_t index = outputFile.Index(\"@\");" << endl;
4657 out << " if (index > 0) outputFile.Remove(index);" << endl;
4658 out << " // Skip already merged outputs" << endl;
4659 out << " if (!gSystem->AccessPathName(outputFile)) {" << endl;
4660 out << " printf(\"Output file <%s> found. Not merging again.\\n\",outputFile.Data());" << endl;
4661 out << " continue;" << endl;
4662 out << " }" << endl;
4663 out << " if (mergeExcludes.Contains(outputFile.Data())) continue;" << endl;
4664 out << " merged = AliAnalysisAlien::MergeOutput(outputFile, outputDir, " << fMaxMergeFiles << ", stage);" << endl;
4665 out << " if (!merged) {" << endl;
4666 out << " printf(\"ERROR: Cannot merge %s\\n\", outputFile.Data());" << endl;
4667 out << " return;" << endl;
4668 out << " }" << endl;
4669 out << " }" << endl;
4670 if (mgr && mgr->IsCollectThroughput() && !IsLocalTest()) {
4671 out << " TString infolog = \"" << mgr->GetFileInfoLog() << "\";" << endl;
4672 out << " AliAnalysisAlien::MergeInfo(infolog, outputDir);" << endl;
4674 out << " // all outputs merged, validate" << endl;
4675 out << " ofstream out;" << endl;
4676 out << " out.open(\"outputs_valid\", ios::out);" << endl;
4677 out << " out.close();" << endl;
4678 out << " // read the analysis manager from file" << endl;
4679 if (IsLocalTest()) {
4680 out << " printf(\"===================================\\n\");" << endl;
4681 out << " printf(\"Testing Terminate()...\\n\");" << endl;
4682 out << " printf(\"===================================\\n\");" << endl;
4684 out << " if (!outputDir.Contains(\"Stage\")) return;" << endl;
4686 out << " mgr->SetRunFromPath(mgr->GetRunFromAlienPath(dir));" << endl;
4687 out << " mgr->SetSkipTerminate(kFALSE);" << endl;
4688 out << " mgr->PrintStatus();" << endl;
4690 if (mgr->GetDebugLevel()>3) {
4691 out << " gEnv->SetValue(\"XNet.Debug\", \"1\");" << endl;
4693 if (TestBit(AliAnalysisGrid::kTest))
4694 out << " AliLog::SetGlobalLogLevel(AliLog::kWarning);" << endl;
4696 out << " AliLog::SetGlobalLogLevel(AliLog::kError);" << endl;
4699 out << " TTree *tree = NULL;" << endl;
4700 out << " mgr->StartAnalysis(\"gridterminate\", tree);" << endl;
4701 out << "}" << endl << endl;
4702 if (hasANALYSISalice) {
4703 out <<"//________________________________________________________________________________" << endl;
4704 out << "Bool_t SetupPar(const char *package) {" << endl;
4705 out << "// Compile the package and set it up." << endl;
4706 out << " TString pkgdir = package;" << endl;
4707 out << " pkgdir.ReplaceAll(\".par\",\"\");" << endl;
4708 out << " gSystem->Exec(TString::Format(\"tar xvzf %s.par\", pkgdir.Data()));" << endl;
4709 out << " TString cdir = gSystem->WorkingDirectory();" << endl;
4710 out << " gSystem->ChangeDirectory(pkgdir);" << endl;
4711 out << " // Check for BUILD.sh and execute" << endl;
4712 out << " if (!gSystem->AccessPathName(\"PROOF-INF/BUILD.sh\")) {" << endl;
4713 out << " printf(\"*******************************\\n\");" << endl;
4714 out << " printf(\"*** Building PAR archive ***\\n\");" << endl;
4715 out << " printf(\"*******************************\\n\");" << endl;
4716 out << " if (gSystem->Exec(\"PROOF-INF/BUILD.sh\")) {" << endl;
4717 out << " ::Error(\"SetupPar\", \"Cannot build par archive %s\", pkgdir.Data());" << endl;
4718 out << " gSystem->ChangeDirectory(cdir);" << endl;
4719 out << " return kFALSE;" << endl;
4720 out << " }" << endl;
4721 out << " } else {" << endl;
4722 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/BUILD.sh for package %s\", pkgdir.Data());" << endl;
4723 out << " gSystem->ChangeDirectory(cdir);" << endl;
4724 out << " return kFALSE;" << endl;
4725 out << " }" << endl;
4726 out << " // Check for SETUP.C and execute" << endl;
4727 out << " if (!gSystem->AccessPathName(\"PROOF-INF/SETUP.C\")) {" << endl;
4728 out << " printf(\"*******************************\\n\");" << endl;
4729 out << " printf(\"*** Setup PAR archive ***\\n\");" << endl;
4730 out << " printf(\"*******************************\\n\");" << endl;
4731 out << " gROOT->Macro(\"PROOF-INF/SETUP.C\");" << endl;
4732 out << " } else {" << endl;
4733 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/SETUP.C for package %s\", pkgdir.Data());" << endl;
4734 out << " gSystem->ChangeDirectory(cdir);" << endl;
4735 out << " return kFALSE;" << endl;
4736 out << " }" << endl;
4737 out << " // Restore original workdir" << endl;
4738 out << " gSystem->ChangeDirectory(cdir);" << endl;
4739 out << " return kTRUE;" << endl;
4743 Bool_t copy = kTRUE;
4744 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
4747 TString workdir = gGrid->GetHomeDirectory();
4748 workdir += fGridWorkingDir;
4749 if (FileExists(mergingMacro)) gGrid->Rm(mergingMacro);
4750 Info("WriteMergingMacro", "\n##### Copying merging macro: <%s> to your alien workspace", mergingMacro.Data());
4751 // TFile::Cp(Form("file:%s",mergingMacro.Data()), Form("alien://%s/%s", workdir.Data(), mergingMacro.Data()));
4752 if (!copyLocal2Alien("WriteMergeMacro",mergingMacro.Data(),
4753 Form("%s/%s", workdir.Data(), mergingMacro.Data()))) Fatal("","Terminating");
4757 //______________________________________________________________________________
4758 Bool_t AliAnalysisAlien::SetupPar(const char *package)
4760 // Compile the par file archive pointed by <package>. This must be present in the current directory.
4761 // Note that for loading the compiled library. The current directory should have precedence in
4763 TString pkgdir = package;
4764 pkgdir.ReplaceAll(".par","");
4765 gSystem->Exec(TString::Format("tar xzf %s.par", pkgdir.Data()));
4766 TString cdir = gSystem->WorkingDirectory();
4767 gSystem->ChangeDirectory(pkgdir);
4768 // Check for BUILD.sh and execute
4769 if (!gSystem->AccessPathName("PROOF-INF/BUILD.sh")) {
4770 printf("**************************************************\n");
4771 printf("*** Building PAR archive %s\n", package);
4772 printf("**************************************************\n");
4773 if (gSystem->Exec("PROOF-INF/BUILD.sh")) {
4774 ::Error("SetupPar", "Cannot build par archive %s", pkgdir.Data());
4775 gSystem->ChangeDirectory(cdir);
4779 ::Error("SetupPar","Cannot access PROOF-INF/BUILD.sh for package %s", pkgdir.Data());
4780 gSystem->ChangeDirectory(cdir);
4783 // Check for SETUP.C and execute
4784 if (!gSystem->AccessPathName("PROOF-INF/SETUP.C")) {
4785 printf("**************************************************\n");
4786 printf("*** Setup PAR archive %s\n", package);
4787 printf("**************************************************\n");
4788 gROOT->Macro("PROOF-INF/SETUP.C");
4789 printf("*** Loaded library: %s\n", gSystem->GetLibraries(pkgdir,"",kFALSE));
4791 ::Error("SetupPar","Cannot access PROOF-INF/SETUP.C for package %s", pkgdir.Data());
4792 gSystem->ChangeDirectory(cdir);
4795 // Restore original workdir
4796 gSystem->ChangeDirectory(cdir);
4800 //______________________________________________________________________________
4801 void AliAnalysisAlien::WriteExecutable()
4803 // Generate the alien executable script.
4804 // Patch executable with -x to catch error code
4805 if (fExecutableCommand.Contains("root") &&
4806 fExecutableCommand.Contains("-q") &&
4807 !fExecutableCommand.Contains("-x")) fExecutableCommand += " -x";
4808 if (!TestBit(AliAnalysisGrid::kSubmit)) {
4810 out.open(fExecutable.Data(), ios::out);
4812 Error("WriteExecutable", "Bad file name for executable: %s", fExecutable.Data());
4815 out << "#!/bin/bash" << endl;
4816 // Make sure we can properly compile par files
4817 out << "export LD_LIBRARY_PATH=.:$LD_LIBRARY_PATH" << endl;
4818 out << "echo \"=========================================\"" << endl;
4819 out << "echo \"############## PATH : ##############\"" << endl;
4820 out << "echo $PATH" << endl;
4821 out << "echo \"############## LD_LIBRARY_PATH : ##############\"" << endl;
4822 out << "echo $LD_LIBRARY_PATH" << endl;
4823 out << "echo \"############## ROOTSYS : ##############\"" << endl;
4824 out << "echo $ROOTSYS" << endl;
4825 out << "echo \"############## which root : ##############\"" << endl;
4826 out << "which root" << endl;
4827 out << "echo \"############## ALICE_ROOT : ##############\"" << endl;
4828 out << "echo $ALICE_ROOT" << endl;
4829 out << "echo \"############## which aliroot : ##############\"" << endl;
4830 out << "which aliroot" << endl;
4831 out << "echo \"############## system limits : ##############\"" << endl;
4832 out << "ulimit -a" << endl;
4833 out << "echo \"############## memory : ##############\"" << endl;
4834 out << "free -m" << endl;
4835 out << "echo \"=========================================\"" << endl << endl;
4836 out << fExecutableCommand << " ";
4837 out << fAnalysisMacro.Data() << " " << fExecutableArgs.Data() << endl;
4838 out << "RET=$?" << endl;
4839 out << "if [ \"$RET\" != \"0\" ];then" << endl;
4840 out << " echo \"======== ERROR : " << fAnalysisMacro.Data() << " finished with NON zero code: $RET ========\"" << endl;
4841 out << " if [ \"$RET\" -gt 128 ] && [ \"$RET\" -lt 160 ]; then"<<endl;
4842 out << " let sig=\"$RET - 128\""<<endl;
4843 out << " sigs='HUP INT QUIT ILL TRAP ABRT BUS FPE"<<endl;
4844 out << " KILL USR1 SEGV USR2 PIPE ALRM TERM STKFLT"<<endl;
4845 out << " CHLD CONT STOP TSTP TTIN TTOU URG XCPU"<<endl;
4846 out << " XFSZ VTALRM PROF WINCH IO PWR SYS'"<<endl;
4847 out << " sig=SIG`echo $sigs | awk '{ print $'\"$sig\"' }'`"<<endl;
4848 out << " echo \"======== it appears to have been killed with signal: $sig ========\""<<endl;
4850 out << " exit $RET"<< endl;
4851 out << "fi" << endl << endl ;
4852 out << "echo \"======== " << fAnalysisMacro.Data() << " finished with exit code: $RET ========\"" << endl;
4853 out << "echo \"############## memory after: ##############\"" << endl;
4854 out << "free -m" << endl;
4856 Bool_t copy = kTRUE;
4857 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
4860 TString workdir = gGrid->GetHomeDirectory();
4861 workdir += fGridWorkingDir;
4862 TString executable = TString::Format("%s/%s", workdir.Data(), fExecutable.Data());
4863 if (FileExists(executable)) gGrid->Rm(executable);
4864 Info("WriteExecutable", "\n##### Copying executable file <%s> to your AliEn bin directory", fExecutable.Data());
4865 if (!copyLocal2Alien("WriteExecutable",fExecutable.Data(),
4866 executable.Data())) Fatal("","Terminating");
4870 //______________________________________________________________________________
4871 void AliAnalysisAlien::WriteMergeExecutable()
4873 // Generate the alien executable script for the merging job.
4874 if (!fMergeViaJDL) return;
4875 TString mergeExec = fExecutable;
4876 mergeExec.ReplaceAll(".sh", "_merge.sh");
4877 if (!TestBit(AliAnalysisGrid::kSubmit)) {
4879 out.open(mergeExec.Data(), ios::out);
4881 Error("WriteMergingExecutable", "Bad file name for executable: %s", mergeExec.Data());
4884 out << "#!/bin/bash" << endl;
4885 // Make sure we can properly compile par files
4886 out << "export LD_LIBRARY_PATH=.:$LD_LIBRARY_PATH" << endl;
4887 out << "echo \"=========================================\"" << endl;
4888 out << "echo \"############## PATH : ##############\"" << endl;
4889 out << "echo $PATH" << endl;
4890 out << "echo \"############## LD_LIBRARY_PATH : ##############\"" << endl;
4891 out << "echo $LD_LIBRARY_PATH" << endl;
4892 out << "echo \"############## ROOTSYS : ##############\"" << endl;
4893 out << "echo $ROOTSYS" << endl;
4894 out << "echo \"############## which root : ##############\"" << endl;
4895 out << "which root" << endl;
4896 out << "echo \"############## ALICE_ROOT : ##############\"" << endl;
4897 out << "echo $ALICE_ROOT" << endl;
4898 out << "echo \"############## which aliroot : ##############\"" << endl;
4899 out << "which aliroot" << endl;
4900 out << "echo \"############## system limits : ##############\"" << endl;
4901 out << "ulimit -a" << endl;
4902 out << "echo \"############## memory : ##############\"" << endl;
4903 out << "free -m" << endl;
4904 out << "echo \"=========================================\"" << endl << endl;
4905 TString mergeMacro = fExecutable;
4906 mergeMacro.ReplaceAll(".sh", "_merge.C");
4907 if (IsOneStageMerging())
4908 out << "export ARG=\"" << mergeMacro << "(\\\"$1\\\")\"" << endl;
4910 out << "export ARG=\"" << mergeMacro << "(\\\"$1\\\",$2)\"" << endl;
4911 out << fExecutableCommand << " " << "$ARG" << endl;
4912 out << "RET=$?" << endl;
4913 out << "if [ \"$RET\" != \"0\" ];then" << endl;
4914 out << " echo \"======== ERROR : " << fAnalysisMacro.Data() << " finished with NON zero code: $RET ========\"" << endl;
4915 out << " if [ \"$RET\" -gt 128 ] && [ \"$RET\" -lt 160 ]; then"<<endl;
4916 out << " let sig=\"$RET - 128\""<<endl;
4917 out << " sigs='HUP INT QUIT ILL TRAP ABRT BUS FPE"<<endl;
4918 out << " KILL USR1 SEGV USR2 PIPE ALRM TERM STKFLT"<<endl;
4919 out << " CHLD CONT STOP TSTP TTIN TTOU URG XCPU"<<endl;
4920 out << " XFSZ VTALRM PROF WINCH IO PWR SYS'"<<endl;
4921 out << " sig=SIG`echo $sigs | awk '{ print $'\"$sig\"' }'`"<<endl;
4922 out << " echo \"======== it appears to have been killed with signal: $sig ========\""<<endl;
4924 out << " exit $RET"<< endl;
4925 out << "fi" << endl << endl ;
4926 out << "echo \"======== " << mergeMacro.Data() << " finished with exit code: $? ========\"" << endl;
4927 out << "echo \"############## memory after: ##############\"" << endl;
4928 out << "free -m" << endl;
4930 Bool_t copy = kTRUE;
4931 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
4934 TString workdir = gGrid->GetHomeDirectory();
4935 workdir += fGridWorkingDir;
4936 TString executable = TString::Format("%s/%s", workdir.Data(), mergeExec.Data());
4937 if (FileExists(executable)) gGrid->Rm(executable);
4938 Info("WriteMergeExecutable", "\n##### Copying executable file <%s> to your AliEn bin directory", mergeExec.Data());
4939 if (!copyLocal2Alien("WriteMergeExecutable",mergeExec.Data(),
4940 executable.Data())) Fatal("","Terminating");
4944 //______________________________________________________________________________
4945 void AliAnalysisAlien::WriteProductionFile(const char *filename) const
4947 // Write the production file to be submitted by LPM manager. The format is:
4948 // First line: full_path_to_jdl estimated_no_subjobs_per_master
4949 // Next lines: full_path_to_dataset XXX (XXX is a string)
4950 // To submit, one has to: submit jdl XXX for all lines
4952 out.open(filename, ios::out);
4954 Error("WriteProductionFile", "Bad file name: %s", filename);
4958 if (!fProductionMode && !fGridWorkingDir.BeginsWith("/alice"))
4959 workdir = gGrid->GetHomeDirectory();
4960 workdir += fGridWorkingDir;
4961 Int_t njobspermaster = 1000*fNrunsPerMaster/fSplitMaxInputFileNumber;
4962 TString locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
4963 out << locjdl << " " << njobspermaster << endl;
4964 Int_t nmasterjobs = fInputFiles->GetEntries();
4965 for (Int_t i=0; i<nmasterjobs; i++) {
4966 TString runOutDir = gSystem->BaseName(fInputFiles->At(i)->GetName());
4967 runOutDir.ReplaceAll(".xml", "");
4969 out << Form("%s", fInputFiles->At(i)->GetName()) << " " << runOutDir << endl;
4971 out << Form("%s", fInputFiles->At(i)->GetName()) << " " << Form("%03d", i) << endl;
4974 Info("WriteProductionFile", "\n##### Copying production file <%s> to your work directory", filename);
4975 if (FileExists(filename)) gGrid->Rm(filename);
4976 // TFile::Cp(Form("file:%s",filename), Form("alien://%s/%s", workdir.Data(),filename));
4977 if (!copyLocal2Alien("WriteProductionFile", filename,
4978 Form("%s/%s", workdir.Data(),filename))) Fatal("","Terminating");
4982 //______________________________________________________________________________
4983 void AliAnalysisAlien::WriteValidationScript(Bool_t merge)
4985 // Generate the alien validation script.
4986 // Generate the validation script
4988 if (fValidationScript.IsNull()) {
4989 fValidationScript = fExecutable;
4990 fValidationScript.ReplaceAll(".sh", "_validation.sh");
4992 TString validationScript = fValidationScript;
4993 if (merge) validationScript.ReplaceAll(".sh", "_merge.sh");
4995 Error("WriteValidationScript", "Alien connection required");
4998 if (!fTerminateFiles.IsNull()) {
4999 fTerminateFiles.Strip();
5000 fTerminateFiles.ReplaceAll(" ",",");
5002 TString outStream = "";
5003 if (!TestBit(AliAnalysisGrid::kTest)) outStream = " >> stdout";
5004 if (!TestBit(AliAnalysisGrid::kSubmit)) {
5006 out.open(validationScript, ios::out);
5007 out << "#!/bin/bash" << endl;
5008 out << "##################################################" << endl;
5009 out << "validateout=`dirname $0`" << endl;
5010 out << "validatetime=`date`" << endl;
5011 out << "validated=\"0\";" << endl;
5012 out << "error=0" << endl;
5013 out << "if [ -z $validateout ]" << endl;
5014 out << "then" << endl;
5015 out << " validateout=\".\"" << endl;
5016 out << "fi" << endl << endl;
5017 out << "cd $validateout;" << endl;
5018 out << "validateworkdir=`pwd`;" << endl << endl;
5019 out << "echo \"*******************************************************\"" << outStream << endl;
5020 out << "echo \"* Automatically generated validation script *\"" << outStream << endl;
5022 out << "echo \"* Time: $validatetime \"" << outStream << endl;
5023 out << "echo \"* Dir: $validateout\"" << outStream << endl;
5024 out << "echo \"* Workdir: $validateworkdir\"" << outStream << endl;
5025 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl;
5026 out << "ls -la ./" << outStream << endl;
5027 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl << endl;
5028 out << "##################################################" << endl;
5031 out << "if [ ! -f stderr ] ; then" << endl;
5032 out << " error=1" << endl;
5033 out << " echo \"* ########## Job not validated - no stderr ###\" " << outStream << endl;
5034 out << " echo \"Error = $error\" " << outStream << endl;
5035 out << "fi" << endl;
5037 out << "parArch=`grep -Ei \"Cannot Build the PAR Archive\" stderr`" << endl;
5038 out << "segViol=`grep -Ei \"Segmentation violation\" stderr`" << endl;
5039 out << "segFault=`grep -Ei \"Segmentation fault\" stderr`" << endl;
5040 out << "glibcErr=`grep -Ei \"*** glibc detected ***\" stderr`" << endl;
5043 out << "if [ \"$parArch\" != \"\" ] ; then" << endl;
5044 out << " error=1" << endl;
5045 out << " echo \"* ########## Job not validated - PAR archive not built ###\" " << outStream << endl;
5046 out << " echo \"$parArch\" " << outStream << endl;
5047 out << " echo \"Error = $error\" " << outStream << endl;
5048 out << "fi" << endl;
5050 out << "if [ \"$segViol\" != \"\" ] ; then" << endl;
5051 out << " error=1" << endl;
5052 out << " echo \"* ########## Job not validated - Segment. violation ###\" " << outStream << endl;
5053 out << " echo \"$segViol\" " << outStream << endl;
5054 out << " echo \"Error = $error\" " << outStream << endl;
5055 out << "fi" << endl;
5057 out << "if [ \"$segFault\" != \"\" ] ; then" << endl;
5058 out << " error=1" << endl;
5059 out << " echo \"* ########## Job not validated - Segment. fault ###\" " << outStream << endl;
5060 out << " echo \"$segFault\" " << outStream << endl;
5061 out << " echo \"Error = $error\" " << outStream << endl;
5062 out << "fi" << endl;
5064 out << "if [ \"$glibcErr\" != \"\" ] ; then" << endl;
5065 out << " error=1" << endl;
5066 out << " echo \"* ########## Job not validated - *** glibc detected *** ###\" " << outStream << endl;
5067 out << " echo \"$glibcErr\" " << outStream << endl;
5068 out << " echo \"Error = $error\" " << outStream << endl;
5069 out << "fi" << endl;
5071 // Part dedicated to the specific analyses running into the train
5073 TString outputFiles = fOutputFiles;
5074 if (merge && !fTerminateFiles.IsNull()) {
5075 if (!outputFiles.IsNull()) outputFiles += ",";
5076 outputFiles += fTerminateFiles;
5078 TObjArray *arr = outputFiles.Tokenize(",");
5081 while (!merge && (os=(TObjString*)next1())) {
5082 // No need to validate outputs produced by merging since the merging macro does this
5083 outputFile = os->GetString();
5084 Int_t index = outputFile.Index("@");
5085 if (index > 0) outputFile.Remove(index);
5086 if (fTerminateFiles.Contains(outputFile)) continue;
5087 if (outputFile.Contains("*")) continue;
5088 out << "if ! [ -f " << outputFile.Data() << " ] ; then" << endl;
5089 out << " error=1" << endl;
5090 out << " echo \"Output file " << outputFile << " not found. Job FAILED !\"" << outStream << endl;
5091 out << " echo \"Output file " << outputFile << " not found. Job FAILED !\" >> stderr" << endl;
5092 out << "fi" << endl;
5095 out << "if ! [ -f outputs_valid ] ; then" << endl;
5096 out << " error=1" << endl;
5097 out << " echo \"Output files were not validated by the analysis manager\" >> stdout" << endl;
5098 out << " echo \"Output files were not validated by the analysis manager\" >> stderr" << endl;
5099 out << "fi" << endl;
5101 out << "if [ $error = 0 ] ; then" << endl;
5102 out << " echo \"* ---------------- Job Validated ------------------*\"" << outStream << endl;
5103 if (!IsKeepLogs()) {
5104 out << " echo \"* === Logs std* will be deleted === \"" << endl;
5106 out << " rm -f std*" << endl;
5108 out << "fi" << endl;
5110 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl;
5111 out << "echo \"*******************************************************\"" << outStream << endl;
5112 out << "cd -" << endl;
5113 out << "exit $error" << endl;
5115 Bool_t copy = kTRUE;
5116 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
5119 TString workdir = gGrid->GetHomeDirectory();
5120 workdir += fGridWorkingDir;
5121 Info("WriteValidationScript", "\n##### Copying validation script <%s> to your AliEn working space", validationScript.Data());
5122 if (FileExists(validationScript)) gGrid->Rm(validationScript);
5123 // TFile::Cp(Form("file:%s",validationScript.Data()), Form("alien://%s/%s", workdir.Data(),validationScript.Data()));
5124 if (!copyLocal2Alien("WriteValidationScript", validationScript.Data(),
5125 Form("%s/%s",workdir.Data(), validationScript.Data()))) Fatal("","Terminating");