3 * @author Christian Holm Christensen <cholm@master.hehi.nbi.dk>
4 * @date Tue Oct 16 18:58:37 2012
9 * @ingroup pwglf_forward_trains_helper
16 # include "OutputUtilities.C"
17 # include "ParUtilities.C"
18 # include "ChainBuilder.C"
22 # include <TProofLog.h>
23 # include <TProofDebug.h>
24 # include <AliAnalysisManager.h>
37 // ===================================================================
39 * Handle analysis on a Proof farm.
41 * This helper is triggered by URIs of the form
44 * proof://[<user>@]<host>[:<port>]/<dsname>[?<options>][#<treename>]
48 * <dt><user></dt>
49 * <dd>Optional user name</dd>
50 * <dt><host></dt>
51 * <dd>PROOF cluster master host</dd>
52 * <dt><port></dt>
53 * <dd>Optional PROOF cluster port on master host</dd>
54 * <dt><dsname></dt>
55 * <dd>Data set name</dd>
56 * <dt><treename></dt>
57 * <dd>Optional tree name in data set, often <tt>esdTree</tt> or
58 * <tt>aodTree</tt></dd>
59 * <dt><options></dt>
60 * <dd>List of options separated by an &
62 * <dt><tt>workers=N[x]</tt></dt>
63 * <dd>Set the number of workers to use. If <tt>x</tt> is appended,
64 * then it's maximum number of workers per slave</dd>
65 * <dt><tt>dsname</tt>[=<output dataset>]</dt>
66 * <dd>Register tree output (e.g., AOD) as a new data set on the
67 * PROOF cluster. If <output dataset> is not specified, take
68 * the name of the train.</dd>
69 * <dt><tt>par[=all]</tt></dt>
70 * <dd>Use PAR files. If the value <tt>all</tt> is given, then also
71 * PAR files of STEERBase, ESD, AOD, ANALYSIS, OADB, ANALYSISalice
73 * <dt><tt>mode=[default,rec,sim,train,custom]</tt></dt>
74 * <dd>Set the AliROOT mode. If not specified <tt>default</tt>
75 * is assumed. See also CreateAliROOTPar</dd>
76 * <dt><tt>storage=<url></tt></dt>
77 * <dd>Specify a non-default storage location for special output
78 * (e.g., AOD trees). <url> should be a valid XRootd
79 * server URI accessible to the slaves - e.g.,
80 * <tt>root://lxplus.cern.ch:10930//tmp</tt>.</dd>
85 * @ingroup pwglf_forward_trains_helper
87 struct ProofHelper : public Helper
93 * @param verbose Verbosity level
95 ProofHelper(const TUrl& url, Int_t verbose)
96 : Helper(url, verbose),
104 fOptions.Add("workers", "N[x]", "Number of workers to use", 0);
105 fOptions.Add("dsname", "NAME", "Make output dataset", "");
106 fOptions.Add("par", "tasks|all", "Use par files", "tasks");
107 fOptions.Add("mode", "default|rec|sim", "AliROOT mode", "default");
108 fOptions.Add("storage", "URL", "Location for external storage", "");
109 fOptions.Add("wrapper", "CMD", "Wrapper command", "");
110 fOptions.Add("clear", "PKGS", "Clear packages ','-separated", "");
111 fOptions.Add("reset", "soft|hard", "Reset cluster", "hard");
112 if (!fUrl.GetUser() || fUrl.GetUser()[0] == '\0')
113 fUrl.SetUser(gSystem->GetUserInfo()->fUser);
114 fAuxFiles.SetOwner();
116 ProofHelper(const ProofHelper& o)
125 ProofHelper& operator=(const ProofHelper& o)
127 if (&o == this) return *this;
128 Helper::operator=(o);
129 fExtraLibs = o.fExtraLibs;
130 fExtraPars = o.fExtraPars;
131 fExtraSrcs = o.fExtraSrcs;
132 fUsePars = o.fUsePars;
133 fBasePars = o.fBasePars;
140 virtual ~ProofHelper() {}
142 * Load a library/PAR/script
145 * @param slaves If true, also load on slaves
147 * @return true on success
149 virtual Bool_t LoadLibrary(const TString& name,
153 Int_t ret = gSystem->Load(MakeLibraryName(name));
154 if (ret < 0) return false;
155 if (slaves) fExtraLibs.Append(Form(":%s", name.Data()));
158 if (!ParUtilities::Find(name)) {
159 Error("ProofHelper::LoadLibrary", "Failed to find PAR file %s",
163 if (!ParUtilities::Build(name)) {
164 Error("ProofHelper::LoadLibrary", "Failed to build PAR file %s",
168 if (gProof->UploadPackage(name.Data(), TProof::kRemoveOld) < 0) {
169 Error("ProofHelper::LoadLibrary", "Failed to upload PAR file %s",
173 fExtraPars.Append(Form(":%s", name.Data()));
178 * Load a source file, and compile it
180 * @param name Name of the source file
181 * @param copy If true, copy not link
183 * @return true on success
185 virtual Bool_t LoadSource(const TString& name, bool copy=false)
187 if (!Helper::LoadSource(name, copy)) return false;
188 fExtraSrcs.Append(Form(":%s", gSystem->BaseName(name.Data())));
192 * Set-up to load the AliROOT libraries
194 * @return true on success
196 virtual Bool_t LoadAliROOT()
198 if (!gSystem->Getenv("ALICE_ROOT")) {
199 Error("ProofHelper::LoadAliROOT", "Local AliROOT not available");
203 Bool_t tmp = fUsePars;
204 fUsePars = fBasePars;
205 if (!LoadLibrary("STEERBase")) return false;
206 if (!LoadLibrary("ESD")) return false;
207 if (!LoadLibrary("AOD")) return false;
208 if (!LoadLibrary("ANALYSIS")) return false;
209 if (!LoadLibrary("OADB")) return false;
210 if (!LoadLibrary("ANALYSISalice")) return false;
213 return CreateAliROOTPar();
216 * Get the name of the AliROOT par file to use
220 virtual const char* AliROOTParName() const
225 * Create an AliROOT par file from the executing AliROOT. This PAR
226 * file basically uses the environment of the client - that is, we
227 * assume that the used AliROOT is accessible on the slaves - e.g.,
230 * Note, the SETUP.C script take one argument - a TList of TNamed
231 * parameters. Parameters processed are
233 * - ALIROOT_MODE=[default,aliroot,rec,sim,train]
234 * - default: Load base analysis libraries
235 * - aliroot: Load $ALICE_ROOT/macros/loadlibs.C
236 * - rec: Load $ALICE_ROOT/macros/loadlibsrec.C
237 * - sim: Load $ALICE_ROOT/macros/loadlibssim.C
238 * - ALIROOT_EXTRA_LIBS Colon separated list of additional (Ali)ROOT
239 * libraries to load on the slaves.
241 * The generated PAR file is uploaded but not enabled until we have
242 * populated fExtraLibs. The enabling takes place at the end of the
245 * @return true on success, false otherwise. */
246 virtual Bool_t CreateAliROOTPar()
248 if (fBasePars) return true;
250 TString parName(AliROOTParName());
251 TString parFile(Form("%s.par", parName.Data()));
253 // --- Check if we have the drirectory already -------------------
254 if (gSystem->AccessPathName(parName.Data()) == 0) {
255 // Let's remove it to get a clean slate
256 if (gSystem->Exec(Form("rm -rf %s", parName.Data())) != 0) {
257 Error("ProofHelper", "Failed to remove %s", parName.Data());
261 // --- Check if the PAR file is there, and remove it if so -------
262 if (gSystem->AccessPathName(parFile.Data()) == 0) {
263 if (gSystem->Unlink(parFile.Data()) != 0) {
264 Error("ProofHelper::CreateAliROOTPar", "Failed to remove %s",
271 // Set-up directories
272 if (gSystem->MakeDirectory(parName) < 0) {
273 Error("ProofHelper::CreateAliROOTPar", "Could not make directory '%s'",
278 if (gSystem->MakeDirectory(Form("%s/PROOF-INF", parName.Data()))) {
279 Error("ProofHelper::CreateAliROOTPar",
280 "Could not make directory %s/PROOF-INF",
285 std::ofstream b(Form("%s/PROOF-INF/BUILD.sh",parName.Data()));
287 Error("ProofHelper::CreateAliROOTPar",
288 "Failed to make BUILD.sh shell script");
292 << "# echo Nothing to do\n"
296 gSystem->Exec(Form("chmod a+x %s/PROOF-INF/BUILD.sh",parName.Data()));
298 std::ofstream s(Form("%s/PROOF-INF/SETUP.C", parName.Data()));
300 Error("ProofHelper::CreateAliROOTPar",
301 "Failed to make SETUP.C ROOT script");
304 s << "void SETUP(TList* opts) {\n"
305 << " gSystem->Setenv(\"ALICE\",\""
306 << gSystem->Getenv("ALICE") << "\");\n"
307 << " gSystem->Setenv(\"ALICE_ROOT\",\""
308 << gSystem->Getenv("ALICE_ROOT") << "\");\n"
309 << " gSystem->Setenv(\"ALICE_TARGET\",\""
310 << gSystem->Getenv("ALICE_TARGET") << "\");\n"
311 << " gSystem->AddDynamicPath("
312 << "\"$(ALICE_ROOT)/lib/tgt_$(ALICE_TARGET)\");\n";
313 if (gSystem->Getenv("OADB_PATH"))
314 s << " gSystem->Setenv(\"OADB_PATH\",\""
315 << gSystem->Getenv("OADB_PATH") << "\");\n";
317 << " // Info(\"SETUP\",\"Loading ROOT libraries\");\n"
318 << " gSystem->Load(\"libTree\");\n"
319 << " gSystem->Load(\"libGeom\");\n"
320 << " gSystem->Load(\"libVMC\");\n"
321 << " gSystem->Load(\"libPhysics\");\n"
322 << " gSystem->Load(\"libMinuit\");\n"
324 s << " // Info(\"SETUP\",\"Parameter list:\");\n"
325 << " if (!opts) return;\n"
326 << " //opts->ls();\n"
328 s << " TObject* par = opts->FindObject(\"ALIROOT_MODE\");\n"
330 << " // Info(\"SETUP\",\"ALIROOT mode: %s\", par->GetTitle());\n"
331 << " TString mode(par->GetTitle());\n"
332 << " if (mode.EqualTo(\"default\",TString::kIgnoreCase)) {\n"
333 << " gSystem->Load(\"libSTEERBase\");\n"
334 << " gSystem->Load(\"libESD\");\n"
335 << " gSystem->Load(\"libAOD\");\n"
336 << " gSystem->Load(\"libANALYSIS\");\n"
337 << " gSystem->Load(\"libOADB\");\n"
338 << " gSystem->Load(\"libANALYSISalice\");\n"
340 << " else if (mode.EqualTo(\"aliroot\",TString::kIgnoreCase)) \n"
341 << " gROOT->Macro(\"$ALICE_ROOT/macros/loadlibs.C\");\n"
342 << " else if (mode.EqualTo(\"rec\",TString::kIgnoreCase)) \n"
343 << " gROOT->Macro(\"$ALICE_ROOT/macros/loadlibsrec.C\");\n"
344 << " else if (mode.EqualTo(\"sim\",TString::kIgnoreCase)) \n"
345 << " gROOT->Macro(\"$ALICE_ROOT/macros/loadlibssim.C\");\n"
346 << " else if (mode.EqualTo(\"train\",TString::kIgnoreCase)) \n"
347 << " gROOT->Macro(\"$ALICE_ROOT/macros/loadlibstrain.C\");\n"
348 << " else if (mode.EqualTo(\"custom\",TString::kIgnoreCase)) \n"
349 << " gROOT->Macro(\"$ALICE_ROOT/macros/loadlibstrain.C\");\n"
352 s << " par = opts->FindObject(\"ALIROOT_EXTRA_LIBS\");\n"
354 << " Info(\"SETUP\",\"Libaries to load: %s\n\",par->GetTitle());\n"
355 << " TString tit(par->GetTitle());\n"
356 << " TObjArray* tokens = tit.Tokenize(\":\");\n"
357 << " TObject* lib = 0;\n"
358 << " TIter next(tokens);\n"
359 << " while ((lib = next())) {\n"
360 << " TString libName(lib->GetName());\n"
361 << " if (!libName.BeginsWith(\"lib\")) libName.Prepend(\"lib\");\n"
362 << " // Info(\"SETUP\",\"Loading %s ...\",libName.Data());\n"
363 << " gSystem->Load(Form(\"lib%s\",lib->GetName()));\n"
370 Int_t ret = gSystem->Exec(Form("tar -czf %s %s",
371 parFile.Data(), parName.Data()));
373 Error("ProofHelper::CreateAliROOTPar", "Failed to pack up PAR file %s",
378 ret = gProof->UploadPackage(parFile.Data(),TProof::kRemoveOld);
380 Error("ProofHelper::CreateAliROOTPar",
381 "Failed to upload the AliROOT PAR file");
384 // Note, the PAR isn't enabled until much later when we've
385 // collected all the needed libraries in fExtraLibs
389 * Get the mode identifier
391 * @return Always kProof
393 virtual UShort_t Mode() const { return kProof; }
395 * Get the mode string used for AliAnalysisManager::StartAnalysis
397 virtual const char* ModeString() const { return "proof"; }
399 * Set-up done before task set-ups
401 * @return true on success
403 virtual Bool_t PreSetup()
405 // --- Set prefered GSI method ---------------------------------
406 gEnv->SetValue("XSec.GSI.DelegProxy", "2");
408 // --- Add ALICE_ROOT directory to search path for packages ----
409 // Info("ProofHelper::PreSetup", "Set location of packages");
410 gEnv->SetValue("Proof.GlobalPackageDirs",
412 gEnv->GetValue("Proof.GlobalPackageDirs", "."),
413 gSystem->Getenv("ALICE_ROOT")));
415 // --- Forming the URI we use to connect with --------------------
417 connect.SetAnchor("");
419 connect.SetOptions("");
421 // --- Check if we need to reset first ---------------------------
422 if (fOptions.Has("reset")) {
423 TString reset = fOptions.Get("reset");
424 Bool_t hard = (reset.IsNull() ||
425 reset.EqualTo("hard", TString::kIgnoreCase));
426 Info("ProofHelper::PreSetup", "Doing a %s reset of %s",
427 hard ? "hard" : "soft", connect.GetUrl());
428 TProof::Reset(connect.GetUrl(), hard);
430 Info("ProofHelper::PreSetup",
431 "Waiting for %d second%s for things to settle", secs,
432 secs > 1 ? "s" : "");
433 gSystem->Sleep(1000*secs);
436 // --- Check if we're using a wrapper ----------------------------
437 if (fOptions.Has("wrapper")) {
438 TString wrapper = fOptions.Get("wrapper");
439 if (wrapper.IsNull())
440 // In case of no argument, use GDB
441 // Just run and backtrace
442 wrapper = "/usr/bin/gdb --batch -ex run -ex bt --args";
443 Info("ProofHelper::PreSetup", "Using wrapper command: %s",
445 TProof::AddEnvVar("PROOF_WRAPPERCMD", wrapper);
448 // --- PAR parameters --------------------------------------------
449 fUsePars = fOptions.Has("par");
450 fBasePars = (fUsePars &&
451 fOptions.Get("par").EqualTo("all",TString::kIgnoreCase));
453 // --- Connect to the cluster ------------------------------------
455 if (fOptions.Has("workers"))
456 opts.Append(Form("workers=%s", fOptions.Get("workers").Data()));
458 Info("ProofHelper::PreSetup", "Connecting to %s with %soptions %s",
460 opts.IsNull() ? "no " : "",
462 TString proto(connect.GetProtocol());
463 if (proto.BeginsWith("lite") && fOptions.Has("workers"))
466 TProof::Open(connect.GetUrl(), opts);
467 // TProof::Open(connect.GetHost(), opts);
469 Error("ProofHelper::PreSetup", "Failed to open Proof connection %s",
474 // --- Check if we need to clear packages ------------------------
475 if (fOptions.Has("clear")) {
476 TString pkgs = fOptions.Get("clear");
477 if (pkgs.IsNull() || pkgs.EqualTo("all", TString::kIgnoreCase)) {
478 // No value given, clear all
479 if (gProof->ClearPackages() != 0)
480 Warning("ProofHelper::PreSetup", "Failed to lear all packages");
483 // Tokenize on ',' and clear each package
484 TObjArray* pars = pkgs.Tokenize(",");
487 while ((pkg = next())) {
488 if (gProof->ClearPackage(pkg->GetName()) != 0)
489 Warning("ProofHelper::PreSetup", "Failed to clear package %s",
498 * Set-up done after the task set-ups
500 * @return true on success
502 virtual Bool_t PostSetup()
504 AliAnalysisManager* mgr = AliAnalysisManager::GetAnalysisManager();
506 Error("ProofHelper::PostSetup", "No analysis manager defined");
510 // --- Check for output ------------------------------------------
511 if (fOptions.Has("dsname"))
512 OutputUtilities::RegisterDataset(fOptions.Get("dsname"));
513 if (fOptions.Has("storage"))
514 OutputUtilities::RegisterStorage(fOptions.Get("storage"));
516 // --- If we are not using PARs for Base, enable special PAR -----
518 TString tmp(fExtraLibs.Strip(TString::kBoth,':'));
519 TList* params = new TList;
520 params->SetOwner(true);
521 params->Add(new TNamed("ALIROOT_EXTRA_LIBS", tmp.Data()));
522 if (fOptions.Has("mode"))
523 params->Add(new TNamed("ALIROOT_MODE", fOptions.Get("mode").Data()));
525 params->Add(new TNamed("ALIROOT_MODE", "default"));
526 Int_t ret = gProof->EnablePackage(AliROOTParName(), params, true);
528 Error("ProofHelper::EnableAliROOT", "Failed to enable AliROOT PAR %s",
534 // --- Make PAR file of Aux Files --------------------------------
535 if (fAuxFiles.GetEntries() > 0) {
536 TString name = TString::Format("%s_auxfiles", mgr->GetName());
537 ParUtilities::MakeAuxFilePAR(fAuxFiles, name);
539 if (gProof->UploadPackage(name.Data(), TProof::kRemoveOld) < 0)
540 Error("ProofHelper::PostSetup", "Failed to upload PAR file %s",
543 fExtraPars.Append(Form(":%s", name.Data()));
546 // --- Load par files --------------------------------------------
547 TString tmp = fExtraPars.Strip(TString::kBoth,':');
548 TObjArray* pars = tmp.Tokenize(":");
551 while ((obj = next())) {
552 // Enable the package, but do not build on client - already done
553 Int_t ret = gProof->EnablePackage(obj->GetName(), true);
555 Error("ProofHelper::PostSetup", "Failed to enable PAR %s",
561 // --- Load extra sources ----------------------------------------
562 TString tmp2 = fExtraSrcs.Strip(TString::kBoth, ':');
563 TObjArray* srcs = tmp2.Tokenize(":");
565 while ((obj = next())) {
566 Int_t ret = gProof->Load(Form("%s++g", obj->GetName()), true);
568 Error("ProofHelper::PostSetup", "Failed to compile %s", obj->GetName());
577 * @param nEvents Number of events to analyse
579 * @return The return value of AliAnalysisManager::StartAnalysis
581 virtual Long64_t Run(Long64_t nEvents=-1)
583 AliAnalysisManager* mgr = AliAnalysisManager::GetAnalysisManager();
584 gProof->SetLogLevel(TMath::Max(fVerbose-2,0),
585 /* TProofDebug::kPacketizer| */
587 /* TProofDebug::kSelector|
588 TProofDebug::kOutput|
590 TProofDebug::kGlobal|*/
591 TProofDebug::kPackage);
592 TString dsName(fUrl.GetFile());
593 // if (fUrl.GetAnchor() && fUrl.GetAnchor()[0] != '\0')
594 // dsName.Append(Form("#%s", fUrl.GetAnchor()));
595 Long64_t ret = mgr->StartAnalysis(fUrl.GetProtocol(), dsName, nEvents);
598 TProof::Mgr(fUrl.GetUrl())->GetSessionLogs()->Save("*","proof.log");
602 * Print information to standard output
606 virtual void Print(Option_t* option="") const
608 Helper::Print(option);
609 std::cout << std::boolalpha
610 << " --- Other settings -------\n"
611 << " Extra libraries : " << fExtraLibs << "\n"
612 << " Extra PARs : " << fExtraPars << "\n"
613 << " Extra sources : " << fExtraSrcs << "\n"
614 << " Use PARs of tasks: " << fUsePars << "\n"
615 << " Use PARs of base : " << fBasePars
616 << std::noboolalpha << std::endl;
619 * Link an auxilary file to working directory
621 * @param name Name of the file
622 * @param copy Copy rather than link
624 * @return true on success
626 virtual Bool_t AuxFile(const TString& name, bool copy=false)
628 Bool_t ret = Helper::AuxFile(name, copy);
629 if (!name.BeginsWith("/")) {
630 fAuxFiles.Add(new TObjString(name));
633 if (ret && name.EndsWith(".root")) {
634 TFile* file = TFile::Open(name, "READ");
636 Info("AuxFile", "Adding input file %s", name.Data());
637 gProof->AddInputData(file, true);
643 Int_t SendFile(const TString& fileName)
645 Int_t bufSize = 32768;
648 Long_t id = 0, flags = 0, modtime = 0;
649 if (gSystem->GetPathInfo(fileName.Data(), &id, &size, &flags, &modtime)==1
651 Error("SendFile", "Cannot stat %s", fileName.Data());
654 TString fn(gSystem->BaseName(fileName.Data()));
655 TList* slaves = 0; // gProof->GetListOfActiveSlaves(); - protected
659 Int_t fd = open(fileName.Data(), O_RDONLY);
660 while ((sl = static_cast<TSlave*>(next()))) {
661 if (!sl->IsValid()) continue;
662 if (sl->GetSlaveType() != TSlave::kSlave) continue;
664 // Always binary (first 1), never forward (last 0).
665 snprintf(buf,bufSize,"%s %d %lld %d", fn.Data(), 1, size, 0);
666 if (sl->GetSocket()->Send(buf, kPROOF_SENDFILE) == -1) {
667 Warning("SendFile", "Could not send kPROOF_SENDFILE request");
671 // Go to the beginning of the file
672 lseek(fd, 0, SEEK_SET);
675 while ((len = read(fd, buf, bufSize)) < 0 &&
676 TSystem::GetErrno() == EINTR)
677 TSystem::ResetErrno();
679 Error("SendFile", "error reading input");
683 if (len > 0 && sl->GetSocket()->SendRaw(buf, len) == -1) {
684 Error("SendFile", "error writing to slave");
691 // Wait for slave - private
692 // if (sl) gProof->Collect(sl,gEnv->GetValue("Proof.CollectTimeout",-1));
703 * @return Path to output - possibly a data set
705 virtual TString OutputPath() const
708 if (fOptions.Has("dsname")) {
709 ret = Form("/%s/%s/", gProof->GetGroup(), gProof->GetUser());
710 ret.Append(OutputUtilities::RegisteredDataset());
715 * @return URL help string
717 virtual const Char_t* UrlHelp() const
719 return "proof://<host>[:<port>]/[<dataset>|<path>][?<options>][#<treeName>]";
722 * @return Short description
724 virtual const char* Desc() const { return "PROOF"; }