From 3bdcb56295d49555c630cefd3113d5c6cc99d7f9 Mon Sep 17 00:00:00 2001 From: agheata Date: Fri, 10 Sep 2010 09:47:48 +0000 Subject: [PATCH] Important update of the grid plugin. Added support for proof mode within the plugin. It is enough to add configuration related to the proof cluster and start analysis via: StartAnalysis("proof") for this to work. Plugin test mode supported, profided that a file containing the locations of the input data files is provided. --- ANALYSIS/AliAnalysisAlien.cxx | 227 ++++++++++++++++++++++++++++++-- ANALYSIS/AliAnalysisAlien.h | 21 ++- ANALYSIS/AliAnalysisGrid.h | 21 ++- ANALYSIS/AliAnalysisManager.cxx | 63 ++++++--- 4 files changed, 306 insertions(+), 26 deletions(-) diff --git a/ANALYSIS/AliAnalysisAlien.cxx b/ANALYSIS/AliAnalysisAlien.cxx index c5cf5486103..bd2adedf4da 100644 --- a/ANALYSIS/AliAnalysisAlien.cxx +++ b/ANALYSIS/AliAnalysisAlien.cxx @@ -27,6 +27,7 @@ #include "TROOT.h" #include "TSystem.h" #include "TFile.h" +#include "TChain.h" #include "TObjString.h" #include "TObjArray.h" #include "TGrid.h" @@ -63,6 +64,8 @@ AliAnalysisAlien::AliAnalysisAlien() fFastReadOption(0), fOverwriteMode(1), fNreplicas(2), + fNproofWorkers(0), + fProofReset(0), fRunNumbers(), fExecutable(), fExecutableCommand(), @@ -94,6 +97,11 @@ AliAnalysisAlien::AliAnalysisAlien() fJobTag(), fOutputSingle(), fRunPrefix(), + fProofCluster(), + fProofDataSet(), + fFileForTestMode(), + fRootVersionForProof(), + fAliRootMode(), fInputFiles(0), fPackages(0) { @@ -121,6 +129,8 @@ AliAnalysisAlien::AliAnalysisAlien(const char *name) fFastReadOption(0), fOverwriteMode(1), fNreplicas(2), + fNproofWorkers(0), + fProofReset(0), fRunNumbers(), fExecutable(), fExecutableCommand(), @@ -152,6 +162,11 @@ AliAnalysisAlien::AliAnalysisAlien(const char *name) fJobTag(), fOutputSingle(), fRunPrefix(), + fProofCluster(), + fProofDataSet(), + fFileForTestMode(), + fRootVersionForProof(), + fAliRootMode(), fInputFiles(0), fPackages(0) { @@ -179,6 +194,8 @@ AliAnalysisAlien::AliAnalysisAlien(const AliAnalysisAlien& other) fFastReadOption(other.fFastReadOption), fOverwriteMode(other.fOverwriteMode), fNreplicas(other.fNreplicas), + fNproofWorkers(other.fNproofWorkers), + fProofReset(other.fProofReset), fRunNumbers(other.fRunNumbers), fExecutable(other.fExecutable), fExecutableCommand(other.fExecutableCommand), @@ -210,6 +227,11 @@ AliAnalysisAlien::AliAnalysisAlien(const AliAnalysisAlien& other) fJobTag(other.fJobTag), fOutputSingle(other.fOutputSingle), fRunPrefix(other.fRunPrefix), + fProofCluster(other.fProofCluster), + fProofDataSet(other.fProofDataSet), + fFileForTestMode(other.fFileForTestMode), + fRootVersionForProof(other.fRootVersionForProof), + fAliRootMode(other.fAliRootMode), fInputFiles(0), fPackages(0) { @@ -267,6 +289,8 @@ AliAnalysisAlien &AliAnalysisAlien::operator=(const AliAnalysisAlien& other) fFastReadOption = other.fFastReadOption; fOverwriteMode = other.fOverwriteMode; fNreplicas = other.fNreplicas; + fNproofWorkers = other.fNproofWorkers; + fProofReset = other.fProofReset; fRunNumbers = other.fRunNumbers; fExecutable = other.fExecutable; fExecutableCommand = other.fExecutableCommand; @@ -298,6 +322,11 @@ AliAnalysisAlien &AliAnalysisAlien::operator=(const AliAnalysisAlien& other) fJobTag = other.fJobTag; fOutputSingle = other.fOutputSingle; fRunPrefix = other.fRunPrefix; + fProofCluster = other.fProofCluster; + fProofDataSet = other.fProofDataSet; + fFileForTestMode = other.fFileForTestMode; + fRootVersionForProof = other.fRootVersionForProof; + fAliRootMode = other.fAliRootMode; if (other.fInputFiles) { fInputFiles = new TObjArray(); TIter next(other.fInputFiles); @@ -1431,6 +1460,50 @@ void AliAnalysisAlien::EnablePackage(const char *package) fPackages->Add(new TObjString(pkg)); } +//______________________________________________________________________________ +TChain *AliAnalysisAlien::GetChainForTestMode(const char *treeName) const +{ +// Make a tree from files having the location specified in fFileForTestMode. +// Inspired from JF's CreateESDChain. + if (fFileForTestMode.IsNull()) { + Error("GetChainForTestMode", "For proof test mode please use SetFileForTestMode() pointing to a file that contains data file locations."); + return NULL; + } + if (gSystem->AccessPathName(fFileForTestMode)) { + Error("GetChainForTestMode", "File not found: %s", fFileForTestMode.Data()); + return NULL; + } + // Open the file + ifstream in; + in.open(fFileForTestMode); + Int_t count = 0; + // Read the input list of files and add them to the chain + TString line; + TChain *chain = new TChain(treeName); + while (in.good()) + { + in >> line; + if (line.IsNull()) continue; + if (count++ == fNtestFiles) break; + TString esdFile(line); + TFile *file = TFile::Open(esdFile); + if (file) { + if (!file->IsZombie()) chain->Add(esdFile); + file->Close(); + } else { + Error("GetChainforTestMode", "Skipping un-openable file: %s", esdFile.Data()); + } + } + in.close(); + if (!chain->GetListOfFiles()->GetEntries()) { + Error("GetChainForTestMode", "No file from %s could be opened", fFileForTestMode.Data()); + delete chain; + return NULL; + } +// chain->ls(); + return chain; +} + //______________________________________________________________________________ const char *AliAnalysisAlien::GetJobStatus(Int_t jobidstart, Int_t lastid, Int_t &nrunning, Int_t &nwaiting, Int_t &nerror, Int_t &ndone) { @@ -2080,14 +2153,143 @@ void AliAnalysisAlien::SetPreferedSE(const char */*se*/) Bool_t AliAnalysisAlien::StartAnalysis(Long64_t /*nentries*/, Long64_t /*firstEntry*/) { // Start remote grid analysis. + AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager(); + Bool_t testMode = TestBit(AliAnalysisGrid::kTest); + if (!mgr || !mgr->IsInitialized()) { + Error("StartAnalysis", "You need an initialized analysis manager for this"); + return kFALSE; + } + // Are we in PROOF mode ? + if (mgr->IsProofMode()) { + Info("StartAnalysis", "##### Starting PROOF analysis on via the plugin #####"); + if (fProofCluster.IsNull()) { + Error("StartAnalysis", "You need to specify the proof cluster name via SetProofCluster"); + return kFALSE; + } + if (fProofDataSet.IsNull() && !testMode) { + Error("StartAnalysis", "You need to specify a dataset using SetProofDataSet()"); + return kFALSE; + } + // Set the needed environment + gEnv->SetValue("XSec.GSI.DelegProxy","2"); + // Do we need to reset PROOF ? The success of the Reset operation cannot be checked + if (fProofReset && !testMode) { + if (fProofReset==1) { + Info("StartAnalysis", "Sending soft reset signal to proof cluster %s", fProofCluster.Data()); + gROOT->ProcessLine(Form("TProof::Reset(\"%s\", kFALSE);", fProofCluster.Data())); + } else { + Info("StartAnalysis", "Sending hard reset signal to proof cluster %s", fProofCluster.Data()); + gROOT->ProcessLine(Form("TProof::Reset(\"%s\", kTRUE);", fProofCluster.Data())); + } + Info("StartAnalysis", "Stopping the analysis. Please use SetProofReset(0) to resume."); + return kFALSE; + } + // Do we need to change the ROOT version ? The success of this cannot be checked. + if (!fRootVersionForProof.IsNull() && !testMode) { + gROOT->ProcessLine(Form("TProof::Mgr(\"%s\")->SetROOTVersion(\"%s\");", + fProofCluster.Data(), fRootVersionForProof.Data())); + } + // Connect to PROOF and check the status + Long_t proof = 0; + if (!testMode) { + if (fNproofWorkers) + proof = gROOT->ProcessLine(Form("TProof::Open(\"%s\", \"workers=%d\");", fProofCluster.Data(), fNproofWorkers)); + else + proof = gROOT->ProcessLine(Form("TProof::Open(\"%s\");", fProofCluster.Data())); + } else { + proof = gROOT->ProcessLine("TProof::Open(\"\");"); + if (!proof) { + Error("StartAnalysis", "Could not start PROOF in test mode"); + return kFALSE; + } + } + if (!proof) { + Error("StartAnalysis", "Could not connect to PROOF cluster <%s>", fProofCluster.Data()); + return kFALSE; + } + // Is dataset existing ? + if (!testMode) { + TString dataset = fProofDataSet; + Int_t index = dataset.Index("#"); + if (index>=0) dataset.Remove(index); + if (!gROOT->ProcessLine(Form("gProof->ExistsDataSet(\"%s\");",fProofDataSet.Data()))) { + Error("StartAnalysis", "Dataset %s not existing", fProofDataSet.Data()); + return kFALSE; + } + Info("StartAnalysis", "Dataset %s found", dataset.Data()); + } + // Is ClearPackages() needed ? + if (TestSpecialBit(kClearPackages)) { + Info("StartAnalysis", "ClearPackages signal sent to PROOF. Use SetClearPackages(kFALSE) to reset this."); + gROOT->ProcessLine("gProof->ClearPackages();"); + } + // Is a given aliroot mode requested ? + TList optionsList; + if (!fAliRootMode.IsNull() && !testMode) { + TString alirootMode = fAliRootMode; + if (alirootMode == "default") alirootMode = ""; + Info("StartAnalysis", "You are requesting AliRoot mode: %s", fAliRootMode.Data()); + optionsList.SetOwner(); + optionsList.Add(new TNamed("ALIROOT_MODE", alirootMode.Data())); + // Check the additional libs to be loaded + TString extraLibs; + if (!alirootMode.IsNull()) extraLibs = "ANALYSIS:ANALYSISalice"; + // Parse the extra libs for .so + if (fAdditionalLibs.Length()) { + TObjArray *list = fAdditionalLibs.Tokenize(" "); + TIter next(list); + TObjString *str; + while((str=(TObjString*)next()) && str->GetString().Contains(".so")) { + if (!extraLibs.IsNull()) extraLibs += ":"; + extraLibs += str->GetName(); + } + if (list) delete list; + } + if (!extraLibs.IsNull()) optionsList.Add(new TNamed("ALIROOT_EXTRA_LIBS",extraLibs.Data())); + if (gROOT->ProcessLine(Form("gProof->EnablePackage(\"VO_ALICE@AliRoot::%s\", (TList*)0x%lx);", + fAliROOTVersion.Data(), (ULong_t)&optionsList))) { + Error("StartAnalysis", "There was an error trying to enable package VO_ALICE@AliRoot::%s", fAliROOTVersion.Data()); + return kFALSE; + } + } else { + if (fAdditionalLibs.Contains(".so") && !testMode) { + Error("StartAnalysis", "You request additional libs to be loaded but did not enabled any AliRoot mode. Please refer to: \ + \n http://aaf.cern.ch/node/83 and use a parameter for SetAliRootMode()"); + return kFALSE; + } + } + // Enable par files if requested + if (fPackages && fPackages->GetEntries()) { + TIter next(fPackages); + TObject *package; + while ((package=next())) { + if (gROOT->ProcessLine(Form("gProof->UploadPackage(\"%s\");", package->GetName()))) { + if (gROOT->ProcessLine(Form("gProof->EnablePackage(\"%s\");", package->GetName()))) { + Error("StartAnalysis", "There was an error trying to enable package %s", package->GetName()); + return kFALSE; + } + } else { + Error("StartAnalysis", "There was an error trying to upload package %s", package->GetName()); + return kFALSE; + } + } + } + // Do we need to load analysis source files ? + // NOTE: don't load on client since this is anyway done by the user to attach his task. + if (fAnalysisSource.Length()) { + TObjArray *list = fAnalysisSource.Tokenize(" "); + TIter next(list); + TObjString *str; + while((str=(TObjString*)next())) { + gROOT->ProcessLine(Form("gProof->Load(\"%s+g\", kTRUE);", str->GetName())); + } + if (list) delete list; + } + return kTRUE; + } // Check if output files have to be taken from the analysis manager if (TestBit(AliAnalysisGrid::kDefaultOutputs)) { - AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager(); - if (!mgr || !mgr->IsInitialized()) { - Error("StartAnalysis", "You need an initialized analysis manager for this"); - return kFALSE; - } fOutputFiles = ""; TIter next(mgr->GetOutputs()); AliAnalysisDataContainer *output; @@ -2163,7 +2365,7 @@ Bool_t AliAnalysisAlien::StartAnalysis(Long64_t /*nentries*/, Long64_t /*firstEn } if (!CreateJDL()) return kFALSE; if (TestBit(AliAnalysisGrid::kOffline)) return kFALSE; - if (TestBit(AliAnalysisGrid::kTest)) { + if (testMode) { // Locally testing the analysis Info("StartAnalysis", "\n_______________________________________________________________________ \ \n Running analysis script in a daughter shell as on a worker node \ @@ -2311,8 +2513,16 @@ Bool_t AliAnalysisAlien::SubmitNext() Int_t ntosubmit = 0; TGridResult *res; TString jobID = ""; - if (!fNsubmitted) ntosubmit = 1; - else { + Int_t nmasterjobs = fInputFiles->GetEntries(); + if (!fNsubmitted) { + ntosubmit = 1; + if (!IsUseSubmitPolicy()) { + if (ntosubmit>5) + Info("SubmitNext","### Warning submit policy not used ! Submitting too many jobs at a time may be prohibitted. \ + \n### You can use SetUseSubmitPolicy() to enable if you have problems."); + ntosubmit = nmasterjobs; + } + } else { TString status = GetJobStatus(firstmaster, lastmaster, nrunning, nwaiting, nerror, ndone); printf("=== master %d: %s\n", lastmaster, status.Data()); // If last master not split, just return @@ -2325,7 +2535,6 @@ Bool_t AliAnalysisAlien::SubmitNext() printf("=== WAITING(%d) RUNNING(%d) DONE(%d) OTHER(%d) NperMaster=%d => to submit %d jobs\n", nwaiting, nrunning, ndone, nerror, npermaster, ntosubmit); } - Int_t nmasterjobs = fInputFiles->GetEntries(); for (Int_t i=0; i=nmasterjobs) {iscalled = kFALSE; return kTRUE;} diff --git a/ANALYSIS/AliAnalysisAlien.h b/ANALYSIS/AliAnalysisAlien.h index 8b14f83ea43..759e70870fa 100644 --- a/ANALYSIS/AliAnalysisAlien.h +++ b/ANALYSIS/AliAnalysisAlien.h @@ -112,6 +112,18 @@ public: virtual void WriteProductionFile(const char *filename) const; virtual void WriteValidationScript(Bool_t merge=kFALSE); +// PROOF mode + virtual void SetProofCluster(const char *cluster) {fProofCluster = cluster;} + virtual void SetProofDataSet(const char *dataset) {fProofDataSet = dataset;} + virtual const char *GetProofDataSet() const {return fProofDataSet.Data();} + virtual void SetProofReset(Int_t mode) {fProofReset = mode;} + virtual void SetNproofWorkers(Int_t nworkers) {fNproofWorkers = nworkers;} + virtual void SetRootVersionForProof(const char *version) {fRootVersionForProof = version;} + virtual void SetAliRootMode(const char *mode) {fAliRootMode = mode;} + // .txt file containing the list of files to be chained in test mode + virtual void SetFileForTestMode(const char *filename) {fFileForTestMode = filename;} + virtual TChain *GetChainForTestMode(const char *treeName) const; + protected: void CdWork(); Bool_t CheckInputData(); @@ -143,6 +155,8 @@ private: Int_t fFastReadOption; // Use xrootd tweaks to reduce timeouts in file access Int_t fOverwriteMode; // Overwrite existing files if any Int_t fNreplicas; // Number of replicas for the output files + Int_t fNproofWorkers; // Number of workers in proof mode + Int_t fProofReset; // Proof reset mode: 0=no reset, 1=soft, 2=hard TString fRunNumbers; // List of runs to be processed TString fExecutable; // Executable script for AliEn job TString fExecutableCommand; // Command(s) to be executed in the executable script @@ -174,9 +188,14 @@ private: TString fJobTag; // Job tag TString fOutputSingle; // Directory name for the output when split is per file TString fRunPrefix; // Run prefix to be applied to run numbers + TString fProofCluster; // Proof cluster name + TString fProofDataSet; // Proof dataset to be used + TString fFileForTestMode; // .txt file for the chain to be used in PROOF test mode + TString fRootVersionForProof; // ROOT version to be used in PROOF mode. The default one taken if empty. + TString fAliRootMode; // AliRoot mode among the list supported by the proof cluster TObjArray *fInputFiles; // List of input files to be processed by the job TObjArray *fPackages; // List of packages to be used - ClassDef(AliAnalysisAlien, 13) // Class providing some AliEn utilities + ClassDef(AliAnalysisAlien, 14) // Class providing some AliEn utilities }; #endif diff --git a/ANALYSIS/AliAnalysisGrid.h b/ANALYSIS/AliAnalysisGrid.h index 60bd6f1ee13..56120b33898 100644 --- a/ANALYSIS/AliAnalysisGrid.h +++ b/ANALYSIS/AliAnalysisGrid.h @@ -14,6 +14,8 @@ #include #endif +class TChain; + class AliAnalysisGrid : public TNamed { public: @@ -35,7 +37,9 @@ enum EPluginBits { kBitMask32 = 0xffffffff, kUseCopy = BIT(0), kCheckCopy = BIT(1), - kKeepLogs = BIT(2) + kKeepLogs = BIT(2), + kClearPackages = BIT(3), + kUseSubmitPolicy = BIT(4) }; AliAnalysisGrid() : TNamed(), fSpecialBits(0) {} @@ -107,6 +111,21 @@ enum EPluginBits { void SetCheckCopy(Bool_t flag=kTRUE) {SetSpecialBit(kCheckCopy,flag);} Bool_t IsKeepLogs() const {return TestSpecialBit(kKeepLogs);} void SetKeepLogs(Bool_t flag=kTRUE) {SetSpecialBit(kKeepLogs,flag);} + Bool_t IsUseSubmitPolicy() const {return TestSpecialBit(kUseSubmitPolicy);} + void SetUseSubmitPolicy(Bool_t flag=kTRUE) {SetSpecialBit(kUseSubmitPolicy,flag);} + +// PROOF mode + virtual void SetProofCluster(const char *cluster) = 0; + virtual void SetProofDataSet(const char *dataset) = 0; + virtual const char *GetProofDataSet() const = 0; + virtual void SetProofReset(Int_t mode) = 0; + virtual void SetClearPackages(Bool_t flag=kTRUE) {SetSpecialBit(kClearPackages,flag);} + virtual void SetNproofWorkers(Int_t nworkers) = 0; + virtual void SetRootVersionForProof(const char *version) = 0; + virtual void SetAliRootMode(const char *mode) = 0; + // .txt file containing the list of files to be chained in test mode + virtual void SetFileForTestMode(const char *filename) = 0; + virtual TChain *GetChainForTestMode(const char *treeName) const = 0; protected: // Methods diff --git a/ANALYSIS/AliAnalysisManager.cxx b/ANALYSIS/AliAnalysisManager.cxx index 8333011e323..ddc090c1b1a 100644 --- a/ANALYSIS/AliAnalysisManager.cxx +++ b/ANALYSIS/AliAnalysisManager.cxx @@ -607,8 +607,8 @@ void AliAnalysisManager::PackOutput(TList *target) TString remote = fSpecialOutputLocation; remote += "/"; Int_t gid = gROOT->ProcessLine("gProofServ->GetGroupId();"); - if (remote.BeginsWith("alien://")) { - gROOT->ProcessLine("TGrid::Connect(\"alien://pcapiserv01.cern.ch:10000\", gProofServ->GetUser());"); + if (remote.BeginsWith("alien:")) { + gROOT->ProcessLine("TGrid::Connect(\"alien:\", gProofServ->GetUser());"); remote += outFilename; remote.ReplaceAll(".root", Form("_%d.root", gid)); } else { @@ -1455,6 +1455,10 @@ Long64_t AliAnalysisManager::StartAnalysis(const char *type, TTree * const tree, break; case kProofAnalysis: fIsRemote = kTRUE; + // Check if the plugin is used + if (fGridHandler) { + return StartAnalysis(type, fGridHandler->GetProofDataSet(), nentries, firstentry); + } if (!gROOT->GetListOfProofs() || !gROOT->GetListOfProofs()->GetEntries()) { Error("StartAnalysis", "No PROOF!!! Exiting."); cdir->cd(); @@ -1526,6 +1530,37 @@ Long64_t AliAnalysisManager::StartAnalysis(const char *type, const char *dataset // Set the dataset flag TObject::SetBit(kUseDataSet); fTree = 0; + TChain *chain = 0; + if (fGridHandler) { + // Start proof analysis using the grid handler + if (!fGridHandler->StartAnalysis(nentries, firstentry)) { + Error("StartAnalysis", "The grid plugin could not start PROOF analysis"); + return -1; + } + // Check if the plugin is in test mode + if (fGridHandler->GetRunMode() == AliAnalysisGrid::kTest) { + // Get the chain to be used for test mode + TString dataType = "esdTree"; + if (fInputEventHandler) { + dataType = fInputEventHandler->GetDataType(); + dataType.ToLower(); + dataType += "Tree"; + } + chain = fGridHandler->GetChainForTestMode(dataType); + if (!chain) { + Error("StartAnalysis", "No chain for test mode. Aborting."); + return -1; + } + fTree = chain; + } else { + dataset = fGridHandler->GetProofDataSet(); + } + } + + if (!gROOT->GetListOfProofs() || !gROOT->GetListOfProofs()->GetEntries()) { + Error("StartAnalysis", "No PROOF!!! Exiting."); + return -1; + } // Initialize locally all tasks TIter next(fTasks); @@ -1534,21 +1569,19 @@ Long64_t AliAnalysisManager::StartAnalysis(const char *type, const char *dataset task->LocalInit(); } - if (!gROOT->GetListOfProofs() || !gROOT->GetListOfProofs()->GetEntries()) { - Error("StartAnalysis", "No PROOF!!! Exiting."); - return -1; - } line = Form("gProof->AddInput((TObject*)0x%lx);", (ULong_t)this); gROOT->ProcessLine(line); -// sprintf(line, "gProof->GetDataSet(\"%s\");", dataset); -// if (!gROOT->ProcessLine(line)) { -// Error("StartAnalysis", "Dataset %s not found", dataset); -// return -1; -// } - line = Form("gProof->Process(\"%s\", \"AliAnalysisSelector\", \"\", %lld, %lld);", - dataset, nentries, firstentry); - cout << "===== RUNNING PROOF ANALYSIS " << GetName() << " ON DATASET " << dataset << endl; - Long_t retv = (Long_t)gROOT->ProcessLine(line); + Long_t retv; + if (chain) { +// chain->SetProof(); + cout << "===== RUNNING PROOF ANALYSIS " << GetName() << " ON TEST CHAIN " << chain->GetName() << endl; + retv = chain->Process("AliAnalysisSelector", "", nentries, firstentry); + } else { + line = Form("gProof->Process(\"%s\", \"AliAnalysisSelector\", \"\", %lld, %lld);", + dataset, nentries, firstentry); + cout << "===== RUNNING PROOF ANALYSIS " << GetName() << " ON DATASET " << dataset << endl; + retv = (Long_t)gROOT->ProcessLine(line); + } return retv; } -- 2.43.0