1 /**************************************************************************
2 * Copyright(c) 1998-2007, ALICE Experiment at CERN, All rights reserved. *
4 * Author: The ALICE Off-line Project. *
5 * Contributors are mentioned in the code where appropriate. *
7 * Permission to use, copy, modify and distribute this software and its *
8 * documentation strictly for non-commercial purposes is hereby granted *
9 * without fee, provided that the above copyright notice appears in all *
10 * copies and that both the copyright notice and this permission notice *
11 * appear in the supporting documentation. The authors make no claims *
12 * about the suitability of this software for any purpose. It is *
13 * provided "as is" without express or implied warranty. *
14 **************************************************************************/
16 // Author: Mihaela Gheata, 01/09/2008
18 //==============================================================================
19 // AliAnalysisAlien - AliEn utility class. Provides interface for creating
20 // a personalized JDL, finding and creating a dataset.
21 //==============================================================================
23 #include "AliAnalysisAlien.h"
25 #include "Riostream.h"
32 #include "TInterpreter.h"
34 #include "TFileCollection.h"
36 #include "TObjString.h"
37 #include "TObjArray.h"
40 #include "TGridResult.h"
41 #include "TGridCollection.h"
43 #include "TGridJobStatusList.h"
44 #include "TGridJobStatus.h"
45 #include "TFileMerger.h"
46 #include "AliAnalysisManager.h"
47 #include "AliAnalysisTaskCfg.h"
48 #include "AliVEventHandler.h"
49 #include "AliAnalysisDataContainer.h"
50 #include "AliMultiInputEventHandler.h"
56 ClassImp(AliAnalysisAlien)
62 Bool_t copyLocal2Alien(const char* where, const char* loc, const char* rem)
64 TString sl(Form("file:%s", loc));
65 TString sr(Form("alien://%s", rem));
66 Bool_t ret = TFile::Cp(sl, sr);
68 Warning(where, "Failed to copy %s to %s", sl.Data(), sr.Data());
74 //______________________________________________________________________________
75 AliAnalysisAlien::AliAnalysisAlien()
81 fSplitMaxInputFileNumber(0),
83 fMasterResubmitThreshold(0),
96 fNproofWorkersPerSlave(0),
102 fExecutableCommand(),
108 fAdditionalRootLibs(),
115 fAliPhysicsVersion(),
157 //______________________________________________________________________________
158 AliAnalysisAlien::AliAnalysisAlien(const char *name)
159 :AliAnalysisGrid(name),
164 fSplitMaxInputFileNumber(0),
166 fMasterResubmitThreshold(0),
179 fNproofWorkersPerSlave(0),
185 fExecutableCommand(),
191 fAdditionalRootLibs(),
198 fAliPhysicsVersion(),
240 //______________________________________________________________________________
241 AliAnalysisAlien::AliAnalysisAlien(const AliAnalysisAlien& other)
242 :AliAnalysisGrid(other),
245 fPrice(other.fPrice),
247 fSplitMaxInputFileNumber(other.fSplitMaxInputFileNumber),
248 fMaxInitFailed(other.fMaxInitFailed),
249 fMasterResubmitThreshold(other.fMasterResubmitThreshold),
250 fNtestFiles(other.fNtestFiles),
251 fNrunsPerMaster(other.fNrunsPerMaster),
252 fMaxMergeFiles(other.fMaxMergeFiles),
253 fMaxMergeStages(other.fMaxMergeStages),
254 fNsubmitted(other.fNsubmitted),
255 fProductionMode(other.fProductionMode),
256 fOutputToRunNo(other.fOutputToRunNo),
257 fMergeViaJDL(other.fMergeViaJDL),
258 fFastReadOption(other.fFastReadOption),
259 fOverwriteMode(other.fOverwriteMode),
260 fNreplicas(other.fNreplicas),
261 fNproofWorkers(other.fNproofWorkers),
262 fNproofWorkersPerSlave(other.fNproofWorkersPerSlave),
263 fProofReset(other.fProofReset),
264 fNMCevents(other.fNMCevents),
265 fNMCjobs(other.fNMCjobs),
266 fRunNumbers(other.fRunNumbers),
267 fExecutable(other.fExecutable),
268 fExecutableCommand(other.fExecutableCommand),
269 fArguments(other.fArguments),
270 fExecutableArgs(other.fExecutableArgs),
271 fAnalysisMacro(other.fAnalysisMacro),
272 fAnalysisSource(other.fAnalysisSource),
273 fValidationScript(other.fValidationScript),
274 fAdditionalRootLibs(other.fAdditionalRootLibs),
275 fAdditionalLibs(other.fAdditionalLibs),
276 fGeneratorLibs(other.fGeneratorLibs),
277 fSplitMode(other.fSplitMode),
278 fAPIVersion(other.fAPIVersion),
279 fROOTVersion(other.fROOTVersion),
280 fAliROOTVersion(other.fAliROOTVersion),
281 fAliPhysicsVersion(other.fAliPhysicsVersion),
282 fExternalPackages(other.fExternalPackages),
284 fGridWorkingDir(other.fGridWorkingDir),
285 fGridDataDir(other.fGridDataDir),
286 fDataPattern(other.fDataPattern),
287 fGridOutputDir(other.fGridOutputDir),
288 fOutputArchive(other.fOutputArchive),
289 fOutputFiles(other.fOutputFiles),
290 fInputFormat(other.fInputFormat),
291 fDatasetName(other.fDatasetName),
292 fJDLName(other.fJDLName),
293 fTerminateFiles(other.fTerminateFiles),
294 fMergeExcludes(other.fMergeExcludes),
295 fRegisterExcludes(other.fRegisterExcludes),
296 fIncludePath(other.fIncludePath),
297 fCloseSE(other.fCloseSE),
298 fFriendChainName(other.fFriendChainName),
299 fJobTag(other.fJobTag),
300 fOutputSingle(other.fOutputSingle),
301 fRunPrefix(other.fRunPrefix),
302 fProofCluster(other.fProofCluster),
303 fProofDataSet(other.fProofDataSet),
304 fFileForTestMode(other.fFileForTestMode),
305 fAliRootMode(other.fAliRootMode),
306 fProofProcessOpt(other.fProofProcessOpt),
307 fMergeDirName(other.fMergeDirName),
312 fDropToShell(other.fDropToShell),
313 fMCLoop(other.fMCLoop),
314 fGridJobIDs(other.fGridJobIDs),
315 fGridStages(other.fGridStages),
316 fFriendLibs(other.fFriendLibs),
317 fTreeName(other.fTreeName)
320 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
321 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
322 fRunRange[0] = other.fRunRange[0];
323 fRunRange[1] = other.fRunRange[1];
324 if (other.fInputFiles) {
325 fInputFiles = new TObjArray();
326 TIter next(other.fInputFiles);
328 while ((obj=next())) fInputFiles->Add(new TObjString(obj->GetName()));
329 fInputFiles->SetOwner();
331 if (other.fPackages) {
332 fPackages = new TObjArray();
333 TIter next(other.fPackages);
335 while ((obj=next())) fPackages->Add(new TObjString(obj->GetName()));
336 fPackages->SetOwner();
338 if (other.fModules) {
339 fModules = new TObjArray();
340 fModules->SetOwner();
341 TIter next(other.fModules);
342 AliAnalysisTaskCfg *mod, *crt;
343 while ((crt=(AliAnalysisTaskCfg*)next())) {
344 mod = new AliAnalysisTaskCfg(*crt);
350 //______________________________________________________________________________
351 AliAnalysisAlien::~AliAnalysisAlien()
359 fProofParam.DeleteAll();
362 //______________________________________________________________________________
363 AliAnalysisAlien &AliAnalysisAlien::operator=(const AliAnalysisAlien& other)
366 if (this != &other) {
367 AliAnalysisGrid::operator=(other);
368 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
369 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
370 fPrice = other.fPrice;
372 fSplitMaxInputFileNumber = other.fSplitMaxInputFileNumber;
373 fMaxInitFailed = other.fMaxInitFailed;
374 fMasterResubmitThreshold = other.fMasterResubmitThreshold;
375 fNtestFiles = other.fNtestFiles;
376 fNrunsPerMaster = other.fNrunsPerMaster;
377 fMaxMergeFiles = other.fMaxMergeFiles;
378 fMaxMergeStages = other.fMaxMergeStages;
379 fNsubmitted = other.fNsubmitted;
380 fProductionMode = other.fProductionMode;
381 fOutputToRunNo = other.fOutputToRunNo;
382 fMergeViaJDL = other.fMergeViaJDL;
383 fFastReadOption = other.fFastReadOption;
384 fOverwriteMode = other.fOverwriteMode;
385 fNreplicas = other.fNreplicas;
386 fNproofWorkers = other.fNproofWorkers;
387 fNproofWorkersPerSlave = other.fNproofWorkersPerSlave;
388 fProofReset = other.fProofReset;
389 fNMCevents = other.fNMCevents;
390 fNMCjobs = other.fNMCjobs;
391 fRunNumbers = other.fRunNumbers;
392 fExecutable = other.fExecutable;
393 fExecutableCommand = other.fExecutableCommand;
394 fArguments = other.fArguments;
395 fExecutableArgs = other.fExecutableArgs;
396 fAnalysisMacro = other.fAnalysisMacro;
397 fAnalysisSource = other.fAnalysisSource;
398 fValidationScript = other.fValidationScript;
399 fAdditionalRootLibs = other.fAdditionalRootLibs;
400 fAdditionalLibs = other.fAdditionalLibs;
401 fGeneratorLibs = other.fGeneratorLibs;
402 fSplitMode = other.fSplitMode;
403 fAPIVersion = other.fAPIVersion;
404 fROOTVersion = other.fROOTVersion;
405 fAliROOTVersion = other.fAliROOTVersion;
406 fAliPhysicsVersion = other.fAliPhysicsVersion;
407 fExternalPackages = other.fExternalPackages;
409 fGridWorkingDir = other.fGridWorkingDir;
410 fGridDataDir = other.fGridDataDir;
411 fDataPattern = other.fDataPattern;
412 fGridOutputDir = other.fGridOutputDir;
413 fOutputArchive = other.fOutputArchive;
414 fOutputFiles = other.fOutputFiles;
415 fInputFormat = other.fInputFormat;
416 fDatasetName = other.fDatasetName;
417 fJDLName = other.fJDLName;
418 fTerminateFiles = other.fTerminateFiles;
419 fMergeExcludes = other.fMergeExcludes;
420 fRegisterExcludes = other.fRegisterExcludes;
421 fIncludePath = other.fIncludePath;
422 fCloseSE = other.fCloseSE;
423 fFriendChainName = other.fFriendChainName;
424 fJobTag = other.fJobTag;
425 fOutputSingle = other.fOutputSingle;
426 fRunPrefix = other.fRunPrefix;
427 fProofCluster = other.fProofCluster;
428 fProofDataSet = other.fProofDataSet;
429 fFileForTestMode = other.fFileForTestMode;
430 fAliRootMode = other.fAliRootMode;
431 fProofProcessOpt = other.fProofProcessOpt;
432 fMergeDirName = other.fMergeDirName;
433 fDropToShell = other.fDropToShell;
434 fMCLoop = other.fMCLoop;
435 fGridJobIDs = other.fGridJobIDs;
436 fGridStages = other.fGridStages;
437 fFriendLibs = other.fFriendLibs;
438 fTreeName = other.fTreeName;
439 if (other.fInputFiles) {
440 fInputFiles = new TObjArray();
441 TIter next(other.fInputFiles);
443 while ((obj=next())) fInputFiles->Add(new TObjString(obj->GetName()));
444 fInputFiles->SetOwner();
446 if (other.fPackages) {
447 fPackages = new TObjArray();
448 TIter next(other.fPackages);
450 while ((obj=next())) fPackages->Add(new TObjString(obj->GetName()));
451 fPackages->SetOwner();
453 if (other.fModules) {
454 fModules = new TObjArray();
455 fModules->SetOwner();
456 TIter next(other.fModules);
457 AliAnalysisTaskCfg *mod, *crt;
458 while ((crt=(AliAnalysisTaskCfg*)next())) {
459 mod = new AliAnalysisTaskCfg(*crt);
467 //______________________________________________________________________________
468 void AliAnalysisAlien::AddAdditionalLibrary(const char *name)
470 // Add a single additional library to be loaded. Extension must be present.
472 if (!lib.Contains(".")) {
473 Error("AddAdditionalLibrary", "Extension not defined for %s", name);
476 if (fAdditionalLibs.Contains(name)) {
477 Warning("AddAdditionalLibrary", "Library %s already added.", name);
480 if (!fAdditionalLibs.IsNull()) fAdditionalLibs += " ";
481 fAdditionalLibs += lib;
484 //______________________________________________________________________________
485 void AliAnalysisAlien::AddModule(AliAnalysisTaskCfg *module)
487 // Adding a module. Checks if already existing. Becomes owned by this.
489 if (GetModule(module->GetName())) {
490 Error("AddModule", "A module having the same name %s already added", module->GetName());
494 fModules = new TObjArray();
495 fModules->SetOwner();
497 fModules->Add(module);
500 //______________________________________________________________________________
501 void AliAnalysisAlien::AddModules(TObjArray *list)
503 // Adding a list of modules. Checks if already existing. Becomes owned by this.
505 AliAnalysisTaskCfg *module;
506 while ((module = (AliAnalysisTaskCfg*)next())) AddModule(module);
509 //______________________________________________________________________________
510 Bool_t AliAnalysisAlien::CheckDependencies()
512 // Check if all dependencies are satisfied. Reorder modules if needed.
513 Int_t nmodules = GetNmodules();
515 Warning("CheckDependencies", "No modules added yet to check their dependencies");
518 AliAnalysisTaskCfg *mod = 0;
519 AliAnalysisTaskCfg *dep = 0;
522 for (i=0; i<nmodules; i++) {
523 mod = (AliAnalysisTaskCfg*) fModules->At(i);
524 Int_t ndeps = mod->GetNdeps();
526 for (j=0; j<ndeps; j++) {
527 depname = mod->GetDependency(j);
528 dep = GetModule(depname);
530 Error("CheckDependencies","Dependency %s not added for module %s",
531 depname.Data(), mod->GetName());
534 if (dep->NeedsDependency(mod->GetName())) {
535 Error("CheckDependencies","Modules %s and %s circularly depend on each other",
536 mod->GetName(), dep->GetName());
539 Int_t idep = fModules->IndexOf(dep);
540 // The dependency task must come first
542 // Remove at idep and move all objects below up one slot
543 // down to index i included.
544 fModules->RemoveAt(idep);
545 for (k=idep-1; k>=i; k--) fModules->AddAt(fModules->RemoveAt(k),k+1);
546 fModules->AddAt(dep, i++);
548 //Redo from istart if dependencies were inserted
549 if (i>istart) i=istart-1;
555 //______________________________________________________________________________
556 AliAnalysisManager *AliAnalysisAlien::CreateAnalysisManager(const char *name, const char *filename)
558 // Create the analysis manager and optionally execute the macro in filename.
559 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
561 mgr->SetMCLoop(fMCLoop);
564 mgr = new AliAnalysisManager(name);
565 mgr->SetGridHandler((AliAnalysisGrid*)this);
566 mgr->SetMCLoop(fMCLoop);
567 if (strlen(filename)) {
568 TString line = gSystem->ExpandPathName(filename);
570 gROOT->ProcessLine(line.Data());
575 //______________________________________________________________________________
576 Int_t AliAnalysisAlien::GetNmodules() const
578 // Get number of modules.
579 if (!fModules) return 0;
580 return fModules->GetEntries();
583 //______________________________________________________________________________
584 AliAnalysisTaskCfg *AliAnalysisAlien::GetModule(const char *name)
586 // Get a module by name.
587 if (!fModules) return 0;
588 return (AliAnalysisTaskCfg*)fModules->FindObject(name);
591 //______________________________________________________________________________
592 Bool_t AliAnalysisAlien::LoadModule(AliAnalysisTaskCfg *mod)
594 // Load a given module.
595 if (mod->IsLoaded()) return kTRUE;
596 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
598 Error("LoadModule", "No analysis manager created yet. Use CreateAnalysisManager first.");
601 Int_t ndeps = mod->GetNdeps();
603 for (Int_t j=0; j<ndeps; j++) {
604 depname = mod->GetDependency(j);
605 AliAnalysisTaskCfg *dep = GetModule(depname);
607 Error("LoadModule","Dependency %s not existing for module %s",
608 depname.Data(), mod->GetName());
611 if (!LoadModule(dep)) {
612 Error("LoadModule","Dependency %s for module %s could not be loaded",
613 depname.Data(), mod->GetName());
617 // Load libraries for the module
618 if (!mod->CheckLoadLibraries()) {
619 Error("LoadModule", "Cannot load all libraries for module %s", mod->GetName());
622 // Check if a custom file name was requested
623 if (strlen(mod->GetOutputFileName())) mgr->SetCommonFileName(mod->GetOutputFileName());
625 // Check if a custom terminate file name was requested
626 if (strlen(mod->GetTerminateFileName())) {
627 if (!fTerminateFiles.IsNull()) fTerminateFiles += ",";
628 fTerminateFiles += mod->GetTerminateFileName();
632 if (mod->ExecuteMacro()<0) {
633 Error("LoadModule", "Executing the macro %s with arguments: %s for module %s returned a negative value",
634 mod->GetMacroName(), mod->GetMacroArgs(), mod->GetName());
637 // Configure dependencies
638 if (mod->GetConfigMacro() && mod->ExecuteConfigMacro()<0) {
639 Error("LoadModule", "There was an error executing the deps config macro %s for module %s",
640 mod->GetConfigMacro()->GetTitle(), mod->GetName());
643 // Adjust extra libraries
644 Int_t nlibs = mod->GetNlibs();
646 for (Int_t i=0; i<nlibs; i++) {
647 lib = mod->GetLibrary(i);
648 lib = Form("lib%s.so", lib.Data());
649 if (fAdditionalLibs.Contains(lib)) continue;
650 if (!fAdditionalLibs.IsNull()) fAdditionalLibs += " ";
651 fAdditionalLibs += lib;
656 //______________________________________________________________________________
657 Bool_t AliAnalysisAlien::GenerateTrain(const char *name)
659 // Generate the full train.
660 fAdditionalLibs = "";
661 if (!LoadModules()) return kFALSE;
662 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
663 if (!mgr->InitAnalysis()) return kFALSE;
666 Int_t productionMode = fProductionMode;
668 TString macro = fAnalysisMacro;
669 TString executable = fExecutable;
670 TString validation = fValidationScript;
671 TString execCommand = fExecutableCommand;
672 SetAnalysisMacro(Form("%s.C", name));
673 SetExecutable(Form("%s.sh", name));
674 // SetExecutableCommand("aliroot -b -q ");
675 SetValidationScript(Form("%s_validation.sh", name));
677 SetProductionMode(productionMode);
678 fAnalysisMacro = macro;
679 fExecutable = executable;
680 fExecutableCommand = execCommand;
681 fValidationScript = validation;
685 //______________________________________________________________________________
686 Bool_t AliAnalysisAlien::GenerateTest(const char *name, const char *modname)
688 // Generate test macros for a single module or for the full train.
689 fAdditionalLibs = "";
690 if (strlen(modname)) {
691 if (!CheckDependencies()) return kFALSE;
692 AliAnalysisTaskCfg *mod = GetModule(modname);
694 Error("GenerateTest", "cannot generate test for inexistent module %s", modname);
697 if (!LoadModule(mod)) return kFALSE;
698 if (!LoadFriendLibs()) return kFALSE;
699 } else if (!LoadModules()) return kFALSE;
700 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
701 if (!mgr->InitAnalysis()) return kFALSE;
705 Int_t productionMode = fProductionMode;
707 TString macro = fAnalysisMacro;
708 TString executable = fExecutable;
709 TString validation = fValidationScript;
710 TString execCommand = fExecutableCommand;
711 SetAnalysisMacro(Form("%s.C", name));
712 SetExecutable(Form("%s.sh", name));
713 fOutputFiles = GetListOfFiles("outaod");
714 // Add extra files registered to the analysis manager
715 TString extra = GetListOfFiles("ext");
716 if (!extra.IsNull()) {
717 extra.ReplaceAll(".root", "*.root");
718 if (!fOutputFiles.IsNull()) fOutputFiles += ",";
719 fOutputFiles += extra;
721 // SetExecutableCommand("aliroot -b -q ");
722 SetValidationScript(Form("%s_validation.sh", name));
724 WriteAnalysisMacro();
726 WriteValidationScript();
728 WriteMergeExecutable();
729 WriteValidationScript(kTRUE);
730 SetLocalTest(kFALSE);
731 SetProductionMode(productionMode);
732 fAnalysisMacro = macro;
733 fExecutable = executable;
734 fExecutableCommand = execCommand;
735 fValidationScript = validation;
739 //______________________________________________________________________________
740 Bool_t AliAnalysisAlien::LoadModules()
742 // Load all modules by executing the AddTask macros. Checks first the dependencies.
743 fAdditionalLibs = "";
744 Int_t nmodules = GetNmodules();
746 Warning("LoadModules", "No module to be loaded");
749 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
751 Error("LoadModules", "No analysis manager created yet. Use CreateAnalysisManager first.");
754 if (!CheckDependencies()) return kFALSE;
755 nmodules = GetNmodules();
756 AliAnalysisTaskCfg *mod;
757 for (Int_t imod=0; imod<nmodules; imod++) {
758 mod = (AliAnalysisTaskCfg*)fModules->At(imod);
759 if (!LoadModule(mod)) return kFALSE;
761 // Load additional friend libraries
762 return LoadFriendLibs();
765 //______________________________________________________________________________
766 Bool_t AliAnalysisAlien::LoadFriendLibs() const
768 // Load libraries required for reading friends.
769 if (fFriendLibs.Length()) {
772 if (fFriendLibs.Contains(",")) list = fFriendLibs.Tokenize(",");
773 else list = fFriendLibs.Tokenize(" ");
774 for (Int_t ilib=0; ilib<list->GetEntriesFast(); ilib++) {
775 lib = list->At(ilib)->GetName();
776 lib.ReplaceAll(".so","");
777 lib.ReplaceAll(".dylib","");
778 lib.ReplaceAll(" ","");
779 if (lib.BeginsWith("lib")) lib.Remove(0, 3);
781 Int_t loaded = strlen(gSystem->GetLibraries(lib,"",kFALSE));
782 if (!loaded) loaded = gSystem->Load(lib);
784 Error("LoadModules", "Cannot load library for friends %s", lib.Data());
793 //______________________________________________________________________________
794 void AliAnalysisAlien::SetRunPrefix(const char *prefix)
796 // Set the run number format. Can be a prefix or a format like "%09d"
798 if (!fRunPrefix.Contains("%")) fRunPrefix += "%d";
801 //______________________________________________________________________________
802 void AliAnalysisAlien::AddIncludePath(const char *path)
804 // Add include path in the remote analysis macro.
806 if (p.Contains("-I")) fIncludePath += Form("%s ", path);
807 else fIncludePath += Form("-I%s ", path);
810 //______________________________________________________________________________
811 void AliAnalysisAlien::AddRunNumber(Int_t run)
813 // Add a run number to the list of runs to be processed.
814 if (fRunNumbers.Length()) fRunNumbers += " ";
815 fRunNumbers += Form(fRunPrefix.Data(), run);
818 //______________________________________________________________________________
819 void AliAnalysisAlien::AddRunList(const char* runList)
821 // Add several runs into the list of runs; they are expected to be separated by a blank character.
822 TString sList = runList;
823 TObjArray *list = sList.Tokenize(" ");
824 Int_t n = list->GetEntries();
825 for (Int_t i = 0; i < n; i++) {
826 TObjString *os = (TObjString*)list->At(i);
827 AddRunNumber(os->GetString().Atoi());
832 //______________________________________________________________________________
833 void AliAnalysisAlien::AddRunNumber(const char* run)
835 // Add a run number to the list of runs to be processed.
838 TObjArray *arr = runs.Tokenize(" ");
841 prefix.Append(fRunPrefix, fRunPrefix.Index("%d"));
842 while ((os=(TObjString*)next())){
843 if (fRunNumbers.Length()) fRunNumbers += " ";
844 fRunNumbers += Form("%s%s", prefix.Data(), os->GetString().Data());
849 //______________________________________________________________________________
850 void AliAnalysisAlien::AddDataFile(const char *lfn)
852 // Adds a data file to the input to be analysed. The file should be a valid LFN
853 // or point to an existing file in the alien workdir.
854 if (!fInputFiles) fInputFiles = new TObjArray();
855 fInputFiles->Add(new TObjString(lfn));
858 //______________________________________________________________________________
859 void AliAnalysisAlien::AddExternalPackage(const char *package)
861 // Adds external packages w.r.t to the default ones (root,aliroot and gapi)
862 if (fExternalPackages) fExternalPackages += " ";
863 fExternalPackages += package;
866 //______________________________________________________________________________
867 Bool_t AliAnalysisAlien::Connect()
869 // Try to connect to AliEn. User needs a valid token and /tmp/gclient_env_$UID sourced.
870 if (gGrid && gGrid->IsConnected()) return kTRUE;
871 if (fProductionMode) return kTRUE;
873 Info("Connect", "Trying to connect to AliEn ...");
874 TGrid::Connect("alien://");
876 if (!gGrid || !gGrid->IsConnected()) {
877 Error("Connect", "Did not managed to connect to AliEn. Make sure you have a valid token.");
880 fUser = gGrid->GetUser();
881 Info("Connect", "\n##### Connected to AliEn as user %s. Setting analysis user to <%s>", fUser.Data(), fUser.Data());
885 //______________________________________________________________________________
886 void AliAnalysisAlien::CdWork()
888 // Check validity of alien workspace. Create directory if possible.
890 Error("CdWork", "Alien connection required");
893 TString homedir = gGrid->GetHomeDirectory();
894 TString workdir = homedir + fGridWorkingDir;
895 if (DirectoryExists(workdir)) {
899 // Work directory not existing - create it
901 if (gGrid->Mkdir(workdir, "-p")) {
902 gGrid->Cd(fGridWorkingDir);
903 Info("CdWork", "\n##### Created alien working directory %s", fGridWorkingDir.Data());
905 Warning("CdWork", "Working directory %s cannot be created.\n Using %s instead.",
906 workdir.Data(), homedir.Data());
907 fGridWorkingDir = "";
911 //______________________________________________________________________________
912 Bool_t AliAnalysisAlien::CheckFileCopy(const char *alienpath)
914 // Check if file copying is possible.
915 if (fProductionMode) return kTRUE;
916 TString salienpath(alienpath);
917 if (salienpath.Contains(" ")) {
918 Error("CheckFileCopy", "path: <%s> contains blancs - FIX IT !",alienpath);
922 Error("CheckFileCopy", "Not connected to AliEn. File copying cannot be tested.");
925 Info("CheckFileCopy", "Checking possibility to copy files to your AliEn home directory... \
926 \n +++ NOTE: You can disable this via: plugin->SetCheckCopy(kFALSE);");
927 // Check if alien_CLOSE_SE is defined
928 TString closeSE = gSystem->Getenv("alien_CLOSE_SE");
929 if (!closeSE.IsNull()) {
930 Info("CheckFileCopy", "Your current close storage is pointing to: \
931 \n alien_CLOSE_SE = \"%s\"", closeSE.Data());
933 Warning("CheckFileCopy", "Your current close storage is empty ! Depending on your location, file copying may fail.");
935 // Check if grid directory exists.
936 if (!DirectoryExists(alienpath)) {
937 Error("CheckFileCopy", "Alien path %s does not seem to exist", alienpath);
940 TString stest = "plugin_test_copy";
941 TFile f(stest, "RECREATE");
942 // User may not have write permissions to current directory
944 Error("CheckFileCopy", "Cannot create local test file. Do you have write access to current directory: <%s> ?",
945 gSystem->WorkingDirectory());
949 if (FileExists(Form("alien://%s/%s",alienpath, stest.Data()))) gGrid->Rm(Form("alien://%s/%s",alienpath, stest.Data()));
950 if (!TFile::Cp(stest.Data(), Form("alien://%s/%s",alienpath, stest.Data()))) {
951 Error("CheckFileCopy", "Cannot copy files to Alien destination: <%s> This may be temporary, or: \
952 \n# 1. Make sure you have write permissions there. If this is the case: \
953 \n# 2. Check the storage availability at: http://alimonitor.cern.ch/stats?page=SE/table \
954 \n# Do: export alien_CLOSE_SE=\"working_disk_SE\" \
955 \n# To make this permanent put in in your .bashrc (in .alienshrc is not enough) \
956 \n# Redo token: rm /tmp/x509up_u$UID then: alien-token-init <username>", alienpath);
957 gSystem->Unlink(stest.Data());
960 gSystem->Unlink(stest.Data());
961 gGrid->Rm(Form("%s/%s",alienpath,stest.Data()));
962 Info("CheckFileCopy", "### ...SUCCESS ###");
966 //______________________________________________________________________________
967 Bool_t AliAnalysisAlien::CheckInputData()
969 // Check validity of input data. If necessary, create xml files.
970 if (fProductionMode) return kTRUE;
971 if (!fInputFiles && !fRunNumbers.Length() && !fRunRange[0]) {
972 if (!fGridDataDir.Length()) {
973 Error("CkeckInputData", "AliEn path to base data directory must be set.\n = Use: SetGridDataDir()");
977 Error("CheckInputData", "Merging via jdl works only with run numbers, run range or provided xml");
980 Info("CheckInputData", "Analysis will make a single xml for base data directory %s",fGridDataDir.Data());
981 if (fDataPattern.Contains("tag") && TestBit(AliAnalysisGrid::kTest))
982 TObject::SetBit(AliAnalysisGrid::kUseTags, kTRUE); // ADDED (fix problem in determining the tag usage in test mode)
985 // Process declared files
986 Bool_t isCollection = kFALSE;
987 Bool_t isXml = kFALSE;
988 Bool_t useTags = kFALSE;
989 Bool_t checked = kFALSE;
990 if (!TestBit(AliAnalysisGrid::kTest)) CdWork();
992 TString workdir = gGrid->GetHomeDirectory();
993 workdir += fGridWorkingDir;
996 TIter next(fInputFiles);
997 while ((objstr=(TObjString*)next())) {
1000 file += objstr->GetString();
1001 // Store full lfn path
1002 if (FileExists(file)) objstr->SetString(file);
1004 file = objstr->GetName();
1005 if (!FileExists(objstr->GetName())) {
1006 Error("CheckInputData", "Data file %s not found or not in your working dir: %s",
1007 objstr->GetName(), workdir.Data());
1011 Bool_t iscoll, isxml, usetags;
1012 CheckDataType(file, iscoll, isxml, usetags);
1015 isCollection = iscoll;
1018 TObject::SetBit(AliAnalysisGrid::kUseTags, useTags);
1020 if ((iscoll != isCollection) || (isxml != isXml) || (usetags != useTags)) {
1021 Error("CheckInputData", "Some conflict was found in the types of inputs");
1027 // Process requested run numbers
1028 if (!fRunNumbers.Length() && !fRunRange[0]) return kTRUE;
1029 // Check validity of alien data directory
1030 if (!fGridDataDir.Length()) {
1031 Error("CkeckInputData", "AliEn path to base data directory must be set.\n = Use: SetGridDataDir()");
1034 if (!DirectoryExists(fGridDataDir)) {
1035 Error("CheckInputData", "Data directory %s not existing.", fGridDataDir.Data());
1039 Error("CheckInputData", "You are using raw AliEn collections as input. Cannot process run numbers.");
1043 if (checked && !isXml) {
1044 Error("CheckInputData", "Cannot mix processing of full runs with non-xml files");
1047 // Check validity of run number(s)
1052 TString schunk, schunk2;
1056 useTags = fDataPattern.Contains("tag");
1057 TObject::SetBit(AliAnalysisGrid::kUseTags, useTags);
1059 if (useTags != fDataPattern.Contains("tag")) {
1060 Error("CheckInputData", "Cannot mix input files using/not using tags");
1063 if (fRunNumbers.Length()) {
1064 Info("CheckDataType", "Using supplied run numbers (run ranges are ignored)");
1065 arr = fRunNumbers.Tokenize(" ");
1067 while ((os=(TObjString*)next())) {
1068 path = Form("%s/%s ", fGridDataDir.Data(), os->GetString().Data());
1069 if (!DirectoryExists(path)) {
1070 Warning("CheckInputData", "Run number %s not found in path: <%s>", os->GetString().Data(), path.Data());
1073 path = Form("%s/%s.xml", workdir.Data(),os->GetString().Data());
1074 TString msg = "\n##### file: ";
1076 msg += " type: xml_collection;";
1077 if (useTags) msg += " using_tags: Yes";
1078 else msg += " using_tags: No";
1079 Info("CheckDataType", "%s", msg.Data());
1080 if (fNrunsPerMaster<2) {
1081 AddDataFile(Form("%s.xml", os->GetString().Data()));
1084 if (((nruns-1)%fNrunsPerMaster) == 0) {
1085 schunk = os->GetString();
1087 if ((nruns%fNrunsPerMaster)!=0 && os!=arr->Last()) continue;
1088 schunk += Form("_%s.xml", os->GetString().Data());
1089 AddDataFile(schunk);
1094 Info("CheckDataType", "Using run range [%d, %d]", fRunRange[0], fRunRange[1]);
1095 for (Int_t irun=fRunRange[0]; irun<=fRunRange[1]; irun++) {
1096 format = Form("%%s/%s ", fRunPrefix.Data());
1097 path = Form(format.Data(), fGridDataDir.Data(), irun);
1098 if (!DirectoryExists(path)) {
1101 format = Form("%%s/%s.xml", fRunPrefix.Data());
1102 path = Form(format.Data(), workdir.Data(),irun);
1103 TString msg = "\n##### file: ";
1105 msg += " type: xml_collection;";
1106 if (useTags) msg += " using_tags: Yes";
1107 else msg += " using_tags: No";
1108 Info("CheckDataType", "%s", msg.Data());
1109 if (fNrunsPerMaster<2) {
1110 format = Form("%s.xml", fRunPrefix.Data());
1111 AddDataFile(Form(format.Data(),irun));
1114 if (((nruns-1)%fNrunsPerMaster) == 0) {
1115 schunk = Form(fRunPrefix.Data(),irun);
1117 format = Form("_%s.xml", fRunPrefix.Data());
1118 schunk2 = Form(format.Data(), irun);
1119 if ((nruns%fNrunsPerMaster)!=0 && irun != fRunRange[1]) continue;
1121 AddDataFile(schunk);
1126 AddDataFile(schunk);
1132 //______________________________________________________________________________
1133 Int_t AliAnalysisAlien::CopyLocalDataset(const char *griddir, const char *pattern, Int_t nfiles, const char *output, const char *archivefile, const char *outputdir)
1135 // Copy data from the given grid directory according a pattern and make a local
1137 // archivefile (optional) results in that the archive containing the file <pattern> is copied. archivefile can contain a list of files (semicolon-separated) which are all copied
1139 Error("CopyLocalDataset", "Cannot copy local dataset with no grid connection");
1142 if (!DirectoryExists(griddir)) {
1143 Error("CopyLocalDataset", "Data directory %s not existing.", griddir);
1146 TString command = Form("find -z -l %d %s %s", nfiles, griddir, pattern);
1147 printf("Running command: %s\n", command.Data());
1148 TGridResult *res = gGrid->Command(command);
1149 Int_t nfound = res->GetEntries();
1151 Error("CopyLocalDataset", "No file found in <%s> having pattern <%s>", griddir, pattern);
1154 printf("... found %d files. Copying locally ...\n", nfound);
1157 TObjArray* additionalArchives = 0;
1158 if (strlen(archivefile) > 0 && TString(archivefile).Contains(";")) {
1159 additionalArchives = TString(archivefile).Tokenize(";");
1160 archivefile = additionalArchives->At(0)->GetName();
1161 additionalArchives->RemoveAt(0);
1162 additionalArchives->Compress();
1165 // Copy files locally
1167 out.open(output, ios::out);
1169 TString turl, dirname, filename, temp;
1170 TString cdir = gSystem->WorkingDirectory();
1171 gSystem->MakeDirectory(outputdir);
1172 gSystem->ChangeDirectory(outputdir);
1174 for (Int_t i=0; i<nfound; i++) {
1175 map = (TMap*)res->At(i);
1176 turl = map->GetValue("turl")->GetName();
1177 filename = gSystem->BaseName(turl.Data());
1178 dirname = gSystem->DirName(turl.Data());
1179 dirname = gSystem->BaseName(dirname.Data());
1180 gSystem->MakeDirectory(dirname);
1182 TString source(turl);
1183 TString targetFileName(filename);
1185 if (strlen(archivefile) > 0) {
1186 // TODO here the archive in which the file resides should be determined
1187 // however whereis returns only a guid, and guid2lfn does not work
1188 // Therefore we use the one provided as argument for now
1189 source = Form("%s/%s", gSystem->DirName(source.Data()), archivefile);
1190 targetFileName = archivefile;
1192 if (TFile::Cp(source, Form("file:./%s/%s", dirname.Data(), targetFileName.Data()))) {
1193 Bool_t success = kTRUE;
1194 if (additionalArchives) {
1195 for (Int_t j=0; j<additionalArchives->GetEntriesFast(); j++) {
1197 target.Form("./%s/%s", dirname.Data(), additionalArchives->At(j)->GetName());
1198 gSystem->MakeDirectory(gSystem->DirName(target));
1199 success &= TFile::Cp(Form("%s/%s", gSystem->DirName(source.Data()), additionalArchives->At(j)->GetName()), Form("file:%s", target.Data()));
1204 if (strlen(archivefile) > 0) targetFileName = Form("%s#%s", targetFileName.Data(), gSystem->BaseName(turl.Data()));
1205 out << cdir << Form("/%s/%s/%s", outputdir, dirname.Data(), targetFileName.Data()) << endl;
1210 gSystem->ChangeDirectory(cdir);
1212 delete additionalArchives;
1216 //______________________________________________________________________________
1217 Bool_t AliAnalysisAlien::CreateDataset(const char *pattern)
1219 // Create dataset for the grid data directory + run number.
1220 const Int_t gMaxEntries = 15000;
1221 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline)) return kTRUE;
1223 Error("CreateDataset", "Cannot create dataset with no grid connection");
1228 if (!TestBit(AliAnalysisGrid::kTest)) CdWork();
1229 TString workdir = gGrid->GetHomeDirectory();
1230 workdir += fGridWorkingDir;
1232 // Compose the 'find' command arguments
1235 TString delimiter = pattern;
1237 if (delimiter.Contains(" ")) delimiter = "";
1238 else delimiter = " ";
1239 TString options = "-x collection ";
1240 if (TestBit(AliAnalysisGrid::kTest)) options += Form("-l %d ", fNtestFiles);
1241 else options += Form("-l %d ", gMaxEntries); // Protection for the find command
1242 TString conditions = "";
1249 TString schunk, schunk2;
1250 TGridCollection *cbase=0, *cadd=0;
1251 if (!fRunNumbers.Length() && !fRunRange[0]) {
1252 if (fInputFiles && fInputFiles->GetEntries()) return kTRUE;
1253 // Make a single data collection from data directory.
1254 path = fGridDataDir;
1255 if (!DirectoryExists(path)) {
1256 Error("CreateDataset", "Path to data directory %s not valid",fGridDataDir.Data());
1260 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
1261 else file = Form("%s.xml", gSystem->BaseName(path));
1262 // cholm - Identical loop - should be put in common function for code simplification
1266 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
1268 command += Form("%s -o %d ",options.Data(), nstart);
1270 command += delimiter;
1272 command += conditions;
1273 printf("command: %s\n", command.Data());
1274 TGridResult *res = gGrid->Command(command);
1275 if (res) delete res;
1276 // Write standard output to file
1277 gROOT->ProcessLine(Form("gGrid->Stdout(); > __tmp%d__%s", stage, file.Data()));
1278 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
1279 Bool_t nullFile = kFALSE;
1281 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
1283 nullFile = (gSystem->Exec(Form("grep -c /event __tmp%d__%s 2>/dev/null > __tmp__",stage,file.Data()))==0)?kFALSE:kTRUE;
1285 Error("CreateDataset","Dataset %s produced by the previous find command is empty !", file.Data());
1286 gSystem->Exec("rm -f __tmp*");
1294 gSystem->Exec("rm -f __tmp__");
1295 ncount = line.Atoi();
1298 if (ncount == gMaxEntries) {
1299 Info("CreateDataset", "Dataset %s has more than 15K entries. Trying to merge...", file.Data());
1300 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1301 if (!cbase) cbase = cadd;
1303 // cholm - Avoid using very slow TAlienCollection
1304 // cbase->Add(cadd);
1305 // cholm - Use AddFast (via interpreter)
1306 gROOT->ProcessLine(Form("((TAlienCollection*)%p)->AddFast((TGridCollection*)%p)",cbase,cadd));
1312 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1313 printf("... please wait - TAlienCollection::Add() scales badly...\n");
1314 // cholm - Avoid using very slow TAlienCollection
1315 // cbase->Add(cadd);
1316 // cholm - Use AddFast (via interpreter)
1317 gROOT->ProcessLine(Form("((TAlienCollection*)%p)->AddFast((TGridCollection*)%p)",cbase,cadd));
1319 cbase->ExportXML(Form("file://%s", file.Data()),kFALSE,kFALSE, file, "Merged entries for a run");
1320 delete cbase; cbase = 0;
1322 TFile::Cp(Form("__tmp%d__%s",stage, file.Data()), file.Data());
1324 gSystem->Exec("rm -f __tmp*");
1325 Info("CreateDataset", "Created dataset %s with %d files", file.Data(), nstart+ncount);
1329 Bool_t fileExists = FileExists(file);
1330 if (!TestBit(AliAnalysisGrid::kTest) && (!fileExists || fOverwriteMode)) {
1331 // Copy xml file to alien space
1332 if (fileExists) gGrid->Rm(file);
1333 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
1334 if (!FileExists(file)) {
1335 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
1338 // Update list of files to be processed.
1340 AddDataFile(Form("%s/%s", workdir.Data(), file.Data()));
1344 Bool_t nullResult = kTRUE;
1345 if (fRunNumbers.Length()) {
1346 TObjArray *arr = fRunNumbers.Tokenize(" ");
1349 while ((os=(TObjString*)next())) {
1352 path = Form("%s/%s", fGridDataDir.Data(), os->GetString().Data());
1353 if (!DirectoryExists(path)) continue;
1355 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
1356 else file = Form("%s.xml", os->GetString().Data());
1357 // cholm - Identical loop - should be put in common function for code simplification
1358 // If local collection file does not exist, create it via 'find' command.
1362 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
1364 command += Form("%s -o %d ",options.Data(), nstart);
1366 command += delimiter;
1368 command += conditions;
1369 TGridResult *res = gGrid->Command(command);
1370 if (res) delete res;
1371 // Write standard output to file
1372 gROOT->ProcessLine(Form("gGrid->Stdout(); > __tmp%d__%s", stage,file.Data()));
1373 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
1374 Bool_t nullFile = kFALSE;
1376 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
1378 nullFile = (gSystem->Exec(Form("grep -c /event __tmp%d__%s 2>/dev/null > __tmp__",stage,file.Data()))==0)?kFALSE:kTRUE;
1380 Warning("CreateDataset","Dataset %s produced by: <%s> is empty !", file.Data(), command.Data());
1381 gSystem->Exec("rm -f __tmp*");
1382 fRunNumbers.ReplaceAll(os->GetString().Data(), "");
1390 gSystem->Exec("rm -f __tmp__");
1391 ncount = line.Atoi();
1393 nullResult = kFALSE;
1395 if (ncount == gMaxEntries) {
1396 Info("CreateDataset", "Dataset %s has more than 15K entries. Trying to merge...", file.Data());
1397 if (fNrunsPerMaster > 1) {
1398 Error("CreateDataset", "File %s has more than %d entries. Please set the number of runs per master to 1 !",
1399 file.Data(),gMaxEntries);
1402 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1403 if (!cbase) cbase = cadd;
1405 // cholm - Avoid using very slow TAlienCollection
1406 // cbase->Add(cadd);
1407 // cholm - Use AddFast (via interpreter)
1408 gROOT->ProcessLine(Form("((TAlienCollection*)%p)->AddFast((TGridCollection*)%p)",cbase,cadd));
1414 if (cbase && fNrunsPerMaster<2) {
1415 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1416 printf("... please wait - TAlienCollection::Add() scales badly...\n");
1417 // cholm - Avoid using very slow TAlienCollection
1418 // cbase->Add(cadd);
1419 // cholm - Use AddFast (via interpreter)
1420 gROOT->ProcessLine(Form("((TAlienCollection*)%p)->AddFast((TGridCollection*)%p)",cbase,cadd));
1422 cbase->ExportXML(Form("file://%s", file.Data()),kFALSE,kFALSE, file, "Merged entries for a run");
1423 delete cbase; cbase = 0;
1425 TFile::Cp(Form("__tmp%d__%s",stage, file.Data()), file.Data());
1427 gSystem->Exec("rm -f __tmp*");
1428 Info("CreateDataset", "Created dataset %s with %d files", file.Data(), nstart+ncount);
1432 if (TestBit(AliAnalysisGrid::kTest)) break;
1433 // Check if there is one run per master job.
1434 if (fNrunsPerMaster<2) {
1435 if (FileExists(file)) {
1436 if (fOverwriteMode) gGrid->Rm(file);
1438 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
1442 // Copy xml file to alien space
1443 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
1444 if (!FileExists(file)) {
1445 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
1451 if (((nruns-1)%fNrunsPerMaster) == 0) {
1452 schunk = os->GetString();
1453 cbase = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
1455 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
1456 printf(" Merging collection <%s> into masterjob input...\n", file.Data());
1457 // cholm - Avoid using very slow TAlienCollection
1458 // cbase->Add(cadd);
1459 // cholm - Use AddFast (via interpreter)
1460 gROOT->ProcessLine(Form("((TAlienCollection*)%p)->AddFast((TGridCollection*)%p)",cbase,cadd));
1463 if ((nruns%fNrunsPerMaster)!=0 && os!=arr->Last()) {
1466 schunk += Form("_%s.xml", os->GetString().Data());
1467 if (FileExists(schunk)) {
1468 if (fOverwriteMode) gGrid->Rm(file);
1470 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", schunk.Data());
1474 printf("Exporting merged collection <%s> and copying to AliEn\n", schunk.Data());
1475 cbase->ExportXML(Form("file://%s", schunk.Data()),kFALSE,kFALSE, schunk, "Merged runs");
1476 TFile::Cp(Form("file:%s",schunk.Data()), Form("alien://%s/%s",workdir.Data(), schunk.Data()));
1477 if (!FileExists(schunk)) {
1478 Error("CreateDataset", "Copy command did NOT succeed for %s", schunk.Data());
1486 Error("CreateDataset", "No valid dataset corresponding to the query!");
1490 // Process a full run range.
1491 for (Int_t irun=fRunRange[0]; irun<=fRunRange[1]; irun++) {
1492 format = Form("%%s/%s", fRunPrefix.Data());
1495 path = Form(format.Data(), fGridDataDir.Data(), irun);
1496 if (!DirectoryExists(path)) continue;
1498 format = Form("%s.xml", fRunPrefix.Data());
1499 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
1500 else file = Form(format.Data(), irun);
1501 if (FileExists(file) && fNrunsPerMaster<2 && !TestBit(AliAnalysisGrid::kTest)) {
1502 if (fOverwriteMode) gGrid->Rm(file);
1504 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
1508 // cholm - Identical loop - should be put in common function for code simplification
1509 // If local collection file does not exist, create it via 'find' command.
1513 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
1515 command += Form("%s -o %d ",options.Data(), nstart);
1517 command += delimiter;
1519 command += conditions;
1520 TGridResult *res = gGrid->Command(command);
1521 if (res) delete res;
1522 // Write standard output to file
1523 gROOT->ProcessLine(Form("gGrid->Stdout(); > __tmp%d__%s", stage,file.Data()));
1524 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
1525 Bool_t nullFile = kFALSE;
1527 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
1529 nullFile = (gSystem->Exec(Form("grep -c /event __tmp%d__%s 2>/dev/null > __tmp__",stage,file.Data()))==0)?kFALSE:kTRUE;
1531 Warning("CreateDataset","Dataset %s produced by: <%s> is empty !", file.Data(), command.Data());
1532 gSystem->Exec("rm -f __tmp*");
1540 gSystem->Exec("rm -f __tmp__");
1541 ncount = line.Atoi();
1543 nullResult = kFALSE;
1545 if (ncount == gMaxEntries) {
1546 Info("CreateDataset", "Dataset %s has more than 15K entries. Trying to merge...", file.Data());
1547 if (fNrunsPerMaster > 1) {
1548 Error("CreateDataset", "File %s has more than %d entries. Please set the number of runs per master to 1 !",
1549 file.Data(),gMaxEntries);
1552 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1553 if (!cbase) cbase = cadd;
1555 // cholm - Avoid using very slow TAlienCollection
1556 // cbase->Add(cadd);
1557 // cholm - Use AddFast (via interpreter)
1558 gROOT->ProcessLine(Form("((TAlienCollection*)%p)->AddFast((TGridCollection*)%p)",cbase,cadd));
1563 if (cbase && fNrunsPerMaster<2) {
1564 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1565 printf("... please wait - TAlienCollection::Add() scales badly...\n");
1566 // cholm - Avoid using very slow TAlienCollection
1567 // cbase->Add(cadd);
1568 // cholm - Use AddFast (via interpreter)
1569 gROOT->ProcessLine(Form("((TAlienCollection*)%p)->AddFast((TGridCollection*)%p)",cbase,cadd));
1571 cbase->ExportXML(Form("file://%s", file.Data()),kFALSE,kFALSE, file, "Merged entries for a run");
1572 delete cbase; cbase = 0;
1574 TFile::Cp(Form("__tmp%d__%s",stage, file.Data()), file.Data());
1576 Info("CreateDataset", "Created dataset %s with %d files", file.Data(), nstart+ncount);
1580 if (TestBit(AliAnalysisGrid::kTest)) break;
1581 // Check if there is one run per master job.
1582 if (fNrunsPerMaster<2) {
1583 if (FileExists(file)) {
1584 if (fOverwriteMode) gGrid->Rm(file);
1586 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
1590 // Copy xml file to alien space
1591 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
1592 if (!FileExists(file)) {
1593 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
1598 // Check if the collection for the chunk exist locally.
1599 Int_t nchunk = (nruns-1)/fNrunsPerMaster;
1600 if (FileExists(fInputFiles->At(nchunk)->GetName())) {
1601 if (fOverwriteMode) gGrid->Rm(fInputFiles->At(nchunk)->GetName());
1604 printf(" Merging collection <%s> into %d runs chunk...\n",file.Data(),fNrunsPerMaster);
1605 if (((nruns-1)%fNrunsPerMaster) == 0) {
1606 schunk = Form(fRunPrefix.Data(), irun);
1607 cbase = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
1609 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
1610 // cholm - Avoid using very slow TAlienCollection
1611 // cbase->Add(cadd);
1612 // cholm - Use AddFast (via interpreter)
1613 gROOT->ProcessLine(Form("((TAlienCollection*)%p)->AddFast((TGridCollection*)%p)",cbase,cadd));
1616 format = Form("%%s_%s.xml", fRunPrefix.Data());
1617 schunk2 = Form(format.Data(), schunk.Data(), irun);
1618 if ((nruns%fNrunsPerMaster)!=0 && irun!=fRunRange[1] && schunk2 != fInputFiles->Last()->GetName()) {
1622 if (FileExists(schunk)) {
1623 if (fOverwriteMode) gGrid->Rm(schunk);
1625 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", schunk.Data());
1629 printf("Exporting merged collection <%s> and copying to AliEn.\n", schunk.Data());
1630 cbase->ExportXML(Form("file://%s", schunk.Data()),kFALSE,kFALSE, schunk, "Merged runs");
1631 if (FileExists(schunk)) {
1632 if (fOverwriteMode) gGrid->Rm(schunk);
1634 Info("CreateDataset", "\n##### Dataset %s exist. Skipping copy...", schunk.Data());
1638 TFile::Cp(Form("file:%s",schunk.Data()), Form("alien://%s/%s",workdir.Data(), schunk.Data()));
1639 if (!FileExists(schunk)) {
1640 Error("CreateDataset", "Copy command did NOT succeed for %s", schunk.Data());
1646 Error("CreateDataset", "No valid dataset corresponding to the query!");
1653 //______________________________________________________________________________
1654 Bool_t AliAnalysisAlien::CreateJDL()
1656 // Generate a JDL file according to current settings. The name of the file is
1657 // specified by fJDLName.
1658 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
1659 Bool_t error = kFALSE;
1661 Bool_t copy = kTRUE;
1662 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
1663 Bool_t generate = kTRUE;
1664 if (TestBit(AliAnalysisGrid::kTest) || TestBit(AliAnalysisGrid::kSubmit)) generate = kFALSE;
1666 Error("CreateJDL", "Alien connection required");
1669 // Check validity of alien workspace
1671 if (!fProductionMode && !fGridWorkingDir.BeginsWith("/alice")) workdir = gGrid->GetHomeDirectory();
1672 if (!fProductionMode && !TestBit(AliAnalysisGrid::kTest)) CdWork();
1673 workdir += fGridWorkingDir;
1676 if (!fInputFiles && !fMCLoop) {
1677 Error("CreateJDL()", "Define some input files for your analysis.");
1680 // Compose list of input files
1681 // Check if output files were defined
1682 if (!fOutputFiles.Length()) {
1683 Error("CreateJDL", "You must define at least one output file");
1686 // Check if an output directory was defined and valid
1687 if (!fGridOutputDir.Length()) {
1688 Error("CreateJDL", "You must define AliEn output directory");
1691 if (!fProductionMode) {
1692 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s", workdir.Data(), fGridOutputDir.Data());
1693 if (!DirectoryExists(fGridOutputDir)) {
1694 if (gGrid->Mkdir(fGridOutputDir,"-p")) {
1695 Info("CreateJDL", "\n##### Created alien output directory %s", fGridOutputDir.Data());
1697 Error("CreateJDL", "Could not create alien output directory %s", fGridOutputDir.Data());
1701 Warning("CreateJDL", "#### Output directory %s exists! If this contains old data, jobs will fail with ERROR_SV !!! ###", fGridOutputDir.Data());
1706 // Exit if any error up to now
1707 if (error) return kFALSE;
1709 if (!fUser.IsNull()) {
1710 fGridJDL->SetValue("User", Form("\"%s\"", fUser.Data()));
1711 fMergingJDL->SetValue("User", Form("\"%s\"", fUser.Data()));
1713 TString executable = fExecutable;
1714 if (!executable.BeginsWith("/"))
1715 executable.Prepend(Form("%s/", workdir.Data()));
1716 fGridJDL->SetExecutable(executable, "This is the startup script");
1717 TString mergeExec = executable;
1718 mergeExec.ReplaceAll(".sh", "_merge.sh");
1719 fMergingJDL->SetExecutable(mergeExec, "This is the startup script");
1720 mergeExec.ReplaceAll(".sh", ".C");
1721 fMergingJDL->AddToInputSandbox(Form("LF:%s", mergeExec.Data()), "List of input files to be uploaded to workers");
1722 if (!fArguments.IsNull())
1723 fGridJDL->SetArguments(fArguments, "Arguments for the executable command");
1724 if (IsOneStageMerging()) fMergingJDL->SetArguments(fGridOutputDir);
1726 if (fProductionMode) fMergingJDL->SetArguments("wn.xml $4"); // xml, stage
1727 else fMergingJDL->SetArguments("wn.xml $2"); // xml, stage
1730 fGridJDL->SetValue("TTL", Form("\"%d\"",fTTL));
1731 fGridJDL->SetDescription("TTL", Form("Time after which the job is killed (%d min.)", fTTL/60));
1732 fMergingJDL->SetValue("TTL", Form("\"%d\"",fTTL));
1733 fMergingJDL->SetDescription("TTL", Form("Time after which the job is killed (%d min.)", fTTL/60));
1735 if (fMaxInitFailed > 0) {
1736 fGridJDL->SetValue("MaxInitFailed", Form("\"%d\"",fMaxInitFailed));
1737 fGridJDL->SetDescription("MaxInitFailed", "Maximum number of first failing jobs to abort the master job");
1739 if (fSplitMaxInputFileNumber > 0 && !fMCLoop) {
1740 fGridJDL->SetValue("SplitMaxInputFileNumber", Form("\"%d\"", fSplitMaxInputFileNumber));
1741 fGridJDL->SetDescription("SplitMaxInputFileNumber", "Maximum number of input files to be processed per subjob");
1743 if (!IsOneStageMerging()) {
1744 fMergingJDL->SetValue("SplitMaxInputFileNumber", Form("\"%d\"",fMaxMergeFiles));
1745 fMergingJDL->SetDescription("SplitMaxInputFileNumber", "Maximum number of input files to be merged in one go");
1747 if (fSplitMode.Length()) {
1748 fGridJDL->SetValue("Split", Form("\"%s\"", fSplitMode.Data()));
1749 fGridJDL->SetDescription("Split", "We split per SE or file");
1751 fMergingJDL->SetValue("Split", "\"se\"");
1752 fMergingJDL->SetDescription("Split", "We split per SE for merging in stages");
1753 if (!fAliROOTVersion.IsNull()) {
1754 fGridJDL->AddToPackages("AliRoot", fAliROOTVersion,"VO_ALICE", "List of requested packages");
1755 fMergingJDL->AddToPackages("AliRoot", fAliROOTVersion, "VO_ALICE", "List of requested packages");
1757 if (!fAliPhysicsVersion.IsNull()) {
1758 fGridJDL->AddToPackages("AliPhysics", fAliPhysicsVersion,"VO_ALICE", "List of requested packages");
1759 fMergingJDL->AddToPackages("AliPhysics", fAliPhysicsVersion, "VO_ALICE", "List of requested packages");
1761 if (!fROOTVersion.IsNull()) {
1762 fGridJDL->AddToPackages("ROOT", fROOTVersion);
1763 fMergingJDL->AddToPackages("ROOT", fROOTVersion);
1765 if (!fAPIVersion.IsNull()) {
1766 fGridJDL->AddToPackages("APISCONFIG", fAPIVersion);
1767 fMergingJDL->AddToPackages("APISCONFIG", fAPIVersion);
1769 if (!fExternalPackages.IsNull()) {
1770 arr = fExternalPackages.Tokenize(" ");
1772 while ((os=(TObjString*)next())) {
1773 TString pkgname = os->GetString();
1774 Int_t index = pkgname.Index("::");
1775 TString pkgversion = pkgname(index+2, pkgname.Length());
1776 pkgname.Remove(index);
1777 fGridJDL->AddToPackages(pkgname, pkgversion);
1778 fMergingJDL->AddToPackages(pkgname, pkgversion);
1783 fGridJDL->SetInputDataListFormat(fInputFormat, "Format of input data");
1784 fGridJDL->SetInputDataList("wn.xml", "Collection name to be processed on each worker node");
1786 fMergingJDL->SetInputDataListFormat(fInputFormat, "Format of input data");
1787 fMergingJDL->SetInputDataList("wn.xml", "Collection name to be processed on each worker node");
1788 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), fAnalysisMacro.Data()), "List of input files to be uploaded to workers");
1789 TString analysisFile = fExecutable;
1790 analysisFile.ReplaceAll(".sh", ".root");
1791 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),analysisFile.Data()));
1792 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),analysisFile.Data()));
1793 if (fAdditionalLibs.Length()) {
1794 arr = fAdditionalLibs.Tokenize(" ");
1796 while ((os=(TObjString*)next())) {
1797 if (os->GetString().Contains(".so") ||
1798 os->GetString().Contains(".dylib")) continue;
1799 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), os->GetString().Data()));
1800 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), os->GetString().Data()));
1805 TIter next(fPackages);
1807 while ((obj=next())) {
1808 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), obj->GetName()));
1809 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), obj->GetName()));
1812 const char *comment = "List of output files and archives";
1813 if (fOutputArchive.Length()) {
1814 TString outputArchive = fOutputArchive;
1815 if (!fRegisterExcludes.IsNull()) {
1816 arr = fRegisterExcludes.Tokenize(" ");
1818 while ((os=(TObjString*)next1())) {
1819 outputArchive.ReplaceAll(Form("%s,",os->GetString().Data()),"");
1820 outputArchive.ReplaceAll(os->GetString(),"");
1824 arr = outputArchive.Tokenize(" ");
1826 Bool_t first = kTRUE;
1827 while ((os=(TObjString*)next())) {
1828 if (!os->GetString().Contains("@") && fCloseSE.Length())
1829 fGridJDL->AddToSet("Output", Form("%s@%s",os->GetString().Data(), fCloseSE.Data()));
1831 fGridJDL->AddToSet("Output", os->GetString());
1832 if (first) fGridJDL->AddToSetDescription("Output", comment);
1836 // Output archive for the merging jdl
1837 if (TestBit(AliAnalysisGrid::kDefaultOutputs)) {
1838 outputArchive = "log_archive.zip:std*@disk=1 ";
1839 // Add normal output files, extra files + terminate files
1841 if (IsMergeAOD()) files = GetListOfFiles("outaodextter");
1842 else files = GetListOfFiles("outextter");
1843 // Do not register files in fRegisterExcludes
1844 if (!fRegisterExcludes.IsNull()) {
1845 arr = fRegisterExcludes.Tokenize(" ");
1847 while ((os=(TObjString*)next1())) {
1848 files.ReplaceAll(Form("%s,",os->GetString().Data()),"");
1849 files.ReplaceAll(os->GetString(),"");
1853 files.ReplaceAll(".root", "*.root");
1855 if (mgr->IsCollectThroughput())
1856 outputArchive += Form("root_archive.zip:%s,*.stat@disk=%d %s@disk=%d",files.Data(),fNreplicas, mgr->GetFileInfoLog(),fNreplicas);
1858 outputArchive += Form("root_archive.zip:%s,*.stat@disk=%d",files.Data(),fNreplicas);
1860 TString files = fOutputArchive;
1861 files.ReplaceAll(".root", "*.root"); // nreplicas etc should be already atttached by use
1862 outputArchive = files;
1864 arr = outputArchive.Tokenize(" ");
1867 while ((os=(TObjString*)next2())) {
1868 TString currentfile = os->GetString();
1869 if (!currentfile.Contains("@") && fCloseSE.Length())
1870 fMergingJDL->AddToSet("Output", Form("%s@%s",currentfile.Data(), fCloseSE.Data()));
1872 fMergingJDL->AddToSet("Output", currentfile);
1873 if (first) fMergingJDL->AddToSetDescription("Output", comment);
1878 arr = fOutputFiles.Tokenize(",");
1880 Bool_t first = kTRUE;
1881 while ((os=(TObjString*)next())) {
1882 // Ignore ouputs in jdl that are also in outputarchive
1883 TString sout = os->GetString();
1884 sout.ReplaceAll("*", "");
1885 sout.ReplaceAll(".root", "");
1886 if (sout.Index("@")>0) sout.Remove(sout.Index("@"));
1887 if (fOutputArchive.Contains(sout)) continue;
1888 // Ignore fRegisterExcludes
1889 if (fRegisterExcludes.Contains(sout)) continue;
1890 if (!first) comment = NULL;
1891 if (!os->GetString().Contains("@") && fCloseSE.Length())
1892 fGridJDL->AddToSet("Output", Form("%s@%s",os->GetString().Data(), fCloseSE.Data()));
1894 fGridJDL->AddToSet("Output", os->GetString());
1895 if (first) fGridJDL->AddToSetDescription("Output", comment);
1896 if (fMergeExcludes.Contains(sout)) continue;
1897 if (!os->GetString().Contains("@") && fCloseSE.Length())
1898 fMergingJDL->AddToSet("Output", Form("%s@%s",os->GetString().Data(), fCloseSE.Data()));
1900 fMergingJDL->AddToSet("Output", os->GetString());
1901 if (first) fMergingJDL->AddToSetDescription("Output", comment);
1905 fGridJDL->SetPrice((UInt_t)fPrice, "AliEn price for this job");
1906 fMergingJDL->SetPrice((UInt_t)fPrice, "AliEn price for this job");
1907 TString validationScript = fValidationScript;
1908 fGridJDL->SetValidationCommand(Form("%s/%s", workdir.Data(),validationScript.Data()), "Validation script to be run for each subjob");
1909 validationScript.ReplaceAll(".sh", "_merge.sh");
1910 fMergingJDL->SetValidationCommand(Form("%s/%s", workdir.Data(),validationScript.Data()), "Validation script to be run for each subjob");
1911 if (fMasterResubmitThreshold) {
1912 fGridJDL->SetValue("MasterResubmitThreshold", Form("\"%d%%\"", fMasterResubmitThreshold));
1913 fGridJDL->SetDescription("MasterResubmitThreshold", "Resubmit failed jobs until DONE rate reaches this percentage");
1915 // Write a jdl with 2 input parameters: collection name and output dir name.
1918 // Copy jdl to grid workspace
1920 // Check if an output directory was defined and valid
1921 if (!fGridOutputDir.Length()) {
1922 Error("CreateJDL", "You must define AliEn output directory");
1925 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s", workdir.Data(), fGridOutputDir.Data());
1926 if (!fProductionMode && !DirectoryExists(fGridOutputDir)) {
1927 if (gGrid->Mkdir(fGridOutputDir,"-p")) {
1928 Info("CreateJDL", "\n##### Created alien output directory %s", fGridOutputDir.Data());
1930 Error("CreateJDL", "Could not create alien output directory %s", fGridOutputDir.Data());
1936 if (TestBit(AliAnalysisGrid::kSubmit)) {
1937 TString mergeJDLName = fExecutable;
1938 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
1939 TString locjdl = Form("%s/%s", fGridOutputDir.Data(),fJDLName.Data());
1940 TString locjdl1 = Form("%s/%s", fGridOutputDir.Data(),mergeJDLName.Data());
1941 if (fProductionMode) {
1942 locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
1943 locjdl1 = Form("%s/%s", workdir.Data(),mergeJDLName.Data());
1945 if (FileExists(locjdl)) gGrid->Rm(locjdl);
1946 if (FileExists(locjdl1)) gGrid->Rm(locjdl1);
1947 Info("CreateJDL", "\n##### Copying JDL file <%s> to your AliEn output directory", fJDLName.Data());
1948 if (!copyLocal2Alien("CreateJDL", fJDLName, locjdl))
1949 Fatal("","Terminating");
1950 // TFile::Cp(Form("file:%s",fJDLName.Data()), Form("alien://%s", locjdl.Data()));
1952 Info("CreateJDL", "\n##### Copying merging JDL file <%s> to your AliEn output directory", mergeJDLName.Data());
1953 // TFile::Cp(Form("file:%s",mergeJDLName.Data()), Form("alien://%s", locjdl1.Data()));
1954 if (!copyLocal2Alien("CreateJDL", mergeJDLName.Data(), locjdl1))
1955 Fatal("","Terminating");
1958 if (fAdditionalLibs.Length()) {
1959 arr = fAdditionalLibs.Tokenize(" ");
1962 while ((os=(TObjString*)next())) {
1963 if (os->GetString().Contains(".so") ||
1964 os->GetString().Contains(".dylib")) continue;
1965 Info("CreateJDL", "\n##### Copying dependency: <%s> to your alien workspace", os->GetString().Data());
1966 if (FileExists(os->GetString())) gGrid->Rm(os->GetString());
1967 // TFile::Cp(Form("file:%s",os->GetString().Data()), Form("alien://%s/%s", workdir.Data(), os->GetString().Data()));
1968 if (!copyLocal2Alien("CreateJDL", os->GetString().Data(),
1969 Form("%s/%s", workdir.Data(), os->GetString().Data())))
1970 Fatal("","Terminating");
1975 TIter next(fPackages);
1977 while ((obj=next())) {
1978 if (FileExists(obj->GetName())) gGrid->Rm(obj->GetName());
1979 Info("CreateJDL", "\n##### Copying dependency: <%s> to your alien workspace", obj->GetName());
1980 // TFile::Cp(Form("file:%s",obj->GetName()), Form("alien://%s/%s", workdir.Data(), obj->GetName()));
1981 if (!copyLocal2Alien("CreateJDL",obj->GetName(),
1982 Form("%s/%s", workdir.Data(), obj->GetName())))
1983 Fatal("","Terminating");
1990 //______________________________________________________________________________
1991 Bool_t AliAnalysisAlien::WriteJDL(Bool_t copy)
1993 // Writes one or more JDL's corresponding to findex. If findex is negative,
1994 // all run numbers are considered in one go (jdl). For non-negative indices
1995 // they correspond to the indices in the array fInputFiles.
1996 if (!fInputFiles && !fMCLoop) return kFALSE;
1999 if (!fProductionMode && !fGridWorkingDir.BeginsWith("/alice")) workdir = gGrid->GetHomeDirectory();
2000 workdir += fGridWorkingDir;
2001 TString stageName = "$2";
2002 if (fProductionMode) stageName = "$4";
2003 if (!fMergeDirName.IsNull()) {
2004 fMergingJDL->AddToInputDataCollection(Form("LF:$1/%s/Stage_%s.xml,nodownload",fMergeDirName.Data(),stageName.Data()), "Collection of files to be merged for current stage");
2005 fMergingJDL->SetOutputDirectory(Form("$1/%s/Stage_%s/#alien_counter_03i#",fMergeDirName.Data(),stageName.Data()), "Output directory");
2007 fMergingJDL->AddToInputDataCollection(Form("LF:$1/Stage_%s.xml,nodownload",stageName.Data()), "Collection of files to be merged for current stage");
2008 fMergingJDL->SetOutputDirectory(Form("$1/Stage_%s/#alien_counter_03i#",stageName.Data()), "Output directory");
2010 if (fProductionMode) {
2011 TIter next(fInputFiles);
2012 while ((os=next())) {
2013 fGridJDL->AddToInputDataCollection(Form("LF:%s,nodownload", os->GetName()), "Input xml collections");
2015 if (!fOutputToRunNo)
2016 fGridJDL->SetOutputDirectory(Form("%s/#alien_counter_04i#", fGridOutputDir.Data()));
2018 fGridJDL->SetOutputDirectory(fGridOutputDir);
2020 if (!fRunNumbers.Length() && !fRunRange[0]) {
2021 // One jdl with no parameters in case input data is specified by name.
2022 TIter next(fInputFiles);
2024 fGridJDL->AddToInputDataCollection(Form("LF:%s,nodownload", os->GetName()), "Input xml collections");
2025 if (!fOutputSingle.IsNull())
2026 fGridJDL->SetOutputDirectory(Form("#alienfulldir#/../%s",fOutputSingle.Data()), "Output directory");
2028 fGridJDL->SetOutputDirectory(Form("%s/#alien_counter_03i#", fGridOutputDir.Data()), "Output directory");
2029 // fMergingJDL->SetOutputDirectory(fGridOutputDir);
2032 // One jdl to be submitted with 2 input parameters: data collection name and output dir prefix
2033 fGridJDL->AddToInputDataCollection(Form("LF:%s/$1,nodownload", workdir.Data()), "Input xml collections");
2034 if (!fOutputSingle.IsNull()) {
2035 if (!fOutputToRunNo) fGridJDL->SetOutputDirectory(Form("#alienfulldir#/%s",fOutputSingle.Data()), "Output directory");
2036 else fGridJDL->SetOutputDirectory(Form("%s/$2",fGridOutputDir.Data()), "Output directory");
2038 fGridJDL->SetOutputDirectory(Form("%s/$2/#alien_counter_03i#", fGridOutputDir.Data()), "Output directory");
2043 // Generate the JDL as a string
2044 TString sjdl = fGridJDL->Generate();
2045 TString sjdl1 = fMergingJDL->Generate();
2047 if (!fMergeDirName.IsNull()) {
2048 fMergingJDL->SetOutputDirectory(Form("$1/%s",fMergeDirName.Data()), "Output directory");
2049 fMergingJDL->AddToInputSandbox(Form("LF:$1/%s/Stage_%s.xml",fMergeDirName.Data(),stageName.Data()));
2051 fMergingJDL->SetOutputDirectory("$1", "Output directory");
2052 fMergingJDL->AddToInputSandbox(Form("LF:$1/Stage_%s.xml",stageName.Data()));
2054 TString sjdl2 = fMergingJDL->Generate();
2055 Int_t index, index1;
2056 sjdl.ReplaceAll("\",\"", "\",\n \"");
2057 sjdl.ReplaceAll("(member", "\n (member");
2058 sjdl.ReplaceAll("\",\"VO_", "\",\n \"VO_");
2059 sjdl.ReplaceAll("{", "{\n ");
2060 sjdl.ReplaceAll("};", "\n};");
2061 sjdl.ReplaceAll("{\n \n", "{\n");
2062 sjdl.ReplaceAll("\n\n", "\n");
2063 sjdl.ReplaceAll("OutputDirectory", "OutputDir");
2064 sjdl1.ReplaceAll("\",\"", "\",\n \"");
2065 sjdl1.ReplaceAll("(member", "\n (member");
2066 sjdl1.ReplaceAll("\",\"VO_", "\",\n \"VO_");
2067 sjdl1.ReplaceAll("{", "{\n ");
2068 sjdl1.ReplaceAll("};", "\n};");
2069 sjdl1.ReplaceAll("{\n \n", "{\n");
2070 sjdl1.ReplaceAll("\n\n", "\n");
2071 sjdl1.ReplaceAll("OutputDirectory", "OutputDir");
2072 sjdl2.ReplaceAll("\",\"", "\",\n \"");
2073 sjdl2.ReplaceAll("(member", "\n (member");
2074 sjdl2.ReplaceAll("\",\"VO_", "\",\n \"VO_");
2075 sjdl2.ReplaceAll("{", "{\n ");
2076 sjdl2.ReplaceAll("};", "\n};");
2077 sjdl2.ReplaceAll("{\n \n", "{\n");
2078 sjdl2.ReplaceAll("\n\n", "\n");
2079 sjdl2.ReplaceAll("OutputDirectory", "OutputDir");
2080 sjdl += "JDLVariables = \n{\n \"Packages\",\n \"OutputDir\"\n};\n";
2081 sjdl.Prepend(Form("Jobtag = {\n \"comment:%s\"\n};\n", fJobTag.Data()));
2082 index = sjdl.Index("JDLVariables");
2083 if (index >= 0) sjdl.Insert(index, "\n# JDL variables\n");
2084 sjdl += "Workdirectorysize = {\"5000MB\"};";
2085 sjdl1 += "Workdirectorysize = {\"5000MB\"};";
2086 sjdl1 += "JDLVariables = \n{\n \"Packages\",\n \"OutputDir\"\n};\n";
2087 index = fJobTag.Index(":");
2088 if (index < 0) index = fJobTag.Length();
2089 TString jobTag = fJobTag;
2090 if (fProductionMode) jobTag.Insert(index,"_Stage$4");
2091 sjdl1.Prepend(Form("Jobtag = {\n \"comment:%s_Merging\"\n};\n", jobTag.Data()));
2092 if (fProductionMode) {
2093 sjdl1.Prepend("# Generated merging jdl (production mode) \
2094 \n# $1 = full alien path to output directory to be merged \
2095 \n# $2 = train number \
2096 \n# $3 = production (like LHC10b) \
2097 \n# $4 = merging stage \
2098 \n# Stage_<n>.xml made via: find <OutputDir> *Stage<n-1>/*root_archive.zip\n");
2099 sjdl2.Prepend(Form("Jobtag = {\n \"comment:%s_FinalMerging\"\n};\n", jobTag.Data()));
2100 sjdl2.Prepend("# Generated merging jdl \
2101 \n# $1 = full alien path to output directory to be merged \
2102 \n# $2 = train number \
2103 \n# $3 = production (like LHC10b) \
2104 \n# $4 = merging stage \
2105 \n# Stage_<n>.xml made via: find <OutputDir> *Stage<n-1>/*root_archive.zip\n");
2107 sjdl1.Prepend("# Generated merging jdl \
2108 \n# $1 = full alien path to output directory to be merged \
2109 \n# $2 = merging stage \
2110 \n# xml made via: find <OutputDir> *Stage<n-1>/*root_archive.zip\n");
2111 sjdl2.Prepend(Form("Jobtag = {\n \"comment:%s_FinalMerging\"\n};\n", jobTag.Data()));
2112 sjdl2.Prepend("# Generated merging jdl \
2113 \n# $1 = full alien path to output directory to be merged \
2114 \n# $2 = merging stage \
2115 \n# xml made via: find <OutputDir> *Stage<n-1>/*root_archive.zip\n");
2117 index = sjdl1.Index("JDLVariables");
2118 if (index >= 0) sjdl1.Insert(index, "\n# JDL variables\n");
2119 index = sjdl2.Index("JDLVariables");
2120 if (index >= 0) sjdl2.Insert(index, "\n# JDL variables\n");
2121 sjdl1 += "Workdirectorysize = {\"5000MB\"};";
2122 sjdl2 += "Workdirectorysize = {\"5000MB\"};";
2123 index = sjdl2.Index("Split =");
2125 index1 = sjdl2.Index("\n", index);
2126 sjdl2.Remove(index, index1-index+1);
2128 index = sjdl2.Index("SplitMaxInputFileNumber");
2130 index1 = sjdl2.Index("\n", index);
2131 sjdl2.Remove(index, index1-index+1);
2133 index = sjdl2.Index("InputDataCollection");
2135 index1 = sjdl2.Index(";", index);
2136 sjdl2.Remove(index, index1-index+1);
2138 index = sjdl2.Index("InputDataListFormat");
2140 index1 = sjdl2.Index("\n", index);
2141 sjdl2.Remove(index, index1-index+1);
2143 index = sjdl2.Index("InputDataList");
2145 index1 = sjdl2.Index("\n", index);
2146 sjdl2.Remove(index, index1-index+1);
2148 sjdl2.ReplaceAll("wn.xml", Form("Stage_%s.xml",stageName.Data()));
2149 // Write jdl to file
2151 out.open(fJDLName.Data(), ios::out);
2153 Error("WriteJDL", "Bad file name: %s", fJDLName.Data());
2156 out << sjdl << endl;
2158 TString mergeJDLName = fExecutable;
2159 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
2162 out1.open(mergeJDLName.Data(), ios::out);
2164 Error("WriteJDL", "Bad file name: %s", mergeJDLName.Data());
2167 out1 << sjdl1 << endl;
2170 TString finalJDL = mergeJDLName;
2171 finalJDL.ReplaceAll(".jdl", "_final.jdl");
2172 out2.open(finalJDL.Data(), ios::out);
2174 Error("WriteJDL", "Bad file name: %s", finalJDL.Data());
2177 out2 << sjdl2 << endl;
2181 // Copy jdl to grid workspace
2183 Info("WriteJDL", "\n##### You may want to review jdl:%s and analysis macro:%s before running in <submit> mode", fJDLName.Data(), fAnalysisMacro.Data());
2185 TString locjdl = Form("%s/%s", fGridOutputDir.Data(),fJDLName.Data());
2186 TString locjdl1 = Form("%s/%s", fGridOutputDir.Data(),mergeJDLName.Data());
2187 TString finalJDL = mergeJDLName;
2188 finalJDL.ReplaceAll(".jdl", "_final.jdl");
2189 TString locjdl2 = Form("%s/%s", fGridOutputDir.Data(),finalJDL.Data());
2190 if (fProductionMode) {
2191 locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
2192 locjdl1 = Form("%s/%s", workdir.Data(),mergeJDLName.Data());
2193 locjdl2 = Form("%s/%s", workdir.Data(),finalJDL.Data());
2195 if (FileExists(locjdl)) gGrid->Rm(locjdl);
2196 if (FileExists(locjdl1)) gGrid->Rm(locjdl1);
2197 if (FileExists(locjdl2)) gGrid->Rm(locjdl2);
2198 Info("WriteJDL", "\n##### Copying JDL file <%s> to your AliEn output directory", fJDLName.Data());
2199 // TFile::Cp(Form("file:%s",fJDLName.Data()), Form("alien://%s", locjdl.Data()));
2200 if (!copyLocal2Alien("WriteJDL",fJDLName.Data(),locjdl.Data()))
2201 Fatal("","Terminating");
2203 Info("WriteJDL", "\n##### Copying merging JDL files <%s> to your AliEn output directory", mergeJDLName.Data());
2204 // TFile::Cp(Form("file:%s",mergeJDLName.Data()), Form("alien://%s", locjdl1.Data()));
2205 // TFile::Cp(Form("file:%s",finalJDL.Data()), Form("alien://%s", locjdl2.Data()));
2206 if (!copyLocal2Alien("WriteJDL",mergeJDLName.Data(),locjdl1.Data()))
2207 Fatal("","Terminating");
2208 if (!copyLocal2Alien("WriteJDL",finalJDL.Data(),locjdl2.Data()))
2209 Fatal("","Terminating");
2215 //______________________________________________________________________________
2216 Bool_t AliAnalysisAlien::FileExists(const char *lfn)
2218 // Returns true if file exists.
2219 if (!gGrid) return kFALSE;
2221 slfn.ReplaceAll("alien://","");
2222 TGridResult *res = gGrid->Ls(slfn);
2223 if (!res) return kFALSE;
2224 TMap *map = dynamic_cast<TMap*>(res->At(0));
2229 TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("name"));
2230 if (!objs || !objs->GetString().Length()) {
2238 //______________________________________________________________________________
2239 Bool_t AliAnalysisAlien::DirectoryExists(const char *dirname)
2241 // Returns true if directory exists. Can be also a path.
2242 // Since there is not API in TAlien, we use the Cd trick:
2243 if (!gGrid) return kFALSE;
2244 // Backup current path
2245 TString cpath = gGrid->Pwd();
2246 TString command = "cd ";
2247 TString sdir(dirname);
2248 sdir.ReplaceAll("alien://", "");
2250 TGridResult *res = gGrid->Command(command);
2255 TMap *map = (TMap*)res->At(0);
2261 TString sval = map->GetValue("__result__")->GetName();
2262 Bool_t retval = (Bool_t)sval.Atoi();
2268 //______________________________________________________________________________
2269 void AliAnalysisAlien::CheckDataType(const char *lfn, Bool_t &isCollection, Bool_t &isXml, Bool_t &useTags)
2271 // Check input data type.
2272 isCollection = kFALSE;
2276 Error("CheckDataType", "No connection to grid");
2279 isCollection = IsCollection(lfn);
2280 TString msg = "\n##### file: ";
2283 msg += " type: raw_collection;";
2284 // special treatment for collections
2286 // check for tag files in the collection
2287 TGridResult *res = gGrid->Command(Form("listFilesFromCollection -z -v %s",lfn), kFALSE);
2289 msg += " using_tags: No (unknown)";
2290 Info("CheckDataType", "%s", msg.Data());
2293 const char* typeStr = res->GetKey(0, "origLFN");
2294 if (!typeStr || !strlen(typeStr)) {
2295 msg += " using_tags: No (unknown)";
2296 Info("CheckDataType", "%s", msg.Data());
2299 TString file = typeStr;
2300 useTags = file.Contains(".tag");
2301 if (useTags) msg += " using_tags: Yes";
2302 else msg += " using_tags: No";
2303 Info("CheckDataType", "%s", msg.Data());
2308 isXml = slfn.Contains(".xml");
2310 // Open xml collection and check if there are tag files inside
2311 msg += " type: xml_collection;";
2312 TGridCollection *coll = (TGridCollection*)gROOT->ProcessLine(Form("TAlienCollection::Open(\"alien://%s\",1);",lfn));
2314 msg += " using_tags: No (unknown)";
2315 Info("CheckDataType", "%s", msg.Data());
2318 TMap *map = coll->Next();
2320 msg += " using_tags: No (unknown)";
2321 Info("CheckDataType", "%s", msg.Data());
2324 map = (TMap*)map->GetValue("");
2326 if (map && map->GetValue("name")) file = map->GetValue("name")->GetName();
2327 useTags = file.Contains(".tag");
2329 if (useTags) msg += " using_tags: Yes";
2330 else msg += " using_tags: No";
2331 Info("CheckDataType", "%s", msg.Data());
2334 useTags = slfn.Contains(".tag");
2335 if (slfn.Contains(".root")) msg += " type: root file;";
2336 else msg += " type: unknown file;";
2337 if (useTags) msg += " using_tags: Yes";
2338 else msg += " using_tags: No";
2339 Info("CheckDataType", "%s", msg.Data());
2342 //______________________________________________________________________________
2343 void AliAnalysisAlien::EnablePackage(const char *package)
2345 // Enables a par file supposed to exist in the current directory.
2346 TString pkg(package);
2347 pkg.ReplaceAll(".par", "");
2349 if (gSystem->AccessPathName(pkg)) {
2350 Fatal("EnablePackage", "Package %s not found", pkg.Data());
2353 if (!TObject::TestBit(AliAnalysisGrid::kUsePars))
2354 Info("EnablePackage", "AliEn plugin will use .par packages");
2355 TObject::SetBit(AliAnalysisGrid::kUsePars, kTRUE);
2357 fPackages = new TObjArray();
2358 fPackages->SetOwner();
2360 fPackages->Add(new TObjString(pkg));
2363 //______________________________________________________________________________
2364 TChain *AliAnalysisAlien::GetChainForTestMode(const char *treeName) const
2366 // Make a tree from files having the location specified in fFileForTestMode.
2367 // Inspired from JF's CreateESDChain.
2368 if (fFileForTestMode.IsNull()) {
2369 Error("GetChainForTestMode", "For proof test mode please use SetFileForTestMode() pointing to a file that contains data file locations.");
2372 if (gSystem->AccessPathName(fFileForTestMode)) {
2373 Error("GetChainForTestMode", "File not found: %s", fFileForTestMode.Data());
2378 in.open(fFileForTestMode);
2380 // Read the input list of files and add them to the chain
2382 TString streeName(treeName);
2383 if (IsUseMCchain()) streeName = "TE";
2384 TChain *chain = new TChain(streeName);
2385 TList *friends = new TList();
2386 TChain *cfriend = 0;
2387 if (!fFriendChainName.IsNull()) {
2388 TObjArray *list = fFriendChainName.Tokenize(" ");
2391 while((str=(TObjString*)next())) {
2392 cfriend = new TChain(streeName, str->GetName());
2393 friends->Add(cfriend);
2394 chain->AddFriend(cfriend);
2399 TIter nextfriend(friends);
2403 if (line.IsNull() || line.BeginsWith("#")) continue;
2404 if (count++ == fNtestFiles) break;
2405 TString esdFile(line);
2406 TFile *file = TFile::Open(esdFile);
2407 if (file && !file->IsZombie()) {
2408 chain->Add(esdFile);
2410 if (!fFriendChainName.IsNull()) {
2411 if (esdFile.Index("#") > -1)
2412 esdFile.Remove(esdFile.Index("#"));
2413 bpath = gSystem->DirName(esdFile);
2417 while ((cfriend=(TChain*)nextfriend())) {
2419 fileFriend += cfriend->GetTitle();
2420 file = TFile::Open(fileFriend);
2421 if (file && !file->IsZombie()) {
2423 cfriend->Add(fileFriend);
2425 Fatal("GetChainForTestMode", "Cannot open friend file: %s", fileFriend.Data());
2431 Error("GetChainforTestMode", "Skipping un-openable file: %s", esdFile.Data());
2435 if (!chain->GetListOfFiles()->GetEntries()) {
2436 Error("GetChainForTestMode", "No file from %s could be opened", fFileForTestMode.Data());
2445 //______________________________________________________________________________
2446 const char *AliAnalysisAlien::GetJobStatus(Int_t jobidstart, Int_t lastid, Int_t &nrunning, Int_t &nwaiting, Int_t &nerror, Int_t &ndone)
2448 // Get job status for all jobs with jobid>jobidstart.
2449 static char mstatus[20];
2455 TGridJobStatusList *list = gGrid->Ps("");
2456 if (!list) return mstatus;
2457 Int_t nentries = list->GetSize();
2458 TGridJobStatus *status;
2460 for (Int_t ijob=0; ijob<nentries; ijob++) {
2461 status = (TGridJobStatus *)list->At(ijob);
2462 pid = gROOT->ProcessLine(Form("atoi(((TAlienJobStatus*)%p)->GetKey(\"queueId\"));", status));
2463 if (pid<jobidstart) continue;
2464 if (pid == lastid) {
2465 gROOT->ProcessLine(Form("sprintf((char*)%p,((TAlienJobStatus*)%p)->GetKey(\"status\"));",mstatus, status));
2467 switch (status->GetStatus()) {
2468 case TGridJobStatus::kWAITING:
2470 case TGridJobStatus::kRUNNING:
2472 case TGridJobStatus::kABORTED:
2473 case TGridJobStatus::kFAIL:
2474 case TGridJobStatus::kUNKNOWN:
2476 case TGridJobStatus::kDONE:
2485 //______________________________________________________________________________
2486 Bool_t AliAnalysisAlien::IsCollection(const char *lfn) const
2488 // Returns true if file is a collection. Functionality duplicated from
2489 // TAlien::Type() because we don't want to directly depend on TAlien.
2491 Error("IsCollection", "No connection to grid");
2494 TGridResult *res = gGrid->Command(Form("type -z %s",lfn),kFALSE);
2495 if (!res) return kFALSE;
2496 const char* typeStr = res->GetKey(0, "type");
2497 if (!typeStr || !strlen(typeStr)) return kFALSE;
2498 if (!strcmp(typeStr, "collection")) return kTRUE;
2503 //______________________________________________________________________________
2504 Bool_t AliAnalysisAlien::IsSingleOutput() const
2506 // Check if single-ouput option is on.
2507 return (!fOutputSingle.IsNull());
2510 //______________________________________________________________________________
2511 Long64_t AliAnalysisAlien::RunMacroAndExtractLibs(const char* macro, const char *args, TString &libs)
2513 // Tries to run the specified macro and return the libraries that it loads.
2515 if (strlen(macro)) expname = gSystem->ExpandPathName(macro);
2516 if (expname.IsNull() || gSystem->AccessPathName(expname)) {
2517 ::Error("RunMacroAndExtractLibs","Cannot find macro %s in current directory", macro);
2520 TString oldlibs = gSystem->GetLibraries();
2523 Long64_t retval = m.Exec(args, &error);
2524 if (error != TInterpreter::kNoError)
2526 ::Error("RunMacroAndExtractLibs", "Macro interpretation %s failed", macro);
2529 libs = gSystem->GetLibraries();
2530 libs.ReplaceAll(oldlibs, "");
2531 libs.Strip(TString::kLeading);
2532 TObjArray *libTokens = libs.Tokenize(" ");
2534 for (Int_t i=0; i<libTokens->GetEntries(); i++) {
2535 if (!libs.IsNull()) libs += " ";
2536 libs += gSystem->BaseName(libTokens->At(i)->GetName());
2542 //______________________________________________________________________________
2543 void AliAnalysisAlien::Print(Option_t *) const
2545 // Print current plugin settings.
2546 printf("### AliEn analysis plugin current settings ###\n");
2547 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
2548 if (mgr && mgr->IsProofMode()) {
2549 TString proofType = "= PLUGIN IN PROOF MODE ON CLUSTER:_________________";
2550 if (TestBit(AliAnalysisGrid::kTest))
2551 proofType = "= PLUGIN IN PROOF LITE MODE ON CLUSTER:____________";
2552 printf("%s %s\n", proofType.Data(), fProofCluster.Data());
2553 if (!fProofDataSet.IsNull())
2554 printf("= Requested data set:___________________________ %s\n", fProofDataSet.Data());
2556 printf("= Soft reset signal will be send to master______ CHANGE BEHAVIOR AFTER COMPLETION\n");
2558 printf("= Hard reset signal will be send to master______ CHANGE BEHAVIOR AFTER COMPLETION\n");
2559 if (!fROOTVersion.IsNull())
2560 printf("= ROOT version requested________________________ %s\n", fROOTVersion.Data());
2562 printf("= ROOT version requested________________________ default\n");
2563 printf("= AliRoot version requested_____________________ %s\n", fAliROOTVersion.Data());
2564 printf("= AliPhysics version requested__________________ %s\n", fAliPhysicsVersion.Data());
2565 if (!fAliRootMode.IsNull())
2566 printf("= Requested AliRoot mode________________________ %s\n", fAliRootMode.Data());
2568 printf("= Number of PROOF workers limited to____________ %d\n", fNproofWorkers);
2569 if (fNproofWorkersPerSlave)
2570 printf("= Maximum number of workers per slave___________ %d\n", fNproofWorkersPerSlave);
2571 if (TestSpecialBit(kClearPackages))
2572 printf("= ClearPackages requested...\n");
2573 if (fIncludePath.Data())
2574 printf("= Include path for runtime task compilation: ___ %s\n", fIncludePath.Data());
2575 printf("= Additional libs to be loaded or souces to be compiled runtime: <%s>\n",fAdditionalLibs.Data());
2576 if (fPackages && fPackages->GetEntries()) {
2577 TIter next(fPackages);
2580 while ((obj=next())) list += obj->GetName();
2581 printf("= Par files to be used: ________________________ %s\n", list.Data());
2583 if (TestSpecialBit(kProofConnectGrid))
2584 printf("= Requested PROOF connection to grid\n");
2587 printf("= OverwriteMode:________________________________ %d\n", fOverwriteMode);
2588 if (fOverwriteMode) {
2589 printf("***** NOTE: Overwrite mode will overwrite the input generated datasets and partial results from previous analysis. \
2590 \n***** To disable, use: plugin->SetOverwriteMode(kFALSE);\n");
2592 printf("= Copy files to grid: __________________________ %s\n", (IsUseCopy())?"YES":"NO");
2593 printf("= Check if files can be copied to grid: ________ %s\n", (IsCheckCopy())?"YES":"NO:Print");
2594 printf("= Production mode:______________________________ %d\n", fProductionMode);
2595 printf("= Version of API requested: ____________________ %s\n", fAPIVersion.Data());
2596 printf("= Version of ROOT requested: ___________________ %s\n", fROOTVersion.Data());
2597 printf("= Version of AliRoot requested: ________________ %s\n", fAliROOTVersion.Data());
2598 printf("= Version of AliPhysics requested: _____________ %s\n", fAliPhysicsVersion.Data());
2600 printf("= User running the plugin: _____________________ %s\n", fUser.Data());
2601 printf("= Grid workdir relative to user $HOME: _________ %s\n", fGridWorkingDir.Data());
2602 printf("= Grid output directory relative to workdir: ___ %s\n", fGridOutputDir.Data());
2603 TString basedatadir = fGridDataDir;
2604 TString pattern = fDataPattern;
2606 Int_t ind = pattern.Index(" ");
2608 basedatadir += "/%run%/";
2609 basedatadir += pattern(0, ind);
2610 pattern = pattern(ind+1, pattern.Length());
2612 printf("= Data base directory path requested: __________ %s\n", basedatadir.Data());
2613 printf("= Data search pattern: _________________________ %s\n", pattern.Data());
2614 printf("= Input data format: ___________________________ %s\n", fInputFormat.Data());
2615 if (fRunNumbers.Length())
2616 printf("= Run numbers to be processed: _________________ %s\n", fRunNumbers.Data());
2618 printf("= Run range to be processed: ___________________ %d-%d\n", fRunRange[0], fRunRange[1]);
2619 if (!fRunRange[0] && !fRunNumbers.Length()) {
2620 TIter next(fInputFiles);
2623 while ((obj=next())) list += obj->GetName();
2624 printf("= Input files to be processed: _________________ %s\n", list.Data());
2626 if (TestBit(AliAnalysisGrid::kTest))
2627 printf("= Number of input files used in test mode: _____ %d\n", fNtestFiles);
2628 printf("= List of output files to be registered: _______ %s\n", fOutputFiles.Data());
2629 printf("= List of outputs going to be archived: ________ %s\n", fOutputArchive.Data());
2630 printf("= List of outputs that should not be merged: ___ %s\n", fMergeExcludes.Data());
2631 printf("= List of outputs that should not be registered: %s\n", fRegisterExcludes.Data());
2632 printf("= List of outputs produced during Terminate: ___ %s\n", fTerminateFiles.Data());
2633 printf("=====================================================================\n");
2634 printf("= Job price: ___________________________________ %d\n", fPrice);
2635 printf("= Time to live (TTL): __________________________ %d\n", fTTL);
2636 printf("= Max files per subjob: ________________________ %d\n", fSplitMaxInputFileNumber);
2637 if (fMaxInitFailed>0)
2638 printf("= Max number of subjob fails to kill: __________ %d\n", fMaxInitFailed);
2639 if (fMasterResubmitThreshold>0)
2640 printf("= Resubmit master job if failed subjobs >_______ %d\n", fMasterResubmitThreshold);
2641 printf("= Number of replicas for the output files_______ %d\n", fNreplicas);
2642 if (fNrunsPerMaster>0)
2643 printf("= Number of runs per master job: _______________ %d\n", fNrunsPerMaster);
2644 printf("= Number of files in one chunk to be merged: ___ %d\n", fMaxMergeFiles);
2645 printf("= Name of the generated execution script: ______ %s\n", fExecutable.Data());
2646 printf("= Executable command: __________________________ %s\n", fExecutableCommand.Data());
2647 if (fArguments.Length())
2648 printf("= Arguments for the execution script: __________ %s\n",fArguments.Data());
2649 if (fExecutableArgs.Length())
2650 printf("= Arguments after macro name in executable______ %s\n",fExecutableArgs.Data());
2651 printf("= Name of the generated analysis macro: ________ %s\n",fAnalysisMacro.Data());
2652 printf("= User analysis files to be deployed: __________ %s\n",fAnalysisSource.Data());
2653 printf("= Additional libs to be loaded or souces to be compiled runtime: <%s>\n",fAdditionalLibs.Data());
2654 printf("= Master jobs split mode: ______________________ %s\n",fSplitMode.Data());
2656 printf("= Custom name for the dataset to be created: ___ %s\n", fDatasetName.Data());
2657 printf("= Name of the generated JDL: ___________________ %s\n", fJDLName.Data());
2658 if (fIncludePath.Data())
2659 printf("= Include path for runtime task compilation: ___ %s\n", fIncludePath.Data());
2660 if (fCloseSE.Length())
2661 printf("= Force job outputs to storage element: ________ %s\n", fCloseSE.Data());
2662 if (fFriendChainName.Length())
2663 printf("= Open friend chain file on worker: ____________ %s\n", fFriendChainName.Data());
2664 if (fPackages && fPackages->GetEntries()) {
2665 TIter next(fPackages);
2668 while ((obj=next())) list += obj->GetName();
2669 printf("= Par files to be used: ________________________ %s\n", list.Data());
2673 //______________________________________________________________________________
2674 void AliAnalysisAlien::SetDefaults()
2676 // Set default values for everything. What cannot be filled will be left empty.
2677 if (fGridJDL) delete fGridJDL;
2678 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
2679 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
2682 fSplitMaxInputFileNumber = 100;
2684 fMasterResubmitThreshold = 0;
2690 fNrunsPerMaster = 1;
2691 fMaxMergeFiles = 100;
2693 fExecutable = "analysis.sh";
2694 fExecutableCommand = "root -b -q -x";
2696 fExecutableArgs = "";
2697 fAnalysisMacro = "myAnalysis.C";
2698 fAnalysisSource = "";
2699 fAdditionalLibs = "";
2703 fAliROOTVersion = "";
2704 fAliPhysicsVersion = "";
2705 fUser = ""; // Your alien user name
2706 fGridWorkingDir = "";
2707 fGridDataDir = ""; // Can be like: /alice/sim/PDC_08a/LHC08c9/
2708 fDataPattern = "*AliESDs.root"; // Can be like: *AliESDs.root, */pass1/*AliESDs.root, ...
2709 fFriendChainName = "";
2710 fGridOutputDir = "output";
2711 fOutputArchive = "log_archive.zip:std*@disk=1 root_archive.zip:*.root@disk=2";
2712 fOutputFiles = ""; // Like "AliAODs.root histos.root"
2713 fInputFormat = "xml-single";
2714 fJDLName = "analysis.jdl";
2715 fJobTag = "Automatically generated analysis JDL";
2716 fMergeExcludes = "";
2719 SetCheckCopy(kTRUE);
2720 SetDefaultOutputs(kTRUE);
2724 //______________________________________________________________________________
2725 void AliAnalysisAlien::SetFriendChainName(const char *name, const char *libnames)
2727 // Set file name for the chain of friends and optionally additional libs to be loaded.
2728 // Libs should be separated by blancs.
2729 fFriendChainName = name;
2730 fFriendChainName.ReplaceAll(",", " ");
2731 fFriendChainName.Strip();
2732 fFriendChainName.ReplaceAll(" ", " ");
2734 fFriendLibs = libnames;
2735 if (fFriendLibs.Length()) {
2736 if(!fFriendLibs.Contains(".so") &&
2737 !fFriendLibs.Contains(".dylib"))
2738 Fatal("SetFriendChainName()", "You should provide explicit library names (with extension)");
2739 fFriendLibs.ReplaceAll(",", " ");
2740 fFriendLibs.Strip();
2741 fFriendLibs.ReplaceAll(" ", " ");
2745 //______________________________________________________________________________
2746 void AliAnalysisAlien::SetRootVersionForProof(const char *version)
2748 // Obsolete method. Use SetROOTVersion instead
2749 Warning("SetRootVersionForProof", "Obsolete. Use SetROOTVersion instead");
2750 if (fROOTVersion.IsNull()) SetROOTVersion(version);
2751 else Error("SetRootVersionForProof", "ROOT version already set to %s", fROOTVersion.Data());
2754 //______________________________________________________________________________
2755 Bool_t AliAnalysisAlien::CheckMergedFiles(const char *filename, const char *aliendir, Int_t nperchunk, const char *jdl)
2757 // Checks current merge stage, makes xml for the next stage, counts number of files, submits next stage.
2758 // First check if the result is already in the output directory.
2759 if (FileExists(Form("%s/%s",aliendir,filename))) {
2760 printf("Final merged results found. Not merging again.\n");
2763 // Now check the last stage done.
2766 if (!FileExists(Form("%s/Stage_%d.xml",aliendir, stage+1))) break;
2769 // Next stage of merging
2771 TString pattern = "*root_archive.zip";
2772 if (stage>1) pattern = Form("Stage_%d/*root_archive.zip", stage-1);
2773 TGridResult *res = gGrid->Command(Form("find -x Stage_%d %s %s", stage, aliendir, pattern.Data()));
2774 if (res) delete res;
2775 // Write standard output to file
2776 gROOT->ProcessLine(Form("gGrid->Stdout(); > %s", Form("Stage_%d.xml",stage)));
2777 // Count the number of files inside
2779 ifile.open(Form("Stage_%d.xml",stage));
2780 if (!ifile.good()) {
2781 ::Error("CheckMergedFiles", "Could not redirect result of the find command to file %s", Form("Stage_%d.xml",stage));
2786 while (!ifile.eof()) {
2788 if (line.Contains("/event")) nfiles++;
2792 ::Error("CheckMergedFiles", "Cannot start Stage_%d merging since Stage_%d did not produced yet output", stage, stage-1);
2795 printf("=== Stage_%d produced %d files\n", stage-1, nfiles);
2797 // Copy the file in the output directory
2798 printf("===> Copying collection %s in the output directory %s\n", Form("Stage_%d.xml",stage), aliendir);
2799 // TFile::Cp(Form("Stage_%d.xml",stage), Form("alien://%s/Stage_%d.xml",aliendir,stage));
2800 if (!copyLocal2Alien("CheckMergedFiles", Form("Stage_%d.xml",stage),
2801 Form("%s/Stage_%d.xml",aliendir,stage))) Fatal("","Terminating");
2802 // Check if this is the last stage to be done.
2803 Bool_t laststage = (nfiles<nperchunk);
2804 if (fMaxMergeStages && stage>=fMaxMergeStages) laststage = kTRUE;
2807 printf("### Submiting final merging stage %d\n", stage);
2808 TString finalJDL = jdl;
2809 finalJDL.ReplaceAll(".jdl", "_final.jdl");
2810 TString query = Form("submit %s %s %d", finalJDL.Data(), aliendir, stage);
2811 jobId = SubmitSingleJob(query);
2813 printf("### Submiting merging stage %d\n", stage);
2814 TString query = Form("submit %s %s %d", jdl, aliendir, stage);
2815 jobId = SubmitSingleJob(query);
2817 if (!jobId) return kFALSE;
2819 if (!fGridJobIDs.IsNull()) fGridJobIDs.Append(" ");
2820 fGridJobIDs.Append(Form("%d", jobId));
2821 if (!fGridStages.IsNull()) fGridStages.Append(" ");
2822 fGridStages.Append(Form("%s_merge_stage%d",
2823 laststage ? "final" : "partial", stage));
2828 //______________________________________________________________________________
2829 AliAnalysisManager *AliAnalysisAlien::LoadAnalysisManager(const char *fname)
2831 // Loat the analysis manager from a file.
2832 TFile *file = TFile::Open(fname);
2834 ::Error("LoadAnalysisManager", "Cannot open file %s", fname);
2837 TIter nextkey(file->GetListOfKeys());
2838 AliAnalysisManager *mgr = 0;
2840 while ((key=(TKey*)nextkey())) {
2841 if (!strcmp(key->GetClassName(), "AliAnalysisManager"))
2842 mgr = (AliAnalysisManager*)file->Get(key->GetName());
2845 ::Error("LoadAnalysisManager", "No analysis manager found in file %s", fname);
2849 //______________________________________________________________________________
2850 Int_t AliAnalysisAlien::SubmitSingleJob(const char *query)
2852 // Submits a single job corresponding to the query and returns job id. If 0 submission failed.
2853 if (!gGrid) return 0;
2854 printf("=> %s ------> ",query);
2855 TGridResult *res = gGrid->Command(query);
2857 TString jobId = res->GetKey(0,"jobId");
2859 if (jobId.IsNull()) {
2860 printf("submission failed. Reason:\n");
2863 ::Error("SubmitSingleJob", "Your query %s could not be submitted", query);
2866 Int_t ijobId = jobId.Atoi();
2867 printf(" Job id: '%s' (%d)\n", jobId.Data(), ijobId);
2871 //______________________________________________________________________________
2872 Bool_t AliAnalysisAlien::MergeInfo(const char *output, const char *collection)
2874 // Merges a collection of output files using concatenation.
2875 TString scoll(collection);
2876 if (!scoll.Contains(".xml")) return kFALSE;
2877 TGridCollection *coll = (TGridCollection*)gROOT->ProcessLine(Form("TAlienCollection::Open(\"%s\");", collection));
2879 ::Error("MergeInfo", "Input XML %s collection empty.", collection);
2882 // Iterate grid collection
2884 Bool_t merged = kFALSE;
2886 while (coll->Next()) {
2887 TString fname = gSystem->DirName(coll->GetTURL());
2890 outtmp = Form("%d_%s", ifile, output);
2891 if (!TFile::Cp(fname, outtmp)) {
2892 ::Error("MergeInfo", "Could not copy %s", fname.Data());
2897 gSystem->Exec(Form("cp %s lastmerged", outtmp.Data()));
2900 gSystem->Exec(Form("cat lastmerged %s > tempmerged", outtmp.Data()));
2901 gSystem->Exec("cp tempmerged lastmerged");
2904 gSystem->Exec(Form("cp lastmerged %s", output));
2905 gSystem->Exec(Form("rm tempmerged lastmerged *_%s", output));
2911 //______________________________________________________________________________
2912 Bool_t AliAnalysisAlien::MergeOutput(const char *output, const char *basedir, Int_t nmaxmerge, Int_t stage)
2914 // Merge given output files from basedir. Basedir can be an alien output directory
2915 // but also an xml file with root_archive.zip locations. The file merger will merge nmaxmerge
2916 // files in a group (ignored for xml input). Merging can be done in stages:
2917 // stage=0 : will merge all existing files in a single stage, supporting resume if run locally
2918 // stage=1 : works with an xml of all root_archive.zip in the output directory
2919 // stage>1 : works with an xml of all root_archive.zip in the Stage_<n-1> directory
2920 TString outputFile = output;
2922 TString outputChunk;
2923 TString previousChunk = "";
2924 TObjArray *listoffiles = new TObjArray();
2925 // listoffiles->SetOwner();
2926 Int_t countChunk = 0;
2927 Int_t countZero = nmaxmerge;
2928 Bool_t merged = kTRUE;
2929 Bool_t isGrid = kTRUE;
2930 Int_t index = outputFile.Index("@");
2931 if (index > 0) outputFile.Remove(index);
2932 TString inputFile = outputFile;
2933 TString sbasedir = basedir;
2934 if (sbasedir.Contains(".xml")) {
2935 // Merge files pointed by the xml - ignore nmaxmerge and set ichunk to 0
2936 nmaxmerge = 9999999;
2937 TGridCollection *coll = (TGridCollection*)gROOT->ProcessLine(Form("TAlienCollection::Open(\"%s\");", basedir));
2939 ::Error("MergeOutput", "Input XML collection empty.");
2942 // Iterate grid collection
2943 while (coll->Next()) {
2944 TString fname = gSystem->DirName(coll->GetTURL());
2947 listoffiles->Add(new TNamed(fname.Data(),""));
2949 } else if (sbasedir.Contains(".txt")) {
2950 // The file having the .txt extension is expected to contain a list of
2951 // folders where the output files will be looked. For alien folders,
2952 // the full folder LFN is expected (starting with alien://)
2953 // Assume lfn's on each line
2958 ::Error("MergeOutput", "File %s cannot be opened. Merging stopped." ,sbasedir.Data());
2964 if (line.IsNull() || line.BeginsWith("#")) continue;
2966 if (!line.Contains("alien:")) isGrid = kFALSE;
2970 listoffiles->Add(new TNamed(line.Data(),""));
2974 ::Error("MergeOutput","Input file %s contains no files to be merged\n", sbasedir.Data());
2979 command = Form("find %s/ *%s", basedir, inputFile.Data());
2980 printf("command: %s\n", command.Data());
2981 TGridResult *res = gGrid->Command(command);
2983 ::Error("MergeOutput","No result for the find command\n");
2989 while ((map=(TMap*)nextmap())) {
2990 TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("turl"));
2991 if (!objs || !objs->GetString().Length()) {
2992 // Nothing found - skip this output
2997 listoffiles->Add(new TNamed(objs->GetName(),""));
3001 if (!listoffiles->GetEntries()) {
3002 ::Error("MergeOutput","No result for the find command\n");
3007 TFileMerger *fm = 0;
3008 TIter next0(listoffiles);
3009 TObjArray *listoffilestmp = new TObjArray();
3010 listoffilestmp->SetOwner();
3013 // Keep only the files at upper level
3014 Int_t countChar = 0;
3015 while ((nextfile=next0())) {
3016 snextfile = nextfile->GetName();
3017 Int_t crtCount = snextfile.CountChar('/');
3018 if (nextfile == listoffiles->First()) countChar = crtCount;
3019 if (crtCount < countChar) countChar = crtCount;
3022 while ((nextfile=next0())) {
3023 snextfile = nextfile->GetName();
3024 Int_t crtCount = snextfile.CountChar('/');
3025 if (crtCount > countChar) {
3029 listoffilestmp->Add(nextfile);
3032 listoffiles = listoffilestmp; // Now contains 'good' files
3033 listoffiles->Print();
3034 TIter next(listoffiles);
3035 // Check if there is a merge operation to resume. Works only for stage 0 or 1.
3036 outputChunk = outputFile;
3037 outputChunk.ReplaceAll(".root", "_*.root");
3038 // Check for existent temporary merge files
3039 // Check overwrite mode and remove previous partial results if needed
3040 // Preserve old merging functionality for stage 0.
3042 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
3044 // Skip as many input files as in a chunk
3045 for (Int_t counter=0; counter<nmaxmerge; counter++) {
3048 ::Error("MergeOutput", "Mismatch found. Please remove partial merged files from local dir.");
3052 snextfile = nextfile->GetName();
3054 outputChunk = outputFile;
3055 outputChunk.ReplaceAll(".root", Form("_%04d.root", countChunk));
3057 if (gSystem->AccessPathName(outputChunk)) continue;
3058 // Merged file with chunks up to <countChunk> found
3059 ::Info("MergeOutput", "Resume merging of <%s> from <%s>\n", outputFile.Data(), outputChunk.Data());
3060 previousChunk = outputChunk;
3064 countZero = nmaxmerge;
3066 while ((nextfile=next())) {
3067 snextfile = nextfile->GetName();
3068 // Loop 'find' results and get next LFN
3069 if (countZero == nmaxmerge) {
3070 // First file in chunk - create file merger and add previous chunk if any.
3071 fm = new TFileMerger(isGrid);
3072 fm->SetFastMethod(kTRUE);
3073 if (previousChunk.Length()) fm->AddFile(previousChunk.Data());
3074 outputChunk = outputFile;
3075 outputChunk.ReplaceAll(".root", Form("_%04d.root", countChunk));
3077 // If last file found, put merged results in the output file
3078 if (nextfile == listoffiles->Last()) outputChunk = outputFile;
3079 // Add file to be merged and decrement chunk counter.
3080 fm->AddFile(snextfile);
3082 if (countZero==0 || nextfile == listoffiles->Last()) {
3083 if (!fm->GetMergeList() || !fm->GetMergeList()->GetSize()) {
3084 // Nothing found - skip this output
3085 ::Warning("MergeOutput", "No <%s> files found.", inputFile.Data());
3089 fm->OutputFile(outputChunk);
3090 // Merge the outputs, then go to next chunk
3092 ::Error("MergeOutput", "Could not merge all <%s> files", outputFile.Data());
3096 ::Info("MergeOutputs", "\n##### Merged %d output files to <%s>", fm->GetMergeList()->GetSize(), outputChunk.Data());
3097 gSystem->Unlink(previousChunk);
3099 if (nextfile == listoffiles->Last()) break;
3101 countZero = nmaxmerge;
3102 previousChunk = outputChunk;
3109 // Merging stage different than 0.
3110 // Move to the begining of the requested chunk.
3111 fm = new TFileMerger(isGrid);
3112 fm->SetFastMethod(kTRUE);
3113 while ((nextfile=next())) fm->AddFile(nextfile->GetName());
3115 if (!fm->GetMergeList() || !fm->GetMergeList()->GetSize()) {
3116 // Nothing found - skip this output
3117 ::Warning("MergeOutput", "No <%s> files found.", inputFile.Data());
3121 fm->OutputFile(outputFile);
3122 // Merge the outputs
3124 ::Error("MergeOutput", "Could not merge all <%s> files", outputFile.Data());
3128 ::Info("MergeOutput", "\n##### Merged %d output files to <%s>", fm->GetMergeList()->GetSize(), outputFile.Data());
3134 //______________________________________________________________________________
3135 Bool_t AliAnalysisAlien::MergeOutputs()
3137 // Merge analysis outputs existing in the AliEn space.
3138 if (TestBit(AliAnalysisGrid::kTest)) return kTRUE;
3139 if (TestBit(AliAnalysisGrid::kOffline)) return kFALSE;
3141 Error("MergeOutputs", "Cannot merge outputs without grid connection. Terminate will NOT be executed");
3145 if (!TestBit(AliAnalysisGrid::kMerge)) {
3146 Info("MergeOutputs", "### Re-run with <MergeViaJDL> option in terminate mode of the plugin to submit merging jobs ###");
3149 if (fProductionMode) {
3150 Info("MergeOutputs", "### Merging will be submitted by LPM manager... ###");
3153 Info("MergeOutputs", "Submitting merging JDL");
3154 if (!SubmitMerging()) return kFALSE;
3155 Info("MergeOutputs", "### Re-run with <MergeViaJDL> off to collect results after merging jobs are done ###");
3156 Info("MergeOutputs", "### The Terminate() method is executed by the merging jobs");
3159 // Get the output path
3160 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
3161 if (!DirectoryExists(fGridOutputDir)) {
3162 Error("MergeOutputs", "Grid output directory %s not found. Terminate() will NOT be executed", fGridOutputDir.Data());
3165 if (!fOutputFiles.Length()) {
3166 Error("MergeOutputs", "No output file names defined. Are you running the right AliAnalysisAlien configuration ?");
3169 // Check if fast read option was requested
3170 Info("MergeOutputs", "Started local merging of output files from: alien://%s \
3171 \n======= overwrite mode = %d", fGridOutputDir.Data(), (Int_t)fOverwriteMode);
3172 if (fFastReadOption) {
3173 Warning("MergeOutputs", "You requested FastRead option. Using xrootd flags to reduce timeouts. This may skip some files that could be accessed ! \
3174 \n+++ NOTE: To disable this option, use: plugin->SetFastReadOption(kFALSE)");
3175 gEnv->SetValue("XNet.ConnectTimeout",50);
3176 gEnv->SetValue("XNet.RequestTimeout",50);
3177 gEnv->SetValue("XNet.MaxRedirectCount",2);
3178 gEnv->SetValue("XNet.ReconnectTimeout",50);
3179 gEnv->SetValue("XNet.FirstConnectMaxCnt",1);
3181 // Make sure we change the temporary directory
3182 gSystem->Setenv("TMPDIR", gSystem->pwd());
3183 // Set temporary compilation directory to current one
3184 gSystem->SetBuildDir(gSystem->pwd(), kTRUE);
3185 TObjArray *list = fOutputFiles.Tokenize(",");
3189 Bool_t merged = kTRUE;
3190 while((str=(TObjString*)next())) {
3191 outputFile = str->GetString();
3192 Int_t index = outputFile.Index("@");
3193 if (index > 0) outputFile.Remove(index);
3194 TString outputChunk = outputFile;
3195 outputChunk.ReplaceAll(".root", "_*.root");
3196 // Skip already merged outputs
3197 if (!gSystem->AccessPathName(outputFile)) {
3198 if (fOverwriteMode) {
3199 Info("MergeOutputs", "Overwrite mode. Existing file %s was deleted.", outputFile.Data());
3200 gSystem->Unlink(outputFile);
3201 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
3202 Info("MergeOutput", "Overwrite mode: partial merged files %s will removed",
3203 outputChunk.Data());
3204 gSystem->Exec(Form("rm -f %s", outputChunk.Data()));
3207 Info("MergeOutputs", "Output file <%s> found. Not merging again.", outputFile.Data());
3211 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
3212 Info("MergeOutput", "Overwrite mode: partial merged files %s will removed",
3213 outputChunk.Data());
3214 gSystem->Exec(Form("rm -f %s", outputChunk.Data()));
3217 if (fMergeExcludes.Contains(outputFile.Data()) ||
3218 fRegisterExcludes.Contains(outputFile.Data())) continue;
3219 // Perform a 'find' command in the output directory, looking for registered outputs
3220 merged = MergeOutput(outputFile, fGridOutputDir, fMaxMergeFiles);
3222 Error("MergeOutputs", "Terminate() will NOT be executed");
3226 TFile *fileOpened = (TFile*)gROOT->GetListOfFiles()->FindObject(outputFile);
3227 if (fileOpened) fileOpened->Close();
3233 //______________________________________________________________________________
3234 void AliAnalysisAlien::SetDefaultOutputs(Bool_t flag)
3236 // Use the output files connected to output containers from the analysis manager
3237 // rather than the files defined by SetOutputFiles
3238 if (flag && !TObject::TestBit(AliAnalysisGrid::kDefaultOutputs))
3239 Info("SetDefaultOutputs", "Plugin will use the output files taken from analysis manager");
3240 TObject::SetBit(AliAnalysisGrid::kDefaultOutputs, flag);
3243 //______________________________________________________________________________
3244 void AliAnalysisAlien::SetOutputFiles(const char *list)
3246 // Manually set the output files list.
3247 // Removes duplicates. Not allowed if default outputs are not disabled.
3248 if (TObject::TestBit(AliAnalysisGrid::kDefaultOutputs)) {
3249 Fatal("SetOutputFiles", "You have to explicitly call SetDefaultOutputs(kFALSE) to manually set output files.");
3252 Info("SetOutputFiles", "Output file list is set manually - you are on your own.");
3254 TString slist = list;
3255 if (slist.Contains("@")) Warning("SetOutputFiles","The plugin does not allow explicit SE's. Please use: SetNumberOfReplicas() instead.");
3256 TObjArray *arr = slist.Tokenize(" ");
3260 while ((os=(TObjString*)next())) {
3261 sout = os->GetString();
3262 if (sout.Index("@")>0) sout.Remove(sout.Index("@"));
3263 if (fOutputFiles.Contains(sout)) continue;
3264 if (!fOutputFiles.IsNull()) fOutputFiles += ",";
3265 fOutputFiles += sout;
3270 //______________________________________________________________________________
3271 void AliAnalysisAlien::SetOutputArchive(const char *list)
3273 // Manually set the output archive list. Free text - you are on your own...
3274 // Not allowed if default outputs are not disabled.
3275 if (TObject::TestBit(AliAnalysisGrid::kDefaultOutputs)) {
3276 Fatal("SetOutputArchive", "You have to explicitly call SetDefaultOutputs(kFALSE) to manually set the output archives.");
3279 Info("SetOutputArchive", "Output archive is set manually - you are on your own.");
3280 fOutputArchive = list;
3283 //______________________________________________________________________________
3284 void AliAnalysisAlien::SetProofParameter(const char *pname, const char *value)
3286 // Set some PROOF special parameter.
3287 TPair *pair = dynamic_cast<TPair*>(fProofParam.FindObject(pname));
3289 TObject *old = pair->Key();
3290 TObject *val = pair->Value();
3291 fProofParam.Remove(old);
3295 fProofParam.Add(new TObjString(pname), new TObjString(value));
3298 //______________________________________________________________________________
3299 const char *AliAnalysisAlien::GetProofParameter(const char *pname) const
3301 // Returns a special PROOF parameter.
3302 TPair *pair = dynamic_cast<TPair*>(fProofParam.FindObject(pname));
3303 if (!pair) return 0;
3304 return pair->Value()->GetName();
3307 //______________________________________________________________________________
3308 Bool_t AliAnalysisAlien::StartAnalysis(Long64_t /*nentries*/, Long64_t /*firstEntry*/)
3310 // Start remote grid analysis.
3311 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
3312 Bool_t testMode = TestBit(AliAnalysisGrid::kTest);
3313 if (!mgr || !mgr->IsInitialized()) {
3314 Error("StartAnalysis", "You need an initialized analysis manager for this");
3317 // Are we in PROOF mode ?
3318 if (mgr->IsProofMode()) {
3319 if (testMode) Info("StartAnalysis", "##### Starting PROOF analysis with Proof Lite via the plugin #####");
3320 else Info("StartAnalysis", "##### Starting PROOF analysis on cluster <%s> via the plugin #####", fProofCluster.Data());
3321 if (fProofCluster.IsNull()) {
3322 Error("StartAnalysis", "You need to specify the proof cluster name via SetProofCluster");
3325 if (fProofDataSet.IsNull() && !testMode) {
3326 Error("StartAnalysis", "You need to specify a dataset using SetProofDataSet()");
3329 // Set the needed environment
3330 gEnv->SetValue("XSec.GSI.DelegProxy","2");
3331 // Do we need to reset PROOF ? The success of the Reset operation cannot be checked
3332 if (fProofReset && !testMode) {
3333 if (fProofReset==1) {
3334 Info("StartAnalysis", "Sending soft reset signal to proof cluster %s", fProofCluster.Data());
3335 gROOT->ProcessLine(Form("TProof::Reset(\"%s\", kFALSE);", fProofCluster.Data()));
3337 Info("StartAnalysis", "Sending hard reset signal to proof cluster %s", fProofCluster.Data());
3338 gROOT->ProcessLine(Form("TProof::Reset(\"%s\", kTRUE);", fProofCluster.Data()));
3340 Info("StartAnalysis", "Stopping the analysis. Please use SetProofReset(0) to resume.");
3345 // Check if there is an old active session
3346 Long_t nsessions = gROOT->ProcessLine(Form("TProof::Mgr(\"%s\")->QuerySessions(\"\")->GetEntries();", fProofCluster.Data()));
3348 Error("StartAnalysis","You have to reset your old session first\n");
3352 // Do we need to change the ROOT version ? The success of this cannot be checked.
3353 if (!fROOTVersion.IsNull() && !testMode) {
3354 gROOT->ProcessLine(Form("TProof::Mgr(\"%s\")->SetROOTVersion(\"VO_ALICE@ROOT::%s\");",
3355 fProofCluster.Data(), fROOTVersion.Data()));
3357 // Connect to PROOF and check the status
3360 if (fNproofWorkersPerSlave) sworkers = Form("workers=%dx", fNproofWorkersPerSlave);
3361 else if (fNproofWorkers) sworkers = Form("workers=%d", fNproofWorkers);
3363 if (!sworkers.IsNull())
3364 proof = gROOT->ProcessLine(Form("TProof::Open(\"%s\", \"%s\");", fProofCluster.Data(), sworkers.Data()));
3366 proof = gROOT->ProcessLine(Form("TProof::Open(\"%s\");", fProofCluster.Data()));
3368 proof = gROOT->ProcessLine("TProof::Open(\"\");");
3370 Error("StartAnalysis", "Could not start PROOF in test mode");
3375 Error("StartAnalysis", "Could not connect to PROOF cluster <%s>", fProofCluster.Data());
3378 if (fNproofWorkersPerSlave*fNproofWorkers > 0)
3379 gROOT->ProcessLine(Form("gProof->SetParallel(%d);", fNproofWorkers));
3380 // Set proof special parameters if any
3381 TIter nextpp(&fProofParam);
3382 TObject *proofparam;
3383 while ((proofparam=nextpp())) {
3384 TString svalue = GetProofParameter(proofparam->GetName());
3385 gROOT->ProcessLine(Form("gProof->SetParameter(\"%s\",%s);", proofparam->GetName(), svalue.Data()));
3387 // Is dataset existing ?
3389 TString dataset = fProofDataSet;
3390 Int_t index = dataset.Index("#");
3391 if (index>=0) dataset.Remove(index);
3392 // if (!gROOT->ProcessLine(Form("gProof->ExistsDataSet(\"%s\");",fProofDataSet.Data()))) {
3393 // Error("StartAnalysis", "Dataset %s not existing", fProofDataSet.Data());
3396 // Info("StartAnalysis", "Dataset %s found", dataset.Data());
3398 // Is ClearPackages() needed ?
3399 if (TestSpecialBit(kClearPackages)) {
3400 Info("StartAnalysis", "ClearPackages signal sent to PROOF. Use SetClearPackages(kFALSE) to reset this.");
3401 gROOT->ProcessLine("gProof->ClearPackages();");
3403 // Is a given aliroot mode requested ?
3406 if (!fAliRootMode.IsNull()) {
3407 TString alirootMode = fAliRootMode;
3408 if (alirootMode == "default") alirootMode = "";
3409 Info("StartAnalysis", "You are requesting AliRoot mode: %s", fAliRootMode.Data());
3410 optionsList.SetOwner();
3411 optionsList.Add(new TNamed("ALIROOT_MODE", alirootMode.Data()));
3412 // Check the additional libs to be loaded
3414 Bool_t parMode = kFALSE;
3415 if (!alirootMode.IsNull()) extraLibs = "ANALYSIS:ANALYSISalice:OADB";
3416 // Parse the extra libs for .so or .dylib
3417 if (fAdditionalLibs.Length()) {
3418 TString additionalLibs = fAdditionalLibs;
3419 additionalLibs.Strip();
3420 if (additionalLibs.Length() && fFriendLibs.Length())
3421 additionalLibs += " ";
3422 additionalLibs += fFriendLibs;
3423 TObjArray *list = additionalLibs.Tokenize(" ");
3426 while((str=(TObjString*)next())) {
3427 if (str->GetString().Contains(".so") ||
3428 str->GetString().Contains(".dylib") ) {
3430 Warning("StartAnalysis", "Plugin does not support loading libs after par files in PROOF mode. Library %s and following will not load on workers", str->GetName());
3433 TString stmp = str->GetName();
3434 if (stmp.BeginsWith("lib")) stmp.Remove(0,3);
3435 stmp.ReplaceAll(".so","");
3436 stmp.ReplaceAll(".dylib","");
3437 if (!extraLibs.IsNull()) extraLibs += ":";
3441 if (str->GetString().Contains(".par")) {
3442 // The first par file found in the list will not allow any further .so
3444 if (!parLibs.IsNull()) parLibs += ":";
3445 parLibs += str->GetName();
3449 if (list) delete list;
3451 if (!extraLibs.IsNull()) {
3452 Info("StartAnalysis", "Adding extra libs: %s",extraLibs.Data());
3453 optionsList.Add(new TNamed("ALIROOT_EXTRA_LIBS",extraLibs.Data()));
3455 // Check extra includes
3456 if (!fIncludePath.IsNull()) {
3457 TString includePath = fIncludePath;
3458 includePath.ReplaceAll(" ",":");
3459 includePath.ReplaceAll("$ALICE_ROOT/","");
3460 includePath.ReplaceAll("${ALICE_ROOT}/","");
3461 includePath.ReplaceAll("-I","");
3462 includePath.Remove(TString::kTrailing, ':');
3463 Info("StartAnalysis", "Adding extra includes: %s",includePath.Data());
3464 optionsList.Add(new TNamed("ALIROOT_EXTRA_INCLUDES",includePath.Data()));
3466 // Check if connection to grid is requested
3467 if (TestSpecialBit(kProofConnectGrid))
3468 optionsList.Add(new TNamed("ALIROOT_ENABLE_ALIEN", "1"));
3469 // Enable AliRoot par
3471 // Enable proof lite package
3472 TString alirootLite = gSystem->ExpandPathName("$ALICE_ROOT/ANALYSIS/macros/AliRootProofLite.par");
3473 for (Int_t i=0; i<optionsList.GetSize(); i++) {
3474 TNamed *obj = (TNamed*)optionsList.At(i);
3475 printf("%s %s\n", obj->GetName(), obj->GetTitle());
3477 if (!gROOT->ProcessLine(Form("gProof->UploadPackage(\"%s\");",alirootLite.Data()))
3478 && !gROOT->ProcessLine(Form("gProof->EnablePackage(\"%s\", (TList*)%p);",alirootLite.Data(),&optionsList))) {
3479 Info("StartAnalysis", "AliRootProofLite enabled");
3481 Error("StartAnalysis", "There was an error trying to enable package AliRootProofLite.par");
3485 if ( ! fAliROOTVersion.IsNull() ) {
3486 if (gROOT->ProcessLine(Form("gProof->EnablePackage(\"VO_ALICE@AliRoot::%s\", (TList*)%p, kTRUE);",
3487 fAliROOTVersion.Data(), &optionsList))) {
3488 Error("StartAnalysis", "There was an error trying to enable package VO_ALICE@AliRoot::%s", fAliROOTVersion.Data());
3492 if ( ! fAliPhysicsVersion.IsNull() ) {
3493 if (gROOT->ProcessLine(Form("gProof->EnablePackage(\"VO_ALICE@AliPhysics::%s\", (TList*)%p, kTRUE);",
3494 fAliPhysicsVersion.Data(), &optionsList))) {
3495 Error("StartAnalysis", "There was an error trying to enable package VO_ALICE@AliPhysics::%s", fAliPhysicsVersion.Data());
3500 // Enable first par files from fAdditionalLibs
3501 if (!parLibs.IsNull()) {
3502 TObjArray *list = parLibs.Tokenize(":");
3504 TObjString *package;
3505 while((package=(TObjString*)next())) {
3506 TString spkg = package->GetName();
3507 spkg.ReplaceAll(".par", "");
3508 gSystem->Exec(TString::Format("rm -rf %s", spkg.Data()));
3509 if (!gROOT->ProcessLine(Form("gProof->UploadPackage(\"%s\");", package->GetName()))) {
3510 TString enablePackage = (testMode)?Form("gProof->EnablePackage(\"%s\",kFALSE);", package->GetName()):Form("gProof->EnablePackage(\"%s\",kTRUE);", package->GetName());
3511 if (gROOT->ProcessLine(enablePackage)) {
3512 Error("StartAnalysis", "There was an error trying to enable package %s", package->GetName());
3516 Error("StartAnalysis", "There was an error trying to upload package %s", package->GetName());
3520 if (list) delete list;
3523 if ((fAdditionalLibs.Contains(".so") || fAdditionalLibs.Contains(".dylib")) &&
3525 Error("StartAnalysis", "You request additional libs to be loaded but did not enabled any AliRoot mode. Please refer to: \
3526 \n http://aaf.cern.ch/node/83 and use a parameter for SetAliRootMode()");
3530 // Enable par files if requested
3531 if (fPackages && fPackages->GetEntries()) {
3532 TIter next(fPackages);
3534 while ((package=next())) {
3535 // Skip packages already enabled
3536 if (parLibs.Contains(package->GetName())) continue;
3537 TString spkg = package->GetName();
3538 spkg.ReplaceAll(".par", "");
3539 gSystem->Exec(TString::Format("rm -rf %s", spkg.Data()));
3540 if (!gROOT->ProcessLine(Form("gProof->UploadPackage(\"%s\");", package->GetName()))) {
3541 if (gROOT->ProcessLine(Form("gProof->EnablePackage(\"%s\",kTRUE);", package->GetName()))) {
3542 Error("StartAnalysis", "There was an error trying to enable package %s", package->GetName());
3546 Error("StartAnalysis", "There was an error trying to upload package %s", package->GetName());
3551 // Do we need to load analysis source files ?
3552 // NOTE: don't load on client since this is anyway done by the user to attach his task.
3553 if (fAnalysisSource.Length()) {
3554 TObjArray *list = fAnalysisSource.Tokenize(" ");
3557 while((str=(TObjString*)next())) {
3558 gROOT->ProcessLine(Form("gProof->Load(\"%s+g\", kTRUE);", str->GetName()));
3560 if (list) delete list;
3563 // Register dataset to proof lite.
3564 if (fFileForTestMode.IsNull()) {
3565 Error("GetChainForTestMode", "For proof test mode please use SetFileForTestMode() pointing to a file that contains data file locations.");
3568 if (gSystem->AccessPathName(fFileForTestMode)) {
3569 Error("GetChainForTestMode", "File not found: %s", fFileForTestMode.Data());
3572 TFileCollection *coll = new TFileCollection();
3573 coll->AddFromFile(fFileForTestMode);
3574 gROOT->ProcessLine(Form("gProof->RegisterDataSet(\"test_collection\", (TFileCollection*)%p, \"OV\");", coll));
3575 gROOT->ProcessLine("gProof->ShowDataSets()");
3580 // Check if output files have to be taken from the analysis manager
3581 if (TestBit(AliAnalysisGrid::kDefaultOutputs)) {
3582 // Add output files and AOD files
3583 fOutputFiles = GetListOfFiles("outaod");
3584 // Add extra files registered to the analysis manager
3585 TString extra = GetListOfFiles("ext");
3586 if (!extra.IsNull()) {
3587 extra.ReplaceAll(".root", "*.root");
3588 if (!fOutputFiles.IsNull()) fOutputFiles += ",";
3589 fOutputFiles += extra;
3591 // Compose the output archive.
3592 fOutputArchive = "log_archive.zip:std*@disk=1 ";
3593 if (mgr->IsCollectThroughput())
3594 fOutputArchive += Form("root_archive.zip:%s,*.stat@disk=%d %s@disk=%d",fOutputFiles.Data(),fNreplicas, mgr->GetFileInfoLog(),fNreplicas);
3596 fOutputArchive += Form("root_archive.zip:%s,*.stat@disk=%d",fOutputFiles.Data(),fNreplicas);
3598 // if (!fCloseSE.Length()) fCloseSE = gSystem->Getenv("alien_CLOSE_SE");
3599 if (TestBit(AliAnalysisGrid::kOffline)) {
3600 Info("StartAnalysis","\n##### OFFLINE MODE ##### Files to be used in GRID are produced but not copied \
3601 \n there nor any job run. You can revise the JDL and analysis \
3602 \n macro then run the same in \"submit\" mode.");
3603 } else if (TestBit(AliAnalysisGrid::kTest)) {
3604 Info("StartAnalysis","\n##### LOCAL MODE ##### Your analysis will be run locally on a subset of the requested \
3606 } else if (TestBit(AliAnalysisGrid::kSubmit)) {
3607 Info("StartAnalysis","\n##### SUBMIT MODE ##### Files required by your analysis are copied to your grid working \
3608 \n space and job submitted.");
3609 } else if (TestBit(AliAnalysisGrid::kMerge)) {
3610 Info("StartAnalysis","\n##### MERGE MODE ##### The registered outputs of the analysis will be merged");
3611 if (fMergeViaJDL) CheckInputData();
3614 Info("StartAnalysis","\n##### FULL ANALYSIS MODE ##### Producing needed files and submitting your analysis job...");
3619 Error("StartAnalysis", "Cannot start grid analysis without grid connection");
3622 if (IsCheckCopy() && gGrid) CheckFileCopy(gGrid->GetHomeDirectory());
3623 if (!CheckInputData()) {
3624 Error("StartAnalysis", "There was an error in preprocessing your requested input data");
3627 if (!CreateDataset(fDataPattern)) {
3629 if (!fRunNumbers.Length() && !fRunRange[0]) serror = Form("path to data directory: <%s>", fGridDataDir.Data());
3630 if (fRunNumbers.Length()) serror = "run numbers";
3631 if (fRunRange[0]) serror = Form("run range [%d, %d]", fRunRange[0], fRunRange[1]);
3632 serror += Form("\n or data pattern <%s>", fDataPattern.Data());
3633 Error("StartAnalysis", "No data to process. Please fix %s in your plugin configuration.", serror.Data());
3636 WriteAnalysisFile();
3637 WriteAnalysisMacro();
3639 WriteValidationScript();
3641 WriteMergingMacro();
3642 WriteMergeExecutable();
3643 WriteValidationScript(kTRUE);
3645 if (!CreateJDL()) return kFALSE;
3646 if (TestBit(AliAnalysisGrid::kOffline)) return kFALSE;
3648 // Locally testing the analysis
3649 Info("StartAnalysis", "\n_______________________________________________________________________ \
3650 \n Running analysis script in a daughter shell as on a worker node \
3651 \n_______________________________________________________________________");
3652 TObjArray *list = fOutputFiles.Tokenize(",");
3656 while((str=(TObjString*)next())) {
3657 outputFile = str->GetString();
3658 Int_t index = outputFile.Index("@");
3659 if (index > 0) outputFile.Remove(index);
3660 if (!gSystem->AccessPathName(outputFile)) gSystem->Exec(Form("rm %s", outputFile.Data()));
3663 gSystem->Exec(Form("bash %s 2>stderr", fExecutable.Data()));
3664 gSystem->Exec(Form("bash %s",fValidationScript.Data()));
3665 // gSystem->Exec("cat stdout");
3668 // Check if submitting is managed by LPM manager
3669 if (fProductionMode) {
3670 //TString prodfile = fJDLName;
3671 //prodfile.ReplaceAll(".jdl", ".prod");
3672 //WriteProductionFile(prodfile);
3673 Info("StartAnalysis", "Job submitting is managed by LPM. Rerun in terminate mode after jobs finished.");
3676 // Submit AliEn job(s)
3677 gGrid->Cd(fGridOutputDir);
3682 if (!fRunNumbers.Length() && !fRunRange[0]) {
3683 // Submit a given xml or a set of runs
3684 res = gGrid->Command(Form("submit %s", fJDLName.Data()));
3685 printf("*************************** %s\n",Form("submit %s", fJDLName.Data()));
3687 const char *cjobId = res->GetKey(0,"jobId");
3691 Error("StartAnalysis", "Your JDL %s could not be submitted", fJDLName.Data());
3694 Info("StartAnalysis", "\n_______________________________________________________________________ \
3695 \n##### Your JDL %s was successfully submitted. \nTHE JOB ID IS: %s \
3696 \n_______________________________________________________________________",
3697 fJDLName.Data(), cjobId);
3700 if (!fGridJobIDs.IsNull()) fGridJobIDs.Append(" ");
3701 fGridJobIDs.Append(jobID);
3702 if (!fGridStages.IsNull()) fGridStages.Append(" ");
3703 fGridStages.Append("full");
3708 Error("StartAnalysis", "No grid result after submission !!! Bailing out...");
3712 // Submit for a range of enumeration of runs.
3713 if (!Submit()) return kFALSE;
3714 jobID = fGridJobIDs;
3718 Info("StartAnalysis", "\n#### STARTING AN ALIEN SHELL FOR YOU. EXIT WHEN YOUR JOB %s HAS FINISHED. #### \
3719 \n You may exit at any time and terminate the job later using the option <terminate> \
3720 \n ##################################################################################", jobID.Data());
3721 gSystem->Exec("aliensh");
3723 Info("StartAnalysis", "\n#### SUBMITTED JOB %s TO ALIEN QUEUE #### \
3724 \n Remember to terminate the job later using the option <terminate> \
3725 \n ##################################################################################", jobID.Data());
3730 //______________________________________________________________________________
3731 const char *AliAnalysisAlien::GetListOfFiles(const char *type)
3733 // Get a comma-separated list of output files of the requested type.
3734 // Type can be (case unsensitive):
3735 // aod - list of aod files (std, extensions and filters)
3736 // out - list of output files connected to containers (but not aod's or extras)
3737 // ext - list of extra files registered to the manager
3738 // ter - list of files produced in terminate
3739 static TString files;
3741 TString stype = type;
3743 TString aodfiles, extra;
3744 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
3746 ::Error("GetListOfFiles", "Cannot call this without analysis manager");
3747 return files.Data();
3749 if (mgr->GetOutputEventHandler()) {
3751 if (mgr->GetOutputEventHandler()->GetFillAOD())
3752 aodfiles = mgr->GetOutputEventHandler()->GetOutputFileName();
3753 TString extraaod = mgr->GetOutputEventHandler()->GetExtraOutputs(kTRUE);
3754 if (!extraaod.IsNull() && mgr->GetOutputEventHandler()->GetFillExtension()) {
3755 if (!aodfiles.IsNull()) aodfiles += ",";
3756 aodfiles += extraaod;
3759 if (stype.Contains("aod")) {
3761 if (stype == "aod") return files.Data();
3763 // Add output files that are not in the list of AOD files
3764 TString outputfiles = "";
3765 TIter next(mgr->GetOutputs());
3766 AliAnalysisDataContainer *output;
3767 const char *filename = 0;
3768 while ((output=(AliAnalysisDataContainer*)next())) {
3769 filename = output->GetFileName();
3770 if (!(strcmp(filename, "default"))) continue;
3771 if (outputfiles.Contains(filename)) continue;
3772 if (aodfiles.Contains(filename)) continue;
3773 if (!outputfiles.IsNull() && strlen(filename)) outputfiles += ",";
3774 outputfiles += filename;
3776 if (stype.Contains("out")) {
3777 if (!files.IsNull() && !outputfiles.IsNull()) files += ",";
3778 files += outputfiles;
3779 if (stype == "out") return files.Data();
3781 // Add extra files registered to the analysis manager
3783 extra = mgr->GetExtraFiles();
3784 if (!extra.IsNull()) {
3786 extra.ReplaceAll(" ", ",");
3787 TObjArray *fextra = extra.Tokenize(",");
3788 TIter nextx(fextra);
3790 while ((obj=nextx())) {
3791 if (aodfiles.Contains(obj->GetName())) continue;
3792 if (outputfiles.Contains(obj->GetName())) continue;
3793 if (sextra.Contains(obj->GetName())) continue;
3794 if (!sextra.IsNull()) sextra += ",";
3795 sextra += obj->GetName();
3798 if (stype.Contains("ext")) {
3799 if (!files.IsNull() && !sextra.IsNull()) files += ",";
3803 if (stype == "ext") return files.Data();
3805 if (!fTerminateFiles.IsNull()) {
3806 fTerminateFiles.Strip();
3807 fTerminateFiles.ReplaceAll(" ",",");
3808 TObjArray *fextra = fTerminateFiles.Tokenize(",");
3809 TIter nextx(fextra);
3811 while ((obj=nextx())) {
3812 if (aodfiles.Contains(obj->GetName())) continue;
3813 if (outputfiles.Contains(obj->GetName())) continue;
3814 if (termfiles.Contains(obj->GetName())) continue;
3815 if (sextra.Contains(obj->GetName())) continue;
3816 if (!termfiles.IsNull()) termfiles += ",";
3817 termfiles += obj->GetName();
3821 if (stype.Contains("ter")) {
3822 if (!files.IsNull() && !termfiles.IsNull()) {
3827 return files.Data();
3830 //______________________________________________________________________________
3831 Bool_t AliAnalysisAlien::Submit()
3833 // Submit all master jobs.
3834 Int_t nmasterjobs = fInputFiles->GetEntries();
3835 Long_t tshoot = gSystem->Now();
3836 if (!fNsubmitted && !SubmitNext()) return kFALSE;
3837 while (fNsubmitted < nmasterjobs) {
3838 Long_t now = gSystem->Now();
3839 if ((now-tshoot)>30000) {
3841 if (!SubmitNext()) return kFALSE;
3847 //______________________________________________________________________________
3848 Bool_t AliAnalysisAlien::SubmitMerging()
3850 // Submit all merging jobs.
3851 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
3852 gGrid->Cd(fGridOutputDir);
3853 TString mergeJDLName = fExecutable;
3854 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
3856 Error("SubmitMerging", "You have to use explicit run numbers or run range to merge via JDL!");
3859 Int_t ntosubmit = fInputFiles->GetEntries();
3860 for (Int_t i=0; i<ntosubmit; i++) {
3861 TString runOutDir = gSystem->BaseName(fInputFiles->At(i)->GetName());
3862 runOutDir.ReplaceAll(".xml", "");
3863 if (fOutputToRunNo) {
3864 // The output directory is the run number
3865 printf("### Submitting merging job for run <%s>\n", runOutDir.Data());
3866 runOutDir = Form("%s/%s", fGridOutputDir.Data(), runOutDir.Data());
3868 if (!fRunNumbers.Length() && !fRunRange[0]) {
3869 // The output directory is the grid outdir
3870 printf("### Submitting merging job for the full output directory %s.\n", fGridOutputDir.Data());
3871 runOutDir = fGridOutputDir;
3873 // The output directory is the master number in 3 digits format
3874 printf("### Submitting merging job for master <%03d>\n", i);
3875 runOutDir = Form("%s/%03d",fGridOutputDir.Data(), i);
3878 // Check now the number of merging stages.
3879 TObjArray *list = fOutputFiles.Tokenize(",");
3883 while((str=(TObjString*)next())) {
3884 outputFile = str->GetString();
3885 Int_t index = outputFile.Index("@");
3886 if (index > 0) outputFile.Remove(index);
3887 if (!fMergeExcludes.Contains(outputFile) &&
3888 !fRegisterExcludes.Contains(outputFile)) break;
3891 Bool_t done = CheckMergedFiles(outputFile, runOutDir, fMaxMergeFiles, mergeJDLName);
3892 if (!done && (i==ntosubmit-1)) return kFALSE;
3893 if (!fRunNumbers.Length() && !fRunRange[0]) break;
3895 if (!ntosubmit) return kTRUE;
3897 Info("StartAnalysis", "\n #### STARTING AN ALIEN SHELL FOR YOU. You can exit any time or inspect your jobs in a different shell.##########\
3898 \n Make sure your jobs are in a final state (you can resubmit failed ones via 'masterjob <id> resubmit ERROR_ALL')\
3899 \n Rerun in 'terminate' mode to submit all merging stages, each AFTER the previous one completed. The final merged \
3900 \n output will be written to your alien output directory, while separate stages in <Stage_n>. \
3901 \n ################################################################################################################");
3902 gSystem->Exec("aliensh");
3904 Info("StartAnalysis", "\n #### STARTED MERGING JOBS FOR YOU #### \
3905 \n Make sure your jobs are in a final state (you can resubmit failed ones via 'masterjob <id> resubmit ERROR_ALL') \
3906 \n Rerun in 'terminate' mode to submit all merging stages, each AFTER the previous one completed. The final merged \
3907 \n output will be written to your alien output directory, while separate stages in <Stage_n>. \
3908 \n ################################################################################################################");
3913 //______________________________________________________________________________
3914 Bool_t AliAnalysisAlien::SubmitNext()
3916 // Submit next bunch of master jobs if the queue is free. The first master job is
3917 // submitted right away, while the next will not be unless the previous was split.
3918 // The plugin will not submit new master jobs if there are more that 500 jobs in
3920 static Bool_t iscalled = kFALSE;
3921 static Int_t firstmaster = 0;
3922 static Int_t lastmaster = 0;
3923 static Int_t npermaster = 0;
3924 if (iscalled) return kTRUE;
3926 Int_t nrunning=0, nwaiting=0, nerror=0, ndone=0;
3927 Int_t ntosubmit = 0;
3930 Int_t nmasterjobs = fInputFiles->GetEntries();
3933 if (!IsUseSubmitPolicy()) {
3935 Info("SubmitNext","### Warning submit policy not used ! Submitting too many jobs at a time may be prohibitted. \
3936 \n### You can use SetUseSubmitPolicy() to enable if you have problems.");
3937 ntosubmit = nmasterjobs;
3940 TString status = GetJobStatus(firstmaster, lastmaster, nrunning, nwaiting, nerror, ndone);
3941 printf("=== master %d: %s\n", lastmaster, status.Data());
3942 // If last master not split, just return
3943 if (status != "SPLIT") {iscalled = kFALSE; return kTRUE;}
3944 // No more than 100 waiting jobs
3945 if (nwaiting>500) {iscalled = kFALSE; return kTRUE;}
3946 npermaster = (nrunning+nwaiting+nerror+ndone)/fNsubmitted;
3947 if (npermaster) ntosubmit = (500-nwaiting)/npermaster;
3948 if (!ntosubmit) ntosubmit = 1;
3949 printf("=== WAITING(%d) RUNNING(%d) DONE(%d) OTHER(%d) NperMaster=%d => to submit %d jobs\n",
3950 nwaiting, nrunning, ndone, nerror, npermaster, ntosubmit);
3952 for (Int_t i=0; i<ntosubmit; i++) {
3953 // Submit for a range of enumeration of runs.
3954 if (fNsubmitted>=nmasterjobs) {iscalled = kFALSE; return kTRUE;}
3956 TString runOutDir = gSystem->BaseName(fInputFiles->At(fNsubmitted)->GetName());
3957 runOutDir.ReplaceAll(".xml", "");
3959 query = Form("submit %s %s %s", fJDLName.Data(), fInputFiles->At(fNsubmitted)->GetName(), runOutDir.Data());
3961 query = Form("submit %s %s %03d", fJDLName.Data(), fInputFiles->At(fNsubmitted)->GetName(), fNsubmitted);
3962 printf("********* %s\n",query.Data());
3963 res = gGrid->Command(query);
3965 TString cjobId1 = res->GetKey(0,"jobId");
3966 if (!cjobId1.Length()) {
3970 Error("StartAnalysis", "Your JDL %s could not be submitted. The message was:", fJDLName.Data());
3973 Info("StartAnalysis", "\n_______________________________________________________________________ \
3974 \n##### Your JDL %s submitted (%d to go). \nTHE JOB ID IS: %s \
3975 \n_______________________________________________________________________",
3976 fJDLName.Data(), nmasterjobs-fNsubmitted-1, cjobId1.Data());
3977 if (!fGridJobIDs.IsNull()) fGridJobIDs.Append(" ");
3978 fGridJobIDs.Append(cjobId1);
3979 if (!fGridStages.IsNull()) fGridStages.Append(" ");
3980 fGridStages.Append("full");
3983 lastmaster = cjobId1.Atoi();
3984 if (!firstmaster) firstmaster = lastmaster;
3989 Error("StartAnalysis", "No grid result after submission !!! Bailing out...");
3997 //______________________________________________________________________________
3998 void AliAnalysisAlien::WriteAnalysisFile()
4000 // Write current analysis manager into the file <analysisFile>
4001 TString analysisFile = fExecutable;
4002 analysisFile.ReplaceAll(".sh", ".root");
4003 if (!TestBit(AliAnalysisGrid::kSubmit)) {
4004 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
4005 if (!mgr || !mgr->IsInitialized()) {
4006 Error("WriteAnalysisFile", "You need an initialized analysis manager for this");
4009 // Check analysis type
4011 if (mgr->GetMCtruthEventHandler()) TObject::SetBit(AliAnalysisGrid::kUseMC);
4012 handler = (TObject*)mgr->GetInputEventHandler();
4014 if (handler->InheritsFrom("AliMultiInputEventHandler")) {
4015 AliMultiInputEventHandler *multiIH = (AliMultiInputEventHandler*)handler;
4016 if (multiIH->GetFirstInputEventHandler()->InheritsFrom("AliESDInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseESD);
4017 if (multiIH->GetFirstInputEventHandler()->InheritsFrom("AliAODInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseAOD);
4019 if (handler->InheritsFrom("AliESDInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseESD);
4020 if (handler->InheritsFrom("AliAODInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseAOD);
4023 TDirectory *cdir = gDirectory;
4024 TFile *file = TFile::Open(analysisFile, "RECREATE");
4026 // Skip task Terminate calls for the grid job (but not in test mode, where we want to check also the terminate mode
4027 if (!TestBit(AliAnalysisGrid::kTest)) mgr->SetSkipTerminate(kTRUE);
4028 // Unless merging makes no sense
4029 if (IsSingleOutput()) mgr->SetSkipTerminate(kFALSE);
4032 // Enable termination for local jobs
4033 mgr->SetSkipTerminate(kFALSE);
4035 if (cdir) cdir->cd();
4036 Info("WriteAnalysisFile", "\n##### Analysis manager: %s wrote to file <%s>\n", mgr->GetName(),analysisFile.Data());
4038 Bool_t copy = kTRUE;
4039 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
4042 TString workdir = gGrid->GetHomeDirectory();
4043 workdir += fGridWorkingDir;
4044 Info("WriteAnalysisFile", "\n##### Copying file <%s> containing your initialized analysis manager to your alien workspace", analysisFile.Data());
4045 if (FileExists(analysisFile)) gGrid->Rm(analysisFile);
4046 if (!copyLocal2Alien("WriteAnalysisFile",analysisFile.Data(),
4047 Form("%s/%s", workdir.Data(),analysisFile.Data()))) Fatal("","Terminating");
4051 //______________________________________________________________________________
4052 void AliAnalysisAlien::WriteAnalysisMacro()
4054 // Write the analysis macro that will steer the analysis in grid mode.
4055 if (!TestBit(AliAnalysisGrid::kSubmit)) {
4057 out.open(fAnalysisMacro.Data(), ios::out);
4059 Error("WriteAnalysisMacro", "could not open file %s for writing", fAnalysisMacro.Data());
4062 Bool_t hasSTEERBase = kFALSE;
4063 Bool_t hasESD = kFALSE;
4064 Bool_t hasAOD = kFALSE;
4065 Bool_t hasANALYSIS = kFALSE;
4066 Bool_t hasOADB = kFALSE;
4067 Bool_t hasANALYSISalice = kFALSE;
4068 Bool_t hasCORRFW = kFALSE;
4069 TString func = fAnalysisMacro;
4070 TString type = "ESD";
4071 TString comment = "// Analysis using ";
4076 if (IsUseMCchain()) {
4080 if (TObject::TestBit(AliAnalysisGrid::kUseESD)) comment += "ESD";
4081 if (TObject::TestBit(AliAnalysisGrid::kUseAOD)) {
4087 if (type!="AOD" && fFriendChainName!="") {
4088 Error("WriteAnalysisMacro", "Friend chain can be attached only to AOD");
4091 if (TObject::TestBit(AliAnalysisGrid::kUseMC)) comment += "/MC";
4092 else comment += " data";
4093 out << "const char *anatype = \"" << type.Data() << "\";" << endl << endl;
4094 func.ReplaceAll(".C", "");
4095 out << "void " << func.Data() << "()" << endl;
4097 out << comment.Data() << endl;
4098 out << "// Automatically generated analysis steering macro executed in grid subjobs" << endl << endl;
4099 out << " TStopwatch timer;" << endl;
4100 out << " timer.Start();" << endl << endl;
4101 // Change temp directory to current one
4102 if (!IsLocalTest()) {
4103 out << "// connect to AliEn and make the chain" << endl;
4104 out << " if (!TGrid::Connect(\"alien://\")) return;" << endl;
4106 out << "// Set temporary merging directory to current one" << endl;
4107 out << " gSystem->Setenv(\"TMPDIR\", gSystem->pwd());" << endl << endl;
4108 out << "// Set temporary compilation directory to current one" << endl;
4109 out << " gSystem->SetBuildDir(gSystem->pwd(), kTRUE);" << endl << endl;
4110 // Reset existing include path
4111 out << "// Reset existing include path and add current directory first in the search" << endl;
4112 out << " gSystem->SetIncludePath(\"-I.\");" << endl;
4113 if (!fExecutableCommand.Contains("aliroot")) {
4114 out << "// load base root libraries" << endl;
4115 out << " gSystem->Load(\"libTree\");" << endl;
4116 out << " gSystem->Load(\"libGeom\");" << endl;
4117 out << " gSystem->Load(\"libVMC\");" << endl;
4118 out << " gSystem->Load(\"libPhysics\");" << endl << endl;
4119 out << " gSystem->Load(\"libMinuit\");" << endl << endl;
4121 if (fAdditionalRootLibs.Length()) {
4122 // in principle libtree /lib geom libvmc etc. can go into this list, too
4123 out << "// Add aditional libraries" << endl;
4124 TObjArray *list = fAdditionalRootLibs.Tokenize(" ");
4128 while((str=(TObjString*)next())) {
4129 buf = str->GetString();
4130 if (buf.Contains(".so") || buf.Contains(".dylib")) {
4131 buf.ReplaceAll(".so", "");
4132 buf.ReplaceAll(".dylib", "");
4133 out << " gSystem->Load(\"" << buf.Data() << "\");" << endl;
4136 if (list) delete list;
4138 out << "// Load analysis framework libraries" << endl;
4139 TString setupPar = "AliAnalysisAlien::SetupPar";
4141 if (!fExecutableCommand.Contains("aliroot")) {
4142 out << " gSystem->Load(\"libSTEERBase\");" << endl;
4143 out << " gSystem->Load(\"libESD\");" << endl;
4144 out << " gSystem->Load(\"libAOD\");" << endl;
4146 out << " gSystem->Load(\"libANALYSIS\");" << endl;
4147 out << " gSystem->Load(\"libANALYSISalice\");" << endl;
4148 out << " gSystem->Load(\"libOADB\");" << endl;
4149 out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
4151 TIter next(fPackages);
4154 while ((obj=next())) {
4155 pkgname = obj->GetName();
4156 if (pkgname == "STEERBase" ||
4157 pkgname == "STEERBase.par") hasSTEERBase = kTRUE;
4158 if (pkgname == "ESD" ||
4159 pkgname == "ESD.par") hasESD = kTRUE;
4160 if (pkgname == "AOD" ||
4161 pkgname == "AOD.par") hasAOD = kTRUE;
4162 if (pkgname == "ANALYSIS" ||
4163 pkgname == "ANALYSIS.par") hasANALYSIS = kTRUE;
4164 if (pkgname == "OADB" ||
4165 pkgname == "OADB.par") hasOADB = kTRUE;
4166 if (pkgname == "ANALYSISalice" ||
4167 pkgname == "ANALYSISalice.par") hasANALYSISalice = kTRUE;
4168 if (pkgname == "CORRFW" ||
4169 pkgname == "CORRFW.par") hasCORRFW = kTRUE;
4171 if (hasANALYSISalice) setupPar = "SetupPar";
4172 if (!hasSTEERBase) out << " gSystem->Load(\"libSTEERBase\");" << endl;
4173 else out << " if (!" << setupPar << "(\"STEERBase\")) return;" << endl;
4174 if (!hasESD) out << " gSystem->Load(\"libESD\");" << endl;
4175 else out << " if (!" << setupPar << "(\"ESD\")) return;" << endl;
4176 if (!hasAOD) out << " gSystem->Load(\"libAOD\");" << endl;
4177 else out << " if (!" << setupPar << "(\"AOD\")) return;" << endl;
4178 if (!hasANALYSIS) out << " gSystem->Load(\"libANALYSIS\");" << endl;
4179 else out << " if (!" << setupPar << "(\"ANALYSIS\")) return;" << endl;
4180 if (!hasANALYSISalice) out << " gSystem->Load(\"libANALYSISalice\");" << endl;
4181 else out << " if (!" << setupPar << "(\"ANALYSISalice\")) return;" << endl;
4182 if (!hasOADB) out << " gSystem->Load(\"libOADB\");" << endl;
4183 else out << " if (!" << setupPar << "(\"OADB\")) return;" << endl;
4184 if (!hasCORRFW) out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
4185 else out << " if (!" << setupPar << "(\"CORRFW\")) return;" << endl << endl;
4186 out << "// Compile other par packages" << endl;
4188 while ((obj=next())) {
4189 pkgname = obj->GetName();
4190 if (pkgname == "STEERBase" ||
4191 pkgname == "STEERBase.par" ||
4193 pkgname == "ESD.par" ||
4195 pkgname == "AOD.par" ||
4196 pkgname == "ANALYSIS" ||
4197 pkgname == "ANALYSIS.par" ||
4198 pkgname == "OADB" ||
4199 pkgname == "OADB.par" ||
4200 pkgname == "ANALYSISalice" ||
4201 pkgname == "ANALYSISalice.par" ||
4202 pkgname == "CORRFW" ||
4203 pkgname == "CORRFW.par") continue;
4204 out << " if (!" << setupPar << "(\"" << obj->GetName() << "\")) return;" << endl;
4207 out << "// include path" << endl;
4208 // Get the include path from the interpreter and remove entries pointing to AliRoot
4209 out << " TString intPath = gInterpreter->GetIncludePath();" << endl;
4210 out << " TObjArray *listpaths = intPath.Tokenize(\" \");" << endl;
4211 out << " TIter nextpath(listpaths);" << endl;
4212 out << " TObjString *pname;" << endl;
4213 out << " while ((pname=(TObjString*)nextpath())) {" << endl;
4214 out << " TString current = pname->GetName();" << endl;
4215 out << " if (current.Contains(\"AliRoot\") || current.Contains(\"ALICE_ROOT\")) continue;" << endl;
4216 out << " gSystem->AddIncludePath(current);" << endl;
4217 out << " }" << endl;
4218 out << " if (listpaths) delete listpaths;" << endl;
4219 if (fIncludePath.Length()) out << " gSystem->AddIncludePath(\"" << fIncludePath.Data() << "\");" << endl;
4220 out << " gROOT->ProcessLine(\".include $ALICE_ROOT/include\");" << endl;
4221 out << " printf(\"Include path: %s\\n\", gSystem->GetIncludePath());" << endl << endl;
4222 if (fMCLoop && !fGeneratorLibs.IsNull()) {
4223 out << "// MC generator libraries" << endl;
4224 TObjArray *list = fGeneratorLibs.Tokenize(" ");
4227 while((str=(TObjString*)next())) {
4228 out << " gSystem->Load(\"" << str->GetName() << "\");" << endl;
4232 if (fAdditionalLibs.Length()) {
4233 out << "// Add aditional AliRoot libraries" << endl;
4234 TString additionalLibs = fAdditionalLibs;
4235 additionalLibs.Strip();
4236 if (additionalLibs.Length() && fFriendLibs.Length())
4237 additionalLibs += " ";
4238 additionalLibs += fFriendLibs;
4239 TObjArray *list = additionalLibs.Tokenize(" ");
4243 while((str=(TObjString*)next())) {
4244 buf = str->GetString();
4245 if (buf.Contains(".so") || buf.Contains(".dylib")) {
4246 buf.ReplaceAll(".so", "");
4247 buf.ReplaceAll(".dylib", "");
4248 out << " gSystem->Load(\"" << buf.Data() << "\");" << endl;
4250 if (buf.Contains(".par"))
4251 out << " if (!" << setupPar << "(\"" << buf.Data() << "\")) return;" << endl;
4256 out << "// analysis source to be compiled at runtime (if any)" << endl;
4257 if (fAnalysisSource.Length()) {
4258 TObjArray *list = fAnalysisSource.Tokenize(" ");
4261 while((str=(TObjString*)next())) {
4262 out << " gROOT->ProcessLine(\".L " << str->GetString().Data() << "+g\");" << endl;
4264 if (list) delete list;
4267 // out << " printf(\"Currently load libraries:\\n\");" << endl;
4268 // out << " printf(\"%s\\n\", gSystem->GetLibraries());" << endl;
4269 if (fFastReadOption) {
4270 Warning("WriteAnalysisMacro", "!!! You requested FastRead option. Using xrootd flags to reduce timeouts in the grid jobs. This may skip some files that could be accessed !!! \
4271 \n+++ NOTE: To disable this option, use: plugin->SetFastReadOption(kFALSE)");
4272 out << "// fast xrootd reading enabled" << endl;
4273 out << " printf(\"!!! You requested FastRead option. Using xrootd flags to reduce timeouts. Note that this may skip some files that could be accessed !!!\");" << endl;
4274 out << " gEnv->SetValue(\"XNet.ConnectTimeout\",50);" << endl;
4275 out << " gEnv->SetValue(\"XNet.RequestTimeout\",50);" << endl;
4276 out << " gEnv->SetValue(\"XNet.MaxRedirectCount\",2);" << endl;
4277 out << " gEnv->SetValue(\"XNet.ReconnectTimeout\",50);" << endl;
4278 out << " gEnv->SetValue(\"XNet.FirstConnectMaxCnt\",1);" << endl << endl;
4280 out << "// read the analysis manager from file" << endl;
4281 TString analysisFile = fExecutable;
4282 analysisFile.ReplaceAll(".sh", ".root");
4283 out << " AliAnalysisManager *mgr = AliAnalysisAlien::LoadAnalysisManager(\""
4284 << analysisFile << "\");" << endl;
4285 out << " if (!mgr) return;" << endl;
4286 if (IsLocalTest()) {
4287 out << " AliAnalysisAlien *plugin = new AliAnalysisAlien();" << endl;
4288 out << " plugin->SetRunMode(\"test\");" << endl;
4289 if (fFileForTestMode.IsNull())
4290 out << " plugin->SetFileForTestMode(\"data.txt\");" << endl;
4292 out << " plugin->SetFileForTestMode(\"" << fFileForTestMode << "\");" << endl;
4293 out << " plugin->SetNtestFiles(" << fNtestFiles << ");" << endl;
4294 if (!fFriendChainName.IsNull())
4295 out << " plugin->SetFriendChainName(\"" << fFriendChainName << "\",\"" << fFriendLibs << "\");" << endl;
4297 out << " plugin->SetUseMCchain();" << endl;
4299 out << " plugin->SetMCLoop(kTRUE);" << endl;
4300 out << " mgr->SetGridHandler(plugin);" << endl;
4301 if (AliAnalysisManager::GetAnalysisManager()) {
4302 out << " mgr->SetDebugLevel(" << AliAnalysisManager::GetAnalysisManager()->GetDebugLevel() << ");" << endl;
4303 out << " mgr->SetNSysInfo(" << AliAnalysisManager::GetAnalysisManager()->GetNsysInfo() << ");" << endl;
4305 out << " mgr->SetDebugLevel(10);" << endl;
4306 out << " mgr->SetNSysInfo(100);" << endl;
4309 out << " mgr->PrintStatus();" << endl;
4310 if (AliAnalysisManager::GetAnalysisManager()) {
4311 if (AliAnalysisManager::GetAnalysisManager()->GetDebugLevel()>3) {
4312 out << " gEnv->SetValue(\"XNet.Debug\", \"1\");" << endl;
4314 if (TestBit(AliAnalysisGrid::kTest))
4315 out << " AliLog::SetGlobalLogLevel(AliLog::kWarning);" << endl;
4317 out << " AliLog::SetGlobalLogLevel(AliLog::kError);" << endl;
4320 if (!IsLocalTest()) {
4322 out << " mgr->SetCacheSize(0);" << endl;
4323 out << " mgr->EventLoop(" << fNMCevents << ");" << endl;
4325 out << " TChain *chain = CreateChain(\"wn.xml\", anatype);" << endl << endl;
4326 out << " mgr->StartAnalysis(\"localfile\", chain);" << endl;
4330 out << " mgr->SetCacheSize(0);" << endl;
4331 out << " mgr->EventLoop(" << fNMCevents << ");" << endl;
4333 out << " mgr->StartAnalysis(\"localfile\");" << endl;
4336 out << " timer.Stop();" << endl;
4337 out << " timer.Print();" << endl;
4338 out << "}" << endl << endl;
4339 if (!IsLocalTest() && !fMCLoop) {
4340 out <<"//________________________________________________________________________________" << endl;
4341 out << "TChain* CreateChain(const char *xmlfile, const char *type=\"ESD\")" << endl;
4343 out << "// Create a chain using url's from xml file" << endl;
4344 out << " TString filename;" << endl;
4345 out << " Int_t run = 0;" << endl;
4346 if (IsUseMCchain()) {
4347 out << " TString treename = \"TE\";" << endl;
4349 if (!fTreeName.IsNull()) {
4350 out << " TString treename = \"" << fTreeName << "\";" << endl;
4352 out << " TString treename = type;" << endl;
4353 out << " treename.ToLower();" << endl;
4354 out << " treename += \"Tree\";" << endl;
4357 out << " printf(\"***************************************\\n\");" << endl;
4358 out << " printf(\" Getting chain of trees %s\\n\", treename.Data());" << endl;
4359 out << " printf(\"***************************************\\n\");" << endl;
4360 out << " TAlienCollection *coll = TAlienCollection::Open(xmlfile);" << endl;
4361 out << " if (!coll) {" << endl;
4362 out << " ::Error(\"CreateChain\", \"Cannot create an AliEn collection from %s\", xmlfile);" << endl;
4363 out << " return NULL;" << endl;
4364 out << " }" << endl;
4365 out << " AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();" << endl;
4366 out << " TChain *chain = new TChain(treename);" << endl;
4367 if(!fFriendChainName.IsNull()) {
4368 out << " TList *friends = new TList();" << endl;
4369 out << " TIter nextfriend(friends);" << endl;
4370 out << " TChain *cfriend = 0;" << endl;
4371 TObjArray *list = fFriendChainName.Tokenize(" ");
4374 while((str=(TObjString*)next())) {
4375 out << " cfriend = new TChain(treename, \"" << str->GetName() << "\");" << endl;
4376 out << " friends->Add(cfriend);" << endl;
4377 out << " chain->AddFriend(cfriend);" << endl;
4380 // out << " TChain *chainFriend = new TChain(treename);" << endl;
4382 out << " coll->Reset();" << endl;
4383 out << " while (coll->Next()) {" << endl;
4384 out << " filename = coll->GetTURL("");" << endl;
4385 out << " if (mgr) {" << endl;
4386 out << " Int_t nrun = AliAnalysisManager::GetRunFromAlienPath(filename);" << endl;
4387 out << " if (nrun && nrun != run) {" << endl;
4388 out << " printf(\"### Run number detected from chain: %d\\n\", nrun);" << endl;
4389 out << " mgr->SetRunFromPath(nrun);" << endl;
4390 out << " run = nrun;" << endl;
4391 out << " }" << endl;
4392 out << " }" << endl;
4393 out << " chain->Add(filename);" << endl;
4394 if(!fFriendChainName.IsNull()) {
4395 out << " TString bpath=coll->GetTURL(\"\");" << endl;
4396 out << " if (bpath.Index(\"#\") > -1) bpath.Remove(bpath.Index(\"#\"));" << endl;
4397 out << " bpath = gSystem->DirName(bpath);" << endl;
4398 out << " bpath += \"/\";" << endl;
4399 out << " TString fileFriend;" << endl;
4400 out << " nextfriend.Reset();" << endl;
4401 out << " while ((cfriend=(TChain*)nextfriend())) {" << endl;
4402 out << " fileFriend = bpath;" << endl;
4403 out << " fileFriend += cfriend->GetTitle();" << endl;
4404 out << " TFile *file = TFile::Open(fileFriend);" << endl;
4405 out << " if (file) {" << endl;
4406 out << " file->Close();" << endl;
4407 out << " cfriend->Add(fileFriend.Data());" << endl;
4408 out << " } else {" << endl;
4409 out << " ::Fatal(\"CreateChain\", \"Cannot open friend file: %s\", fileFriend.Data());" << endl;
4410 out << " return 0;" << endl;
4411 out << " }" << endl;
4412 out << " }" << endl;
4414 out << " }" << endl;
4415 out << " if (!chain->GetNtrees()) {" << endl;
4416 out << " ::Error(\"CreateChain\", \"No tree found from collection %s\", xmlfile);" << endl;
4417 out << " return NULL;" << endl;
4418 out << " }" << endl;
4419 out << " return chain;" << endl;
4420 out << "}" << endl << endl;
4422 if (hasANALYSISalice) {
4423 out <<"//________________________________________________________________________________" << endl;
4424 out << "Bool_t SetupPar(const char *package) {" << endl;
4425 out << "// Compile the package and set it up." << endl;
4426 out << " TString pkgdir = package;" << endl;
4427 out << " pkgdir.ReplaceAll(\".par\",\"\");" << endl;
4428 out << " gSystem->Exec(TString::Format(\"tar xvzf %s.par\", pkgdir.Data()));" << endl;
4429 out << " TString cdir = gSystem->WorkingDirectory();" << endl;
4430 out << " gSystem->ChangeDirectory(pkgdir);" << endl;
4431 out << " // Check for BUILD.sh and execute" << endl;
4432 out << " if (!gSystem->AccessPathName(\"PROOF-INF/BUILD.sh\")) {" << endl;
4433 out << " printf(\"*******************************\\n\");" << endl;
4434 out << " printf(\"*** Building PAR archive ***\\n\");" << endl;
4435 out << " printf(\"*******************************\\n\");" << endl;
4436 out << " if (gSystem->Exec(\"PROOF-INF/BUILD.sh\")) {" << endl;
4437 out << " ::Error(\"SetupPar\", \"Cannot build par archive %s\", pkgdir.Data());" << endl;
4438 out << " gSystem->ChangeDirectory(cdir);" << endl;
4439 out << " return kFALSE;" << endl;
4440 out << " }" << endl;
4441 out << " } else {" << endl;
4442 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/BUILD.sh for package %s\", pkgdir.Data());" << endl;
4443 out << " gSystem->ChangeDirectory(cdir);" << endl;
4444 out << " return kFALSE;" << endl;
4445 out << " }" << endl;
4446 out << " // Check for SETUP.C and execute" << endl;
4447 out << " if (!gSystem->AccessPathName(\"PROOF-INF/SETUP.C\")) {" << endl;
4448 out << " printf(\"*******************************\\n\");" << endl;
4449 out << " printf(\"*** Setup PAR archive ***\\n\");" << endl;
4450 out << " printf(\"*******************************\\n\");" << endl;
4451 out << " gROOT->Macro(\"PROOF-INF/SETUP.C\");" << endl;
4452 out << " } else {" << endl;
4453 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/SETUP.C for package %s\", pkgdir.Data());" << endl;
4454 out << " gSystem->ChangeDirectory(cdir);" << endl;
4455 out << " return kFALSE;" << endl;
4456 out << " }" << endl;
4457 out << " // Restore original workdir" << endl;
4458 out << " gSystem->ChangeDirectory(cdir);" << endl;
4459 out << " return kTRUE;" << endl;
4462 Info("WriteAnalysisMacro", "\n##### Analysis macro to run on worker nodes <%s> written",fAnalysisMacro.Data());
4464 Bool_t copy = kTRUE;
4465 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
4468 TString workdir = gGrid->GetHomeDirectory();
4469 workdir += fGridWorkingDir;
4470 if (FileExists(fAnalysisMacro)) gGrid->Rm(fAnalysisMacro);
4471 Info("WriteAnalysisMacro", "\n##### Copying analysis macro: <%s> to your alien workspace", fAnalysisMacro.Data());
4472 // TFile::Cp(Form("file:%s",fAnalysisMacro.Data()), Form("alien://%s/%s", workdir.Data(), fAnalysisMacro.Data()));
4473 if (!copyLocal2Alien("WriteAnalysisMacro",fAnalysisMacro.Data(),
4474 Form("alien://%s/%s", workdir.Data(),
4475 fAnalysisMacro.Data()))) Fatal("","Terminating");
4479 //______________________________________________________________________________
4480 void AliAnalysisAlien::WriteMergingMacro()
4482 // Write a macro to merge the outputs per master job.
4483 if (!fMergeViaJDL) return;
4484 if (!fOutputFiles.Length()) {
4485 Error("WriteMergingMacro", "No output file names defined. Are you running the right AliAnalysisAlien configuration ?");
4488 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
4489 TString mergingMacro = fExecutable;
4490 mergingMacro.ReplaceAll(".sh","_merge.C");
4491 if (gGrid && !fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
4492 if (!TestBit(AliAnalysisGrid::kSubmit)) {
4494 out.open(mergingMacro.Data(), ios::out);
4496 Error("WriteMergingMacro", "could not open file %s for writing", fAnalysisMacro.Data());
4499 Bool_t hasSTEERBase = kFALSE;
4500 Bool_t hasESD = kFALSE;
4501 Bool_t hasAOD = kFALSE;
4502 Bool_t hasANALYSIS = kFALSE;
4503 Bool_t hasOADB = kFALSE;
4504 Bool_t hasANALYSISalice = kFALSE;
4505 Bool_t hasCORRFW = kFALSE;
4506 TString func = mergingMacro;
4508 func.ReplaceAll(".C", "");
4509 out << "void " << func.Data() << "(const char *dir, Int_t stage=0)" << endl;
4511 out << "// Automatically generated merging macro executed in grid subjobs" << endl << endl;
4512 out << " TStopwatch timer;" << endl;
4513 out << " timer.Start();" << endl << endl;
4514 // Reset existing include path
4515 out << "// Reset existing include path and add current directory first in the search" << endl;
4516 out << " gSystem->SetIncludePath(\"-I.\");" << endl;
4517 if (!fExecutableCommand.Contains("aliroot")) {
4518 out << "// load base root libraries" << endl;
4519 out << " gSystem->Load(\"libTree\");" << endl;
4520 out << " gSystem->Load(\"libGeom\");" << endl;
4521 out << " gSystem->Load(\"libVMC\");" << endl;
4522 out << " gSystem->Load(\"libPhysics\");" << endl << endl;
4523 out << " gSystem->Load(\"libMinuit\");" << endl << endl;
4525 if (fAdditionalRootLibs.Length()) {
4526 // in principle libtree /lib geom libvmc etc. can go into this list, too
4527 out << "// Add aditional libraries" << endl;
4528 TObjArray *list = fAdditionalRootLibs.Tokenize(" ");
4532 while((str=(TObjString*)next())) {
4533 buf = str->GetString();
4534 if (buf.Contains(".so") || buf.Contains(".dylib")) {
4535 buf.ReplaceAll(".so", "");
4536 buf.ReplaceAll(".dylib", "");
4537 out << " gSystem->Load(\"" << buf.Data() << "\");" << endl;
4540 if (list) delete list;
4542 out << "// Load analysis framework libraries" << endl;
4544 if (!fExecutableCommand.Contains("aliroot")) {
4545 out << " gSystem->Load(\"libSTEERBase\");" << endl;
4546 out << " gSystem->Load(\"libESD\");" << endl;
4547 out << " gSystem->Load(\"libAOD\");" << endl;
4549 out << " gSystem->Load(\"libANALYSIS\");" << endl;
4550 out << " gSystem->Load(\"libANALYSISalice\");" << endl;
4551 out << " gSystem->Load(\"libOADB\");" << endl;
4552 out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
4554 TIter next(fPackages);
4557 TString setupPar = "AliAnalysisAlien::SetupPar";
4558 while ((obj=next())) {
4559 pkgname = obj->GetName();
4560 if (pkgname == "STEERBase" ||
4561 pkgname == "STEERBase.par") hasSTEERBase = kTRUE;
4562 if (pkgname == "ESD" ||
4563 pkgname == "ESD.par") hasESD = kTRUE;
4564 if (pkgname == "AOD" ||
4565 pkgname == "AOD.par") hasAOD = kTRUE;
4566 if (pkgname == "ANALYSIS" ||
4567 pkgname == "ANALYSIS.par") hasANALYSIS = kTRUE;
4568 if (pkgname == "OADB" ||
4569 pkgname == "OADB.par") hasOADB = kTRUE;
4570 if (pkgname == "ANALYSISalice" ||
4571 pkgname == "ANALYSISalice.par") hasANALYSISalice = kTRUE;
4572 if (pkgname == "CORRFW" ||
4573 pkgname == "CORRFW.par") hasCORRFW = kTRUE;
4575 if (hasANALYSISalice) setupPar = "SetupPar";
4576 if (!hasSTEERBase) out << " gSystem->Load(\"libSTEERBase\");" << endl;
4577 else out << " if (!" << setupPar << "(\"STEERBase\")) return;" << endl;
4578 if (!hasESD) out << " gSystem->Load(\"libESD\");" << endl;
4579 else out << " if (!" << setupPar << "(\"ESD\")) return;" << endl;
4580 if (!hasAOD) out << " gSystem->Load(\"libAOD\");" << endl;
4581 else out << " if (!" << setupPar << "(\"AOD\")) return;" << endl;
4582 out << " gSystem->Load(\"libOADB\");" << endl;
4583 if (!hasANALYSIS) out << " gSystem->Load(\"libANALYSIS\");" << endl;
4584 else out << " if (!" << setupPar << "(\"ANALYSIS\")) return;" << endl;
4585 if (!hasANALYSISalice) out << " gSystem->Load(\"libANALYSISalice\");" << endl;
4586 else out << " if (!" << setupPar << "(\"ANALYSISalice\")) return;" << endl;
4587 if (!hasOADB) out << " gSystem->Load(\"libOADB\");" << endl;
4588 else out << " if (!" << setupPar << "(\"OADB\")) return;" << endl;
4589 if (!hasCORRFW) out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
4590 else out << " if (!" << setupPar << "(\"CORRFW\")) return;" << endl << endl;
4591 out << "// Compile other par packages" << endl;
4593 while ((obj=next())) {
4594 pkgname = obj->GetName();
4595 if (pkgname == "STEERBase" ||
4596 pkgname == "STEERBase.par" ||
4598 pkgname == "ESD.par" ||
4600 pkgname == "AOD.par" ||
4601 pkgname == "ANALYSIS" ||
4602 pkgname == "ANALYSIS.par" ||
4603 pkgname == "OADB" ||
4604 pkgname == "OADB.par" ||
4605 pkgname == "ANALYSISalice" ||
4606 pkgname == "ANALYSISalice.par" ||
4607 pkgname == "CORRFW" ||
4608 pkgname == "CORRFW.par") continue;
4609 out << " if (!" << setupPar << "(\"" << obj->GetName() << "\")) return;" << endl;
4612 out << "// include path" << endl;
4613 // Get the include path from the interpreter and remove entries pointing to AliRoot
4614 out << " TString intPath = gInterpreter->GetIncludePath();" << endl;
4615 out << " TObjArray *listpaths = intPath.Tokenize(\" \");" << endl;
4616 out << " TIter nextpath(listpaths);" << endl;
4617 out << " TObjString *pname;" << endl;
4618 out << " while ((pname=(TObjString*)nextpath())) {" << endl;
4619 out << " TString current = pname->GetName();" << endl;
4620 out << " if (current.Contains(\"AliRoot\") || current.Contains(\"ALICE_ROOT\")) continue;" << endl;
4621 out << " gSystem->AddIncludePath(current);" << endl;
4622 out << " }" << endl;
4623 out << " if (listpaths) delete listpaths;" << endl;
4624 if (fIncludePath.Length()) out << " gSystem->AddIncludePath(\"" << fIncludePath.Data() << "\");" << endl;
4625 out << " gROOT->ProcessLine(\".include $ALICE_ROOT/include\");" << endl;
4626 out << " printf(\"Include path: %s\\n\", gSystem->GetIncludePath());" << endl << endl;
4627 if (fMCLoop && !fGeneratorLibs.IsNull()) {
4628 out << "// MC generator libraries" << endl;
4629 TObjArray *list = fGeneratorLibs.Tokenize(" ");
4632 while((str=(TObjString*)next())) {
4633 out << " gSystem->Load(\"" << str->GetName() << "\");" << endl;
4637 if (fAdditionalLibs.Length()) {
4638 out << "// Add aditional AliRoot libraries" << endl;
4639 TString additionalLibs = fAdditionalLibs;
4640 additionalLibs.Strip();
4641 if (additionalLibs.Length() && fFriendLibs.Length())
4642 additionalLibs += " ";
4643 additionalLibs += fFriendLibs;
4644 TObjArray *list = additionalLibs.Tokenize(" ");
4648 while((str=(TObjString*)next())) {
4649 buf = str->GetString();
4650 if (buf.Contains(".so") || buf.Contains(".dylib")) {
4651 buf.ReplaceAll(".so", "");
4652 buf.ReplaceAll(".dylib", "");
4653 out << " gSystem->Load(\"" << buf.Data() << "\");" << endl;
4656 if (list) delete list;
4659 out << "// Analysis source to be compiled at runtime (if any)" << endl;
4660 if (fAnalysisSource.Length()) {
4661 TObjArray *list = fAnalysisSource.Tokenize(" ");
4664 while((str=(TObjString*)next())) {
4665 out << " gROOT->ProcessLine(\".L " << str->GetString().Data() << "+g\");" << endl;
4667 if (list) delete list;
4671 if (fFastReadOption) {
4672 Warning("WriteMergingMacro", "!!! You requested FastRead option. Using xrootd flags to reduce timeouts in the grid merging jobs. Note that this may skip some files that could be accessed !!!");
4673 out << "// fast xrootd reading enabled" << endl;
4674 out << " printf(\"!!! You requested FastRead option. Using xrootd flags to reduce timeouts. Note that this may skip some files that could be accessed !!!\");" << endl;
4675 out << " gEnv->SetValue(\"XNet.ConnectTimeout\",50);" << endl;
4676 out << " gEnv->SetValue(\"XNet.RequestTimeout\",50);" << endl;
4677 out << " gEnv->SetValue(\"XNet.MaxRedirectCount\",2);" << endl;
4678 out << " gEnv->SetValue(\"XNet.ReconnectTimeout\",50);" << endl;
4679 out << " gEnv->SetValue(\"XNet.FirstConnectMaxCnt\",1);" << endl << endl;
4681 // Change temp directory to current one
4682 out << "// Connect to AliEn" << endl;
4683 out << " if (!TGrid::Connect(\"alien://\")) return;" << endl;
4684 out << "// Set temporary merging directory to current one" << endl;
4685 out << " gSystem->Setenv(\"TMPDIR\", gSystem->pwd());" << endl << endl;
4686 out << "// Set temporary compilation directory to current one" << endl;
4687 out << " gSystem->SetBuildDir(gSystem->pwd(), kTRUE);" << endl << endl;
4688 out << " TString outputDir = dir;" << endl;
4690 out << " TString outputFiles = \"" << GetListOfFiles("outaod") << "\";" << endl;
4692 out << " TString outputFiles = \"" << GetListOfFiles("out") << "\";" << endl;
4693 out << " TString mergeExcludes = \"" << fMergeExcludes << " " << fRegisterExcludes << "\";" << endl;
4694 out << " TObjArray *list = outputFiles.Tokenize(\",\");" << endl;
4695 out << " TIter *iter = new TIter(list);" << endl;
4696 out << " TObjString *str;" << endl;
4697 out << " TString outputFile;" << endl;
4698 out << " Bool_t merged = kTRUE;" << endl;
4699 TString analysisFile = fExecutable;
4700 analysisFile.ReplaceAll(".sh", ".root");
4701 out << " AliAnalysisManager *mgr = AliAnalysisAlien::LoadAnalysisManager(\""
4702 << analysisFile << "\");" << endl;
4703 out << " if (!mgr) {" << endl;
4704 out << " printf(\"ERROR: Analysis manager could not be extracted from file \");" << endl;
4705 out << " return;" << endl;
4706 out << " }" << endl;
4707 if (IsLocalTest()) {
4708 out << " printf(\"===================================\\n\");" << endl;
4709 out << " printf(\"Testing merging...\\n\");" << endl;
4710 out << " printf(\"===================================\\n\");" << endl;
4712 out << " while((str=(TObjString*)iter->Next())) {" << endl;
4713 out << " outputFile = str->GetString();" << endl;
4714 out << " if (outputFile.Contains(\"*\")) continue;" << endl;
4715 out << " Int_t index = outputFile.Index(\"@\");" << endl;
4716 out << " if (index > 0) outputFile.Remove(index);" << endl;
4717 out << " // Skip already merged outputs" << endl;
4718 out << " if (!gSystem->AccessPathName(outputFile)) {" << endl;
4719 out << " printf(\"Output file <%s> found. Not merging again.\\n\",outputFile.Data());" << endl;
4720 out << " continue;" << endl;
4721 out << " }" << endl;
4722 out << " if (mergeExcludes.Contains(outputFile.Data())) continue;" << endl;
4723 out << " merged = AliAnalysisAlien::MergeOutput(outputFile, outputDir, " << fMaxMergeFiles << ", stage);" << endl;
4724 out << " if (!merged) {" << endl;
4725 out << " printf(\"ERROR: Cannot merge %s\\n\", outputFile.Data());" << endl;
4726 out << " return;" << endl;
4727 out << " }" << endl;
4728 out << " }" << endl;
4729 if (mgr && mgr->IsCollectThroughput() && !IsLocalTest()) {
4730 out << " TString infolog = \"" << mgr->GetFileInfoLog() << "\";" << endl;
4731 out << " AliAnalysisAlien::MergeInfo(infolog, outputDir);" << endl;
4733 out << " // all outputs merged, validate" << endl;
4734 out << " ofstream out;" << endl;
4735 out << " out.open(\"outputs_valid\", ios::out);" << endl;
4736 out << " out.close();" << endl;
4737 out << " // read the analysis manager from file" << endl;
4738 if (IsLocalTest()) {
4739 out << " printf(\"===================================\\n\");" << endl;
4740 out << " printf(\"Testing Terminate()...\\n\");" << endl;
4741 out << " printf(\"===================================\\n\");" << endl;
4743 out << " if (!outputDir.Contains(\"Stage\")) return;" << endl;
4745 out << " mgr->SetRunFromPath(mgr->GetRunFromAlienPath(dir));" << endl;
4746 out << " mgr->SetSkipTerminate(kFALSE);" << endl;
4747 out << " mgr->PrintStatus();" << endl;
4749 if (mgr->GetDebugLevel()>3) {
4750 out << " gEnv->SetValue(\"XNet.Debug\", \"1\");" << endl;
4752 if (TestBit(AliAnalysisGrid::kTest))
4753 out << " AliLog::SetGlobalLogLevel(AliLog::kWarning);" << endl;
4755 out << " AliLog::SetGlobalLogLevel(AliLog::kError);" << endl;
4758 out << " TTree *tree = NULL;" << endl;
4759 out << " mgr->StartAnalysis(\"gridterminate\", tree);" << endl;
4760 out << "}" << endl << endl;
4761 if (hasANALYSISalice) {
4762 out <<"//________________________________________________________________________________" << endl;
4763 out << "Bool_t SetupPar(const char *package) {" << endl;
4764 out << "// Compile the package and set it up." << endl;
4765 out << " TString pkgdir = package;" << endl;
4766 out << " pkgdir.ReplaceAll(\".par\",\"\");" << endl;
4767 out << " gSystem->Exec(TString::Format(\"tar xvzf %s.par\", pkgdir.Data()));" << endl;
4768 out << " TString cdir = gSystem->WorkingDirectory();" << endl;
4769 out << " gSystem->ChangeDirectory(pkgdir);" << endl;
4770 out << " // Check for BUILD.sh and execute" << endl;
4771 out << " if (!gSystem->AccessPathName(\"PROOF-INF/BUILD.sh\")) {" << endl;
4772 out << " printf(\"*******************************\\n\");" << endl;
4773 out << " printf(\"*** Building PAR archive ***\\n\");" << endl;
4774 out << " printf(\"*******************************\\n\");" << endl;
4775 out << " if (gSystem->Exec(\"PROOF-INF/BUILD.sh\")) {" << endl;
4776 out << " ::Error(\"SetupPar\", \"Cannot build par archive %s\", pkgdir.Data());" << endl;
4777 out << " gSystem->ChangeDirectory(cdir);" << endl;
4778 out << " return kFALSE;" << endl;
4779 out << " }" << endl;
4780 out << " } else {" << endl;
4781 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/BUILD.sh for package %s\", pkgdir.Data());" << endl;
4782 out << " gSystem->ChangeDirectory(cdir);" << endl;
4783 out << " return kFALSE;" << endl;
4784 out << " }" << endl;
4785 out << " // Check for SETUP.C and execute" << endl;
4786 out << " if (!gSystem->AccessPathName(\"PROOF-INF/SETUP.C\")) {" << endl;
4787 out << " printf(\"*******************************\\n\");" << endl;
4788 out << " printf(\"*** Setup PAR archive ***\\n\");" << endl;
4789 out << " printf(\"*******************************\\n\");" << endl;
4790 out << " gROOT->Macro(\"PROOF-INF/SETUP.C\");" << endl;
4791 out << " } else {" << endl;
4792 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/SETUP.C for package %s\", pkgdir.Data());" << endl;
4793 out << " gSystem->ChangeDirectory(cdir);" << endl;
4794 out << " return kFALSE;" << endl;
4795 out << " }" << endl;
4796 out << " // Restore original workdir" << endl;
4797 out << " gSystem->ChangeDirectory(cdir);" << endl;
4798 out << " return kTRUE;" << endl;
4802 Bool_t copy = kTRUE;
4803 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
4806 TString workdir = gGrid->GetHomeDirectory();
4807 workdir += fGridWorkingDir;
4808 if (FileExists(mergingMacro)) gGrid->Rm(mergingMacro);
4809 Info("WriteMergingMacro", "\n##### Copying merging macro: <%s> to your alien workspace", mergingMacro.Data());
4810 // TFile::Cp(Form("file:%s",mergingMacro.Data()), Form("alien://%s/%s", workdir.Data(), mergingMacro.Data()));
4811 if (!copyLocal2Alien("WriteMergeMacro",mergingMacro.Data(),
4812 Form("%s/%s", workdir.Data(), mergingMacro.Data()))) Fatal("","Terminating");
4816 //______________________________________________________________________________
4817 Bool_t AliAnalysisAlien::SetupPar(const char *package)
4819 // Compile the par file archive pointed by <package>. This must be present in the current directory.
4820 // Note that for loading the compiled library. The current directory should have precedence in
4822 TString pkgdir = package;
4823 pkgdir.ReplaceAll(".par","");
4824 gSystem->Exec(TString::Format("tar xzf %s.par", pkgdir.Data()));
4825 TString cdir = gSystem->WorkingDirectory();
4826 gSystem->ChangeDirectory(pkgdir);
4827 // Check for BUILD.sh and execute
4828 if (!gSystem->AccessPathName("PROOF-INF/BUILD.sh")) {
4829 printf("**************************************************\n");
4830 printf("*** Building PAR archive %s\n", package);
4831 printf("**************************************************\n");
4832 if (gSystem->Exec("PROOF-INF/BUILD.sh")) {
4833 ::Error("SetupPar", "Cannot build par archive %s", pkgdir.Data());
4834 gSystem->ChangeDirectory(cdir);
4838 ::Error("SetupPar","Cannot access PROOF-INF/BUILD.sh for package %s", pkgdir.Data());
4839 gSystem->ChangeDirectory(cdir);
4842 // Check for SETUP.C and execute
4843 if (!gSystem->AccessPathName("PROOF-INF/SETUP.C")) {
4844 printf("**************************************************\n");
4845 printf("*** Setup PAR archive %s\n", package);
4846 printf("**************************************************\n");
4847 gROOT->Macro("PROOF-INF/SETUP.C");
4848 printf("*** Loaded library: %s\n", gSystem->GetLibraries(pkgdir,"",kFALSE));
4850 ::Error("SetupPar","Cannot access PROOF-INF/SETUP.C for package %s", pkgdir.Data());
4851 gSystem->ChangeDirectory(cdir);
4854 // Restore original workdir
4855 gSystem->ChangeDirectory(cdir);
4859 //______________________________________________________________________________
4860 void AliAnalysisAlien::WriteExecutable()
4862 // Generate the alien executable script.
4863 // Patch executable with -x to catch error code
4864 if (fExecutableCommand.Contains("root") &&
4865 fExecutableCommand.Contains("-q") &&
4866 !fExecutableCommand.Contains("-x")) fExecutableCommand += " -x";
4867 if (!TestBit(AliAnalysisGrid::kSubmit)) {
4869 out.open(fExecutable.Data(), ios::out);
4871 Error("WriteExecutable", "Bad file name for executable: %s", fExecutable.Data());
4874 out << "#!/bin/bash" << endl;
4875 // Make sure we can properly compile par files
4876 out << "export LD_LIBRARY_PATH=.:$LD_LIBRARY_PATH" << endl;
4877 out << "echo \"=========================================\"" << endl;
4878 out << "echo \"############## PATH : ##############\"" << endl;
4879 out << "echo $PATH" << endl;
4880 out << "echo \"############## LD_LIBRARY_PATH : ##############\"" << endl;
4881 out << "echo $LD_LIBRARY_PATH" << endl;
4882 out << "echo \"############## ROOTSYS : ##############\"" << endl;
4883 out << "echo $ROOTSYS" << endl;
4884 out << "echo \"############## which root : ##############\"" << endl;
4885 out << "which root" << endl;
4886 out << "echo \"############## ALICE_ROOT : ##############\"" << endl;
4887 out << "echo $ALICE_ROOT" << endl;
4888 out << "echo \"############## which aliroot : ##############\"" << endl;
4889 out << "which aliroot" << endl;
4890 out << "echo \"############## system limits : ##############\"" << endl;
4891 out << "ulimit -a" << endl;
4892 out << "echo \"############## memory : ##############\"" << endl;
4893 out << "free -m" << endl;
4894 out << "echo \"=========================================\"" << endl << endl;
4895 out << fExecutableCommand << " ";
4896 out << fAnalysisMacro.Data() << " " << fExecutableArgs.Data() << endl;
4897 out << "RET=$?" << endl;
4898 out << "if [ \"$RET\" != \"0\" ];then" << endl;
4899 out << " echo \"======== ERROR : " << fAnalysisMacro.Data() << " finished with NON zero code: $RET ========\"" << endl;
4900 out << " if [ \"$RET\" -gt 128 ] && [ \"$RET\" -lt 160 ]; then"<<endl;
4901 out << " let sig=\"$RET - 128\""<<endl;
4902 out << " sigs='HUP INT QUIT ILL TRAP ABRT BUS FPE"<<endl;
4903 out << " KILL USR1 SEGV USR2 PIPE ALRM TERM STKFLT"<<endl;
4904 out << " CHLD CONT STOP TSTP TTIN TTOU URG XCPU"<<endl;
4905 out << " XFSZ VTALRM PROF WINCH IO PWR SYS'"<<endl;
4906 out << " sig=SIG`echo $sigs | awk '{ print $'\"$sig\"' }'`"<<endl;
4907 out << " echo \"======== it appears to have been killed with signal: $sig ========\""<<endl;
4909 out << " exit $RET"<< endl;
4910 out << "fi" << endl << endl ;
4911 out << "echo \"======== " << fAnalysisMacro.Data() << " finished with exit code: $RET ========\"" << endl;
4912 out << "echo \"############## memory after: ##############\"" << endl;
4913 out << "free -m" << endl;
4915 Bool_t copy = kTRUE;
4916 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
4919 TString workdir = gGrid->GetHomeDirectory();
4920 workdir += fGridWorkingDir;
4921 TString executable = TString::Format("%s/%s", workdir.Data(), fExecutable.Data());
4922 if (FileExists(executable)) gGrid->Rm(executable);
4923 Info("WriteExecutable", "\n##### Copying executable file <%s> to your AliEn bin directory", fExecutable.Data());
4924 if (!copyLocal2Alien("WriteExecutable",fExecutable.Data(),
4925 executable.Data())) Fatal("","Terminating");
4929 //______________________________________________________________________________
4930 void AliAnalysisAlien::WriteMergeExecutable()
4932 // Generate the alien executable script for the merging job.
4933 if (!fMergeViaJDL) return;
4934 TString mergeExec = fExecutable;
4935 mergeExec.ReplaceAll(".sh", "_merge.sh");
4936 if (!TestBit(AliAnalysisGrid::kSubmit)) {
4938 out.open(mergeExec.Data(), ios::out);
4940 Error("WriteMergingExecutable", "Bad file name for executable: %s", mergeExec.Data());
4943 out << "#!/bin/bash" << endl;
4944 // Make sure we can properly compile par files
4945 out << "export LD_LIBRARY_PATH=.:$LD_LIBRARY_PATH" << endl;
4946 out << "echo \"=========================================\"" << endl;
4947 out << "echo \"############## PATH : ##############\"" << endl;
4948 out << "echo $PATH" << endl;
4949 out << "echo \"############## LD_LIBRARY_PATH : ##############\"" << endl;
4950 out << "echo $LD_LIBRARY_PATH" << endl;
4951 out << "echo \"############## ROOTSYS : ##############\"" << endl;
4952 out << "echo $ROOTSYS" << endl;
4953 out << "echo \"############## which root : ##############\"" << endl;
4954 out << "which root" << endl;
4955 out << "echo \"############## ALICE_ROOT : ##############\"" << endl;
4956 out << "echo $ALICE_ROOT" << endl;
4957 out << "echo \"############## which aliroot : ##############\"" << endl;
4958 out << "which aliroot" << endl;
4959 out << "echo \"############## system limits : ##############\"" << endl;
4960 out << "ulimit -a" << endl;
4961 out << "echo \"############## memory : ##############\"" << endl;
4962 out << "free -m" << endl;
4963 out << "echo \"=========================================\"" << endl << endl;
4964 TString mergeMacro = fExecutable;
4965 mergeMacro.ReplaceAll(".sh", "_merge.C");
4966 if (IsOneStageMerging())
4967 out << "export ARG=\"" << mergeMacro << "(\\\"$1\\\")\"" << endl;
4969 out << "export ARG=\"" << mergeMacro << "(\\\"$1\\\",$2)\"" << endl;
4970 out << fExecutableCommand << " " << "$ARG" << endl;
4971 out << "RET=$?" << endl;
4972 out << "if [ \"$RET\" != \"0\" ];then" << endl;
4973 out << " echo \"======== ERROR : " << fAnalysisMacro.Data() << " finished with NON zero code: $RET ========\"" << endl;
4974 out << " if [ \"$RET\" -gt 128 ] && [ \"$RET\" -lt 160 ]; then"<<endl;
4975 out << " let sig=\"$RET - 128\""<<endl;
4976 out << " sigs='HUP INT QUIT ILL TRAP ABRT BUS FPE"<<endl;
4977 out << " KILL USR1 SEGV USR2 PIPE ALRM TERM STKFLT"<<endl;
4978 out << " CHLD CONT STOP TSTP TTIN TTOU URG XCPU"<<endl;
4979 out << " XFSZ VTALRM PROF WINCH IO PWR SYS'"<<endl;
4980 out << " sig=SIG`echo $sigs | awk '{ print $'\"$sig\"' }'`"<<endl;
4981 out << " echo \"======== it appears to have been killed with signal: $sig ========\""<<endl;
4983 out << " exit $RET"<< endl;
4984 out << "fi" << endl << endl ;
4985 out << "echo \"======== " << mergeMacro.Data() << " finished with exit code: $? ========\"" << endl;
4986 out << "echo \"############## memory after: ##############\"" << endl;
4987 out << "free -m" << endl;
4989 Bool_t copy = kTRUE;
4990 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
4993 TString workdir = gGrid->GetHomeDirectory();
4994 workdir += fGridWorkingDir;
4995 TString executable = TString::Format("%s/%s", workdir.Data(), mergeExec.Data());
4996 if (FileExists(executable)) gGrid->Rm(executable);
4997 Info("WriteMergeExecutable", "\n##### Copying executable file <%s> to your AliEn bin directory", mergeExec.Data());
4998 if (!copyLocal2Alien("WriteMergeExecutable",mergeExec.Data(),
4999 executable.Data())) Fatal("","Terminating");
5003 //______________________________________________________________________________
5004 void AliAnalysisAlien::WriteProductionFile(const char *filename) const
5006 // Write the production file to be submitted by LPM manager. The format is:
5007 // First line: full_path_to_jdl estimated_no_subjobs_per_master
5008 // Next lines: full_path_to_dataset XXX (XXX is a string)
5009 // To submit, one has to: submit jdl XXX for all lines
5011 out.open(filename, ios::out);
5013 Error("WriteProductionFile", "Bad file name: %s", filename);
5017 if (!fProductionMode && !fGridWorkingDir.BeginsWith("/alice"))
5018 workdir = gGrid->GetHomeDirectory();
5019 workdir += fGridWorkingDir;
5020 Int_t njobspermaster = 1000*fNrunsPerMaster/fSplitMaxInputFileNumber;
5021 TString locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
5022 out << locjdl << " " << njobspermaster << endl;
5023 Int_t nmasterjobs = fInputFiles->GetEntries();
5024 for (Int_t i=0; i<nmasterjobs; i++) {
5025 TString runOutDir = gSystem->BaseName(fInputFiles->At(i)->GetName());
5026 runOutDir.ReplaceAll(".xml", "");
5028 out << Form("%s", fInputFiles->At(i)->GetName()) << " " << runOutDir << endl;
5030 out << Form("%s", fInputFiles->At(i)->GetName()) << " " << Form("%03d", i) << endl;
5033 Info("WriteProductionFile", "\n##### Copying production file <%s> to your work directory", filename);
5034 if (FileExists(filename)) gGrid->Rm(filename);
5035 // TFile::Cp(Form("file:%s",filename), Form("alien://%s/%s", workdir.Data(),filename));
5036 if (!copyLocal2Alien("WriteProductionFile", filename,
5037 Form("%s/%s", workdir.Data(),filename))) Fatal("","Terminating");
5041 //______________________________________________________________________________
5042 void AliAnalysisAlien::WriteValidationScript(Bool_t merge)
5044 // Generate the alien validation script.
5045 // Generate the validation script
5047 if (fValidationScript.IsNull()) {
5048 fValidationScript = fExecutable;
5049 fValidationScript.ReplaceAll(".sh", "_validation.sh");
5051 TString validationScript = fValidationScript;
5052 if (merge) validationScript.ReplaceAll(".sh", "_merge.sh");
5054 Error("WriteValidationScript", "Alien connection required");
5057 if (!fTerminateFiles.IsNull()) {
5058 fTerminateFiles.Strip();
5059 fTerminateFiles.ReplaceAll(" ",",");
5061 TString outStream = "";
5062 if (!TestBit(AliAnalysisGrid::kTest)) outStream = " >> stdout";
5063 if (!TestBit(AliAnalysisGrid::kSubmit)) {
5065 out.open(validationScript, ios::out);
5066 out << "#!/bin/bash" << endl;
5067 out << "##################################################" << endl;
5068 out << "validateout=`dirname $0`" << endl;
5069 out << "validatetime=`date`" << endl;
5070 out << "validated=\"0\";" << endl;
5071 out << "error=0" << endl;
5072 out << "if [ -z $validateout ]" << endl;
5073 out << "then" << endl;
5074 out << " validateout=\".\"" << endl;
5075 out << "fi" << endl << endl;
5076 out << "cd $validateout;" << endl;
5077 out << "validateworkdir=`pwd`;" << endl << endl;
5078 out << "echo \"*******************************************************\"" << outStream << endl;
5079 out << "echo \"* Automatically generated validation script *\"" << outStream << endl;
5081 out << "echo \"* Time: $validatetime \"" << outStream << endl;
5082 out << "echo \"* Dir: $validateout\"" << outStream << endl;
5083 out << "echo \"* Workdir: $validateworkdir\"" << outStream << endl;
5084 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl;
5085 out << "ls -la ./" << outStream << endl;
5086 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl << endl;
5087 out << "##################################################" << endl;
5090 out << "if [ ! -f stderr ] ; then" << endl;
5091 out << " error=1" << endl;
5092 out << " echo \"* ########## Job not validated - no stderr ###\" " << outStream << endl;
5093 out << " echo \"Error = $error\" " << outStream << endl;
5094 out << "fi" << endl;
5096 out << "parArch=`grep -Ei \"Cannot Build the PAR Archive\" stderr`" << endl;
5097 out << "segViol=`grep -Ei \"Segmentation violation\" stderr`" << endl;
5098 out << "segFault=`grep -Ei \"Segmentation fault\" stderr`" << endl;
5099 out << "glibcErr=`grep -Ei '\\*\\*\\* glibc detected \\*\\*\\*' stderr`" << endl;
5102 out << "if [ \"$parArch\" != \"\" ] ; then" << endl;
5103 out << " error=1" << endl;
5104 out << " echo \"* ########## Job not validated - PAR archive not built ###\" " << outStream << endl;
5105 out << " echo \"$parArch\" " << outStream << endl;
5106 out << " echo \"Error = $error\" " << outStream << endl;
5107 out << "fi" << endl;
5109 out << "if [ \"$segViol\" != \"\" ] ; then" << endl;
5110 out << " error=1" << endl;
5111 out << " echo \"* ########## Job not validated - Segment. violation ###\" " << outStream << endl;
5112 out << " echo \"$segViol\" " << outStream << endl;
5113 out << " echo \"Error = $error\" " << outStream << endl;
5114 out << "fi" << endl;
5116 out << "if [ \"$segFault\" != \"\" ] ; then" << endl;
5117 out << " error=1" << endl;
5118 out << " echo \"* ########## Job not validated - Segment. fault ###\" " << outStream << endl;
5119 out << " echo \"$segFault\" " << outStream << endl;
5120 out << " echo \"Error = $error\" " << outStream << endl;
5121 out << "fi" << endl;
5123 out << "if [ \"$glibcErr\" != \"\" ] ; then" << endl;
5124 out << " error=1" << endl;
5125 out << " echo \"* ########## Job not validated - *** glibc detected *** ###\" " << outStream << endl;
5126 out << " echo \"$glibcErr\" " << outStream << endl;
5127 out << " echo \"Error = $error\" " << outStream << endl;
5128 out << "fi" << endl;
5130 // Part dedicated to the specific analyses running into the train
5132 TString outputFiles = fOutputFiles;
5133 if (merge && !fTerminateFiles.IsNull()) {
5134 if (!outputFiles.IsNull()) outputFiles += ",";
5135 outputFiles += fTerminateFiles;
5137 TObjArray *arr = outputFiles.Tokenize(",");
5140 while (!merge && (os=(TObjString*)next1())) {
5141 // No need to validate outputs produced by merging since the merging macro does this
5142 outputFile = os->GetString();
5143 Int_t index = outputFile.Index("@");
5144 if (index > 0) outputFile.Remove(index);
5145 if (fTerminateFiles.Contains(outputFile)) continue;
5146 if (outputFile.Contains("*")) continue;
5147 out << "if ! [ -f " << outputFile.Data() << " ] ; then" << endl;
5148 out << " error=1" << endl;
5149 out << " echo \"Output file " << outputFile << " not found. Job FAILED !\"" << outStream << endl;
5150 out << " echo \"Output file " << outputFile << " not found. Job FAILED !\" >> stderr" << endl;
5151 out << "fi" << endl;
5154 out << "if ! [ -f outputs_valid ] ; then" << endl;
5155 out << " error=1" << endl;
5156 out << " echo \"Output files were not validated by the analysis manager\" >> stdout" << endl;
5157 out << " echo \"Output files were not validated by the analysis manager\" >> stderr" << endl;
5158 out << "fi" << endl;
5160 out << "if [ $error = 0 ] ; then" << endl;
5161 out << " echo \"* ---------------- Job Validated ------------------*\"" << outStream << endl;
5162 if (!IsKeepLogs()) {
5163 out << " echo \"* === Logs std* will be deleted === \"" << endl;
5165 out << " rm -f std*" << endl;
5167 out << "fi" << endl;
5169 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl;
5170 out << "echo \"*******************************************************\"" << outStream << endl;
5171 out << "cd -" << endl;
5172 out << "exit $error" << endl;
5174 Bool_t copy = kTRUE;
5175 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
5178 TString workdir = gGrid->GetHomeDirectory();
5179 workdir += fGridWorkingDir;
5180 Info("WriteValidationScript", "\n##### Copying validation script <%s> to your AliEn working space", validationScript.Data());
5181 if (FileExists(validationScript)) gGrid->Rm(validationScript);
5182 // TFile::Cp(Form("file:%s",validationScript.Data()), Form("alien://%s/%s", workdir.Data(),validationScript.Data()));
5183 if (!copyLocal2Alien("WriteValidationScript", validationScript.Data(),
5184 Form("%s/%s",workdir.Data(), validationScript.Data()))) Fatal("","Terminating");