3 * @author Christian Holm Christensen <cholm@master.hehi.nbi.dk>
4 * @date Tue Oct 16 19:01:27 2012
6 * @brief Grid Analysis Railway
8 * @ingroup pwglf_forward_trains_helper
13 #include "PluginRailway.C"
18 # include <AliAnalysisManager.h>
19 # include <AliAnalysisAlien.h>
22 class AliAnalysisAlien;
25 // ===================================================================
27 * Handle analysis on an the Grid
29 * This helper is triggered by a URL of the form
32 * alien:///<directory>[?<options>][#<pattern>]
36 * <dt><directory></dt>
37 * <dd>Grid directory that holds the data</dd>
38 * <dt><treeName></dt>
39 * <dd>Tree to loop over</dd>
40 * <dt><options></dt>
41 * <dd>List of options separated by an &
43 * <dt><tt>storage=<url></tt></dt>
44 * <dd>Specify a non-default storage location for special output
45 * (e.g., AOD trees). <url> should be a valid XRootd
46 * server URI accessible to the slaves - e.g.,
47 * <tt>root://lxplus.cern.ch:10930//tmp</tt>.</dd>
48 * <dt><tt>mode=[default,rec,sim,train,custom]</tt></dt>
49 * <dd>Set the AliROOT mode. If not specified <tt>default</tt>
50 * is assumed. See also CreateAliROOTPar</dd>
51 * <dt><tt>par</tt></dt>
52 * <dd> Use PAR files</dd>
53 * <dt><tt>runs=[list or file]</tt></dt>
54 * <dd>Comma separated list of run numbers, or file(s) containing
56 * <dt><tt>oper=[FULL,TERMINATE,SUBMIT,OFFLINE,TEST]</tt></dt>
57 * <dd>How to run the analysis</dd>
58 * <dt><tt>split=<N></tt></dt>
59 * <dd>Maximum number of files per split</dd>
60 * <dt><tt>merge=<N></tt></dt>
61 * <dd>Maximum number of files per merger</dd>
62 * <dt><tt>mc</tt></dt>
63 * <dd>Scan also for MC files (<tt>galice.root</tt>,
64 * <tt>Kinematics.root</tt>, and <tt>TrackRefs.root</tt>) when
65 * scanning <datadir></dd>
66 * <dt><tt>pattern=<GLOB></tt></dt>
67 * <dd>Shell glob pattern that files must check when scanning
68 * <datadir></dd>
73 * @ingroup pwglf_forward_trains_helper
75 struct GridRailway : public PluginRailway
81 * @param verbose Verbosity level
83 GridRailway(const TUrl& url, Int_t verbose)
84 : PluginRailway(url, verbose), fRuns()
86 // Note, split, merge, and ttl are by default set to values
87 // optimized for AOD production on real PbPb data.
89 // TTL shouldn't be much smaller than 4h10m. Split and merge
90 // shouldn't be much larger than 75, but probably not smaller than
92 fOptions.Add("oper", "FULL|TERMINATE|SUBMIT", "Analysis operation", "FULL");
93 fOptions.Add("split", "N|max", "Max number of files before split","50");
94 fOptions.Add("merge", "N|max", "Max number of files for merge", "50");
95 fOptions.Add("run", "RUNS", "Range, list, and/or file of runs", "");
96 fOptions.Add("alien", "VERSION","Alien API version", "V1.1x");
97 fOptions.Add("ttl", "N|max", "Time to live", "6h");
98 fOptions.Add("pattern","GLOB", "File/directory name pattern", "");
99 fOptions.Add("concat", "Concatenate all runs");
100 fOptions.Add("exclude", "GLOB","Comma separated list of merge excludes","");
102 GridRailway(const GridRailway& o)
103 : PluginRailway(o), fRuns()
105 GridRailway& operator=(const GridRailway& o)
107 if (&o == this) return *this;
108 PluginRailway::operator=(o);
111 virtual ~GridRailway() {}
113 * Get the mode identifier
115 * @return Always kProof
117 virtual UShort_t Mode() const { return kGrid; }
119 * Get the mode string used for AliAnalysisManager::StartAnalysis
121 virtual const char* ModeString() const { return "grid"; }
123 * Set-up done before task set-ups
125 * @return true on success
127 virtual UShort_t Operation() const
129 if (!fOptions.Has("oper")) return kFull;
130 const TString& oper = fOptions.Get("oper");
131 if (oper.EqualTo("FULL", TString::kIgnoreCase)) return kFull;
132 else if (oper.EqualTo("OFFLINE", TString::kIgnoreCase)) return kOffline;
133 else if (oper.EqualTo("SUBMIT", TString::kIgnoreCase)) return kSubmit;
134 else if (oper.EqualTo("TERMINATE", TString::kIgnoreCase)) return kTerminate;
135 else if (oper.EqualTo("TEST", TString::kIgnoreCase)) return kTest;
138 void StoreRun(Int_t r)
140 TObject* o = new TObject;
147 * @return Number of registered runs
149 virtual Int_t RegisterRuns()
151 if (!fOptions.Find("run")) {
152 Error("GridRailway::RegisterRuns", "No runs specified");
156 TString runs = fOptions.Get("run");
157 TObjArray* tokens = runs.Tokenize(",+:");
158 TObjString* part = 0;
160 Bool_t range = false;
161 Bool_t individual = false;
162 // Info("GridRailway::RegisterRuns", "Runs specified are %s", runs.Data());
163 while ((part = static_cast<TObjString*>(next()))) {
164 TString& s = part->String();
165 if (s.Contains("-")) { // Run range
167 Warning("GridRailway::RegisterRuns", "Run range already specified, "
168 "ignoring %s", s.Data());
172 Warning("GridRailway::RegisterRuns",
173 "Run ranges and individual run specs do not mix, "
174 "ignoring %s", s.Data());
177 TObjArray* ranges = s.Tokenize("-");
178 if (ranges->GetEntriesFast() > 2) {
179 Warning("GridRailway::RegisterRuns", "Invalid run range: %s",
184 Int_t first = static_cast<TObjString*>(ranges->At(0))->String().Atoi();
185 Int_t last = static_cast<TObjString*>(ranges->At(1))->String().Atoi();
186 nRuns = last-first+1;
187 // Info("GridRailway::RegisterRuns", "Run range %d -> %d", first, last);
188 fHandler->SetRunRange(first, last);
191 for (Int_t r = first; r <= last; r++) StoreRun(r);
194 if (s.IsDigit()) { // single run
196 Warning("GridRailway::RegisterRuns",
197 "Run ranges and individual run specs do not mix, "
198 "ignoring %s", s.Data());
201 // Info("GridHandler::RegisterRuns", "Adding run %s", s.Data());
202 fHandler->AddRunNumber(s.Atoi());
209 Warning("GridRailway::RegisterRuns", "Run ranges and list file "
210 "do not mix, ignoring %s", s.Data());
214 // We assume this part is a file
215 // Info("GridRailway::RegisterRuns", "Reading runs from %s", s.Data());
216 std::ifstream in(s.Data());
221 Warning("GridRailway::RegisterRuns", "Failed to open %s", s.Data());
229 TString bare = lne.Strip(TString::kBoth);
230 if (bare[0] == '#') continue;
232 TObjArray* ltokens = bare.Tokenize(" \t,");
233 TIter lnext(ltokens);
235 while ((str = static_cast<TObjString*>(lnext()))) {
236 const TString& token = str->String();
237 if (!token.IsDigit()) continue;
239 int r = token.Atoi();
240 fHandler->AddRunNumber(r);
250 // Info("GridRailway::RegisterRuns", "Read %d, adding", r);
251 fHandler->AddRunNumber(r);
265 * Executed before setting up tasks
267 * @return true on success
269 virtual Bool_t PreSetup()
271 if (!PluginRailway::PreSetup()) return false;
273 // --- Add system library dir to load path -----------------------
274 gSystem->AddDynamicPath("/usr/lib");
276 // --- Open a connection to the grid -----------------------------
277 if (!TGrid::Connect(Form("%s://", fUrl.GetProtocol()))) {
278 Error("GridRailway::PreSetup", "Failed to connect to AliEN");
281 if (!gGrid || !gGrid->IsConnected()) {
282 Error("GridRailway::PreSetup", "Failed to connect to AliEN");
289 * Set-up done after the task set-ups
291 * @return true on success
293 virtual Bool_t PostSetup()
295 // Info("GridRailway::PostSetup", "Calling super.PostSetup");
296 if (!PluginRailway::PostSetup()) return false;
298 // --- API version -----------------------------------------------
299 fHandler->SetAPIVersion(fOptions.Get("alien"));
301 // --- Get the name ----------------------------------------------
302 // Info("GridRailway", "Proceeding with plugin setup");
303 AliAnalysisManager* mgr = AliAnalysisManager::GetAnalysisManager();
304 TString name(mgr->GetName());
306 // --- Set the operation to do (TEST, SUBMIT, TERMINATE, FULL) ---
307 TString operation("FULL");
308 if (fOptions.Has("oper")) operation = fOptions.Get("oper");
309 fHandler->SetRunMode(operation);
311 // --- Add the run numbers ---------------------------------------
312 fHandler->SetRunPrefix(fOptions.Has("mc") ? "%d" : "%09d");
313 Int_t nRun = RegisterRuns();
315 // --- Do not test copying ---------------------------------------
316 fHandler->SetCheckCopy(false);
318 // --- Set output to be per run ----------------------------------
319 fHandler->SetOutputToRunNo(true);
321 // --- Set the job tag -------------------------------------------
322 fHandler->SetJobTag(name);
324 // --- Set number of test files - used in test mode only ---------
325 fHandler->SetNtestFiles(1);
327 // --- Set the Time-To-Live --------------------------------------
328 if (fOptions.Has("ttl")) {
329 TString sttl = fOptions.Get("ttl");
330 if (!sttl.EqualTo("max")) {
332 if (sttl.IsDigit()) ttl = sttl.Atoi();
334 // Parse string of the form <DAYS>d<HOURS>h<MINUTES>m<SECONDS>s
335 Int_t id = sttl.Index("d", 0);
336 if (id == kNPOS) id = -1;
338 TString sdays(sttl(0,id));
339 ttl += 24 * 60 * 60 * sdays.Atoi();
341 Int_t ih = sttl.Index("h", id+1);
342 if (ih == kNPOS) ih = id;
344 TString shour(sttl(id+1,ih-id-1));
345 ttl += 60 * 60 * shour.Atoi();
347 Int_t im = sttl.Index("m", ih+1);
348 if (im == kNPOS) im = ih;
350 TString smin(sttl(ih+1, im-ih-1));
351 ttl += 60 * smin.Atoi();
353 Int_t is = sttl.Index("s", im+1);
355 TString ssec(sttl(im+1, is-im-1));
359 if (ttl != 0) fHandler->SetTTL(ttl);
361 Warning("", "Option ttl given but no value found");
365 // --- Re-submit failed jobs as long as the ratio of failed jobs -
366 // --- is this percentage.
367 fHandler->SetMasterResubmitThreshold(95);
369 // --- Set the input format --------------------------------------
370 fHandler->SetInputFormat("xml-single");
372 // --- Set names of generated files ------------------------------
373 fHandler->SetAnalysisMacro(Form("%s.C", name.Data()));
374 fHandler->SetJDLName(Form("%s.jdl", name.Data()));
375 fHandler->SetExecutable(Form("%s.sh", name.Data()));
377 // ---- Set the job price !? -------------------------------------
378 fHandler->SetPrice(1);
380 // --- Set whether to merge via JDL ------------------------------
381 fHandler->SetMergeViaJDL(true);
383 // --- Fast read otion -------------------------------------------
384 fHandler->SetFastReadOption(false);
386 // --- Whether to overwrite existing output ----------------------
387 fHandler->SetOverwriteMode(true);
389 // --- Set the executable binary name and options ----------------
390 fHandler->SetExecutableCommand("aliroot -b -q -x");
392 // --- Split by storage element - must be lower case! ------------
393 fHandler->SetSplitMode("se");
395 // --- How much to split -----------------------------------------
396 if (fOptions.Has("split")) {
397 if (!fOptions.Get("split").EqualTo("max")) {
398 fHandler->SetSplitMaxInputFileNumber(fOptions.AsInt("split"));
401 // --- Merge parameters ------------------------------------------
402 if (fOptions.Has("merge")) {
403 if (!fOptions.Get("merge").EqualTo("max")) {
404 fHandler->SetMaxMergeFiles(fOptions.AsInt("merge"));
407 TString exclude="AliAOD.root *EventStat*.root *event_stat*.root";
408 if (fOptions.Has("exclude")) {
409 TString exOpt = fOptions.Get("exclude");
410 exOpt.ReplaceAll(",", " ");
412 exclude.Append(exOpt);
414 fHandler->SetMergeExcludes(exclude);
416 // --- Set number of runs per master - 1 or all ------------------
417 fHandler->SetNrunsPerMaster(fOptions.Has("concat") ? nRun+1 : 1);
420 // --- Enable default outputs ------------------------------------
421 fHandler->SetDefaultOutputs(true);
423 // --- Keep log files ------------------------------------------
424 fHandler->SetKeepLogs();
426 // --- Set the working directory to be the trains name (with -----
427 // --- special characters replaced by '_' and the date appended),
428 // --- and also set the output directory (relative to working
430 fHandler->SetGridWorkingDir(name.Data());
431 fHandler->SetGridOutputDir("output");
432 fHandler->SetGridDataDir(fUrl.GetFile());
434 // --- Get the tree name and set the file pattern ----------------
436 if (fOptions.Has("pattern")) pattern = fOptions.Get("pattern");
438 TString treeName(fUrl.GetAnchor());
439 if (treeName.IsNull()) {
440 Warning("GridRailway::PreSetup", "No tree name specified, assuming T");
443 if (treeName.EqualTo("esdTree")) pattern = "AliESD";
444 else if (treeName.EqualTo("aodTree")) pattern = "AliAOD";
446 fHandler->SetDataPattern(pattern);
448 // --- Loop over defined containers in the analysis manager, and -
449 // --- declare these as outputs
450 TString listOfAODs = "";
451 TString listOfHists = "";
452 TString listOfTerms = "";
454 TObjArray* outs[] = { mgr->GetOutputs(), mgr->GetParamOutputs(), 0 };
455 TObjArray** out = outs;
457 AliAnalysisDataContainer* cont = 0;
458 TIter nextCont(*out);
459 while ((cont = static_cast<AliAnalysisDataContainer*>(nextCont()))) {
460 TString outName(cont->GetFileName());
461 Bool_t term = (*out == outs[1]);
462 TString& list = (outName == "default" ? listOfAODs :
463 !term ? listOfHists : listOfTerms);
464 if (outName == "default") {
465 if (!mgr->GetOutputEventHandler()) continue;
467 outName = mgr->GetOutputEventHandler()->GetOutputFileName();
469 if (list.Contains(outName)) continue;
470 if (!list.IsNull()) list.Append(",");
471 list.Append(outName);
475 TString extra = mgr->GetExtraFiles();
476 if (!extra.IsNull()) {
477 if (!listOfAODs.IsNull()) listOfAODs.Append("+");
478 extra.ReplaceAll(" ", ",");
479 listOfAODs.Append(extra);
484 TString outArchive = Form("stderr, stdout@disk=%d", nReplica);
485 if (!listOfHists.IsNull())
486 outArchive.Append(Form(" hist_archive.zip:%s@disk=%d",
487 listOfHists.Data(), nReplica));
488 if (!listOfAODs.IsNull())
489 outArchive.Append(Form(" aod_archive.zip:%s@disk=%d",
490 listOfAODs.Data(), nReplica));
492 // plugin->SetOutputArchive(outArchive);
495 if (listOfAODs.IsNull() && listOfHists.IsNull())
496 Fatal("PostSetup", "No outputs defined");
497 if (!listOfTerms.IsNull())
498 fHandler->SetTerminateFiles(listOfTerms);
505 * @param nEvents Number of events to analyse
507 * @return The return value of AliAnalysisManager::StartAnalysis
509 virtual Long64_t Run(Long64_t nEvents=-1)
511 AliAnalysisManager* mgr = AliAnalysisManager::GetAnalysisManager();
512 if (nEvents == 0) return 0;
513 Long64_t ret = mgr->StartAnalysis("grid", nEvents);
516 std::ofstream outJobs(Form("%s.jobid", mgr->GetName()));
517 outJobs << fHandler->GetGridJobIDs() << std::endl;
520 std::ofstream outStages(Form("%s.stage", mgr->GetName()));
521 outStages << fHandler->GetGridStages() << std::endl;
527 * Link an auxilary file to working directory
529 * @param name Name of the file
530 * @param copy Whether to copy or not
532 * @return true on success
534 virtual Bool_t AuxFile(const TString& name, bool copy=false)
536 if (!Railway::AuxFile(name, copy)) return false;
537 // We need to add this file as an additional 'library', so that the
538 // file is uploaded to the users Grid working directory.
539 fHandler->AddAdditionalLibrary(gSystem->BaseName(name.Data()));
543 * Get the output (directory)
546 virtual TString OutputPath() const
550 Warning("GridRailway::OutputLocation", "No AliEn handler");
553 ret = fHandler->GetGridOutputDir();
554 if (ret.BeginsWith("/")) return ret;
556 AliAnalysisManager* mgr = AliAnalysisManager::GetAnalysisManager();
558 Warning("GridRailway::OutputLocation", "No analysis manager");
561 ret.Prepend(Form("%s/", mgr->GetName()));
563 ret.Prepend(Form("%s/", gGrid->GetHomeDirectory()));
568 * @return URL help string
570 virtual const Char_t* UrlHelp() const
572 return "alien:///<datadir>[?<options>][#<treeName>]";
575 * @return Short description
577 virtual const char* Desc() const { return "AliEn"; }
579 * Write auxillary ROOT (and possible shell) script for more
580 * (post-)processing e.g., terminate
582 * @param escaped Escaped name
584 void AuxSave(const TString& escaped,
585 Bool_t /*asShellScript*/)
587 // Write plug-in to file
588 TFile* plug = TFile::Open(Form("%s_plugin.root", escaped.Data()),
590 fHandler->Write("plugin");
593 TIter nextLib(&fExtraLibs);
596 while ((lib = static_cast<TObjString*>(nextLib()))) {
597 if (!libs.IsNull()) libs.Append(" ");
598 libs.Append(lib->String());
600 TIter nextPar(&fExtraPars);
603 while ((par = static_cast<TObjString*>(nextPar()))) {
604 if (!pars.IsNull()) pars.Append(" ");
605 pars.Append(par->String());
607 TIter nextSrc(&fExtraSrcs);
610 while ((src = static_cast<TObjString*>(nextSrc()))) {
611 if (!srcs.IsNull()) srcs.Append(" ");
612 srcs.Append(src->String());
614 TString macDir("$ALICE_ROOT/PWGLF/FORWARD/trains");
615 std::ofstream t("Terminate.C");
617 Error("GridRailway::AuxSave", "Failed to make terminate ROOT script");
621 t << "// Generated by GridRailway\n"
622 << "Bool_t Terminate(Bool_t localMerge=false)\n"
624 << " TString name = \"" << escaped << "\";\n"
625 << " TString libs = \"" << libs << "\";\n"
626 << " TString pars = \"" << pars << "\";\n"
627 << " TString srcs = \"" << srcs << "\";\n\n"
628 << " gSystem->Load(\"libANALYSIS\");\n"
629 << " gSystem->Load(\"libANALYSISalice\");\n"
630 << " gSystem->AddIncludePath(\"-I$ALICE_ROOT/include\");\n\n"
631 << " gROOT->LoadMacro(\"" << macDir << "/GridTerminate.C+g\");\n\n"
632 << " return GridTerminate(name,libs,pars,srcs,localMerge);\n"
639 TString format(fOptions.Has("mc") ? "%d" : "%09d");
640 if (fOptions.Has("concat")) {
641 Int_t first = fRuns.First()->GetUniqueID();
642 Int_t last = fRuns.Last()->GetUniqueID();
646 if (!runs.IsNull()) runs.Append(" ");
647 runs.Append(TString::Format(fmt, first, last));
652 while ((o = next())) {
653 if (!runs.IsNull()) runs.Append(" ");
654 runs.Append(Form(format, o->GetUniqueID()));
658 std::ofstream d("Download.C");
660 Error("GridRailway::AuxSave", "Failed to make ROOT script Download.C");
663 d << "// Generated by GridRailway\n"
664 << "void Download(Bool_t unpack=true)\n"
666 << " TString base = \"" << fUrl.GetProtocol() << "://"
667 << OutputPath() << "\";\n"
668 << " TString runs = \"" << runs << "\";\n\n"
669 << " gROOT->LoadMacro(\"" << macDir << "/GridDownload.C\");\n\n"
670 << " GridDownload(base, runs, unpack);\n"
676 std::ofstream w("Watch.C");
678 Error("GridRailway::AuxSave", "Failed to make ROOT script Watch.C");
681 w << "// Generated by GridRailway\n"
682 << "void Watch(Bool_t batch=false, Int_t delay=5*60)\n"
684 << " TString name = \"" << escaped << "\";\n"
685 << " gROOT->LoadMacro(\"" << macDir << "/GridWatch.C+g\");\n\n"
686 << " GridWatch(name,batch,delay);\n"