From 0f3891412c006cffee0e927c5d31c3bb69b60dbe Mon Sep 17 00:00:00 2001 From: agheata Date: Mon, 29 Mar 2010 11:53:09 +0000 Subject: [PATCH] New features implemented in the alien plugin: - Enable xrootd tweaks to reduce file access timeouts (both in the submitted jobs or in local merging phase). May cause unnecessary file skipping !!!: plugin->SetFastReadOption(); - Enable merging via a special JDL. Files for merging (JDL, merge macro, executable and validation script) are generated and copied in alien space. Merging jobs are submitted only in "terminate" mode of the plugin: plugin->SetMergeViaJDL(); - Force overwriting existing collections: plugin->SetOverwriteMode(); --- ANALYSIS/AliAnalysisAlien.cxx | 955 +++++++++++++++++++++++++--------- ANALYSIS/AliAnalysisAlien.h | 19 +- ANALYSIS/AliAnalysisGrid.h | 8 +- 3 files changed, 740 insertions(+), 242 deletions(-) diff --git a/ANALYSIS/AliAnalysisAlien.cxx b/ANALYSIS/AliAnalysisAlien.cxx index 628834b5d35..8b731dd72dd 100644 --- a/ANALYSIS/AliAnalysisAlien.cxx +++ b/ANALYSIS/AliAnalysisAlien.cxx @@ -21,6 +21,8 @@ //============================================================================== #include "Riostream.h" +#include "TEnv.h" +#include "TError.h" #include "TROOT.h" #include "TSystem.h" #include "TFile.h" @@ -44,6 +46,7 @@ ClassImp(AliAnalysisAlien) AliAnalysisAlien::AliAnalysisAlien() :AliAnalysisGrid(), fGridJDL(NULL), + fMergingJDL(NULL), fPrice(0), fTTL(0), fSplitMaxInputFileNumber(0), @@ -55,6 +58,9 @@ AliAnalysisAlien::AliAnalysisAlien() fNsubmitted(0), fProductionMode(0), fOutputToRunNo(0), + fMergeViaJDL(0), + fFastReadOption(0), + fOverwriteMode(0), fRunNumbers(), fExecutable(), fExecutableCommand(), @@ -97,6 +103,7 @@ AliAnalysisAlien::AliAnalysisAlien() AliAnalysisAlien::AliAnalysisAlien(const char *name) :AliAnalysisGrid(name), fGridJDL(NULL), + fMergingJDL(NULL), fPrice(0), fTTL(0), fSplitMaxInputFileNumber(0), @@ -108,6 +115,9 @@ AliAnalysisAlien::AliAnalysisAlien(const char *name) fNsubmitted(0), fProductionMode(0), fOutputToRunNo(0), + fMergeViaJDL(0), + fFastReadOption(0), + fOverwriteMode(0), fRunNumbers(), fExecutable(), fExecutableCommand(), @@ -150,6 +160,7 @@ AliAnalysisAlien::AliAnalysisAlien(const char *name) AliAnalysisAlien::AliAnalysisAlien(const AliAnalysisAlien& other) :AliAnalysisGrid(other), fGridJDL(NULL), + fMergingJDL(NULL), fPrice(other.fPrice), fTTL(other.fTTL), fSplitMaxInputFileNumber(other.fSplitMaxInputFileNumber), @@ -161,6 +172,9 @@ AliAnalysisAlien::AliAnalysisAlien(const AliAnalysisAlien& other) fNsubmitted(other.fNsubmitted), fProductionMode(other.fProductionMode), fOutputToRunNo(other.fOutputToRunNo), + fMergeViaJDL(other.fMergeViaJDL), + fFastReadOption(other.fFastReadOption), + fOverwriteMode(other.fOverwriteMode), fRunNumbers(other.fRunNumbers), fExecutable(other.fExecutable), fExecutableCommand(other.fExecutableCommand), @@ -197,6 +211,7 @@ AliAnalysisAlien::AliAnalysisAlien(const AliAnalysisAlien& other) { // Copy ctor. fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()"); + fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()"); fRunRange[0] = other.fRunRange[0]; fRunRange[1] = other.fRunRange[1]; if (other.fInputFiles) { @@ -220,6 +235,7 @@ AliAnalysisAlien::~AliAnalysisAlien() { // Destructor. if (fGridJDL) delete fGridJDL; + if (fMergingJDL) delete fMergingJDL; if (fInputFiles) delete fInputFiles; if (fPackages) delete fPackages; } @@ -231,6 +247,7 @@ AliAnalysisAlien &AliAnalysisAlien::operator=(const AliAnalysisAlien& other) if (this != &other) { AliAnalysisGrid::operator=(other); fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()"); + fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()"); fPrice = other.fPrice; fTTL = other.fTTL; fSplitMaxInputFileNumber = other.fSplitMaxInputFileNumber; @@ -242,6 +259,9 @@ AliAnalysisAlien &AliAnalysisAlien::operator=(const AliAnalysisAlien& other) fNsubmitted = other.fNsubmitted; fProductionMode = other.fProductionMode; fOutputToRunNo = other.fOutputToRunNo; + fMergeViaJDL = other.fMergeViaJDL; + fFastReadOption = other.fFastReadOption; + fOverwriteMode = other.fOverwriteMode; fRunNumbers = other.fRunNumbers; fExecutable = other.fExecutable; fExecutableCommand = other.fExecutableCommand; @@ -574,7 +594,7 @@ Bool_t AliAnalysisAlien::CreateDataset(const char *pattern) // CdWork(); if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml"; else file = Form("%s.xml", gSystem->BaseName(path)); - if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest)) { + if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) { command = "find "; command += options; command += path; @@ -586,9 +606,11 @@ Bool_t AliAnalysisAlien::CreateDataset(const char *pattern) if (res) delete res; // Write standard output to file gROOT->ProcessLine(Form("gGrid->Stdout(); > %s", file.Data())); - } - if (!TestBit(AliAnalysisGrid::kTest) && !FileExists(file)) { + } + Bool_t fileExists = FileExists(file); + if (!TestBit(AliAnalysisGrid::kTest) && (!fileExists || fOverwriteMode)) { // Copy xml file to alien space + if (fileExists) gGrid->Rm(file); TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data())); if (!FileExists(file)) { Error("CreateDataset", "Command %s did NOT succeed", command.Data()); @@ -626,8 +648,11 @@ Bool_t AliAnalysisAlien::CreateDataset(const char *pattern) // Check if there is one run per master job. if (fNrunsPerMaster<2) { if (FileExists(file)) { - Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data()); - continue; + if (fOverwriteMode) gGrid->Rm(file); + else { + Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data()); + continue; + } } // Copy xml file to alien space TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data())); @@ -651,9 +676,12 @@ Bool_t AliAnalysisAlien::CreateDataset(const char *pattern) continue; } schunk += Form("_%s.xml", os->GetString().Data()); - if (FileExists(schunk)) { - Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", schunk.Data()); - continue; + if (FileExists(schunk)) { + if (fOverwriteMode) gGrid->Rm(file); + else { + Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", schunk.Data()); + continue; + } } printf("Exporting merged collection <%s> and copying to AliEn\n", schunk.Data()); cbase->ExportXML(Form("file://%s", schunk.Data()),kFALSE,kFALSE, schunk, "Merged runs"); @@ -674,13 +702,15 @@ Bool_t AliAnalysisAlien::CreateDataset(const char *pattern) // CdWork(); if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml"; else file = Form("%s%d.xml", fRunPrefix.Data(), irun); - if (FileExists(file) && fNrunsPerMaster<2 && !TestBit(AliAnalysisGrid::kTest)) { - Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data()); -// gGrid->Rm(file); - continue; + if (FileExists(file) && fNrunsPerMaster<2 && !TestBit(AliAnalysisGrid::kTest)) { + if (fOverwriteMode) gGrid->Rm(file); + else { + Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data()); + continue; + } } // If local collection file does not exist, create it via 'find' command. - if (gSystem->AccessPathName(file)) { + if (gSystem->AccessPathName(file) || fOverwriteMode) { command = "find "; command += options; command += path; @@ -695,8 +725,11 @@ Bool_t AliAnalysisAlien::CreateDataset(const char *pattern) // Check if there is one run per master job. if (fNrunsPerMaster<2) { if (FileExists(file)) { - Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data()); - continue; + if (fOverwriteMode) gGrid->Rm(file); + else { + Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data()); + continue; + } } // Copy xml file to alien space TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data())); @@ -708,7 +741,10 @@ Bool_t AliAnalysisAlien::CreateDataset(const char *pattern) nruns++; // Check if the collection for the chunk exist locally. Int_t nchunk = (nruns-1)/fNrunsPerMaster; - if (FileExists(fInputFiles->At(nchunk)->GetName())) continue; + if (FileExists(fInputFiles->At(nchunk)->GetName())) { + if (fOverwriteMode) gGrid->Rm(fInputFiles->At(nchunk)->GetName()); + else continue; + } printf(" Merging collection <%s> into %d runs chunk...\n",file.Data(),fNrunsPerMaster); if (((nruns-1)%fNrunsPerMaster) == 0) { schunk = Form("%s%d", fRunPrefix.Data(), irun); @@ -724,14 +760,20 @@ Bool_t AliAnalysisAlien::CreateDataset(const char *pattern) } schunk = schunk2; if (FileExists(schunk)) { - Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", schunk.Data()); - continue; + if (fOverwriteMode) gGrid->Rm(schunk); + else { + Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", schunk.Data()); + continue; + } } printf("Exporting merged collection <%s> and copying to AliEn.\n", schunk.Data()); cbase->ExportXML(Form("file://%s", schunk.Data()),kFALSE,kFALSE, schunk, "Merged runs"); if (FileExists(schunk)) { - Info("CreateDataset", "\n##### Dataset %s exist. Skipping copy...", schunk.Data()); - continue; + if (fOverwriteMode) gGrid->Rm(schunk); + else { + Info("CreateDataset", "\n##### Dataset %s exist. Skipping copy...", schunk.Data()); + continue; + } } TFile::Cp(Form("file:%s",schunk.Data()), Form("alien://%s/%s",workdir.Data(), schunk.Data())); if (!FileExists(schunk)) { @@ -794,25 +836,45 @@ Bool_t AliAnalysisAlien::CreateJDL() // Exit if any error up to now if (error) return kFALSE; // Set JDL fields - fGridJDL->SetValue("User", Form("\"%s\"", fUser.Data())); - fGridJDL->SetExecutable(fExecutable); + if (!fUser.IsNull()) { + fGridJDL->SetValue("User", Form("\"%s\"", fUser.Data())); + fMergingJDL->SetValue("User", Form("\"%s\"", fUser.Data())); + } + fGridJDL->SetExecutable(fExecutable, "This is the startup script"); + TString mergeExec = fExecutable; + mergeExec.ReplaceAll(".sh", "_merge.sh"); + fMergingJDL->SetExecutable(mergeExec, "This is the startup script"); + mergeExec.ReplaceAll(".sh", ".C"); + fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),mergeExec.Data()), "List of input files to be uploaded to workers"); if (!fArguments.IsNull()) fGridJDL->SetArguments(fArguments, "Arguments for the executable command"); -// fGridJDL->SetTTL((UInt_t)fTTL); - fGridJDL->SetValue("TTL", Form("\"%d\"", fTTL)); - if (fMaxInitFailed > 0) + fMergingJDL->SetArguments("$1"); + fGridJDL->SetTTL((UInt_t)fTTL); + fMergingJDL->SetTTL((UInt_t)fTTL); + if (fMaxInitFailed > 0) { fGridJDL->SetValue("MaxInitFailed", Form("\"%d\"",fMaxInitFailed)); - if (fSplitMaxInputFileNumber > 0) + fGridJDL->SetDescription("MaxInitFailed", "Maximum number of first failing jobs to abort the master job"); + } + if (fSplitMaxInputFileNumber > 0) { fGridJDL->SetValue("SplitMaxInputFileNumber", Form("\"%d\"", fSplitMaxInputFileNumber)); - if (fSplitMode.Length()) + fGridJDL->SetDescription("SplitMaxInputFileNumber", "Maximum number of input files to be processed per subjob"); + } + if (fSplitMode.Length()) { fGridJDL->SetValue("Split", Form("\"%s\"", fSplitMode.Data())); -// fGridJDL->SetSplitMode(fSplitMode, (UInt_t)fSplitMaxInputFileNumber); - if (fAliROOTVersion.Length()) - fGridJDL->AddToPackages("AliRoot", fAliROOTVersion); - if (fROOTVersion.Length()) + fGridJDL->SetDescription("Split", "We split per SE or file"); + } + if (!fAliROOTVersion.IsNull()) { + fGridJDL->AddToPackages("AliRoot", fAliROOTVersion,"VO_ALICE", "List of requested packages"); + fMergingJDL->AddToPackages("AliRoot", fAliROOTVersion, "VO_ALICE", "List of requested packages"); + } + if (!fROOTVersion.IsNull()) { fGridJDL->AddToPackages("ROOT", fROOTVersion); - if (fAPIVersion.Length()) + fMergingJDL->AddToPackages("ROOT", fROOTVersion); + } + if (!fAPIVersion.IsNull()) { fGridJDL->AddToPackages("APISCONFIG", fAPIVersion); + fMergingJDL->AddToPackages("APISCONFIG", fAPIVersion); + } if (!fExternalPackages.IsNull()) { arr = fExternalPackages.Tokenize(" "); TIter next(arr); @@ -822,12 +884,13 @@ Bool_t AliAnalysisAlien::CreateJDL() TString pkgversion = pkgname(index+2, pkgname.Length()); pkgname.Remove(index); fGridJDL->AddToPackages(pkgname, pkgversion); + fMergingJDL->AddToPackages(pkgname, pkgversion); } delete arr; } - fGridJDL->SetInputDataListFormat(fInputFormat); - fGridJDL->SetInputDataList("wn.xml"); - fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), fAnalysisMacro.Data())); + fGridJDL->SetInputDataListFormat(fInputFormat, "Format of input data"); + fGridJDL->SetInputDataList("wn.xml", "Collection name to be processed on each worker node"); + fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), fAnalysisMacro.Data()), "List of input files to be uploaded to workers"); TString analysisFile = fExecutable; analysisFile.ReplaceAll(".sh", ".root"); fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),analysisFile.Data())); @@ -839,44 +902,115 @@ Bool_t AliAnalysisAlien::CreateJDL() while ((os=(TObjString*)next())) { if (os->GetString().Contains(".so")) continue; fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), os->GetString().Data())); + fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), os->GetString().Data())); } delete arr; } if (fPackages) { TIter next(fPackages); TObject *obj; - while ((obj=next())) + while ((obj=next())) { fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), obj->GetName())); + fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), obj->GetName())); + } } if (fOutputArchive.Length()) { arr = fOutputArchive.Tokenize(" "); TIter next(arr); - while ((os=(TObjString*)next())) - if (!os->GetString().Contains("@") && fCloseSE.Length()) - fGridJDL->AddToOutputArchive(Form("%s@%s",os->GetString().Data(), fCloseSE.Data())); - else - fGridJDL->AddToOutputArchive(os->GetString()); + Bool_t first = kTRUE; + const char *comment = "Files to be archived"; + const char *comment1 = comment; + while ((os=(TObjString*)next())) { + if (!first) comment = NULL; + if (!os->GetString().Contains("@") && fCloseSE.Length()) + fGridJDL->AddToOutputArchive(Form("%s@%s",os->GetString().Data(), fCloseSE.Data()), comment); + else + fGridJDL->AddToOutputArchive(os->GetString(), comment); + first = kFALSE; + } delete arr; + TString outputArchive = fOutputArchive; + if (!fMergeExcludes.IsNull()) { + arr = fMergeExcludes.Tokenize(" "); + TIter next(arr); + while ((os=(TObjString*)next())) { + outputArchive.ReplaceAll(Form("%s,",os->GetString().Data()),""); + outputArchive.ReplaceAll(os->GetString(),""); + } + delete arr; + } + arr = outputArchive.Tokenize(" "); + TIter next1(arr); + comment = comment1; + first = kTRUE; + while ((os=(TObjString*)next1())) { + if (!first) comment = NULL; + if (!os->GetString().Contains("@") && fCloseSE.Length()) + fMergingJDL->AddToOutputArchive(Form("%s@%s",os->GetString().Data(), fCloseSE.Data()), comment); + else + fMergingJDL->AddToOutputArchive(os->GetString(), comment); + first = kFALSE; + } + delete arr; } arr = fOutputFiles.Tokenize(" "); TIter next(arr); + Bool_t first = kTRUE; + const char *comment = "Files to be archived"; + const char *comment1 = comment; while ((os=(TObjString*)next())) { // Ignore ouputs in jdl that are also in outputarchive TString sout = os->GetString(); if (sout.Index("@")>0) sout.Remove(sout.Index("@")); if (fOutputArchive.Contains(sout)) continue; + if (!first) comment = NULL; if (!os->GetString().Contains("@") && fCloseSE.Length()) - fGridJDL->AddToOutputSandbox(Form("%s@%s",os->GetString().Data(), fCloseSE.Data())); + fGridJDL->AddToOutputSandbox(Form("%s@%s",os->GetString().Data(), fCloseSE.Data()), comment); else - fGridJDL->AddToOutputSandbox(os->GetString()); + fGridJDL->AddToOutputSandbox(os->GetString(), comment); + first = kFALSE; } delete arr; -// fGridJDL->SetPrice((UInt_t)fPrice); - fGridJDL->SetValue("Price", Form("\"%d\"", fPrice)); + if (fOutputFiles.Length()) { + TString outputFiles = fOutputFiles; + if (!fMergeExcludes.IsNull()) { + arr = fMergeExcludes.Tokenize(" "); + TIter next(arr); + while ((os=(TObjString*)next())) { + outputFiles.ReplaceAll(Form("%s,",os->GetString().Data()),""); + outputFiles.ReplaceAll(os->GetString(),""); + } + delete arr; + } + arr = outputFiles.Tokenize(" "); + TIter next(arr); + comment = comment1; + first = kTRUE; + while ((os=(TObjString*)next())) { + // Ignore ouputs in jdl that are also in outputarchive + TString sout = os->GetString(); + if (sout.Index("@")>0) sout.Remove(sout.Index("@")); + if (fOutputArchive.Contains(sout)) continue; + if (!first) comment = NULL; + if (!os->GetString().Contains("@") && fCloseSE.Length()) + fMergingJDL->AddToOutputSandbox(Form("%s@%s",os->GetString().Data(), fCloseSE.Data()), comment); + else + fMergingJDL->AddToOutputSandbox(os->GetString(), comment); + } + delete arr; + } + fGridJDL->SetPrice((UInt_t)fPrice, "AliEn price for this job"); + fMergingJDL->SetPrice((UInt_t)fPrice, "AliEn price for this job"); TString validationScript = fExecutable; validationScript.ReplaceAll(".sh", "_validation.sh"); - fGridJDL->SetValidationCommand(Form("%s/%s", workdir.Data(),validationScript.Data())); - if (fMasterResubmitThreshold) fGridJDL->SetValue("MasterResubmitThreshold", Form("\"%d%%\"", fMasterResubmitThreshold)); + fGridJDL->SetValidationCommand(Form("%s/%s", workdir.Data(),validationScript.Data()), "Validation script to be run for each subjob"); + validationScript = fExecutable; + validationScript.ReplaceAll(".sh", "_mergevalidation.sh"); + fMergingJDL->SetValidationCommand(Form("%s/%s", workdir.Data(),validationScript.Data()), "Validation script to be run for each subjob"); + if (fMasterResubmitThreshold) { + fGridJDL->SetValue("MasterResubmitThreshold", Form("\"%d%%\"", fMasterResubmitThreshold)); + fGridJDL->SetDescription("MasterResubmitThreshold", "Resubmit failed jobs until DONE rate reaches this percentage"); + } // Write a jdl with 2 input parameters: collection name and output dir name. WriteJDL(copy); } @@ -899,12 +1033,22 @@ Bool_t AliAnalysisAlien::CreateJDL() gGrid->Cd(workdir); } if (TestBit(AliAnalysisGrid::kSubmit)) { - Info("CreateJDL", "\n##### Copying JDL file <%s> to your AliEn output directory", fJDLName.Data()); + TString mergeJDLName = fExecutable; + mergeJDLName.ReplaceAll(".sh", "_merge.jdl"); TString locjdl = Form("%s/%s", fGridOutputDir.Data(),fJDLName.Data()); - if (fProductionMode) + TString locjdl1 = Form("%s/%s", fGridOutputDir.Data(),mergeJDLName.Data()); + if (fProductionMode) { locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data()); + locjdl1 = Form("%s/%s", workdir.Data(),mergeJDLName.Data()); + } if (FileExists(locjdl)) gGrid->Rm(locjdl); + if (FileExists(locjdl1)) gGrid->Rm(locjdl1); + Info("CreateJDL", "\n##### Copying JDL file <%s> to your AliEn output directory", fJDLName.Data()); TFile::Cp(Form("file:%s",fJDLName.Data()), Form("alien://%s", locjdl.Data())); + if (fMergeViaJDL) { + Info("CreateJDL", "\n##### Copying merging JDL file <%s> to your AliEn output directory", mergeJDLName.Data()); + TFile::Cp(Form("file:%s",mergeJDLName.Data()), Form("alien://%s", locjdl1.Data())); + } } if (fAdditionalLibs.Length()) { arr = fAdditionalLibs.Tokenize(" "); @@ -946,63 +1090,30 @@ Bool_t AliAnalysisAlien::WriteJDL(Bool_t copy) // One jdl with no parameters in case input data is specified by name. TIter next(fInputFiles); while ((os=(TObjString*)next())) - fGridJDL->AddToInputDataCollection(Form("LF:%s,nodownload", os->GetString().Data())); + fGridJDL->AddToInputDataCollection(Form("LF:%s,nodownload", os->GetString().Data()), "Input xml collections"); if (!fOutputSingle.IsNull()) - fGridJDL->SetOutputDirectory(Form("#alienfulldir#/../%s",fOutputSingle.Data())); - else - fGridJDL->SetOutputDirectory(Form("%s/#alien_counter_03i#", fGridOutputDir.Data())); + fGridJDL->SetOutputDirectory(Form("#alienfulldir#/../%s",fOutputSingle.Data()), "Output directory"); + else { + fGridJDL->SetOutputDirectory(Form("%s/#alien_counter_03i#", fGridOutputDir.Data()), "Output directory"); + fMergingJDL->SetOutputDirectory(fGridOutputDir); + } } else { // One jdl to be submitted with 2 input parameters: data collection name and output dir prefix - fGridJDL->AddToInputDataCollection(Form("LF:%s/$1,nodownload", workdir.Data())); + fGridJDL->AddToInputDataCollection(Form("LF:%s/$1,nodownload", workdir.Data()), "Input xml collections"); if (!fOutputSingle.IsNull()) { - if (!fOutputToRunNo) fGridJDL->SetOutputDirectory(Form("#alienfulldir#/%s",fOutputSingle.Data())); - else fGridJDL->SetOutputDirectory(Form("%s/$2",fGridOutputDir.Data())); + if (!fOutputToRunNo) fGridJDL->SetOutputDirectory(Form("#alienfulldir#/%s",fOutputSingle.Data()), "Output directory"); + else fGridJDL->SetOutputDirectory(Form("%s/$2",fGridOutputDir.Data()), "Output directory"); } else { - fGridJDL->SetOutputDirectory(Form("%s/$2/#alien_counter_03i#", fGridOutputDir.Data())); + fGridJDL->SetOutputDirectory(Form("%s/$2/#alien_counter_03i#", fGridOutputDir.Data()), "Output directory"); + fMergingJDL->SetOutputDirectory(Form("%s/$1", fGridOutputDir.Data()), "Output directory"); } } // Generate the JDL as a string TString sjdl = fGridJDL->Generate(); + TString sjdl1 = fMergingJDL->Generate(); Int_t index; - index = sjdl.Index("Executable"); - if (index >= 0) sjdl.Insert(index, "\n# This is the startup script\n"); - index = sjdl.Index("Split "); - if (index >= 0) sjdl.Insert(index, "\n# We split per SE or file\n"); - index = sjdl.Index("SplitMaxInputFileNumber"); - if (index >= 0) sjdl.Insert(index, "\n# We want each subjob to get maximum this number of input files\n"); - index = sjdl.Index("InputDataCollection"); - if (index >= 0) sjdl.Insert(index, "# Input xml collections\n"); - index = sjdl.Index("InputFile"); - if (index >= 0) sjdl.Insert(index, "\n# List of input files to be uploaded to wn's\n"); - index = sjdl.Index("InputDataList "); - if (index >= 0) sjdl.Insert(index, "\n# Collection to be processed on wn\n"); - index = sjdl.Index("InputDataListFormat"); - if (index >= 0) sjdl.Insert(index, "\n# Format of input data\n"); - index = sjdl.Index("Price"); - if (index >= 0) sjdl.Insert(index, "\n# AliEn price for this job\n"); - index = sjdl.Index("Requirements"); - if (index >= 0) sjdl.Insert(index, "\n# Additional requirements for the computing element\n"); - index = sjdl.Index("Packages"); - if (index >= 0) sjdl.Insert(index, "\n# Packages to be used\n"); - index = sjdl.Index("User ="); - if (index >= 0) sjdl.Insert(index, "\n# AliEn user\n"); - index = sjdl.Index("TTL"); - if (index >= 0) sjdl.Insert(index, "\n# Time to live for the job\n"); - index = sjdl.Index("OutputFile"); - if (index >= 0) sjdl.Insert(index, "\n# List of output files to be registered\n"); - index = sjdl.Index("OutputDir"); - if (index >= 0) sjdl.Insert(index, "\n# Output directory\n"); - index = sjdl.Index("OutputArchive"); - if (index >= 0) sjdl.Insert(index, "\n# Files to be archived\n"); - index = sjdl.Index("MaxInitFailed"); - if (index >= 0) sjdl.Insert(index, "\n# Maximum number of first failing jobs to abort the master job\n"); - index = sjdl.Index("MasterResubmitThreshold"); - if (index >= 0) sjdl.Insert(index, "\n# Resubmit failed jobs until DONE rate reaches this percentage\n"); - sjdl.ReplaceAll("ValidationCommand", "Validationcommand"); - index = sjdl.Index("Validationcommand"); - if (index >= 0) sjdl.Insert(index, "\n# Validation script to be run for each subjob\n"); sjdl.ReplaceAll("\"LF:", "\n \"LF:"); sjdl.ReplaceAll("(member", "\n (member"); sjdl.ReplaceAll("\",\"VO_", "\",\n \"VO_"); @@ -1011,10 +1122,22 @@ Bool_t AliAnalysisAlien::WriteJDL(Bool_t copy) sjdl.ReplaceAll("{\n \n", "{\n"); sjdl.ReplaceAll("\n\n", "\n"); sjdl.ReplaceAll("OutputDirectory", "OutputDir"); + sjdl1.ReplaceAll("\"LF:", "\n \"LF:"); + sjdl1.ReplaceAll("(member", "\n (member"); + sjdl1.ReplaceAll("\",\"VO_", "\",\n \"VO_"); + sjdl1.ReplaceAll("{", "{\n "); + sjdl1.ReplaceAll("};", "\n};"); + sjdl1.ReplaceAll("{\n \n", "{\n"); + sjdl1.ReplaceAll("\n\n", "\n"); + sjdl1.ReplaceAll("OutputDirectory", "OutputDir"); sjdl += "JDLVariables = \n{\n \"Packages\",\n \"OutputDir\"\n};\n"; sjdl.Prepend(Form("Jobtag = {\n \"comment:%s\"\n};\n", fJobTag.Data())); index = sjdl.Index("JDLVariables"); if (index >= 0) sjdl.Insert(index, "\n# JDL variables\n"); + sjdl1 += "JDLVariables = \n{\n \"Packages\",\n \"OutputDir\"\n};\n"; + sjdl1.Prepend(Form("Jobtag = {\n \"comment:Merging_%s\"\n};\n", fJobTag.Data())); + index = sjdl1.Index("JDLVariables"); + if (index >= 0) sjdl1.Insert(index, "\n# JDL variables\n"); // Write jdl to file ofstream out; out.open(fJDLName.Data(), ios::out); @@ -1023,17 +1146,36 @@ Bool_t AliAnalysisAlien::WriteJDL(Bool_t copy) return kFALSE; } out << sjdl << endl; + TString mergeJDLName = fExecutable; + mergeJDLName.ReplaceAll(".sh", "_merge.jdl"); + if (fMergeViaJDL) { + ofstream out1; + out1.open(mergeJDLName.Data(), ios::out); + if (out.bad()) { + Error("CreateJDL", "Bad file name: %s", mergeJDLName.Data()); + return kFALSE; + } + out1 << sjdl1 << endl; + } // Copy jdl to grid workspace if (!copy) { Info("CreateJDL", "\n##### You may want to review jdl:%s and analysis macro:%s before running in mode", fJDLName.Data(), fAnalysisMacro.Data()); } else { - Info("CreateJDL", "\n##### Copying JDL file <%s> to your AliEn output directory", fJDLName.Data()); TString locjdl = Form("%s/%s", fGridOutputDir.Data(),fJDLName.Data()); - if (fProductionMode) + TString locjdl1 = Form("%s/%s", fGridOutputDir.Data(),mergeJDLName.Data()); + if (fProductionMode) { locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data()); + locjdl1 = Form("%s/%s", workdir.Data(),mergeJDLName.Data()); + } if (FileExists(locjdl)) gGrid->Rm(locjdl); + if (FileExists(locjdl1)) gGrid->Rm(locjdl1); + Info("CreateJDL", "\n##### Copying JDL file <%s> to your AliEn output directory", fJDLName.Data()); TFile::Cp(Form("file:%s",fJDLName.Data()), Form("alien://%s", locjdl.Data())); + if (fMergeViaJDL) { + Info("CreateJDL", "\n##### Copying merging JDL file <%s> to your AliEn output directory", mergeJDLName.Data()); + TFile::Cp(Form("file:%s",mergeJDLName.Data()), Form("alien://%s", locjdl1.Data())); + } } return kTRUE; } @@ -1325,6 +1467,7 @@ void AliAnalysisAlien::SetDefaults() // Set default values for everything. What cannot be filled will be left empty. if (fGridJDL) delete fGridJDL; fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()"); + fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()"); fPrice = 1; fTTL = 30000; fSplitMaxInputFileNumber = 100; @@ -1359,8 +1502,113 @@ void AliAnalysisAlien::SetDefaults() fJDLName = "analysis.jdl"; fJobTag = "Automatically generated analysis JDL"; fMergeExcludes = ""; + fMergeViaJDL = 0; } +//______________________________________________________________________________ +Bool_t AliAnalysisAlien::MergeOutput(const char *output, const char *basedir, Int_t nmaxmerge) +{ +// Merge all registered outputs from basedir. + TString output_file = output; + TString command; + TString output_chunk; + TString previous_chunk = ""; + Int_t count_chunk = 0; + Int_t count_zero = nmaxmerge; + Bool_t merged = kTRUE; + Int_t index = output_file.Index("@"); + if (index > 0) output_file.Remove(index); + command = Form("find %s/ *%s", basedir, output_file.Data()); + printf("command: %s\n", command.Data()); + TGridResult *res = gGrid->Command(command); + if (!res) { + printf("Error: No result for the find command\n"); + return kFALSE; + } + + TFileMerger *fm = 0; + TIter nextmap(res); + TMap *map = 0; + // Check if there is a merge operation to resume + output_chunk = output_file; + output_chunk.ReplaceAll(".root", "_*.root"); + // Check for existent temporary merge files + if (!gSystem->Exec(Form("ls %s", output_chunk.Data()))) { + while (1) { + // Skip as many input files as in a chunk + for (Int_t counter=0; counter, nentries=%d", output_file.Data(), res->GetSize()); + delete res; + return kFALSE; + } + output_chunk = output_file; + output_chunk.ReplaceAll(".root", Form("_%04d.root", count_chunk)); + count_chunk++; + if (gSystem->AccessPathName(output_chunk)) continue; + // Merged file with chunks up to found + printf("Resume merging of <%s> from <%s>\n", output_file.Data(), output_chunk.Data()); + previous_chunk = output_chunk; + break; + } + } + count_zero = nmaxmerge; + + while ((map=(TMap*)nextmap())) { + // Loop 'find' results and get next LFN + if (count_zero == nmaxmerge) { + // First file in chunk - create file merger and add previous chunk if any. + fm = new TFileMerger(kFALSE); + fm->SetFastMethod(kTRUE); + if (previous_chunk.Length()) fm->AddFile(previous_chunk.Data()); + output_chunk = output_file; + output_chunk.ReplaceAll(".root", Form("_%04d.root", count_chunk)); + } + // If last file found, put merged results in the output file + if (map == res->Last()) output_chunk = output_file; + TObjString *objs = dynamic_cast(map->GetValue("turl")); + if (!objs || !objs->GetString().Length()) { + // Nothing found - skip this output + delete res; + delete fm; + return kFALSE; + } + // Add file to be merged and decrement chunk counter. + fm->AddFile(objs->GetString()); + count_zero--; + if (count_zero==0 || map == res->Last()) { + fm->OutputFile(output_chunk); + if (!fm->GetMergeList() || !fm->GetMergeList()->GetSize()) { + // Nothing found - skip this output + ::Warning("MergeOutputs", "No <%s> files found.", output_file.Data()); + delete res; + delete fm; + return kFALSE; + } + // Merge the outputs, then go to next chunk + if (!fm->Merge()) { + ::Error("MergeOutputs", "Could not merge all <%s> files", output_file.Data()); + delete res; + delete fm; + merged = kFALSE; + return kFALSE; + } else { + ::Info("MergeOutputs", "\n##### Merged %d output files to <%s>", fm->GetMergeList()->GetSize(), output_chunk.Data()); + gSystem->Unlink(previous_chunk); + } + if (map == res->Last()) { + delete res; + delete fm; + break; + } + count_chunk++; + count_zero = nmaxmerge; + previous_chunk = output_chunk; + } + } + return merged; +} + //______________________________________________________________________________ Bool_t AliAnalysisAlien::MergeOutputs() { @@ -1370,6 +1618,12 @@ Bool_t AliAnalysisAlien::MergeOutputs() if (!Connect()) { Error("MergeOutputs", "Cannot merge outputs without grid connection. Terminate will NOT be executed"); return kFALSE; + } + if (fMergeViaJDL && !fProductionMode && TestBit(AliAnalysisGrid::kMerge)) { + Info("MergeOutputs", "Submitting merging JDL"); + SubmitMerging(); + Info("MergeOutputs", "### Re-run with off to collect results after merging jobs are done ###"); + Info("MergeOutputs", "### Trying to continue merging ... (may fail if exited the shell prematurely)"); } // Get the output path if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("/%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data()); @@ -1380,16 +1634,20 @@ Bool_t AliAnalysisAlien::MergeOutputs() if (!fOutputFiles.Length()) { Error("MergeOutputs", "No output file names defined. Are you running the right AliAnalysisAlien configuration ?"); return kFALSE; + } + // Check if fast read option was requested + if (fFastReadOption) { + Warning("MergeOutputs", "You requested FastRead option. Using xrootd flags to reduce timeouts. Note that this may skip some files that could be accessed !"); + gEnv->SetValue("XNet.ConnectTimeout",5); + gEnv->SetValue("XNet.RequestTimeout",5); + gEnv->SetValue("XNet.MaxRedirectCount",2); + gEnv->SetValue("XNet.ReconnectTimeout",5); + gEnv->SetValue("XNet.FirstConnectMaxCnt",1); } TObjArray *list = fOutputFiles.Tokenize(" "); TIter next(list); TObjString *str; - TString command; TString output_file; - TString output_chunk; - TString previous_chunk; - Int_t count_chunk = 0; - Int_t count_zero = fMaxMergeFiles; Bool_t merged = kTRUE; while((str=(TObjString*)next())) { output_file = str->GetString(); @@ -1403,95 +1661,13 @@ Bool_t AliAnalysisAlien::MergeOutputs() if (fMergeExcludes.Length() && fMergeExcludes.Contains(output_file.Data())) continue; // Perform a 'find' command in the output directory, looking for registered outputs - command = Form("find %s/ *%s", fGridOutputDir.Data(), output_file.Data()); - printf("command: %s\n", command.Data()); - TGridResult *res = gGrid->Command(command); - if (!res) continue; - TFileMerger *fm = 0; - TIter nextmap(res); - TMap *map = 0; - previous_chunk = ""; - count_chunk = 0; - // Check if there is a merge operation to resume - output_chunk = output_file; - output_chunk.ReplaceAll(".root", "_*.root"); - if (!gSystem->Exec(Form("ls %s", output_chunk.Data()))) { - while (1) { - for (Int_t counter=0; counter, nentries=%d", output_file.Data(), res->GetSize()); - delete res; - return kFALSE; - } - output_chunk = output_file; - output_chunk.ReplaceAll(".root", Form("_%04d.root", count_chunk)); - printf("%s\n", output_chunk.Data()); - count_chunk++; - if (gSystem->AccessPathName(output_chunk)) continue; - // Merged file with chunks up to found - printf("Resume merging of <%s> from <%s>\n", output_file.Data(), output_chunk.Data()); - previous_chunk = output_chunk; - break; - } - } - count_zero = fMaxMergeFiles; - while ((map=(TMap*)nextmap())) { - // Loop 'find' results and get next LFN - if (count_zero == fMaxMergeFiles) { - // First file in chunk - create file merger and add previous chunk if any. - fm = new TFileMerger(kFALSE); - fm->SetFastMethod(kTRUE); - if (previous_chunk.Length()) fm->AddFile(previous_chunk.Data()); - output_chunk = output_file; - output_chunk.ReplaceAll(".root", Form("_%04d.root", count_chunk)); - } - // If last file found, put merged results in the output file - if (map == res->Last()) output_chunk = output_file; - TObjString *objs = dynamic_cast(map->GetValue("turl")); - if (!objs || !objs->GetString().Length()) { - // Nothing found - skip this output - delete res; - delete fm; - break; - } - // Add file to be merged and decrement chunk counter. - fm->AddFile(objs->GetString()); - count_zero--; - if (count_zero==0 || map == res->Last()) { - fm->OutputFile(output_chunk); - if (!fm->GetMergeList() || !fm->GetMergeList()->GetSize()) { - // Nothing found - skip this output - Warning("MergeOutputs", "No <%s> files found.", output_file.Data()); - delete res; - delete fm; - break; - } - // Merge the outputs, then go to next chunk - if (!fm->Merge()) { - Error("MergeOutputs", "Could not merge all <%s> files", output_file.Data()); - delete res; - delete fm; - merged = kFALSE; - break; - } else { - Info("MergeOutputs", "\n##### Merged %d output files to <%s>", fm->GetMergeList()->GetSize(), output_chunk.Data()); - gSystem->Unlink(previous_chunk); - } - if (map == res->Last()) { - delete res; - delete fm; - break; - } - count_chunk++; - count_zero = fMaxMergeFiles; - previous_chunk = output_chunk; - } - } + merged = MergeOutput(output_file, fGridOutputDir, fMaxMergeFiles); + if (!merged) { + Error("MergeOutputs", "Terminate() will NOT be executed"); + return kFALSE; + } } - if (!merged) { - Error("MergeOutputs", "Terminate() will NOT be executed"); - } - return merged; + return kTRUE; } //______________________________________________________________________________ @@ -1549,6 +1725,7 @@ Bool_t AliAnalysisAlien::StartAnalysis(Long64_t /*nentries*/, Long64_t /*firstEn \n space and job submitted."); } else if (TestBit(AliAnalysisGrid::kMerge)) { Info("StartAnalysis","\n##### MERGE MODE ##### The registered outputs of the analysis will be merged"); + if (fMergeViaJDL) CheckInputData(); return kTRUE; } else { Info("StartAnalysis","\n##### FULL ANALYSIS MODE ##### Producing needed files and submitting your analysis job..."); @@ -1568,6 +1745,11 @@ Bool_t AliAnalysisAlien::StartAnalysis(Long64_t /*nentries*/, Long64_t /*firstEn WriteAnalysisMacro(); WriteExecutable(); WriteValidationScript(); + if (fMergeViaJDL) { + WriteMergingMacro(); + WriteMergeExecutable(); + WriteValidationScript(kTRUE); + } if (!CreateJDL()) return kFALSE; if (TestBit(AliAnalysisGrid::kOffline)) return kFALSE; if (TestBit(AliAnalysisGrid::kTest)) { @@ -1651,6 +1833,42 @@ void AliAnalysisAlien::Submit() } } +//______________________________________________________________________________ +void AliAnalysisAlien::SubmitMerging() +{ +// Submit all merging jobs. + if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("/%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data()); + gGrid->Cd(fGridOutputDir); + TString mergeJDLName = fExecutable; + mergeJDLName.ReplaceAll(".sh", "_merge.jdl"); + Int_t ntosubmit = fInputFiles->GetEntries(); + printf("### Submitting %d merging jobs...\n", ntosubmit); + for (Int_t i=0; iCommand(query); + if (res) { + const char *cjobId = res->GetKey(0,"jobId"); + if (!cjobId) { + Error("StartAnalysis", "Your JDL %s could not be submitted", mergeJDLName.Data()); + return; + } else { + Info("StartAnalysis", "\n_______________________________________________________________________ \ + \n##### Your JDL %s was successfully submitted. \nTHE JOB ID IS: %s \ + \n_______________________________________________________________________", + mergeJDLName.Data(), cjobId); + } + delete res; + } + } + if (!ntosubmit) return; + Info("StartAnalysis", "\n#### STARTING AN ALIEN SHELL FOR YOU. EXIT WHEN YOUR MERGING JOBS HAVE FINISHED. #### \ + \n You may exit at any time and terminate the job later using the option but disabling SetMergeViaJDL\ + \n ##################################################################################"); + gSystem->Exec("aliensh"); +} + //______________________________________________________________________________ void AliAnalysisAlien::SubmitNext() { @@ -1849,17 +2067,17 @@ void AliAnalysisAlien::WriteAnalysisMacro() pkgname == "CORRFW.par") hasCORRFW = kTRUE; } if (!hasSTEERBase) out << " gSystem->Load(\"libSTEERBase\");" << endl; - else out << " if (!SetupPar(\"STEERBase\")) return;" << endl; + else out << " if (!AliAnalysisAlien::SetupPar(\"STEERBase\")) return;" << endl; if (!hasESD) out << " gSystem->Load(\"libESD\");" << endl; - else out << " if (!SetupPar(\"ESD\")) return;" << endl; + else out << " if (!AliAnalysisAlien::SetupPar(\"ESD\")) return;" << endl; if (!hasAOD) out << " gSystem->Load(\"libAOD\");" << endl; - else out << " if (!SetupPar(\"AOD\")) return;" << endl; + else out << " if (!AliAnalysisAlien::SetupPar(\"AOD\")) return;" << endl; if (!hasANALYSIS) out << " gSystem->Load(\"libANALYSIS\");" << endl; - else out << " if (!SetupPar(\"ANALYSIS\")) return;" << endl; + else out << " if (!AliAnalysisAlien::SetupPar(\"ANALYSIS\")) return;" << endl; if (!hasANALYSISalice) out << " gSystem->Load(\"libANALYSISalice\");" << endl; - else out << " if (!SetupPar(\"ANALYSISalice\")) return;" << endl; + else out << " if (!AliAnalysisAlien::SetupPar(\"ANALYSISalice\")) return;" << endl; if (!hasCORRFW) out << " gSystem->Load(\"libCORRFW\");" << endl << endl; - else out << " if (!SetupPar(\"CORRFW\")) return;" << endl << endl; + else out << " if (!AliAnalysisAlien::SetupPar(\"CORRFW\")) return;" << endl << endl; out << "// Compile other par packages" << endl; next.Reset(); while ((obj=next())) { @@ -1876,7 +2094,7 @@ void AliAnalysisAlien::WriteAnalysisMacro() pkgname == "ANALYSISalice.par" || pkgname == "CORRFW" || pkgname == "CORRFW.par") continue; - out << " if (!SetupPar(\"" << obj->GetName() << "\")) return;" << endl; + out << " if (!AliAnalysisAlien::SetupPar(\"" << obj->GetName() << "\")) return;" << endl; } } if (fAdditionalLibs.Length()) { @@ -1902,6 +2120,16 @@ void AliAnalysisAlien::WriteAnalysisMacro() if (list) delete list; } out << endl; + if (fFastReadOption) { + Warning("WriteAnalysisMacro", "!!! You requested FastRead option. Using xrootd flags to reduce timeouts in the grid jobs. Note that this may skip some files that could be accessed !!!"); + out << "// fast xrootd reading enabled" << endl; + out << " printf(\"!!! You requested FastRead option. Using xrootd flags to reduce timeouts. Note that this may skip some files that could be accessed !!!\");" << endl; + out << " gEnv->SetValue(\"XNet.ConnectTimeout\",5);" << endl; + out << " gEnv->SetValue(\"XNet.RequestTimeout\",5);" << endl; + out << " gEnv->SetValue(\"XNet.MaxRedirectCount\",2);" << endl; + out << " gEnv->SetValue(\"XNet.ReconnectTimeout\",5);" << endl; + out << " gEnv->SetValue(\"XNet.FirstConnectMaxCnt\",1);" << endl << endl; + } out << "// connect to AliEn and make the chain" << endl; out << " if (!TGrid::Connect(\"alien://\")) return;" << endl; if (IsUsingTags()) { @@ -2028,46 +2256,6 @@ void AliAnalysisAlien::WriteAnalysisMacro() out << " return chain;" << endl; out << "}" << endl << endl; } - if (fPackages) { - out <<"//________________________________________________________________________________" << endl; - out << "Bool_t SetupPar(const char *package) {" << endl; - out << "// Compile the package and set it up." << endl; - out << " TString pkgdir = package;" << endl; - out << " pkgdir.ReplaceAll(\".par\",\"\");" << endl; - out << " gSystem->Exec(Form(\"tar xvzf %s.par\", pkgdir.Data()));" << endl; - out << " TString cdir = gSystem->WorkingDirectory();" << endl; - out << " gSystem->ChangeDirectory(pkgdir);" << endl; - out << " // Check for BUILD.sh and execute" << endl; - out << " if (!gSystem->AccessPathName(\"PROOF-INF/BUILD.sh\")) {" << endl; - out << " printf(\"*******************************\\n\");" << endl; - out << " printf(\"*** Building PAR archive ***\\n\");" << endl; - out << " printf(\"*******************************\\n\");" << endl; - out << " if (gSystem->Exec(\"PROOF-INF/BUILD.sh\")) {" << endl; - out << " ::Error(\"SetupPar\", \"Cannot build par archive %s\", pkgdir.Data());" << endl; - out << " gSystem->ChangeDirectory(cdir);" << endl; - out << " return kFALSE;" << endl; - out << " }" << endl; - out << " } else {" << endl; - out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/BUILD.sh for package %s\", pkgdir.Data());" << endl; - out << " gSystem->ChangeDirectory(cdir);" << endl; - out << " return kFALSE;" << endl; - out << " }" << endl; - out << " // Check for SETUP.C and execute" << endl; - out << " if (!gSystem->AccessPathName(\"PROOF-INF/SETUP.C\")) {" << endl; - out << " printf(\"*******************************\\n\");" << endl; - out << " printf(\"*** Setup PAR archive ***\\n\");" << endl; - out << " printf(\"*******************************\\n\");" << endl; - out << " gROOT->Macro(\"PROOF-INF/SETUP.C\");" << endl; - out << " } else {" << endl; - out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/SETUP.C for package %s\", pkgdir.Data());" << endl; - out << " gSystem->ChangeDirectory(cdir);" << endl; - out << " return kFALSE;" << endl; - out << " }" << endl; - out << " // Restore original workdir" << endl; - out << " gSystem->ChangeDirectory(cdir);" << endl; - out << " return kTRUE;" << endl; - out << "}" << endl; - } Info("WriteAnalysisMacro", "\n##### Analysis macro to run on worker nodes <%s> written",fAnalysisMacro.Data()); } Bool_t copy = kTRUE; @@ -2087,6 +2275,235 @@ void AliAnalysisAlien::WriteAnalysisMacro() } } +//______________________________________________________________________________ +void AliAnalysisAlien::WriteMergingMacro() +{ +// Write a macro to merge the outputs per master job. + if (!fMergeViaJDL) return; + if (!fOutputFiles.Length()) { + Error("WriteMergingMacro", "No output file names defined. Are you running the right AliAnalysisAlien configuration ?"); + return; + } + TString mergingMacro = fExecutable; + mergingMacro.ReplaceAll(".sh","_merge.C"); + if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("/%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data()); + if (!TestBit(AliAnalysisGrid::kSubmit)) { + ofstream out; + out.open(mergingMacro.Data(), ios::out); + if (!out.good()) { + Error("WriteMergingMacro", "could not open file %s for writing", fAnalysisMacro.Data()); + return; + } + TString func = mergingMacro; + TString comment; + func.ReplaceAll(".C", ""); + out << "void " << func.Data() << "(const char *dir)" << endl; + out << "{" << endl; + out << "// Automatically generated merging macro executed in grid subjobs" << endl << endl; + out << " TStopwatch timer;" << endl; + out << " timer.Start();" << endl << endl; + out << "// load base root libraries" << endl; + out << " gSystem->Load(\"libTree\");" << endl; + out << " gSystem->Load(\"libGeom\");" << endl; + out << " gSystem->Load(\"libVMC\");" << endl; + out << " gSystem->Load(\"libPhysics\");" << endl << endl; + out << " gSystem->Load(\"libMinuit\");" << endl << endl; + if (fAdditionalRootLibs.Length()) { + // in principle libtree /lib geom libvmc etc. can go into this list, too + out << "// Add aditional libraries" << endl; + TObjArray *list = fAdditionalRootLibs.Tokenize(" "); + TIter next(list); + TObjString *str; + while((str=(TObjString*)next())) { + if (str->GetString().Contains(".so")) + out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl; + } + if (list) delete list; + } + out << "// include path" << endl; + if (fIncludePath.Length()) out << " gSystem->AddIncludePath(\"" << fIncludePath.Data() << "\");" << endl; + out << " gSystem->AddIncludePath(\"-I$ALICE_ROOT/include\");" << endl << endl; + out << "// Load analysis framework libraries" << endl; + if (!fPackages) { + out << " gSystem->Load(\"libSTEERBase\");" << endl; + out << " gSystem->Load(\"libESD\");" << endl; + out << " gSystem->Load(\"libAOD\");" << endl; + out << " gSystem->Load(\"libANALYSIS\");" << endl; + out << " gSystem->Load(\"libANALYSISalice\");" << endl; + out << " gSystem->Load(\"libCORRFW\");" << endl << endl; + } else { + TIter next(fPackages); + TObject *obj; + TString pkgname; + Bool_t hasSTEERBase = kFALSE; + Bool_t hasESD = kFALSE; + Bool_t hasAOD = kFALSE; + Bool_t hasANALYSIS = kFALSE; + Bool_t hasANALYSISalice = kFALSE; + Bool_t hasCORRFW = kFALSE; + while ((obj=next())) { + pkgname = obj->GetName(); + if (pkgname == "STEERBase" || + pkgname == "STEERBase.par") hasSTEERBase = kTRUE; + if (pkgname == "ESD" || + pkgname == "ESD.par") hasESD = kTRUE; + if (pkgname == "AOD" || + pkgname == "AOD.par") hasAOD = kTRUE; + if (pkgname == "ANALYSIS" || + pkgname == "ANALYSIS.par") hasANALYSIS = kTRUE; + if (pkgname == "ANALYSISalice" || + pkgname == "ANALYSISalice.par") hasANALYSISalice = kTRUE; + if (pkgname == "CORRFW" || + pkgname == "CORRFW.par") hasCORRFW = kTRUE; + } + if (!hasSTEERBase) out << " gSystem->Load(\"libSTEERBase\");" << endl; + else out << " if (!AliAnalysisAlien::SetupPar(\"STEERBase\")) return;" << endl; + if (!hasESD) out << " gSystem->Load(\"libESD\");" << endl; + else out << " if (!AliAnalysisAlien::SetupPar(\"ESD\")) return;" << endl; + if (!hasAOD) out << " gSystem->Load(\"libAOD\");" << endl; + else out << " if (!AliAnalysisAlien::SetupPar(\"AOD\")) return;" << endl; + if (!hasANALYSIS) out << " gSystem->Load(\"libANALYSIS\");" << endl; + else out << " if (!AliAnalysisAlien::SetupPar(\"ANALYSIS\")) return;" << endl; + if (!hasANALYSISalice) out << " gSystem->Load(\"libANALYSISalice\");" << endl; + else out << " if (!AliAnalysisAlien::SetupPar(\"ANALYSISalice\")) return;" << endl; + if (!hasCORRFW) out << " gSystem->Load(\"libCORRFW\");" << endl << endl; + else out << " if (!AliAnalysisAlien::SetupPar(\"CORRFW\")) return;" << endl << endl; + out << "// Compile other par packages" << endl; + next.Reset(); + while ((obj=next())) { + pkgname = obj->GetName(); + if (pkgname == "STEERBase" || + pkgname == "STEERBase.par" || + pkgname == "ESD" || + pkgname == "ESD.par" || + pkgname == "AOD" || + pkgname == "AOD.par" || + pkgname == "ANALYSIS" || + pkgname == "ANALYSIS.par" || + pkgname == "ANALYSISalice" || + pkgname == "ANALYSISalice.par" || + pkgname == "CORRFW" || + pkgname == "CORRFW.par") continue; + out << " if (!AliAnalysisAlien::SetupPar(\"" << obj->GetName() << "\")) return;" << endl; + } + } + if (fAdditionalLibs.Length()) { + out << "// Add aditional AliRoot libraries" << endl; + TObjArray *list = fAdditionalLibs.Tokenize(" "); + TIter next(list); + TObjString *str; + while((str=(TObjString*)next())) { + if (str->GetString().Contains(".so")) + out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl; + } + if (list) delete list; + } + out << endl; + out << "// Analysis source to be compiled at runtime (if any)" << endl; + if (fAnalysisSource.Length()) { + TObjArray *list = fAnalysisSource.Tokenize(" "); + TIter next(list); + TObjString *str; + while((str=(TObjString*)next())) { + out << " gROOT->ProcessLine(\".L " << str->GetString().Data() << "+g\");" << endl; + } + if (list) delete list; + } + out << endl; + if (fFastReadOption) { + Warning("WriteMergingMacro", "!!! You requested FastRead option. Using xrootd flags to reduce timeouts in the grid merging jobs. Note that this may skip some files that could be accessed !!!"); + out << "// fast xrootd reading enabled" << endl; + out << " printf(\"!!! You requested FastRead option. Using xrootd flags to reduce timeouts. Note that this may skip some files that could be accessed !!!\");" << endl; + out << " gEnv->SetValue(\"XNet.ConnectTimeout\",5);" << endl; + out << " gEnv->SetValue(\"XNet.RequestTimeout\",5);" << endl; + out << " gEnv->SetValue(\"XNet.MaxRedirectCount\",2);" << endl; + out << " gEnv->SetValue(\"XNet.ReconnectTimeout\",5);" << endl; + out << " gEnv->SetValue(\"XNet.FirstConnectMaxCnt\",1);" << endl << endl; + } + out << "// Connect to AliEn" << endl; + out << " if (!TGrid::Connect(\"alien://\")) return;" << endl; + out << " TString outputDir = \"" << fGridOutputDir << "/\";" << endl; + out << " outputDir += dir;" << endl; + out << " TString outputFiles = \"" << fOutputFiles << "\";" << endl; + out << " TString mergeExcludes = \"" << fMergeExcludes << "\";" << endl; + out << " TObjArray *list = outputFiles.Tokenize(\" \");" << endl; + out << " TIter *iter = new TIter(list);" << endl; + out << " TObjString *str;" << endl; + out << " TString output_file;" << endl; + out << " Bool_t merged = kTRUE;" << endl; + out << " while((str=(TObjString*)iter->Next())) {" << endl; + out << " output_file = str->GetString();" << endl; + out << " Int_t index = output_file.Index(\"@\");" << endl; + out << " if (index > 0) output_file.Remove(index);" << endl; + out << " // Skip already merged outputs" << endl; + out << " if (!gSystem->AccessPathName(output_file)) {" << endl; + out << " printf(\"Output file <%s> found. Not merging again.\",output_file.Data());" << endl; + out << " continue;" << endl; + out << " }" << endl; + out << " if (mergeExcludes.Contains(output_file.Data())) continue;" << endl; + out << " merged = AliAnalysisAlien::MergeOutput(output_file, outputDir, " << fMaxMergeFiles << ");" << endl; + out << " if (!merged) {" << endl; + out << " printf(\"ERROR: Cannot merge %s\\n\", output_file.Data());" << endl; + out << " return;" << endl; + out << " }" << endl; + out << " }" << endl; + out << "}" << endl << endl; + } + Bool_t copy = kTRUE; + if (TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE; + if (copy) { + CdWork(); + TString workdir = gGrid->GetHomeDirectory(); + workdir += fGridWorkingDir; + if (FileExists(mergingMacro)) gGrid->Rm(mergingMacro); + Info("WriteMergingMacro", "\n##### Copying merging macro: <%s> to your alien workspace", mergingMacro.Data()); + TFile::Cp(Form("file:%s",mergingMacro.Data()), Form("alien://%s/%s", workdir.Data(), mergingMacro.Data())); + } +} + +//______________________________________________________________________________ +Bool_t AliAnalysisAlien::SetupPar(const char *package) +{ +// Compile the par file archive pointed by . This must be present in the current durectory. +// Note that for loading the compiled library. The current directory should have precedence in +// LD_LIBRARY_PATH + TString pkgdir = package; + pkgdir.ReplaceAll(".par",""); + gSystem->Exec(Form("tar xvzf %s.par", pkgdir.Data())); + TString cdir = gSystem->WorkingDirectory(); + gSystem->ChangeDirectory(pkgdir); + // Check for BUILD.sh and execute + if (!gSystem->AccessPathName("PROOF-INF/BUILD.sh")) { + printf("**************************************************\n"); + printf("*** Building PAR archive %s\n", package); + printf("**************************************************\n"); + if (gSystem->Exec("PROOF-INF/BUILD.sh")) { + ::Error("SetupPar", "Cannot build par archive %s", pkgdir.Data()); + gSystem->ChangeDirectory(cdir); + return kFALSE; + } + } else { + ::Error("SetupPar","Cannot access PROOF-INF/BUILD.sh for package %s", pkgdir.Data()); + gSystem->ChangeDirectory(cdir); + return kFALSE; + } + // Check for SETUP.C and execute + if (!gSystem->AccessPathName("PROOF-INF/SETUP.C")) { + printf("**************************************************\n"); + printf("*** Setup PAR archive %s\n", package); + printf("**************************************************\n"); + gROOT->Macro("PROOF-INF/SETUP.C"); + printf("*** Loaded library: %s\n", gSystem->GetLibraries(pkgdir,"",kFALSE)); + } else { + ::Error("SetupPar","Cannot access PROOF-INF/SETUP.C for package %s", pkgdir.Data()); + gSystem->ChangeDirectory(cdir); + return kFALSE; + } + // Restore original workdir + gSystem->ChangeDirectory(cdir); + return kTRUE; +} + //______________________________________________________________________________ void AliAnalysisAlien::WriteExecutable() { @@ -2142,6 +2559,66 @@ void AliAnalysisAlien::WriteExecutable() } } +//______________________________________________________________________________ +void AliAnalysisAlien::WriteMergeExecutable() +{ +// Generate the alien executable script for the merging job. + if (!fMergeViaJDL) return; + TString mergeExec = fExecutable; + mergeExec.ReplaceAll(".sh", "_merge.sh"); + if (!TestBit(AliAnalysisGrid::kSubmit)) { + ofstream out; + out.open(mergeExec.Data(), ios::out); + if (out.bad()) { + Error("WriteMergingExecutable", "Bad file name for executable: %s", mergeExec.Data()); + return; + } + out << "#!/bin/bash" << endl; + out << "echo \"=========================================\"" << endl; + out << "echo \"############## PATH : ##############\"" << endl; + out << "echo $PATH" << endl; + out << "echo \"############## LD_LIBRARY_PATH : ##############\"" << endl; + out << "echo $LD_LIBRARY_PATH" << endl; + out << "echo \"############## ROOTSYS : ##############\"" << endl; + out << "echo $ROOTSYS" << endl; + out << "echo \"############## which root : ##############\"" << endl; + out << "which root" << endl; + out << "echo \"############## ALICE_ROOT : ##############\"" << endl; + out << "echo $ALICE_ROOT" << endl; + out << "echo \"############## which aliroot : ##############\"" << endl; + out << "which aliroot" << endl; + out << "echo \"############## system limits : ##############\"" << endl; + out << "ulimit -a" << endl; + out << "echo \"############## memory : ##############\"" << endl; + out << "free -m" << endl; + out << "echo \"=========================================\"" << endl << endl; + // Make sure we can properly compile par files + if (TObject::TestBit(AliAnalysisGrid::kUsePars)) out << "export LD_LIBRARY_PATH=.:$LD_LIBRARY_PATH" << endl; + TString mergeMacro = fExecutable; + mergeMacro.ReplaceAll(".sh", "_merge.C"); + out << "export ARG=\"" << mergeMacro << "(\\\"$1\\\")\"" << endl; + out << fExecutableCommand << " " << "$ARG" << endl; + out << "echo \"======== " << mergeMacro.Data() << " finished with exit code: $? ========\"" << endl; + out << "echo \"############## memory after: ##############\"" << endl; + out << "free -m" << endl; + out << "echo \"############## Last 10 lines from dmesg : ##############\"" << endl; + out << "dmesg | tail -n 10" << endl; + } + Bool_t copy = kTRUE; + if (TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE; + if (copy) { + CdWork(); + TString workdir = gGrid->GetHomeDirectory(); + TString bindir = Form("%s/bin", workdir.Data()); + if (!DirectoryExists(bindir)) gGrid->Mkdir(bindir); + workdir += fGridWorkingDir; + TString executable = Form("%s/bin/%s", gGrid->GetHomeDirectory(), mergeExec.Data()); + if (FileExists(executable)) gGrid->Rm(executable); + Info("CreateJDL", "\n##### Copying executable file <%s> to your AliEn bin directory", mergeExec.Data()); + TFile::Cp(Form("file:%s",mergeExec.Data()), Form("alien://%s", executable.Data())); + } +} + //______________________________________________________________________________ void AliAnalysisAlien::WriteProductionFile(const char *filename) const { @@ -2175,13 +2652,14 @@ void AliAnalysisAlien::WriteProductionFile(const char *filename) const } //______________________________________________________________________________ -void AliAnalysisAlien::WriteValidationScript() +void AliAnalysisAlien::WriteValidationScript(Bool_t merge) { // Generate the alien validation script. // Generate the validation script TObjString *os; TString validationScript = fExecutable; - validationScript.ReplaceAll(".sh", "_validation.sh"); + if (merge) validationScript.ReplaceAll(".sh", "_mergevalidation.sh"); + else validationScript.ReplaceAll(".sh", "_validation.sh"); if (!Connect()) { Error("WriteValidationScript", "Alien connection required"); return; @@ -2256,6 +2734,7 @@ void AliAnalysisAlien::WriteValidationScript() output_file = os->GetString(); Int_t index = output_file.Index("@"); if (index > 0) output_file.Remove(index); + if (merge && fMergeExcludes.Contains(output_file)) continue; out << "if ! [ -f " << output_file.Data() << " ] ; then" << endl; out << " error=1" << endl; out << " echo \"Output file(s) not found. Job FAILED !\"" << out_stream << endl; diff --git a/ANALYSIS/AliAnalysisAlien.h b/ANALYSIS/AliAnalysisAlien.h index a4bcf297f16..5861834ad25 100644 --- a/ANALYSIS/AliAnalysisAlien.h +++ b/ANALYSIS/AliAnalysisAlien.h @@ -67,6 +67,7 @@ public: virtual void SetInputFormat(const char *format="xml-single") {fInputFormat = format;} virtual void SetMaxInitFailed(Int_t nfail=5) {fMaxInitFailed = nfail;} virtual void SetMergeExcludes(const char *list) {fMergeExcludes = list;}; + virtual void SetMergeViaJDL(Bool_t on=kTRUE) {fMergeViaJDL = on ? 1 : 0;} virtual void SetMasterResubmitThreshold(Int_t percentage) {fMasterResubmitThreshold = percentage;} virtual void SetNtestFiles(Int_t nfiles) {fNtestFiles = nfiles;} virtual void SetJDLName(const char *name="analysis.jdl") {fJDLName = name;} @@ -74,8 +75,11 @@ public: virtual void SetProductionMode(Int_t mode=1) {fProductionMode = mode;} virtual void SetRunPrefix(const char *prefix) {fRunPrefix = prefix;} virtual void SetOutputSingleFolder(const char *folder) {fOutputSingle = folder; fSplitMode="file"; fSplitMaxInputFileNumber=1;} + virtual void SetFastReadOption(Bool_t on=kTRUE) {fFastReadOption = on ? 1 : 0;} + virtual void SetOverwriteMode(Bool_t on=kTRUE) {fOverwriteMode = on ? 1 : 0;} - TGridJDL *GetGridJDL() {return fGridJDL;} + TGridJDL *GetGridJDL() const {return fGridJDL;} + TGridJDL *GetMergingJDL() const {return fMergingJDL;} const char *GetGridOutputDir() const {return fGridOutputDir;} //Utilities virtual Bool_t CreateDataset(const char *pattern); @@ -84,16 +88,21 @@ public: static Bool_t DirectoryExists(const char *lfn); static Bool_t FileExists(const char *lfn); static const char *GetJobStatus(Int_t jobidstart, Int_t lastid, Int_t &nrunning, Int_t &nwaiting, Int_t &nerror, Int_t &ndone); + static Bool_t MergeOutput(const char *output, const char *basedir, Int_t nmaxmerge); virtual Bool_t MergeOutputs(); virtual void Print(Option_t *option="") const; virtual Bool_t StartAnalysis(Long64_t nentries=123456789, Long64_t firstentry=0); + static Bool_t SetupPar(const char *package); virtual void Submit(); + virtual void SubmitMerging(); virtual void WriteAnalysisFile(); virtual void WriteAnalysisMacro(); + virtual void WriteMergingMacro(); + virtual void WriteMergeExecutable(); virtual void WriteExecutable(); virtual Bool_t WriteJDL(Bool_t copy); virtual void WriteProductionFile(const char *filename) const; - virtual void WriteValidationScript(); + virtual void WriteValidationScript(Bool_t merge=kFALSE); protected: void CdWork(); @@ -109,6 +118,7 @@ protected: private: TGridJDL *fGridJDL; //! JDL maker + TGridJDL *fMergingJDL; //! JDL maker Int_t fPrice; // Grid price for the job; Int_t fTTL; // Time to live. Int_t fSplitMaxInputFileNumber; // Maximum number of files to be processed per subjob @@ -121,6 +131,9 @@ private: Int_t fNsubmitted; // Number of jobs submitted Int_t fProductionMode; // Production mode (0-off, 1-on) Int_t fOutputToRunNo; // Use run number as output directory + Int_t fMergeViaJDL; // Enable merging via automatic JDL + Int_t fFastReadOption; // Use xrootd tweaks to reduce timeouts in file access + Int_t fOverwriteMode; // Overwrite existing files if any TString fRunNumbers; // List of runs to be processed TString fExecutable; // Executable script for AliEn job TString fExecutableCommand; // Command(s) to be executed in the executable script @@ -155,6 +168,6 @@ private: TObjArray *fInputFiles; // List of input files to be processed by the job TObjArray *fPackages; // List of packages to be used - ClassDef(AliAnalysisAlien, 10) // Class providing some AliEn utilities + ClassDef(AliAnalysisAlien, 12) // Class providing some AliEn utilities }; #endif diff --git a/ANALYSIS/AliAnalysisGrid.h b/ANALYSIS/AliAnalysisGrid.h index 3deeeb37e55..454c4bf7156 100644 --- a/ANALYSIS/AliAnalysisGrid.h +++ b/ANALYSIS/AliAnalysisGrid.h @@ -69,10 +69,16 @@ enum EPluginRunMode { virtual void SetInputFormat(const char *format="xml-single") = 0; virtual void SetMaxInitFailed(Int_t nfail=5) = 0; virtual void SetMergeExcludes(const char *list) = 0; + virtual void SetMergeViaJDL(Bool_t on=kTRUE) = 0; virtual void SetMasterResubmitThreshold(Int_t percentage) = 0; virtual void SetNtestFiles(Int_t nfiles) = 0; virtual void SetJDLName(const char *name="analysis.jdl") = 0; virtual void SetPreferedSE(const char *se) = 0; + virtual void SetProductionMode(Int_t mode=1) = 0; + virtual void SetRunPrefix(const char *prefix) = 0; + virtual void SetOutputSingleFolder(const char *folder) = 0; + virtual void SetFastReadOption(Bool_t on=kTRUE) = 0; + virtual void SetOverwriteMode(Bool_t on=kTRUE) = 0; // Set run mode. Can be "full", "test", "offline", "submit" or "merge" virtual void SetRunMode(const char *mode="full"); @@ -86,7 +92,7 @@ enum EPluginRunMode { virtual void WriteAnalysisFile() = 0; virtual void WriteAnalysisMacro() = 0; virtual void WriteExecutable() = 0; - virtual void WriteValidationScript() = 0; + virtual void WriteValidationScript(Bool_t merge=kFALSE) = 0; protected: virtual Bool_t Connect() = 0; -- 2.43.0