]> git.uio.no Git - u/mrichter/AliRoot.git/blame_incremental - ANALYSIS/AliAnalysisAlien.cxx
Changing to centrality dependent corrections
[u/mrichter/AliRoot.git] / ANALYSIS / AliAnalysisAlien.cxx
... / ...
CommitLineData
1/**************************************************************************
2 * Copyright(c) 1998-2007, ALICE Experiment at CERN, All rights reserved. *
3 * *
4 * Author: The ALICE Off-line Project. *
5 * Contributors are mentioned in the code where appropriate. *
6 * *
7 * Permission to use, copy, modify and distribute this software and its *
8 * documentation strictly for non-commercial purposes is hereby granted *
9 * without fee, provided that the above copyright notice appears in all *
10 * copies and that both the copyright notice and this permission notice *
11 * appear in the supporting documentation. The authors make no claims *
12 * about the suitability of this software for any purpose. It is *
13 * provided "as is" without express or implied warranty. *
14 **************************************************************************/
15
16// Author: Mihaela Gheata, 01/09/2008
17
18//==============================================================================
19// AliAnalysisAlien - AliEn utility class. Provides interface for creating
20// a personalized JDL, finding and creating a dataset.
21//==============================================================================
22
23#include "AliAnalysisAlien.h"
24
25#include "Riostream.h"
26#include "TEnv.h"
27#include "TKey.h"
28#include "TBits.h"
29#include "TError.h"
30#include "TROOT.h"
31#include "TSystem.h"
32#include "TFile.h"
33#include "TFileCollection.h"
34#include "TChain.h"
35#include "TObjString.h"
36#include "TObjArray.h"
37#include "TMacro.h"
38#include "TGrid.h"
39#include "TGridResult.h"
40#include "TGridCollection.h"
41#include "TGridJDL.h"
42#include "TGridJobStatusList.h"
43#include "TGridJobStatus.h"
44#include "TFileMerger.h"
45#include "AliAnalysisManager.h"
46#include "AliAnalysisTaskCfg.h"
47#include "AliVEventHandler.h"
48#include "AliAnalysisDataContainer.h"
49#include "AliMultiInputEventHandler.h"
50
51using std::ofstream;
52using std::ifstream;
53using std::ios;
54using std::endl;
55ClassImp(AliAnalysisAlien)
56#if 0
57;
58#endif
59
60namespace {
61 Bool_t copyLocal2Alien(const char* where, const char* loc, const char* rem)
62 {
63 TString sl(Form("file:%s", loc));
64 TString sr(Form("alien://%s", rem));
65 Bool_t ret = TFile::Cp(sl, sr);
66 if (!ret) {
67 Warning(where, "Failed to copy %s to %s", sl.Data(), sr.Data());
68 }
69 return ret;
70 }
71}
72
73//______________________________________________________________________________
74AliAnalysisAlien::AliAnalysisAlien()
75 :AliAnalysisGrid(),
76 fGridJDL(NULL),
77 fMergingJDL(NULL),
78 fPrice(0),
79 fTTL(0),
80 fSplitMaxInputFileNumber(0),
81 fMaxInitFailed(0),
82 fMasterResubmitThreshold(0),
83 fNtestFiles(0),
84 fNrunsPerMaster(0),
85 fMaxMergeFiles(0),
86 fMaxMergeStages(0),
87 fNsubmitted(0),
88 fProductionMode(0),
89 fOutputToRunNo(0),
90 fMergeViaJDL(0),
91 fFastReadOption(0),
92 fOverwriteMode(1),
93 fNreplicas(2),
94 fNproofWorkers(0),
95 fNproofWorkersPerSlave(0),
96 fProofReset(0),
97 fRunNumbers(),
98 fExecutable(),
99 fExecutableCommand(),
100 fArguments(),
101 fExecutableArgs(),
102 fAnalysisMacro(),
103 fAnalysisSource(),
104 fValidationScript(),
105 fAdditionalRootLibs(),
106 fAdditionalLibs(),
107 fSplitMode(),
108 fAPIVersion(),
109 fROOTVersion(),
110 fAliROOTVersion(),
111 fExternalPackages(),
112 fUser(),
113 fGridWorkingDir(),
114 fGridDataDir(),
115 fDataPattern(),
116 fGridOutputDir(),
117 fOutputArchive(),
118 fOutputFiles(),
119 fInputFormat(),
120 fDatasetName(),
121 fJDLName(),
122 fTerminateFiles(),
123 fMergeExcludes(),
124 fRegisterExcludes(),
125 fIncludePath(),
126 fCloseSE(),
127 fFriendChainName(),
128 fJobTag(),
129 fOutputSingle(),
130 fRunPrefix(),
131 fProofCluster(),
132 fProofDataSet(),
133 fFileForTestMode(),
134 fAliRootMode(),
135 fProofProcessOpt(),
136 fMergeDirName(),
137 fInputFiles(0),
138 fPackages(0),
139 fModules(0),
140 fProofParam(),
141 fDropToShell(true),
142 fGridJobIDs(""),
143 fGridStages("")
144{
145// Dummy ctor.
146 SetDefaults();
147}
148
149//______________________________________________________________________________
150AliAnalysisAlien::AliAnalysisAlien(const char *name)
151 :AliAnalysisGrid(name),
152 fGridJDL(NULL),
153 fMergingJDL(NULL),
154 fPrice(0),
155 fTTL(0),
156 fSplitMaxInputFileNumber(0),
157 fMaxInitFailed(0),
158 fMasterResubmitThreshold(0),
159 fNtestFiles(0),
160 fNrunsPerMaster(0),
161 fMaxMergeFiles(0),
162 fMaxMergeStages(0),
163 fNsubmitted(0),
164 fProductionMode(0),
165 fOutputToRunNo(0),
166 fMergeViaJDL(0),
167 fFastReadOption(0),
168 fOverwriteMode(1),
169 fNreplicas(2),
170 fNproofWorkers(0),
171 fNproofWorkersPerSlave(0),
172 fProofReset(0),
173 fRunNumbers(),
174 fExecutable(),
175 fExecutableCommand(),
176 fArguments(),
177 fExecutableArgs(),
178 fAnalysisMacro(),
179 fAnalysisSource(),
180 fValidationScript(),
181 fAdditionalRootLibs(),
182 fAdditionalLibs(),
183 fSplitMode(),
184 fAPIVersion(),
185 fROOTVersion(),
186 fAliROOTVersion(),
187 fExternalPackages(),
188 fUser(),
189 fGridWorkingDir(),
190 fGridDataDir(),
191 fDataPattern(),
192 fGridOutputDir(),
193 fOutputArchive(),
194 fOutputFiles(),
195 fInputFormat(),
196 fDatasetName(),
197 fJDLName(),
198 fTerminateFiles(),
199 fMergeExcludes(),
200 fRegisterExcludes(),
201 fIncludePath(),
202 fCloseSE(),
203 fFriendChainName(),
204 fJobTag(),
205 fOutputSingle(),
206 fRunPrefix(),
207 fProofCluster(),
208 fProofDataSet(),
209 fFileForTestMode(),
210 fAliRootMode(),
211 fProofProcessOpt(),
212 fMergeDirName(),
213 fInputFiles(0),
214 fPackages(0),
215 fModules(0),
216 fProofParam(),
217 fDropToShell(true),
218 fGridJobIDs(""),
219 fGridStages("")
220{
221// Default ctor.
222 SetDefaults();
223}
224
225//______________________________________________________________________________
226AliAnalysisAlien::AliAnalysisAlien(const AliAnalysisAlien& other)
227 :AliAnalysisGrid(other),
228 fGridJDL(NULL),
229 fMergingJDL(NULL),
230 fPrice(other.fPrice),
231 fTTL(other.fTTL),
232 fSplitMaxInputFileNumber(other.fSplitMaxInputFileNumber),
233 fMaxInitFailed(other.fMaxInitFailed),
234 fMasterResubmitThreshold(other.fMasterResubmitThreshold),
235 fNtestFiles(other.fNtestFiles),
236 fNrunsPerMaster(other.fNrunsPerMaster),
237 fMaxMergeFiles(other.fMaxMergeFiles),
238 fMaxMergeStages(other.fMaxMergeStages),
239 fNsubmitted(other.fNsubmitted),
240 fProductionMode(other.fProductionMode),
241 fOutputToRunNo(other.fOutputToRunNo),
242 fMergeViaJDL(other.fMergeViaJDL),
243 fFastReadOption(other.fFastReadOption),
244 fOverwriteMode(other.fOverwriteMode),
245 fNreplicas(other.fNreplicas),
246 fNproofWorkers(other.fNproofWorkers),
247 fNproofWorkersPerSlave(other.fNproofWorkersPerSlave),
248 fProofReset(other.fProofReset),
249 fRunNumbers(other.fRunNumbers),
250 fExecutable(other.fExecutable),
251 fExecutableCommand(other.fExecutableCommand),
252 fArguments(other.fArguments),
253 fExecutableArgs(other.fExecutableArgs),
254 fAnalysisMacro(other.fAnalysisMacro),
255 fAnalysisSource(other.fAnalysisSource),
256 fValidationScript(other.fValidationScript),
257 fAdditionalRootLibs(other.fAdditionalRootLibs),
258 fAdditionalLibs(other.fAdditionalLibs),
259 fSplitMode(other.fSplitMode),
260 fAPIVersion(other.fAPIVersion),
261 fROOTVersion(other.fROOTVersion),
262 fAliROOTVersion(other.fAliROOTVersion),
263 fExternalPackages(other.fExternalPackages),
264 fUser(other.fUser),
265 fGridWorkingDir(other.fGridWorkingDir),
266 fGridDataDir(other.fGridDataDir),
267 fDataPattern(other.fDataPattern),
268 fGridOutputDir(other.fGridOutputDir),
269 fOutputArchive(other.fOutputArchive),
270 fOutputFiles(other.fOutputFiles),
271 fInputFormat(other.fInputFormat),
272 fDatasetName(other.fDatasetName),
273 fJDLName(other.fJDLName),
274 fTerminateFiles(other.fTerminateFiles),
275 fMergeExcludes(other.fMergeExcludes),
276 fRegisterExcludes(other.fRegisterExcludes),
277 fIncludePath(other.fIncludePath),
278 fCloseSE(other.fCloseSE),
279 fFriendChainName(other.fFriendChainName),
280 fJobTag(other.fJobTag),
281 fOutputSingle(other.fOutputSingle),
282 fRunPrefix(other.fRunPrefix),
283 fProofCluster(other.fProofCluster),
284 fProofDataSet(other.fProofDataSet),
285 fFileForTestMode(other.fFileForTestMode),
286 fAliRootMode(other.fAliRootMode),
287 fProofProcessOpt(other.fProofProcessOpt),
288 fMergeDirName(other.fMergeDirName),
289 fInputFiles(0),
290 fPackages(0),
291 fModules(0),
292 fProofParam(),
293 fDropToShell(other.fDropToShell),
294 fGridJobIDs(other.fGridJobIDs),
295 fGridStages(other.fGridStages)
296{
297// Copy ctor.
298 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
299 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
300 fRunRange[0] = other.fRunRange[0];
301 fRunRange[1] = other.fRunRange[1];
302 if (other.fInputFiles) {
303 fInputFiles = new TObjArray();
304 TIter next(other.fInputFiles);
305 TObject *obj;
306 while ((obj=next())) fInputFiles->Add(new TObjString(obj->GetName()));
307 fInputFiles->SetOwner();
308 }
309 if (other.fPackages) {
310 fPackages = new TObjArray();
311 TIter next(other.fPackages);
312 TObject *obj;
313 while ((obj=next())) fPackages->Add(new TObjString(obj->GetName()));
314 fPackages->SetOwner();
315 }
316 if (other.fModules) {
317 fModules = new TObjArray();
318 fModules->SetOwner();
319 TIter next(other.fModules);
320 AliAnalysisTaskCfg *mod, *crt;
321 while ((crt=(AliAnalysisTaskCfg*)next())) {
322 mod = new AliAnalysisTaskCfg(*crt);
323 fModules->Add(mod);
324 }
325 }
326}
327
328//______________________________________________________________________________
329AliAnalysisAlien::~AliAnalysisAlien()
330{
331// Destructor.
332 delete fGridJDL;
333 delete fMergingJDL;
334 delete fInputFiles;
335 delete fPackages;
336 delete fModules;
337 fProofParam.DeleteAll();
338}
339
340//______________________________________________________________________________
341AliAnalysisAlien &AliAnalysisAlien::operator=(const AliAnalysisAlien& other)
342{
343// Assignment.
344 if (this != &other) {
345 AliAnalysisGrid::operator=(other);
346 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
347 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
348 fPrice = other.fPrice;
349 fTTL = other.fTTL;
350 fSplitMaxInputFileNumber = other.fSplitMaxInputFileNumber;
351 fMaxInitFailed = other.fMaxInitFailed;
352 fMasterResubmitThreshold = other.fMasterResubmitThreshold;
353 fNtestFiles = other.fNtestFiles;
354 fNrunsPerMaster = other.fNrunsPerMaster;
355 fMaxMergeFiles = other.fMaxMergeFiles;
356 fMaxMergeStages = other.fMaxMergeStages;
357 fNsubmitted = other.fNsubmitted;
358 fProductionMode = other.fProductionMode;
359 fOutputToRunNo = other.fOutputToRunNo;
360 fMergeViaJDL = other.fMergeViaJDL;
361 fFastReadOption = other.fFastReadOption;
362 fOverwriteMode = other.fOverwriteMode;
363 fNreplicas = other.fNreplicas;
364 fNproofWorkers = other.fNproofWorkers;
365 fNproofWorkersPerSlave = other.fNproofWorkersPerSlave;
366 fProofReset = other.fProofReset;
367 fRunNumbers = other.fRunNumbers;
368 fExecutable = other.fExecutable;
369 fExecutableCommand = other.fExecutableCommand;
370 fArguments = other.fArguments;
371 fExecutableArgs = other.fExecutableArgs;
372 fAnalysisMacro = other.fAnalysisMacro;
373 fAnalysisSource = other.fAnalysisSource;
374 fValidationScript = other.fValidationScript;
375 fAdditionalRootLibs = other.fAdditionalRootLibs;
376 fAdditionalLibs = other.fAdditionalLibs;
377 fSplitMode = other.fSplitMode;
378 fAPIVersion = other.fAPIVersion;
379 fROOTVersion = other.fROOTVersion;
380 fAliROOTVersion = other.fAliROOTVersion;
381 fExternalPackages = other.fExternalPackages;
382 fUser = other.fUser;
383 fGridWorkingDir = other.fGridWorkingDir;
384 fGridDataDir = other.fGridDataDir;
385 fDataPattern = other.fDataPattern;
386 fGridOutputDir = other.fGridOutputDir;
387 fOutputArchive = other.fOutputArchive;
388 fOutputFiles = other.fOutputFiles;
389 fInputFormat = other.fInputFormat;
390 fDatasetName = other.fDatasetName;
391 fJDLName = other.fJDLName;
392 fTerminateFiles = other.fTerminateFiles;
393 fMergeExcludes = other.fMergeExcludes;
394 fRegisterExcludes = other.fRegisterExcludes;
395 fIncludePath = other.fIncludePath;
396 fCloseSE = other.fCloseSE;
397 fFriendChainName = other.fFriendChainName;
398 fJobTag = other.fJobTag;
399 fOutputSingle = other.fOutputSingle;
400 fRunPrefix = other.fRunPrefix;
401 fProofCluster = other.fProofCluster;
402 fProofDataSet = other.fProofDataSet;
403 fFileForTestMode = other.fFileForTestMode;
404 fAliRootMode = other.fAliRootMode;
405 fProofProcessOpt = other.fProofProcessOpt;
406 fMergeDirName = other.fMergeDirName;
407 fDropToShell = other.fDropToShell;
408 fGridJobIDs = other.fGridJobIDs;
409 fGridStages = other.fGridStages;
410 if (other.fInputFiles) {
411 fInputFiles = new TObjArray();
412 TIter next(other.fInputFiles);
413 TObject *obj;
414 while ((obj=next())) fInputFiles->Add(new TObjString(obj->GetName()));
415 fInputFiles->SetOwner();
416 }
417 if (other.fPackages) {
418 fPackages = new TObjArray();
419 TIter next(other.fPackages);
420 TObject *obj;
421 while ((obj=next())) fPackages->Add(new TObjString(obj->GetName()));
422 fPackages->SetOwner();
423 }
424 if (other.fModules) {
425 fModules = new TObjArray();
426 fModules->SetOwner();
427 TIter next(other.fModules);
428 AliAnalysisTaskCfg *mod, *crt;
429 while ((crt=(AliAnalysisTaskCfg*)next())) {
430 mod = new AliAnalysisTaskCfg(*crt);
431 fModules->Add(mod);
432 }
433 }
434 }
435 return *this;
436}
437
438//______________________________________________________________________________
439void AliAnalysisAlien::AddAdditionalLibrary(const char *name)
440{
441// Add a single additional library to be loaded. Extension must be present.
442 TString lib(name);
443 if (!lib.Contains(".")) {
444 Error("AddAdditionalLibrary", "Extension not defined for %s", name);
445 return;
446 }
447 if (fAdditionalLibs.Contains(name)) {
448 Warning("AddAdditionalLibrary", "Library %s already added.", name);
449 return;
450 }
451 if (!fAdditionalLibs.IsNull()) fAdditionalLibs += " ";
452 fAdditionalLibs += lib;
453}
454
455//______________________________________________________________________________
456void AliAnalysisAlien::AddModule(AliAnalysisTaskCfg *module)
457{
458// Adding a module. Checks if already existing. Becomes owned by this.
459 if (!module) return;
460 if (GetModule(module->GetName())) {
461 Error("AddModule", "A module having the same name %s already added", module->GetName());
462 return;
463 }
464 if (!fModules) {
465 fModules = new TObjArray();
466 fModules->SetOwner();
467 }
468 fModules->Add(module);
469}
470
471//______________________________________________________________________________
472void AliAnalysisAlien::AddModules(TObjArray *list)
473{
474// Adding a list of modules. Checks if already existing. Becomes owned by this.
475 TIter next(list);
476 AliAnalysisTaskCfg *module;
477 while ((module = (AliAnalysisTaskCfg*)next())) AddModule(module);
478}
479
480//______________________________________________________________________________
481Bool_t AliAnalysisAlien::CheckDependencies()
482{
483// Check if all dependencies are satisfied. Reorder modules if needed.
484 Int_t nmodules = GetNmodules();
485 if (!nmodules) {
486 Warning("CheckDependencies", "No modules added yet to check their dependencies");
487 return kTRUE;
488 }
489 AliAnalysisTaskCfg *mod = 0;
490 AliAnalysisTaskCfg *dep = 0;
491 TString depname;
492 Int_t i, j, k;
493 for (i=0; i<nmodules; i++) {
494 mod = (AliAnalysisTaskCfg*) fModules->At(i);
495 Int_t ndeps = mod->GetNdeps();
496 Int_t istart = i;
497 for (j=0; j<ndeps; j++) {
498 depname = mod->GetDependency(j);
499 dep = GetModule(depname);
500 if (!dep) {
501 Error("CheckDependencies","Dependency %s not added for module %s",
502 depname.Data(), mod->GetName());
503 return kFALSE;
504 }
505 if (dep->NeedsDependency(mod->GetName())) {
506 Error("CheckDependencies","Modules %s and %s circularly depend on each other",
507 mod->GetName(), dep->GetName());
508 return kFALSE;
509 }
510 Int_t idep = fModules->IndexOf(dep);
511 // The dependency task must come first
512 if (idep>i) {
513 // Remove at idep and move all objects below up one slot
514 // down to index i included.
515 fModules->RemoveAt(idep);
516 for (k=idep-1; k>=i; k--) fModules->AddAt(fModules->RemoveAt(k),k+1);
517 fModules->AddAt(dep, i++);
518 }
519 //Redo from istart if dependencies were inserted
520 if (i>istart) i=istart-1;
521 }
522 }
523 return kTRUE;
524}
525
526//______________________________________________________________________________
527AliAnalysisManager *AliAnalysisAlien::CreateAnalysisManager(const char *name, const char *filename)
528{
529// Create the analysis manager and optionally execute the macro in filename.
530 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
531 if (mgr) return mgr;
532 mgr = new AliAnalysisManager(name);
533 mgr->SetGridHandler((AliAnalysisGrid*)this);
534 if (strlen(filename)) {
535 TString line = gSystem->ExpandPathName(filename);
536 line.Prepend(".x ");
537 gROOT->ProcessLine(line.Data());
538 }
539 return mgr;
540}
541
542//______________________________________________________________________________
543Int_t AliAnalysisAlien::GetNmodules() const
544{
545// Get number of modules.
546 if (!fModules) return 0;
547 return fModules->GetEntries();
548}
549
550//______________________________________________________________________________
551AliAnalysisTaskCfg *AliAnalysisAlien::GetModule(const char *name)
552{
553// Get a module by name.
554 if (!fModules) return 0;
555 return (AliAnalysisTaskCfg*)fModules->FindObject(name);
556}
557
558//______________________________________________________________________________
559Bool_t AliAnalysisAlien::LoadModule(AliAnalysisTaskCfg *mod)
560{
561// Load a given module.
562 if (mod->IsLoaded()) return kTRUE;
563 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
564 if (!mgr) {
565 Error("LoadModule", "No analysis manager created yet. Use CreateAnalysisManager first.");
566 return kFALSE;
567 }
568 Int_t ndeps = mod->GetNdeps();
569 TString depname;
570 for (Int_t j=0; j<ndeps; j++) {
571 depname = mod->GetDependency(j);
572 AliAnalysisTaskCfg *dep = GetModule(depname);
573 if (!dep) {
574 Error("LoadModule","Dependency %s not existing for module %s",
575 depname.Data(), mod->GetName());
576 return kFALSE;
577 }
578 if (!LoadModule(dep)) {
579 Error("LoadModule","Dependency %s for module %s could not be loaded",
580 depname.Data(), mod->GetName());
581 return kFALSE;
582 }
583 }
584 // Load libraries for the module
585 if (!mod->CheckLoadLibraries()) {
586 Error("LoadModule", "Cannot load all libraries for module %s", mod->GetName());
587 return kFALSE;
588 }
589 // Check if a custom file name was requested
590 if (strlen(mod->GetOutputFileName())) mgr->SetCommonFileName(mod->GetOutputFileName());
591
592 // Check if a custom terminate file name was requested
593 if (strlen(mod->GetTerminateFileName())) {
594 if (!fTerminateFiles.IsNull()) fTerminateFiles += ",";
595 fTerminateFiles += mod->GetTerminateFileName();
596 }
597
598 // Execute the macro
599 if (mod->ExecuteMacro()<0) {
600 Error("LoadModule", "Executing the macro %s with arguments: %s for module %s returned a negative value",
601 mod->GetMacroName(), mod->GetMacroArgs(), mod->GetName());
602 return kFALSE;
603 }
604 // Configure dependencies
605 if (mod->GetConfigMacro() && mod->ExecuteConfigMacro()<0) {
606 Error("LoadModule", "There was an error executing the deps config macro %s for module %s",
607 mod->GetConfigMacro()->GetTitle(), mod->GetName());
608 return kFALSE;
609 }
610 // Adjust extra libraries
611 Int_t nlibs = mod->GetNlibs();
612 TString lib;
613 for (Int_t i=0; i<nlibs; i++) {
614 lib = mod->GetLibrary(i);
615 lib = Form("lib%s.so", lib.Data());
616 if (fAdditionalLibs.Contains(lib)) continue;
617 if (!fAdditionalLibs.IsNull()) fAdditionalLibs += " ";
618 fAdditionalLibs += lib;
619 }
620 return kTRUE;
621}
622
623//______________________________________________________________________________
624Bool_t AliAnalysisAlien::GenerateTrain(const char *name)
625{
626// Generate the full train.
627 fAdditionalLibs = "";
628 if (!LoadModules()) return kFALSE;
629 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
630 if (!mgr->InitAnalysis()) return kFALSE;
631 mgr->RunLocalInit();
632 mgr->PrintStatus();
633 Int_t productionMode = fProductionMode;
634 SetProductionMode();
635 TString macro = fAnalysisMacro;
636 TString executable = fExecutable;
637 TString validation = fValidationScript;
638 TString execCommand = fExecutableCommand;
639 SetAnalysisMacro(Form("%s.C", name));
640 SetExecutable(Form("%s.sh", name));
641// SetExecutableCommand("aliroot -b -q ");
642 SetValidationScript(Form("%s_validation.sh", name));
643 StartAnalysis();
644 SetProductionMode(productionMode);
645 fAnalysisMacro = macro;
646 fExecutable = executable;
647 fExecutableCommand = execCommand;
648 fValidationScript = validation;
649 return kTRUE;
650}
651
652//______________________________________________________________________________
653Bool_t AliAnalysisAlien::GenerateTest(const char *name, const char *modname)
654{
655// Generate test macros for a single module or for the full train.
656 fAdditionalLibs = "";
657 if (strlen(modname)) {
658 if (!CheckDependencies()) return kFALSE;
659 AliAnalysisTaskCfg *mod = GetModule(modname);
660 if (!mod) {
661 Error("GenerateTest", "cannot generate test for inexistent module %s", modname);
662 return kFALSE;
663 }
664 if (!LoadModule(mod)) return kFALSE;
665 } else if (!LoadModules()) return kFALSE;
666 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
667 if (!mgr->InitAnalysis()) return kFALSE;
668 mgr->RunLocalInit();
669 mgr->PrintStatus();
670 SetLocalTest(kTRUE);
671 Int_t productionMode = fProductionMode;
672 SetProductionMode();
673 TString macro = fAnalysisMacro;
674 TString executable = fExecutable;
675 TString validation = fValidationScript;
676 TString execCommand = fExecutableCommand;
677 SetAnalysisMacro(Form("%s.C", name));
678 SetExecutable(Form("%s.sh", name));
679 fOutputFiles = GetListOfFiles("outaod");
680 // Add extra files registered to the analysis manager
681 TString extra = GetListOfFiles("ext");
682 if (!extra.IsNull()) {
683 extra.ReplaceAll(".root", "*.root");
684 if (!fOutputFiles.IsNull()) fOutputFiles += ",";
685 fOutputFiles += extra;
686 }
687// SetExecutableCommand("aliroot -b -q ");
688 SetValidationScript(Form("%s_validation.sh", name));
689 WriteAnalysisFile();
690 WriteAnalysisMacro();
691 WriteExecutable();
692 WriteValidationScript();
693 WriteMergingMacro();
694 WriteMergeExecutable();
695 WriteValidationScript(kTRUE);
696 SetLocalTest(kFALSE);
697 SetProductionMode(productionMode);
698 fAnalysisMacro = macro;
699 fExecutable = executable;
700 fExecutableCommand = execCommand;
701 fValidationScript = validation;
702 return kTRUE;
703}
704
705//______________________________________________________________________________
706Bool_t AliAnalysisAlien::LoadModules()
707{
708// Load all modules by executing the AddTask macros. Checks first the dependencies.
709 fAdditionalLibs = "";
710 Int_t nmodules = GetNmodules();
711 if (!nmodules) {
712 Warning("LoadModules", "No module to be loaded");
713 return kTRUE;
714 }
715 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
716 if (!mgr) {
717 Error("LoadModules", "No analysis manager created yet. Use CreateAnalysisManager first.");
718 return kFALSE;
719 }
720 if (!CheckDependencies()) return kFALSE;
721 nmodules = GetNmodules();
722 AliAnalysisTaskCfg *mod;
723 for (Int_t imod=0; imod<nmodules; imod++) {
724 mod = (AliAnalysisTaskCfg*)fModules->At(imod);
725 if (!LoadModule(mod)) return kFALSE;
726 }
727 return kTRUE;
728}
729
730//______________________________________________________________________________
731void AliAnalysisAlien::SetRunPrefix(const char *prefix)
732{
733// Set the run number format. Can be a prefix or a format like "%09d"
734 fRunPrefix = prefix;
735 if (!fRunPrefix.Contains("%")) fRunPrefix += "%d";
736}
737
738//______________________________________________________________________________
739void AliAnalysisAlien::AddIncludePath(const char *path)
740{
741// Add include path in the remote analysis macro.
742 TString p(path);
743 if (p.Contains("-I")) fIncludePath += Form("%s ", path);
744 else fIncludePath += Form("-I%s ", path);
745}
746
747//______________________________________________________________________________
748void AliAnalysisAlien::AddRunNumber(Int_t run)
749{
750// Add a run number to the list of runs to be processed.
751 if (fRunNumbers.Length()) fRunNumbers += " ";
752 fRunNumbers += Form(fRunPrefix.Data(), run);
753}
754
755//______________________________________________________________________________
756void AliAnalysisAlien::AddRunList(const char* runList)
757{
758// Add several runs into the list of runs; they are expected to be separated by a blank character.
759 TString sList = runList;
760 TObjArray *list = sList.Tokenize(" ");
761 Int_t n = list->GetEntries();
762 for (Int_t i = 0; i < n; i++) {
763 TObjString *os = (TObjString*)list->At(i);
764 AddRunNumber(os->GetString().Atoi());
765 }
766 delete list;
767}
768
769//______________________________________________________________________________
770void AliAnalysisAlien::AddRunNumber(const char* run)
771{
772// Add a run number to the list of runs to be processed.
773 TString runs = run;
774 TObjString *os;
775 TObjArray *arr = runs.Tokenize(" ");
776 TIter next(arr);
777 TString prefix;
778 prefix.Append(fRunPrefix, fRunPrefix.Index("%d"));
779 while ((os=(TObjString*)next())){
780 if (fRunNumbers.Length()) fRunNumbers += " ";
781 fRunNumbers += Form("%s%s", prefix.Data(), os->GetString().Data());
782 }
783 delete arr;
784}
785
786//______________________________________________________________________________
787void AliAnalysisAlien::AddDataFile(const char *lfn)
788{
789// Adds a data file to the input to be analysed. The file should be a valid LFN
790// or point to an existing file in the alien workdir.
791 if (!fInputFiles) fInputFiles = new TObjArray();
792 fInputFiles->Add(new TObjString(lfn));
793}
794
795//______________________________________________________________________________
796void AliAnalysisAlien::AddExternalPackage(const char *package)
797{
798// Adds external packages w.r.t to the default ones (root,aliroot and gapi)
799 if (fExternalPackages) fExternalPackages += " ";
800 fExternalPackages += package;
801}
802
803//______________________________________________________________________________
804Bool_t AliAnalysisAlien::Connect()
805{
806// Try to connect to AliEn. User needs a valid token and /tmp/gclient_env_$UID sourced.
807 if (gGrid && gGrid->IsConnected()) return kTRUE;
808 if (fProductionMode) return kTRUE;
809 if (!gGrid) {
810 Info("Connect", "Trying to connect to AliEn ...");
811 TGrid::Connect("alien://");
812 }
813 if (!gGrid || !gGrid->IsConnected()) {
814 Error("Connect", "Did not managed to connect to AliEn. Make sure you have a valid token.");
815 return kFALSE;
816 }
817 fUser = gGrid->GetUser();
818 Info("Connect", "\n##### Connected to AliEn as user %s. Setting analysis user to <%s>", fUser.Data(), fUser.Data());
819 return kTRUE;
820}
821
822//______________________________________________________________________________
823void AliAnalysisAlien::CdWork()
824{
825// Check validity of alien workspace. Create directory if possible.
826 if (!Connect()) {
827 Error("CdWork", "Alien connection required");
828 return;
829 }
830 TString homedir = gGrid->GetHomeDirectory();
831 TString workdir = homedir + fGridWorkingDir;
832 if (DirectoryExists(workdir)) {
833 gGrid->Cd(workdir);
834 return;
835 }
836 // Work directory not existing - create it
837 gGrid->Cd(homedir);
838 if (gGrid->Mkdir(workdir, "-p")) {
839 gGrid->Cd(fGridWorkingDir);
840 Info("CdWork", "\n##### Created alien working directory %s", fGridWorkingDir.Data());
841 } else {
842 Warning("CdWork", "Working directory %s cannot be created.\n Using %s instead.",
843 workdir.Data(), homedir.Data());
844 fGridWorkingDir = "";
845 }
846}
847
848//______________________________________________________________________________
849Bool_t AliAnalysisAlien::CheckFileCopy(const char *alienpath)
850{
851// Check if file copying is possible.
852 if (fProductionMode) return kTRUE;
853 TString salienpath(alienpath);
854 if (salienpath.Contains(" ")) {
855 Error("CheckFileCopy", "path: <%s> contains blancs - FIX IT !",alienpath);
856 return kFALSE;
857 }
858 if (!Connect()) {
859 Error("CheckFileCopy", "Not connected to AliEn. File copying cannot be tested.");
860 return kFALSE;
861 }
862 Info("CheckFileCopy", "Checking possibility to copy files to your AliEn home directory... \
863 \n +++ NOTE: You can disable this via: plugin->SetCheckCopy(kFALSE);");
864 // Check if alien_CLOSE_SE is defined
865 TString closeSE = gSystem->Getenv("alien_CLOSE_SE");
866 if (!closeSE.IsNull()) {
867 Info("CheckFileCopy", "Your current close storage is pointing to: \
868 \n alien_CLOSE_SE = \"%s\"", closeSE.Data());
869 } else {
870 Warning("CheckFileCopy", "Your current close storage is empty ! Depending on your location, file copying may fail.");
871 }
872 // Check if grid directory exists.
873 if (!DirectoryExists(alienpath)) {
874 Error("CheckFileCopy", "Alien path %s does not seem to exist", alienpath);
875 return kFALSE;
876 }
877 TString stest = "plugin_test_copy";
878 TFile f(stest, "RECREATE");
879 // User may not have write permissions to current directory
880 if (f.IsZombie()) {
881 Error("CheckFileCopy", "Cannot create local test file. Do you have write access to current directory: <%s> ?",
882 gSystem->WorkingDirectory());
883 return kFALSE;
884 }
885 f.Close();
886 if (FileExists(Form("alien://%s/%s",alienpath, stest.Data()))) gGrid->Rm(Form("alien://%s/%s",alienpath, stest.Data()));
887 if (!TFile::Cp(stest.Data(), Form("alien://%s/%s",alienpath, stest.Data()))) {
888 Error("CheckFileCopy", "Cannot copy files to Alien destination: <%s> This may be temporary, or: \
889 \n# 1. Make sure you have write permissions there. If this is the case: \
890 \n# 2. Check the storage availability at: http://alimonitor.cern.ch/stats?page=SE/table \
891 \n# Do: export alien_CLOSE_SE=\"working_disk_SE\" \
892 \n# To make this permanent put in in your .bashrc (in .alienshrc is not enough) \
893 \n# Redo token: rm /tmp/x509up_u$UID then: alien-token-init <username>", alienpath);
894 gSystem->Unlink(stest.Data());
895 return kFALSE;
896 }
897 gSystem->Unlink(stest.Data());
898 gGrid->Rm(Form("%s/%s",alienpath,stest.Data()));
899 Info("CheckFileCopy", "### ...SUCCESS ###");
900 return kTRUE;
901}
902
903//______________________________________________________________________________
904Bool_t AliAnalysisAlien::CheckInputData()
905{
906// Check validity of input data. If necessary, create xml files.
907 if (fProductionMode) return kTRUE;
908 if (!fInputFiles && !fRunNumbers.Length() && !fRunRange[0]) {
909 if (!fGridDataDir.Length()) {
910 Error("CkeckInputData", "AliEn path to base data directory must be set.\n = Use: SetGridDataDir()");
911 return kFALSE;
912 }
913 if (fMergeViaJDL) {
914 Error("CheckInputData", "Merging via jdl works only with run numbers, run range or provided xml");
915 return kFALSE;
916 }
917 Info("CheckInputData", "Analysis will make a single xml for base data directory %s",fGridDataDir.Data());
918 if (fDataPattern.Contains("tag") && TestBit(AliAnalysisGrid::kTest))
919 TObject::SetBit(AliAnalysisGrid::kUseTags, kTRUE); // ADDED (fix problem in determining the tag usage in test mode)
920 return kTRUE;
921 }
922 // Process declared files
923 Bool_t isCollection = kFALSE;
924 Bool_t isXml = kFALSE;
925 Bool_t useTags = kFALSE;
926 Bool_t checked = kFALSE;
927 if (!TestBit(AliAnalysisGrid::kTest)) CdWork();
928 TString file;
929 TString workdir = gGrid->GetHomeDirectory();
930 workdir += fGridWorkingDir;
931 if (fInputFiles) {
932 TObjString *objstr;
933 TIter next(fInputFiles);
934 while ((objstr=(TObjString*)next())) {
935 file = workdir;
936 file += "/";
937 file += objstr->GetString();
938 // Store full lfn path
939 if (FileExists(file)) objstr->SetString(file);
940 else {
941 file = objstr->GetName();
942 if (!FileExists(objstr->GetName())) {
943 Error("CheckInputData", "Data file %s not found or not in your working dir: %s",
944 objstr->GetName(), workdir.Data());
945 return kFALSE;
946 }
947 }
948 Bool_t iscoll, isxml, usetags;
949 CheckDataType(file, iscoll, isxml, usetags);
950 if (!checked) {
951 checked = kTRUE;
952 isCollection = iscoll;
953 isXml = isxml;
954 useTags = usetags;
955 TObject::SetBit(AliAnalysisGrid::kUseTags, useTags);
956 } else {
957 if ((iscoll != isCollection) || (isxml != isXml) || (usetags != useTags)) {
958 Error("CheckInputData", "Some conflict was found in the types of inputs");
959 return kFALSE;
960 }
961 }
962 }
963 }
964 // Process requested run numbers
965 if (!fRunNumbers.Length() && !fRunRange[0]) return kTRUE;
966 // Check validity of alien data directory
967 if (!fGridDataDir.Length()) {
968 Error("CkeckInputData", "AliEn path to base data directory must be set.\n = Use: SetGridDataDir()");
969 return kFALSE;
970 }
971 if (!DirectoryExists(fGridDataDir)) {
972 Error("CheckInputData", "Data directory %s not existing.", fGridDataDir.Data());
973 return kFALSE;
974 }
975 if (isCollection) {
976 Error("CheckInputData", "You are using raw AliEn collections as input. Cannot process run numbers.");
977 return kFALSE;
978 }
979
980 if (checked && !isXml) {
981 Error("CheckInputData", "Cannot mix processing of full runs with non-xml files");
982 return kFALSE;
983 }
984 // Check validity of run number(s)
985 TObjArray *arr;
986 TObjString *os;
987 TString format;
988 Int_t nruns = 0;
989 TString schunk, schunk2;
990 TString path;
991 if (!checked) {
992 checked = kTRUE;
993 useTags = fDataPattern.Contains("tag");
994 TObject::SetBit(AliAnalysisGrid::kUseTags, useTags);
995 }
996 if (useTags != fDataPattern.Contains("tag")) {
997 Error("CheckInputData", "Cannot mix input files using/not using tags");
998 return kFALSE;
999 }
1000 if (fRunNumbers.Length()) {
1001 Info("CheckDataType", "Using supplied run numbers (run ranges are ignored)");
1002 arr = fRunNumbers.Tokenize(" ");
1003 TIter next(arr);
1004 while ((os=(TObjString*)next())) {
1005 path = Form("%s/%s ", fGridDataDir.Data(), os->GetString().Data());
1006 if (!DirectoryExists(path)) {
1007 Warning("CheckInputData", "Run number %s not found in path: <%s>", os->GetString().Data(), path.Data());
1008 continue;
1009 }
1010 path = Form("%s/%s.xml", workdir.Data(),os->GetString().Data());
1011 TString msg = "\n##### file: ";
1012 msg += path;
1013 msg += " type: xml_collection;";
1014 if (useTags) msg += " using_tags: Yes";
1015 else msg += " using_tags: No";
1016 Info("CheckDataType", "%s", msg.Data());
1017 if (fNrunsPerMaster<2) {
1018 AddDataFile(Form("%s.xml", os->GetString().Data()));
1019 } else {
1020 nruns++;
1021 if (((nruns-1)%fNrunsPerMaster) == 0) {
1022 schunk = os->GetString();
1023 }
1024 if ((nruns%fNrunsPerMaster)!=0 && os!=arr->Last()) continue;
1025 schunk += Form("_%s.xml", os->GetString().Data());
1026 AddDataFile(schunk);
1027 }
1028 }
1029 delete arr;
1030 } else {
1031 Info("CheckDataType", "Using run range [%d, %d]", fRunRange[0], fRunRange[1]);
1032 for (Int_t irun=fRunRange[0]; irun<=fRunRange[1]; irun++) {
1033 format = Form("%%s/%s ", fRunPrefix.Data());
1034 path = Form(format.Data(), fGridDataDir.Data(), irun);
1035 if (!DirectoryExists(path)) {
1036 continue;
1037 }
1038 format = Form("%%s/%s.xml", fRunPrefix.Data());
1039 path = Form(format.Data(), workdir.Data(),irun);
1040 TString msg = "\n##### file: ";
1041 msg += path;
1042 msg += " type: xml_collection;";
1043 if (useTags) msg += " using_tags: Yes";
1044 else msg += " using_tags: No";
1045 Info("CheckDataType", "%s", msg.Data());
1046 if (fNrunsPerMaster<2) {
1047 format = Form("%s.xml", fRunPrefix.Data());
1048 AddDataFile(Form(format.Data(),irun));
1049 } else {
1050 nruns++;
1051 if (((nruns-1)%fNrunsPerMaster) == 0) {
1052 schunk = Form(fRunPrefix.Data(),irun);
1053 }
1054 format = Form("_%s.xml", fRunPrefix.Data());
1055 schunk2 = Form(format.Data(), irun);
1056 if ((nruns%fNrunsPerMaster)!=0 && irun != fRunRange[1]) continue;
1057 schunk += schunk2;
1058 AddDataFile(schunk);
1059 }
1060 }
1061 if (!fInputFiles) {
1062 schunk += schunk2;
1063 AddDataFile(schunk);
1064 }
1065 }
1066 return kTRUE;
1067}
1068
1069//______________________________________________________________________________
1070Int_t AliAnalysisAlien::CopyLocalDataset(const char *griddir, const char *pattern, Int_t nfiles, const char *output, const char *archivefile, const char *outputdir)
1071{
1072// Copy data from the given grid directory according a pattern and make a local
1073// dataset.
1074// archivefile (optional) results in that the archive containing the file <pattern> is copied. archivefile can contain a list of files (semicolon-separated) which are all copied
1075 if (!Connect()) {
1076 Error("CopyLocalDataset", "Cannot copy local dataset with no grid connection");
1077 return 0;
1078 }
1079 if (!DirectoryExists(griddir)) {
1080 Error("CopyLocalDataset", "Data directory %s not existing.", griddir);
1081 return 0;
1082 }
1083 TString command = Form("find -z -l %d %s %s", nfiles, griddir, pattern);
1084 printf("Running command: %s\n", command.Data());
1085 TGridResult *res = gGrid->Command(command);
1086 Int_t nfound = res->GetEntries();
1087 if (!nfound) {
1088 Error("CopyLocalDataset", "No file found in <%s> having pattern <%s>", griddir, pattern);
1089 return 0;
1090 }
1091 printf("... found %d files. Copying locally ...\n", nfound);
1092
1093 // archives
1094 TObjArray* additionalArchives = 0;
1095 if (strlen(archivefile) > 0 && TString(archivefile).Contains(";")) {
1096 additionalArchives = TString(archivefile).Tokenize(";");
1097 archivefile = additionalArchives->At(0)->GetName();
1098 additionalArchives->RemoveAt(0);
1099 additionalArchives->Compress();
1100 }
1101
1102 // Copy files locally
1103 ofstream out;
1104 out.open(output, ios::out);
1105 TMap *map;
1106 TString turl, dirname, filename, temp;
1107 TString cdir = gSystem->WorkingDirectory();
1108 gSystem->MakeDirectory(outputdir);
1109 gSystem->ChangeDirectory(outputdir);
1110 Int_t ncopied = 0;
1111 for (Int_t i=0; i<nfound; i++) {
1112 map = (TMap*)res->At(i);
1113 turl = map->GetValue("turl")->GetName();
1114 filename = gSystem->BaseName(turl.Data());
1115 dirname = gSystem->DirName(turl.Data());
1116 dirname = gSystem->BaseName(dirname.Data());
1117 gSystem->MakeDirectory(dirname);
1118
1119 TString source(turl);
1120 TString targetFileName(filename);
1121
1122 if (strlen(archivefile) > 0) {
1123// TODO here the archive in which the file resides should be determined
1124// however whereis returns only a guid, and guid2lfn does not work
1125// Therefore we use the one provided as argument for now
1126 source = Form("%s/%s", gSystem->DirName(source.Data()), archivefile);
1127 targetFileName = archivefile;
1128 }
1129 if (TFile::Cp(source, Form("file:./%s/%s", dirname.Data(), targetFileName.Data()))) {
1130 Bool_t success = kTRUE;
1131 if (additionalArchives) {
1132 for (Int_t j=0; j<additionalArchives->GetEntriesFast(); j++) {
1133 TString target;
1134 target.Form("./%s/%s", dirname.Data(), additionalArchives->At(j)->GetName());
1135 gSystem->MakeDirectory(gSystem->DirName(target));
1136 success &= TFile::Cp(Form("%s/%s", gSystem->DirName(source.Data()), additionalArchives->At(j)->GetName()), Form("file:%s", target.Data()));
1137 }
1138 }
1139
1140 if (success) {
1141 if (strlen(archivefile) > 0) targetFileName = Form("%s#%s", targetFileName.Data(), gSystem->BaseName(turl.Data()));
1142 out << cdir << Form("/%s/%s/%s", outputdir, dirname.Data(), targetFileName.Data()) << endl;
1143 ncopied++;
1144 }
1145 }
1146 }
1147 gSystem->ChangeDirectory(cdir);
1148 delete res;
1149 delete additionalArchives;
1150 return ncopied;
1151}
1152
1153//______________________________________________________________________________
1154Bool_t AliAnalysisAlien::CreateDataset(const char *pattern)
1155{
1156// Create dataset for the grid data directory + run number.
1157 const Int_t gMaxEntries = 15000;
1158 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline)) return kTRUE;
1159 if (!Connect()) {
1160 Error("CreateDataset", "Cannot create dataset with no grid connection");
1161 return kFALSE;
1162 }
1163
1164 // Cd workspace
1165 if (!TestBit(AliAnalysisGrid::kTest)) CdWork();
1166 TString workdir = gGrid->GetHomeDirectory();
1167 workdir += fGridWorkingDir;
1168
1169 // Compose the 'find' command arguments
1170 TString format;
1171 TString command;
1172 TString options = "-x collection ";
1173 if (TestBit(AliAnalysisGrid::kTest)) options += Form("-l %d ", fNtestFiles);
1174 else options += Form("-l %d ", gMaxEntries); // Protection for the find command
1175 TString conditions = "";
1176 Int_t nstart = 0;
1177 Int_t ncount = 0;
1178 Int_t stage = 0;
1179 TString file;
1180 TString path;
1181 Int_t nruns = 0;
1182 TString schunk, schunk2;
1183 TGridCollection *cbase=0, *cadd=0;
1184 if (!fRunNumbers.Length() && !fRunRange[0]) {
1185 if (fInputFiles && fInputFiles->GetEntries()) return kTRUE;
1186 // Make a single data collection from data directory.
1187 path = fGridDataDir;
1188 if (!DirectoryExists(path)) {
1189 Error("CreateDataset", "Path to data directory %s not valid",fGridDataDir.Data());
1190 return kFALSE;
1191 }
1192// CdWork();
1193 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
1194 else file = Form("%s.xml", gSystem->BaseName(path));
1195 while (1) {
1196 ncount = 0;
1197 stage++;
1198 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
1199 command = "find ";
1200 command += Form("%s -o %d ",options.Data(), nstart);
1201 command += path;
1202 command += " ";
1203 command += pattern;
1204 command += conditions;
1205 printf("command: %s\n", command.Data());
1206 TGridResult *res = gGrid->Command(command);
1207 if (res) delete res;
1208 // Write standard output to file
1209 gROOT->ProcessLine(Form("gGrid->Stdout(); > __tmp%d__%s", stage, file.Data()));
1210 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
1211 Bool_t nullFile = kFALSE;
1212 if (!hasGrep) {
1213 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
1214 } else {
1215 nullFile = (gSystem->Exec(Form("grep -c /event __tmp%d__%s 2>/dev/null > __tmp__",stage,file.Data()))==0)?kFALSE:kTRUE;
1216 if (nullFile) {
1217 Error("CreateDataset","Dataset %s produced by the previous find command is empty !", file.Data());
1218 gSystem->Exec("rm -f __tmp*");
1219 return kFALSE;
1220 }
1221 TString line;
1222 ifstream in;
1223 in.open("__tmp__");
1224 in >> line;
1225 in.close();
1226 gSystem->Exec("rm -f __tmp__");
1227 ncount = line.Atoi();
1228 }
1229 }
1230 if (ncount == gMaxEntries) {
1231 Info("CreateDataset", "Dataset %s has more than 15K entries. Trying to merge...", file.Data());
1232 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1233 if (!cbase) cbase = cadd;
1234 else {
1235 cbase->Add(cadd);
1236 delete cadd;
1237 }
1238 nstart += ncount;
1239 } else {
1240 if (cbase) {
1241 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1242 printf("... please wait - TAlienCollection::Add() scales badly...\n");
1243 cbase->Add(cadd);
1244 delete cadd;
1245 cbase->ExportXML(Form("file://%s", file.Data()),kFALSE,kFALSE, file, "Merged entries for a run");
1246 delete cbase; cbase = 0;
1247 } else {
1248 TFile::Cp(Form("__tmp%d__%s",stage, file.Data()), file.Data());
1249 }
1250 gSystem->Exec("rm -f __tmp*");
1251 Info("CreateDataset", "Created dataset %s with %d files", file.Data(), nstart+ncount);
1252 break;
1253 }
1254 }
1255 Bool_t fileExists = FileExists(file);
1256 if (!TestBit(AliAnalysisGrid::kTest) && (!fileExists || fOverwriteMode)) {
1257 // Copy xml file to alien space
1258 if (fileExists) gGrid->Rm(file);
1259 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
1260 if (!FileExists(file)) {
1261 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
1262 return kFALSE;
1263 }
1264 // Update list of files to be processed.
1265 }
1266 AddDataFile(Form("%s/%s", workdir.Data(), file.Data()));
1267 return kTRUE;
1268 }
1269 // Several runs
1270 Bool_t nullResult = kTRUE;
1271 if (fRunNumbers.Length()) {
1272 TObjArray *arr = fRunNumbers.Tokenize(" ");
1273 TObjString *os;
1274 TIter next(arr);
1275 while ((os=(TObjString*)next())) {
1276 nstart = 0;
1277 stage = 0;
1278 path = Form("%s/%s/ ", fGridDataDir.Data(), os->GetString().Data());
1279 if (!DirectoryExists(path)) continue;
1280// CdWork();
1281 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
1282 else file = Form("%s.xml", os->GetString().Data());
1283 // If local collection file does not exist, create it via 'find' command.
1284 while (1) {
1285 ncount = 0;
1286 stage++;
1287 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
1288 command = "find ";
1289 command += Form("%s -o %d ",options.Data(), nstart);
1290 command += path;
1291 command += pattern;
1292 command += conditions;
1293 TGridResult *res = gGrid->Command(command);
1294 if (res) delete res;
1295 // Write standard output to file
1296 gROOT->ProcessLine(Form("gGrid->Stdout(); > __tmp%d__%s", stage,file.Data()));
1297 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
1298 Bool_t nullFile = kFALSE;
1299 if (!hasGrep) {
1300 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
1301 } else {
1302 nullFile = (gSystem->Exec(Form("grep -c /event __tmp%d__%s 2>/dev/null > __tmp__",stage,file.Data()))==0)?kFALSE:kTRUE;
1303 if (nullFile) {
1304 Warning("CreateDataset","Dataset %s produced by: <%s> is empty !", file.Data(), command.Data());
1305 gSystem->Exec("rm -f __tmp*");
1306 fRunNumbers.ReplaceAll(os->GetString().Data(), "");
1307 break;
1308 }
1309 TString line;
1310 ifstream in;
1311 in.open("__tmp__");
1312 in >> line;
1313 in.close();
1314 gSystem->Exec("rm -f __tmp__");
1315 ncount = line.Atoi();
1316 }
1317 nullResult = kFALSE;
1318 }
1319 if (ncount == gMaxEntries) {
1320 Info("CreateDataset", "Dataset %s has more than 15K entries. Trying to merge...", file.Data());
1321 if (fNrunsPerMaster > 1) {
1322 Error("CreateDataset", "File %s has more than %d entries. Please set the number of runs per master to 1 !",
1323 file.Data(),gMaxEntries);
1324 return kFALSE;
1325 }
1326 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1327 if (!cbase) cbase = cadd;
1328 else {
1329 cbase->Add(cadd);
1330 delete cadd;
1331 }
1332 nstart += ncount;
1333 } else {
1334 if (cbase && fNrunsPerMaster<2) {
1335 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1336 printf("... please wait - TAlienCollection::Add() scales badly...\n");
1337 cbase->Add(cadd);
1338 delete cadd;
1339 cbase->ExportXML(Form("file://%s", file.Data()),kFALSE,kFALSE, file, "Merged entries for a run");
1340 delete cbase; cbase = 0;
1341 } else {
1342 TFile::Cp(Form("__tmp%d__%s",stage, file.Data()), file.Data());
1343 }
1344 gSystem->Exec("rm -f __tmp*");
1345 Info("CreateDataset", "Created dataset %s with %d files", file.Data(), nstart+ncount);
1346 break;
1347 }
1348 }
1349 if (TestBit(AliAnalysisGrid::kTest)) break;
1350 // Check if there is one run per master job.
1351 if (fNrunsPerMaster<2) {
1352 if (FileExists(file)) {
1353 if (fOverwriteMode) gGrid->Rm(file);
1354 else {
1355 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
1356 continue;
1357 }
1358 }
1359 // Copy xml file to alien space
1360 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
1361 if (!FileExists(file)) {
1362 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
1363 delete arr;
1364 return kFALSE;
1365 }
1366 } else {
1367 nruns++;
1368 if (((nruns-1)%fNrunsPerMaster) == 0) {
1369 schunk = os->GetString();
1370 cbase = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
1371 } else {
1372 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
1373 printf(" Merging collection <%s> into masterjob input...\n", file.Data());
1374 cbase->Add(cadd);
1375 delete cadd;
1376 }
1377 if ((nruns%fNrunsPerMaster)!=0 && os!=arr->Last()) {
1378 continue;
1379 }
1380 schunk += Form("_%s.xml", os->GetString().Data());
1381 if (FileExists(schunk)) {
1382 if (fOverwriteMode) gGrid->Rm(file);
1383 else {
1384 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", schunk.Data());
1385 continue;
1386 }
1387 }
1388 printf("Exporting merged collection <%s> and copying to AliEn\n", schunk.Data());
1389 cbase->ExportXML(Form("file://%s", schunk.Data()),kFALSE,kFALSE, schunk, "Merged runs");
1390 TFile::Cp(Form("file:%s",schunk.Data()), Form("alien://%s/%s",workdir.Data(), schunk.Data()));
1391 if (!FileExists(schunk)) {
1392 Error("CreateDataset", "Copy command did NOT succeed for %s", schunk.Data());
1393 delete arr;
1394 return kFALSE;
1395 }
1396 }
1397 }
1398 delete arr;
1399 if (nullResult) {
1400 Error("CreateDataset", "No valid dataset corresponding to the query!");
1401 return kFALSE;
1402 }
1403 } else {
1404 // Process a full run range.
1405 for (Int_t irun=fRunRange[0]; irun<=fRunRange[1]; irun++) {
1406 format = Form("%%s/%s ", fRunPrefix.Data());
1407 nstart = 0;
1408 stage = 0;
1409 path = Form(format.Data(), fGridDataDir.Data(), irun);
1410 if (!DirectoryExists(path)) continue;
1411// CdWork();
1412 format = Form("%s.xml", fRunPrefix.Data());
1413 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
1414 else file = Form(format.Data(), irun);
1415 if (FileExists(file) && fNrunsPerMaster<2 && !TestBit(AliAnalysisGrid::kTest)) {
1416 if (fOverwriteMode) gGrid->Rm(file);
1417 else {
1418 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
1419 continue;
1420 }
1421 }
1422 // If local collection file does not exist, create it via 'find' command.
1423 while (1) {
1424 ncount = 0;
1425 stage++;
1426 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
1427 command = "find ";
1428 command += Form("%s -o %d ",options.Data(), nstart);
1429 command += path;
1430 command += pattern;
1431 command += conditions;
1432 TGridResult *res = gGrid->Command(command);
1433 if (res) delete res;
1434 // Write standard output to file
1435 gROOT->ProcessLine(Form("gGrid->Stdout(); > __tmp%d__%s", stage,file.Data()));
1436 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
1437 Bool_t nullFile = kFALSE;
1438 if (!hasGrep) {
1439 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
1440 } else {
1441 nullFile = (gSystem->Exec(Form("grep -c /event __tmp%d__%s 2>/dev/null > __tmp__",stage,file.Data()))==0)?kFALSE:kTRUE;
1442 if (nullFile) {
1443 Warning("CreateDataset","Dataset %s produced by: <%s> is empty !", file.Data(), command.Data());
1444 gSystem->Exec("rm -f __tmp*");
1445 break;
1446 }
1447 TString line;
1448 ifstream in;
1449 in.open("__tmp__");
1450 in >> line;
1451 in.close();
1452 gSystem->Exec("rm -f __tmp__");
1453 ncount = line.Atoi();
1454 }
1455 nullResult = kFALSE;
1456 }
1457 if (ncount == gMaxEntries) {
1458 Info("CreateDataset", "Dataset %s has more than 15K entries. Trying to merge...", file.Data());
1459 if (fNrunsPerMaster > 1) {
1460 Error("CreateDataset", "File %s has more than %d entries. Please set the number of runs per master to 1 !",
1461 file.Data(),gMaxEntries);
1462 return kFALSE;
1463 }
1464 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1465 if (!cbase) cbase = cadd;
1466 else {
1467 cbase->Add(cadd);
1468 delete cadd;
1469 }
1470 nstart += ncount;
1471 } else {
1472 if (cbase && fNrunsPerMaster<2) {
1473 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1474 printf("... please wait - TAlienCollection::Add() scales badly...\n");
1475 cbase->Add(cadd);
1476 delete cadd;
1477 cbase->ExportXML(Form("file://%s", file.Data()),kFALSE,kFALSE, file, "Merged entries for a run");
1478 delete cbase; cbase = 0;
1479 } else {
1480 TFile::Cp(Form("__tmp%d__%s",stage, file.Data()), file.Data());
1481 }
1482 Info("CreateDataset", "Created dataset %s with %d files", file.Data(), nstart+ncount);
1483 break;
1484 }
1485 }
1486 if (TestBit(AliAnalysisGrid::kTest)) break;
1487 // Check if there is one run per master job.
1488 if (fNrunsPerMaster<2) {
1489 if (FileExists(file)) {
1490 if (fOverwriteMode) gGrid->Rm(file);
1491 else {
1492 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
1493 continue;
1494 }
1495 }
1496 // Copy xml file to alien space
1497 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
1498 if (!FileExists(file)) {
1499 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
1500 return kFALSE;
1501 }
1502 } else {
1503 nruns++;
1504 // Check if the collection for the chunk exist locally.
1505 Int_t nchunk = (nruns-1)/fNrunsPerMaster;
1506 if (FileExists(fInputFiles->At(nchunk)->GetName())) {
1507 if (fOverwriteMode) gGrid->Rm(fInputFiles->At(nchunk)->GetName());
1508 else continue;
1509 }
1510 printf(" Merging collection <%s> into %d runs chunk...\n",file.Data(),fNrunsPerMaster);
1511 if (((nruns-1)%fNrunsPerMaster) == 0) {
1512 schunk = Form(fRunPrefix.Data(), irun);
1513 cbase = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
1514 } else {
1515 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
1516 cbase->Add(cadd);
1517 delete cadd;
1518 }
1519 format = Form("%%s_%s.xml", fRunPrefix.Data());
1520 schunk2 = Form(format.Data(), schunk.Data(), irun);
1521 if ((nruns%fNrunsPerMaster)!=0 && irun!=fRunRange[1] && schunk2 != fInputFiles->Last()->GetName()) {
1522 continue;
1523 }
1524 schunk = schunk2;
1525 if (FileExists(schunk)) {
1526 if (fOverwriteMode) gGrid->Rm(schunk);
1527 else {
1528 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", schunk.Data());
1529 continue;
1530 }
1531 }
1532 printf("Exporting merged collection <%s> and copying to AliEn.\n", schunk.Data());
1533 cbase->ExportXML(Form("file://%s", schunk.Data()),kFALSE,kFALSE, schunk, "Merged runs");
1534 if (FileExists(schunk)) {
1535 if (fOverwriteMode) gGrid->Rm(schunk);
1536 else {
1537 Info("CreateDataset", "\n##### Dataset %s exist. Skipping copy...", schunk.Data());
1538 continue;
1539 }
1540 }
1541 TFile::Cp(Form("file:%s",schunk.Data()), Form("alien://%s/%s",workdir.Data(), schunk.Data()));
1542 if (!FileExists(schunk)) {
1543 Error("CreateDataset", "Copy command did NOT succeed for %s", schunk.Data());
1544 return kFALSE;
1545 }
1546 }
1547 }
1548 if (nullResult) {
1549 Error("CreateDataset", "No valid dataset corresponding to the query!");
1550 return kFALSE;
1551 }
1552 }
1553 return kTRUE;
1554}
1555
1556//______________________________________________________________________________
1557Bool_t AliAnalysisAlien::CreateJDL()
1558{
1559// Generate a JDL file according to current settings. The name of the file is
1560// specified by fJDLName.
1561 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
1562 Bool_t error = kFALSE;
1563 TObjArray *arr = 0;
1564 Bool_t copy = kTRUE;
1565 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
1566 Bool_t generate = kTRUE;
1567 if (TestBit(AliAnalysisGrid::kTest) || TestBit(AliAnalysisGrid::kSubmit)) generate = kFALSE;
1568 if (!Connect()) {
1569 Error("CreateJDL", "Alien connection required");
1570 return kFALSE;
1571 }
1572 // Check validity of alien workspace
1573 TString workdir;
1574 if (!fProductionMode && !fGridWorkingDir.BeginsWith("/alice")) workdir = gGrid->GetHomeDirectory();
1575 if (!fProductionMode && !TestBit(AliAnalysisGrid::kTest)) CdWork();
1576 workdir += fGridWorkingDir;
1577 if (generate) {
1578 TObjString *os;
1579 if (!fInputFiles) {
1580 Error("CreateJDL()", "Define some input files for your analysis.");
1581 error = kTRUE;
1582 }
1583 // Compose list of input files
1584 // Check if output files were defined
1585 if (!fOutputFiles.Length()) {
1586 Error("CreateJDL", "You must define at least one output file");
1587 error = kTRUE;
1588 }
1589 // Check if an output directory was defined and valid
1590 if (!fGridOutputDir.Length()) {
1591 Error("CreateJDL", "You must define AliEn output directory");
1592 error = kTRUE;
1593 } else {
1594 if (!fProductionMode) {
1595 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s", workdir.Data(), fGridOutputDir.Data());
1596 if (!DirectoryExists(fGridOutputDir)) {
1597 if (gGrid->Mkdir(fGridOutputDir,"-p")) {
1598 Info("CreateJDL", "\n##### Created alien output directory %s", fGridOutputDir.Data());
1599 } else {
1600 Error("CreateJDL", "Could not create alien output directory %s", fGridOutputDir.Data());
1601 // error = kTRUE;
1602 }
1603 } else {
1604 Warning("CreateJDL", "#### Output directory %s exists! If this contains old data, jobs will fail with ERROR_SV !!! ###", fGridOutputDir.Data());
1605 }
1606 gGrid->Cd(workdir);
1607 }
1608 }
1609 // Exit if any error up to now
1610 if (error) return kFALSE;
1611 // Set JDL fields
1612 if (!fUser.IsNull()) {
1613 fGridJDL->SetValue("User", Form("\"%s\"", fUser.Data()));
1614 fMergingJDL->SetValue("User", Form("\"%s\"", fUser.Data()));
1615 }
1616 fGridJDL->SetExecutable(fExecutable, "This is the startup script");
1617 TString mergeExec = fExecutable;
1618 mergeExec.ReplaceAll(".sh", "_merge.sh");
1619 fMergingJDL->SetExecutable(mergeExec, "This is the startup script");
1620 mergeExec.ReplaceAll(".sh", ".C");
1621 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),mergeExec.Data()), "List of input files to be uploaded to workers");
1622 if (!fArguments.IsNull())
1623 fGridJDL->SetArguments(fArguments, "Arguments for the executable command");
1624 if (IsOneStageMerging()) fMergingJDL->SetArguments(fGridOutputDir);
1625 else {
1626 if (fProductionMode) fMergingJDL->SetArguments("wn.xml $4"); // xml, stage
1627 else fMergingJDL->SetArguments("wn.xml $2"); // xml, stage
1628 }
1629
1630 fGridJDL->SetValue("TTL", Form("\"%d\"",fTTL));
1631 fGridJDL->SetDescription("TTL", Form("Time after which the job is killed (%d min.)", fTTL/60));
1632 fMergingJDL->SetValue("TTL", Form("\"%d\"",fTTL));
1633 fMergingJDL->SetDescription("TTL", Form("Time after which the job is killed (%d min.)", fTTL/60));
1634
1635 if (fMaxInitFailed > 0) {
1636 fGridJDL->SetValue("MaxInitFailed", Form("\"%d\"",fMaxInitFailed));
1637 fGridJDL->SetDescription("MaxInitFailed", "Maximum number of first failing jobs to abort the master job");
1638 }
1639 if (fSplitMaxInputFileNumber > 0) {
1640 fGridJDL->SetValue("SplitMaxInputFileNumber", Form("\"%d\"", fSplitMaxInputFileNumber));
1641 fGridJDL->SetDescription("SplitMaxInputFileNumber", "Maximum number of input files to be processed per subjob");
1642 }
1643 if (!IsOneStageMerging()) {
1644 fMergingJDL->SetValue("SplitMaxInputFileNumber", Form("\"%d\"",fMaxMergeFiles));
1645 fMergingJDL->SetDescription("SplitMaxInputFileNumber", "Maximum number of input files to be merged in one go");
1646 }
1647 if (fSplitMode.Length()) {
1648 fGridJDL->SetValue("Split", Form("\"%s\"", fSplitMode.Data()));
1649 fGridJDL->SetDescription("Split", "We split per SE or file");
1650 }
1651 fMergingJDL->SetValue("Split", "\"se\"");
1652 fMergingJDL->SetDescription("Split", "We split per SE for merging in stages");
1653 if (!fAliROOTVersion.IsNull()) {
1654 fGridJDL->AddToPackages("AliRoot", fAliROOTVersion,"VO_ALICE", "List of requested packages");
1655 fMergingJDL->AddToPackages("AliRoot", fAliROOTVersion, "VO_ALICE", "List of requested packages");
1656 }
1657 if (!fROOTVersion.IsNull()) {
1658 fGridJDL->AddToPackages("ROOT", fROOTVersion);
1659 fMergingJDL->AddToPackages("ROOT", fROOTVersion);
1660 }
1661 if (!fAPIVersion.IsNull()) {
1662 fGridJDL->AddToPackages("APISCONFIG", fAPIVersion);
1663 fMergingJDL->AddToPackages("APISCONFIG", fAPIVersion);
1664 }
1665 if (!fExternalPackages.IsNull()) {
1666 arr = fExternalPackages.Tokenize(" ");
1667 TIter next(arr);
1668 while ((os=(TObjString*)next())) {
1669 TString pkgname = os->GetString();
1670 Int_t index = pkgname.Index("::");
1671 TString pkgversion = pkgname(index+2, pkgname.Length());
1672 pkgname.Remove(index);
1673 fGridJDL->AddToPackages(pkgname, pkgversion);
1674 fMergingJDL->AddToPackages(pkgname, pkgversion);
1675 }
1676 delete arr;
1677 }
1678 fGridJDL->SetInputDataListFormat(fInputFormat, "Format of input data");
1679 fGridJDL->SetInputDataList("wn.xml", "Collection name to be processed on each worker node");
1680 fMergingJDL->SetInputDataListFormat(fInputFormat, "Format of input data");
1681 fMergingJDL->SetInputDataList("wn.xml", "Collection name to be processed on each worker node");
1682 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), fAnalysisMacro.Data()), "List of input files to be uploaded to workers");
1683 TString analysisFile = fExecutable;
1684 analysisFile.ReplaceAll(".sh", ".root");
1685 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),analysisFile.Data()));
1686 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),analysisFile.Data()));
1687 if (fAdditionalLibs.Length()) {
1688 arr = fAdditionalLibs.Tokenize(" ");
1689 TIter next(arr);
1690 while ((os=(TObjString*)next())) {
1691 if (os->GetString().Contains(".so")) continue;
1692 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), os->GetString().Data()));
1693 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), os->GetString().Data()));
1694 }
1695 delete arr;
1696 }
1697 if (fPackages) {
1698 TIter next(fPackages);
1699 TObject *obj;
1700 while ((obj=next())) {
1701 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), obj->GetName()));
1702 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), obj->GetName()));
1703 }
1704 }
1705 const char *comment = "List of output files and archives";
1706 if (fOutputArchive.Length()) {
1707 TString outputArchive = fOutputArchive;
1708 if (!fRegisterExcludes.IsNull()) {
1709 arr = fRegisterExcludes.Tokenize(" ");
1710 TIter next1(arr);
1711 while ((os=(TObjString*)next1())) {
1712 outputArchive.ReplaceAll(Form("%s,",os->GetString().Data()),"");
1713 outputArchive.ReplaceAll(os->GetString(),"");
1714 }
1715 delete arr;
1716 }
1717 arr = outputArchive.Tokenize(" ");
1718 TIter next(arr);
1719 Bool_t first = kTRUE;
1720 while ((os=(TObjString*)next())) {
1721 if (!os->GetString().Contains("@") && fCloseSE.Length())
1722 fGridJDL->AddToSet("Output", Form("%s@%s",os->GetString().Data(), fCloseSE.Data()));
1723 else
1724 fGridJDL->AddToSet("Output", os->GetString());
1725 if (first) fGridJDL->AddToSetDescription("Output", comment);
1726 first = kFALSE;
1727 }
1728 delete arr;
1729 // Output archive for the merging jdl
1730 if (TestBit(AliAnalysisGrid::kDefaultOutputs)) {
1731 outputArchive = "log_archive.zip:std*@disk=1 ";
1732 // Add normal output files, extra files + terminate files
1733 TString files = GetListOfFiles("outextter");
1734 // Do not register files in fRegisterExcludes
1735 if (!fRegisterExcludes.IsNull()) {
1736 arr = fRegisterExcludes.Tokenize(" ");
1737 TIter next1(arr);
1738 while ((os=(TObjString*)next1())) {
1739 files.ReplaceAll(Form("%s,",os->GetString().Data()),"");
1740 files.ReplaceAll(os->GetString(),"");
1741 }
1742 delete arr;
1743 }
1744 files.ReplaceAll(".root", "*.root");
1745
1746 if (mgr->IsCollectThroughput())
1747 outputArchive += Form("root_archive.zip:%s,*.stat@disk=%d %s@disk=%d",files.Data(),fNreplicas, mgr->GetFileInfoLog(),fNreplicas);
1748 else
1749 outputArchive += Form("root_archive.zip:%s,*.stat@disk=%d",files.Data(),fNreplicas);
1750 } else {
1751 TString files = fOutputArchive;
1752 files.ReplaceAll(".root", "*.root"); // nreplicas etc should be already atttached by use
1753 outputArchive = files;
1754 }
1755 arr = outputArchive.Tokenize(" ");
1756 TIter next2(arr);
1757 first = kTRUE;
1758 while ((os=(TObjString*)next2())) {
1759 TString currentfile = os->GetString();
1760 if (!currentfile.Contains("@") && fCloseSE.Length())
1761 fMergingJDL->AddToSet("Output", Form("%s@%s",currentfile.Data(), fCloseSE.Data()));
1762 else
1763 fMergingJDL->AddToSet("Output", currentfile);
1764 if (first) fMergingJDL->AddToSetDescription("Output", comment);
1765 first = kFALSE;
1766 }
1767 delete arr;
1768 }
1769 arr = fOutputFiles.Tokenize(",");
1770 TIter next(arr);
1771 Bool_t first = kTRUE;
1772 while ((os=(TObjString*)next())) {
1773 // Ignore ouputs in jdl that are also in outputarchive
1774 TString sout = os->GetString();
1775 sout.ReplaceAll("*", "");
1776 sout.ReplaceAll(".root", "");
1777 if (sout.Index("@")>0) sout.Remove(sout.Index("@"));
1778 if (fOutputArchive.Contains(sout)) continue;
1779 // Ignore fRegisterExcludes
1780 if (fRegisterExcludes.Contains(sout)) continue;
1781 if (!first) comment = NULL;
1782 if (!os->GetString().Contains("@") && fCloseSE.Length())
1783 fGridJDL->AddToSet("Output", Form("%s@%s",os->GetString().Data(), fCloseSE.Data()));
1784 else
1785 fGridJDL->AddToSet("Output", os->GetString());
1786 if (first) fGridJDL->AddToSetDescription("Output", comment);
1787 if (fMergeExcludes.Contains(sout)) continue;
1788 if (!os->GetString().Contains("@") && fCloseSE.Length())
1789 fMergingJDL->AddToSet("Output", Form("%s@%s",os->GetString().Data(), fCloseSE.Data()));
1790 else
1791 fMergingJDL->AddToSet("Output", os->GetString());
1792 if (first) fMergingJDL->AddToSetDescription("Output", comment);
1793 first = kFALSE;
1794 }
1795 delete arr;
1796 fGridJDL->SetPrice((UInt_t)fPrice, "AliEn price for this job");
1797 fMergingJDL->SetPrice((UInt_t)fPrice, "AliEn price for this job");
1798 TString validationScript = fValidationScript;
1799 fGridJDL->SetValidationCommand(Form("%s/%s", workdir.Data(),validationScript.Data()), "Validation script to be run for each subjob");
1800 validationScript.ReplaceAll(".sh", "_merge.sh");
1801 fMergingJDL->SetValidationCommand(Form("%s/%s", workdir.Data(),validationScript.Data()), "Validation script to be run for each subjob");
1802 if (fMasterResubmitThreshold) {
1803 fGridJDL->SetValue("MasterResubmitThreshold", Form("\"%d%%\"", fMasterResubmitThreshold));
1804 fGridJDL->SetDescription("MasterResubmitThreshold", "Resubmit failed jobs until DONE rate reaches this percentage");
1805 }
1806 // Write a jdl with 2 input parameters: collection name and output dir name.
1807 WriteJDL(copy);
1808 }
1809 // Copy jdl to grid workspace
1810 if (copy) {
1811 // Check if an output directory was defined and valid
1812 if (!fGridOutputDir.Length()) {
1813 Error("CreateJDL", "You must define AliEn output directory");
1814 return kFALSE;
1815 } else {
1816 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s", workdir.Data(), fGridOutputDir.Data());
1817 if (!fProductionMode && !DirectoryExists(fGridOutputDir)) {
1818 if (gGrid->Mkdir(fGridOutputDir,"-p")) {
1819 Info("CreateJDL", "\n##### Created alien output directory %s", fGridOutputDir.Data());
1820 } else {
1821 Error("CreateJDL", "Could not create alien output directory %s", fGridOutputDir.Data());
1822 return kFALSE;
1823 }
1824 }
1825 gGrid->Cd(workdir);
1826 }
1827 if (TestBit(AliAnalysisGrid::kSubmit)) {
1828 TString mergeJDLName = fExecutable;
1829 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
1830 TString locjdl = Form("%s/%s", fGridOutputDir.Data(),fJDLName.Data());
1831 TString locjdl1 = Form("%s/%s", fGridOutputDir.Data(),mergeJDLName.Data());
1832 if (fProductionMode) {
1833 locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
1834 locjdl1 = Form("%s/%s", workdir.Data(),mergeJDLName.Data());
1835 }
1836 if (FileExists(locjdl)) gGrid->Rm(locjdl);
1837 if (FileExists(locjdl1)) gGrid->Rm(locjdl1);
1838 Info("CreateJDL", "\n##### Copying JDL file <%s> to your AliEn output directory", fJDLName.Data());
1839 if (!copyLocal2Alien("CreateJDL", fJDLName, locjdl))
1840 Fatal("","Terminating");
1841// TFile::Cp(Form("file:%s",fJDLName.Data()), Form("alien://%s", locjdl.Data()));
1842 if (fMergeViaJDL) {
1843 Info("CreateJDL", "\n##### Copying merging JDL file <%s> to your AliEn output directory", mergeJDLName.Data());
1844// TFile::Cp(Form("file:%s",mergeJDLName.Data()), Form("alien://%s", locjdl1.Data()));
1845 if (!copyLocal2Alien("CreateJDL", mergeJDLName.Data(), locjdl1))
1846 Fatal("","Terminating");
1847 }
1848 }
1849 if (fAdditionalLibs.Length()) {
1850 arr = fAdditionalLibs.Tokenize(" ");
1851 TObjString *os;
1852 TIter next(arr);
1853 while ((os=(TObjString*)next())) {
1854 if (os->GetString().Contains(".so")) continue;
1855 Info("CreateJDL", "\n##### Copying dependency: <%s> to your alien workspace", os->GetString().Data());
1856 if (FileExists(os->GetString())) gGrid->Rm(os->GetString());
1857// TFile::Cp(Form("file:%s",os->GetString().Data()), Form("alien://%s/%s", workdir.Data(), os->GetString().Data()));
1858 if (!copyLocal2Alien("CreateJDL", os->GetString().Data(),
1859 Form("%s/%s", workdir.Data(), os->GetString().Data())))
1860 Fatal("","Terminating");
1861 }
1862 delete arr;
1863 }
1864 if (fPackages) {
1865 TIter next(fPackages);
1866 TObject *obj;
1867 while ((obj=next())) {
1868 if (FileExists(obj->GetName())) gGrid->Rm(obj->GetName());
1869 Info("CreateJDL", "\n##### Copying dependency: <%s> to your alien workspace", obj->GetName());
1870// TFile::Cp(Form("file:%s",obj->GetName()), Form("alien://%s/%s", workdir.Data(), obj->GetName()));
1871 if (!copyLocal2Alien("CreateJDL",obj->GetName(),
1872 Form("%s/%s", workdir.Data(), obj->GetName())))
1873 Fatal("","Terminating");
1874 }
1875 }
1876 }
1877 return kTRUE;
1878}
1879
1880//______________________________________________________________________________
1881Bool_t AliAnalysisAlien::WriteJDL(Bool_t copy)
1882{
1883// Writes one or more JDL's corresponding to findex. If findex is negative,
1884// all run numbers are considered in one go (jdl). For non-negative indices
1885// they correspond to the indices in the array fInputFiles.
1886 if (!fInputFiles) return kFALSE;
1887 TObject *os;
1888 TString workdir;
1889 if (!fProductionMode && !fGridWorkingDir.BeginsWith("/alice")) workdir = gGrid->GetHomeDirectory();
1890 workdir += fGridWorkingDir;
1891 TString stageName = "$2";
1892 if (fProductionMode) stageName = "$4";
1893 if (!fMergeDirName.IsNull()) {
1894 fMergingJDL->AddToInputDataCollection(Form("LF:$1/%s/Stage_%s.xml,nodownload",fMergeDirName.Data(),stageName.Data()), "Collection of files to be merged for current stage");
1895 fMergingJDL->SetOutputDirectory(Form("$1/%s/Stage_%s/#alien_counter_03i#",fMergeDirName.Data(),stageName.Data()), "Output directory");
1896 } else {
1897 fMergingJDL->AddToInputDataCollection(Form("LF:$1/Stage_%s.xml,nodownload",stageName.Data()), "Collection of files to be merged for current stage");
1898 fMergingJDL->SetOutputDirectory(Form("$1/Stage_%s/#alien_counter_03i#",stageName.Data()), "Output directory");
1899 }
1900 if (fProductionMode) {
1901 TIter next(fInputFiles);
1902 while ((os=next())) {
1903 fGridJDL->AddToInputDataCollection(Form("LF:%s,nodownload", os->GetName()), "Input xml collections");
1904 }
1905 if (!fOutputToRunNo)
1906 fGridJDL->SetOutputDirectory(Form("%s/#alien_counter_04i#", fGridOutputDir.Data()));
1907 else
1908 fGridJDL->SetOutputDirectory(fGridOutputDir);
1909 } else {
1910 if (!fRunNumbers.Length() && !fRunRange[0]) {
1911 // One jdl with no parameters in case input data is specified by name.
1912 TIter next(fInputFiles);
1913 while ((os=next()))
1914 fGridJDL->AddToInputDataCollection(Form("LF:%s,nodownload", os->GetName()), "Input xml collections");
1915 if (!fOutputSingle.IsNull())
1916 fGridJDL->SetOutputDirectory(Form("#alienfulldir#/../%s",fOutputSingle.Data()), "Output directory");
1917 else {
1918 fGridJDL->SetOutputDirectory(Form("%s/#alien_counter_03i#", fGridOutputDir.Data()), "Output directory");
1919 fMergingJDL->SetOutputDirectory(fGridOutputDir);
1920 }
1921 } else {
1922 // One jdl to be submitted with 2 input parameters: data collection name and output dir prefix
1923 fGridJDL->AddToInputDataCollection(Form("LF:%s/$1,nodownload", workdir.Data()), "Input xml collections");
1924 if (!fOutputSingle.IsNull()) {
1925 if (!fOutputToRunNo) fGridJDL->SetOutputDirectory(Form("#alienfulldir#/%s",fOutputSingle.Data()), "Output directory");
1926 else fGridJDL->SetOutputDirectory(Form("%s/$2",fGridOutputDir.Data()), "Output directory");
1927 } else {
1928 fGridJDL->SetOutputDirectory(Form("%s/$2/#alien_counter_03i#", fGridOutputDir.Data()), "Output directory");
1929 }
1930 }
1931 }
1932
1933 // Generate the JDL as a string
1934 TString sjdl = fGridJDL->Generate();
1935 TString sjdl1 = fMergingJDL->Generate();
1936 // Final merge jdl
1937 if (!fMergeDirName.IsNull()) {
1938 fMergingJDL->SetOutputDirectory(Form("$1/%s",fMergeDirName.Data()), "Output directory");
1939 fMergingJDL->AddToInputSandbox(Form("LF:$1/%s/Stage_%s.xml",fMergeDirName.Data(),stageName.Data()));
1940 } else {
1941 fMergingJDL->SetOutputDirectory("$1", "Output directory");
1942 fMergingJDL->AddToInputSandbox(Form("LF:$1/Stage_%s.xml",stageName.Data()));
1943 }
1944 TString sjdl2 = fMergingJDL->Generate();
1945 Int_t index, index1;
1946 sjdl.ReplaceAll("\",\"", "\",\n \"");
1947 sjdl.ReplaceAll("(member", "\n (member");
1948 sjdl.ReplaceAll("\",\"VO_", "\",\n \"VO_");
1949 sjdl.ReplaceAll("{", "{\n ");
1950 sjdl.ReplaceAll("};", "\n};");
1951 sjdl.ReplaceAll("{\n \n", "{\n");
1952 sjdl.ReplaceAll("\n\n", "\n");
1953 sjdl.ReplaceAll("OutputDirectory", "OutputDir");
1954 sjdl1.ReplaceAll("\",\"", "\",\n \"");
1955 sjdl1.ReplaceAll("(member", "\n (member");
1956 sjdl1.ReplaceAll("\",\"VO_", "\",\n \"VO_");
1957 sjdl1.ReplaceAll("{", "{\n ");
1958 sjdl1.ReplaceAll("};", "\n};");
1959 sjdl1.ReplaceAll("{\n \n", "{\n");
1960 sjdl1.ReplaceAll("\n\n", "\n");
1961 sjdl1.ReplaceAll("OutputDirectory", "OutputDir");
1962 sjdl2.ReplaceAll("\",\"", "\",\n \"");
1963 sjdl2.ReplaceAll("(member", "\n (member");
1964 sjdl2.ReplaceAll("\",\"VO_", "\",\n \"VO_");
1965 sjdl2.ReplaceAll("{", "{\n ");
1966 sjdl2.ReplaceAll("};", "\n};");
1967 sjdl2.ReplaceAll("{\n \n", "{\n");
1968 sjdl2.ReplaceAll("\n\n", "\n");
1969 sjdl2.ReplaceAll("OutputDirectory", "OutputDir");
1970 sjdl += "JDLVariables = \n{\n \"Packages\",\n \"OutputDir\"\n};\n";
1971 sjdl.Prepend(Form("Jobtag = {\n \"comment:%s\"\n};\n", fJobTag.Data()));
1972 index = sjdl.Index("JDLVariables");
1973 if (index >= 0) sjdl.Insert(index, "\n# JDL variables\n");
1974 sjdl += "Workdirectorysize = {\"5000MB\"};";
1975 sjdl1 += "Workdirectorysize = {\"5000MB\"};";
1976 sjdl1 += "JDLVariables = \n{\n \"Packages\",\n \"OutputDir\"\n};\n";
1977 index = fJobTag.Index(":");
1978 if (index < 0) index = fJobTag.Length();
1979 TString jobTag = fJobTag;
1980 if (fProductionMode) jobTag.Insert(index,"_Stage$4");
1981 sjdl1.Prepend(Form("Jobtag = {\n \"comment:%s_Merging\"\n};\n", jobTag.Data()));
1982 if (fProductionMode) {
1983 sjdl1.Prepend("# Generated merging jdl (production mode) \
1984 \n# $1 = full alien path to output directory to be merged \
1985 \n# $2 = train number \
1986 \n# $3 = production (like LHC10b) \
1987 \n# $4 = merging stage \
1988 \n# Stage_<n>.xml made via: find <OutputDir> *Stage<n-1>/*root_archive.zip\n");
1989 sjdl2.Prepend(Form("Jobtag = {\n \"comment:%s_FinalMerging\"\n};\n", jobTag.Data()));
1990 sjdl2.Prepend("# Generated merging jdl \
1991 \n# $1 = full alien path to output directory to be merged \
1992 \n# $2 = train number \
1993 \n# $3 = production (like LHC10b) \
1994 \n# $4 = merging stage \
1995 \n# Stage_<n>.xml made via: find <OutputDir> *Stage<n-1>/*root_archive.zip\n");
1996 } else {
1997 sjdl1.Prepend("# Generated merging jdl \
1998 \n# $1 = full alien path to output directory to be merged \
1999 \n# $2 = merging stage \
2000 \n# xml made via: find <OutputDir> *Stage<n-1>/*root_archive.zip\n");
2001 sjdl2.Prepend(Form("Jobtag = {\n \"comment:%s_FinalMerging\"\n};\n", jobTag.Data()));
2002 sjdl2.Prepend("# Generated merging jdl \
2003 \n# $1 = full alien path to output directory to be merged \
2004 \n# $2 = merging stage \
2005 \n# xml made via: find <OutputDir> *Stage<n-1>/*root_archive.zip\n");
2006 }
2007 index = sjdl1.Index("JDLVariables");
2008 if (index >= 0) sjdl1.Insert(index, "\n# JDL variables\n");
2009 index = sjdl2.Index("JDLVariables");
2010 if (index >= 0) sjdl2.Insert(index, "\n# JDL variables\n");
2011 sjdl1 += "Workdirectorysize = {\"5000MB\"};";
2012 sjdl2 += "Workdirectorysize = {\"5000MB\"};";
2013 index = sjdl2.Index("Split =");
2014 if (index>=0) {
2015 index1 = sjdl2.Index("\n", index);
2016 sjdl2.Remove(index, index1-index+1);
2017 }
2018 index = sjdl2.Index("SplitMaxInputFileNumber");
2019 if (index>=0) {
2020 index1 = sjdl2.Index("\n", index);
2021 sjdl2.Remove(index, index1-index+1);
2022 }
2023 index = sjdl2.Index("InputDataCollection");
2024 if (index>=0) {
2025 index1 = sjdl2.Index(";", index);
2026 sjdl2.Remove(index, index1-index+1);
2027 }
2028 index = sjdl2.Index("InputDataListFormat");
2029 if (index>=0) {
2030 index1 = sjdl2.Index("\n", index);
2031 sjdl2.Remove(index, index1-index+1);
2032 }
2033 index = sjdl2.Index("InputDataList");
2034 if (index>=0) {
2035 index1 = sjdl2.Index("\n", index);
2036 sjdl2.Remove(index, index1-index+1);
2037 }
2038 sjdl2.ReplaceAll("wn.xml", Form("Stage_%s.xml",stageName.Data()));
2039 // Write jdl to file
2040 ofstream out;
2041 out.open(fJDLName.Data(), ios::out);
2042 if (out.bad()) {
2043 Error("WriteJDL", "Bad file name: %s", fJDLName.Data());
2044 return kFALSE;
2045 }
2046 out << sjdl << endl;
2047 out.close();
2048 TString mergeJDLName = fExecutable;
2049 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
2050 if (fMergeViaJDL) {
2051 ofstream out1;
2052 out1.open(mergeJDLName.Data(), ios::out);
2053 if (out1.bad()) {
2054 Error("WriteJDL", "Bad file name: %s", mergeJDLName.Data());
2055 return kFALSE;
2056 }
2057 out1 << sjdl1 << endl;
2058 out1.close();
2059 ofstream out2;
2060 TString finalJDL = mergeJDLName;
2061 finalJDL.ReplaceAll(".jdl", "_final.jdl");
2062 out2.open(finalJDL.Data(), ios::out);
2063 if (out2.bad()) {
2064 Error("WriteJDL", "Bad file name: %s", finalJDL.Data());
2065 return kFALSE;
2066 }
2067 out2 << sjdl2 << endl;
2068 out2.close();
2069 }
2070
2071 // Copy jdl to grid workspace
2072 if (!copy) {
2073 Info("WriteJDL", "\n##### You may want to review jdl:%s and analysis macro:%s before running in <submit> mode", fJDLName.Data(), fAnalysisMacro.Data());
2074 } else {
2075 TString locjdl = Form("%s/%s", fGridOutputDir.Data(),fJDLName.Data());
2076 TString locjdl1 = Form("%s/%s", fGridOutputDir.Data(),mergeJDLName.Data());
2077 TString finalJDL = mergeJDLName;
2078 finalJDL.ReplaceAll(".jdl", "_final.jdl");
2079 TString locjdl2 = Form("%s/%s", fGridOutputDir.Data(),finalJDL.Data());
2080 if (fProductionMode) {
2081 locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
2082 locjdl1 = Form("%s/%s", workdir.Data(),mergeJDLName.Data());
2083 locjdl2 = Form("%s/%s", workdir.Data(),finalJDL.Data());
2084 }
2085 if (FileExists(locjdl)) gGrid->Rm(locjdl);
2086 if (FileExists(locjdl1)) gGrid->Rm(locjdl1);
2087 if (FileExists(locjdl2)) gGrid->Rm(locjdl2);
2088 Info("WriteJDL", "\n##### Copying JDL file <%s> to your AliEn output directory", fJDLName.Data());
2089// TFile::Cp(Form("file:%s",fJDLName.Data()), Form("alien://%s", locjdl.Data()));
2090 if (!copyLocal2Alien("WriteJDL",fJDLName.Data(),locjdl.Data()))
2091 Fatal("","Terminating");
2092 if (fMergeViaJDL) {
2093 Info("WriteJDL", "\n##### Copying merging JDL files <%s> to your AliEn output directory", mergeJDLName.Data());
2094// TFile::Cp(Form("file:%s",mergeJDLName.Data()), Form("alien://%s", locjdl1.Data()));
2095// TFile::Cp(Form("file:%s",finalJDL.Data()), Form("alien://%s", locjdl2.Data()));
2096 if (!copyLocal2Alien("WriteJDL",mergeJDLName.Data(),locjdl1.Data()))
2097 Fatal("","Terminating");
2098 if (!copyLocal2Alien("WriteJDL",finalJDL.Data(),locjdl2.Data()))
2099 Fatal("","Terminating");
2100 }
2101 }
2102 return kTRUE;
2103}
2104
2105//______________________________________________________________________________
2106Bool_t AliAnalysisAlien::FileExists(const char *lfn)
2107{
2108// Returns true if file exists.
2109 if (!gGrid) return kFALSE;
2110 TString slfn = lfn;
2111 slfn.ReplaceAll("alien://","");
2112 TGridResult *res = gGrid->Ls(slfn);
2113 if (!res) return kFALSE;
2114 TMap *map = dynamic_cast<TMap*>(res->At(0));
2115 if (!map) {
2116 delete res;
2117 return kFALSE;
2118 }
2119 TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("name"));
2120 if (!objs || !objs->GetString().Length()) {
2121 delete res;
2122 return kFALSE;
2123 }
2124 delete res;
2125 return kTRUE;
2126}
2127
2128//______________________________________________________________________________
2129Bool_t AliAnalysisAlien::DirectoryExists(const char *dirname)
2130{
2131// Returns true if directory exists. Can be also a path.
2132 if (!gGrid) return kFALSE;
2133 // Check if dirname is a path
2134 TString dirstripped = dirname;
2135 dirstripped = dirstripped.Strip();
2136 dirstripped = dirstripped.Strip(TString::kTrailing, '/');
2137 TString dir = gSystem->BaseName(dirstripped);
2138 dir += "/";
2139 TString path = gSystem->DirName(dirstripped);
2140 TGridResult *res = gGrid->Ls(path, "-F");
2141 if (!res) return kFALSE;
2142 TIter next(res);
2143 TMap *map;
2144 TObject *obj;
2145 while ((map=dynamic_cast<TMap*>(next()))) {
2146 obj = map->GetValue("name");
2147 if (!obj) break;
2148 if (dir == obj->GetName()) {
2149 delete res;
2150 return kTRUE;
2151 }
2152 }
2153 delete res;
2154 return kFALSE;
2155}
2156
2157//______________________________________________________________________________
2158void AliAnalysisAlien::CheckDataType(const char *lfn, Bool_t &isCollection, Bool_t &isXml, Bool_t &useTags)
2159{
2160// Check input data type.
2161 isCollection = kFALSE;
2162 isXml = kFALSE;
2163 useTags = kFALSE;
2164 if (!gGrid) {
2165 Error("CheckDataType", "No connection to grid");
2166 return;
2167 }
2168 isCollection = IsCollection(lfn);
2169 TString msg = "\n##### file: ";
2170 msg += lfn;
2171 if (isCollection) {
2172 msg += " type: raw_collection;";
2173 // special treatment for collections
2174 isXml = kFALSE;
2175 // check for tag files in the collection
2176 TGridResult *res = gGrid->Command(Form("listFilesFromCollection -z -v %s",lfn), kFALSE);
2177 if (!res) {
2178 msg += " using_tags: No (unknown)";
2179 Info("CheckDataType", "%s", msg.Data());
2180 return;
2181 }
2182 const char* typeStr = res->GetKey(0, "origLFN");
2183 if (!typeStr || !strlen(typeStr)) {
2184 msg += " using_tags: No (unknown)";
2185 Info("CheckDataType", "%s", msg.Data());
2186 return;
2187 }
2188 TString file = typeStr;
2189 useTags = file.Contains(".tag");
2190 if (useTags) msg += " using_tags: Yes";
2191 else msg += " using_tags: No";
2192 Info("CheckDataType", "%s", msg.Data());
2193 return;
2194 }
2195 TString slfn(lfn);
2196 slfn.ToLower();
2197 isXml = slfn.Contains(".xml");
2198 if (isXml) {
2199 // Open xml collection and check if there are tag files inside
2200 msg += " type: xml_collection;";
2201 TGridCollection *coll = (TGridCollection*)gROOT->ProcessLine(Form("TAlienCollection::Open(\"alien://%s\",1);",lfn));
2202 if (!coll) {
2203 msg += " using_tags: No (unknown)";
2204 Info("CheckDataType", "%s", msg.Data());
2205 return;
2206 }
2207 TMap *map = coll->Next();
2208 if (!map) {
2209 msg += " using_tags: No (unknown)";
2210 Info("CheckDataType", "%s", msg.Data());
2211 return;
2212 }
2213 map = (TMap*)map->GetValue("");
2214 TString file;
2215 if (map && map->GetValue("name")) file = map->GetValue("name")->GetName();
2216 useTags = file.Contains(".tag");
2217 delete coll;
2218 if (useTags) msg += " using_tags: Yes";
2219 else msg += " using_tags: No";
2220 Info("CheckDataType", "%s", msg.Data());
2221 return;
2222 }
2223 useTags = slfn.Contains(".tag");
2224 if (slfn.Contains(".root")) msg += " type: root file;";
2225 else msg += " type: unknown file;";
2226 if (useTags) msg += " using_tags: Yes";
2227 else msg += " using_tags: No";
2228 Info("CheckDataType", "%s", msg.Data());
2229}
2230
2231//______________________________________________________________________________
2232void AliAnalysisAlien::EnablePackage(const char *package)
2233{
2234// Enables a par file supposed to exist in the current directory.
2235 TString pkg(package);
2236 pkg.ReplaceAll(".par", "");
2237 pkg += ".par";
2238 if (gSystem->AccessPathName(pkg)) {
2239 Fatal("EnablePackage", "Package %s not found", pkg.Data());
2240 return;
2241 }
2242 if (!TObject::TestBit(AliAnalysisGrid::kUsePars))
2243 Info("EnablePackage", "AliEn plugin will use .par packages");
2244 TObject::SetBit(AliAnalysisGrid::kUsePars, kTRUE);
2245 if (!fPackages) {
2246 fPackages = new TObjArray();
2247 fPackages->SetOwner();
2248 }
2249 fPackages->Add(new TObjString(pkg));
2250}
2251
2252//______________________________________________________________________________
2253TChain *AliAnalysisAlien::GetChainForTestMode(const char *treeName) const
2254{
2255// Make a tree from files having the location specified in fFileForTestMode.
2256// Inspired from JF's CreateESDChain.
2257 if (fFileForTestMode.IsNull()) {
2258 Error("GetChainForTestMode", "For proof test mode please use SetFileForTestMode() pointing to a file that contains data file locations.");
2259 return NULL;
2260 }
2261 if (gSystem->AccessPathName(fFileForTestMode)) {
2262 Error("GetChainForTestMode", "File not found: %s", fFileForTestMode.Data());
2263 return NULL;
2264 }
2265 // Open the file
2266 ifstream in;
2267 in.open(fFileForTestMode);
2268 Int_t count = 0;
2269 // Read the input list of files and add them to the chain
2270 TString line;
2271 TString streeName(treeName);
2272 if (IsUseMCchain()) streeName = "TE";
2273 TChain *chain = new TChain(streeName);
2274 TChain *chainFriend = 0;
2275 if (!fFriendChainName.IsNull()) chainFriend = new TChain(streeName);
2276 while (in.good())
2277 {
2278 in >> line;
2279 if (line.IsNull() || line.BeginsWith("#")) continue;
2280 if (count++ == fNtestFiles) break;
2281 TString esdFile(line);
2282 TFile *file = TFile::Open(esdFile);
2283 if (file && !file->IsZombie()) {
2284 chain->Add(esdFile);
2285 file->Close();
2286 if (!fFriendChainName.IsNull()) {
2287 if (esdFile.Index("#") > -1)
2288 esdFile.Remove(esdFile.Index("#"));
2289 esdFile = gSystem->DirName(esdFile);
2290 esdFile += "/" + fFriendChainName;
2291 file = TFile::Open(esdFile);
2292 if (file && !file->IsZombie()) {
2293 file->Close();
2294 chainFriend->Add(esdFile);
2295 } else {
2296 Fatal("GetChainForTestMode", "Cannot open friend file: %s", esdFile.Data());
2297 return 0;
2298 }
2299 }
2300 } else {
2301 Error("GetChainforTestMode", "Skipping un-openable file: %s", esdFile.Data());
2302 }
2303 }
2304 in.close();
2305 if (!chain->GetListOfFiles()->GetEntries()) {
2306 Error("GetChainForTestMode", "No file from %s could be opened", fFileForTestMode.Data());
2307 delete chain;
2308 delete chainFriend;
2309 return NULL;
2310 }
2311// chain->ls();
2312 if (!fFriendChainName.IsNull()) chain->AddFriend(chainFriend);
2313 return chain;
2314}
2315
2316//______________________________________________________________________________
2317const char *AliAnalysisAlien::GetJobStatus(Int_t jobidstart, Int_t lastid, Int_t &nrunning, Int_t &nwaiting, Int_t &nerror, Int_t &ndone)
2318{
2319// Get job status for all jobs with jobid>jobidstart.
2320 static char mstatus[20];
2321 mstatus[0] = '\0';
2322 nrunning = 0;
2323 nwaiting = 0;
2324 nerror = 0;
2325 ndone = 0;
2326 TGridJobStatusList *list = gGrid->Ps("");
2327 if (!list) return mstatus;
2328 Int_t nentries = list->GetSize();
2329 TGridJobStatus *status;
2330 Int_t pid;
2331 for (Int_t ijob=0; ijob<nentries; ijob++) {
2332 status = (TGridJobStatus *)list->At(ijob);
2333 pid = gROOT->ProcessLine(Form("atoi(((TAlienJobStatus*)%p)->GetKey(\"queueId\"));", status));
2334 if (pid<jobidstart) continue;
2335 if (pid == lastid) {
2336 gROOT->ProcessLine(Form("sprintf((char*)%p,((TAlienJobStatus*)%p)->GetKey(\"status\"));",mstatus, status));
2337 }
2338 switch (status->GetStatus()) {
2339 case TGridJobStatus::kWAITING:
2340 nwaiting++; break;
2341 case TGridJobStatus::kRUNNING:
2342 nrunning++; break;
2343 case TGridJobStatus::kABORTED:
2344 case TGridJobStatus::kFAIL:
2345 case TGridJobStatus::kUNKNOWN:
2346 nerror++; break;
2347 case TGridJobStatus::kDONE:
2348 ndone++;
2349 }
2350 }
2351 list->Delete();
2352 delete list;
2353 return mstatus;
2354}
2355
2356//______________________________________________________________________________
2357Bool_t AliAnalysisAlien::IsCollection(const char *lfn) const
2358{
2359// Returns true if file is a collection. Functionality duplicated from
2360// TAlien::Type() because we don't want to directly depend on TAlien.
2361 if (!gGrid) {
2362 Error("IsCollection", "No connection to grid");
2363 return kFALSE;
2364 }
2365 TGridResult *res = gGrid->Command(Form("type -z %s",lfn),kFALSE);
2366 if (!res) return kFALSE;
2367 const char* typeStr = res->GetKey(0, "type");
2368 if (!typeStr || !strlen(typeStr)) return kFALSE;
2369 if (!strcmp(typeStr, "collection")) return kTRUE;
2370 delete res;
2371 return kFALSE;
2372}
2373
2374//______________________________________________________________________________
2375Bool_t AliAnalysisAlien::IsSingleOutput() const
2376{
2377// Check if single-ouput option is on.
2378 return (!fOutputSingle.IsNull());
2379}
2380
2381//______________________________________________________________________________
2382void AliAnalysisAlien::Print(Option_t *) const
2383{
2384// Print current plugin settings.
2385 printf("### AliEn analysis plugin current settings ###\n");
2386 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
2387 if (mgr && mgr->IsProofMode()) {
2388 TString proofType = "= PLUGIN IN PROOF MODE ON CLUSTER:_________________";
2389 if (TestBit(AliAnalysisGrid::kTest))
2390 proofType = "= PLUGIN IN PROOF LITE MODE ON CLUSTER:____________";
2391 printf("%s %s\n", proofType.Data(), fProofCluster.Data());
2392 if (!fProofDataSet.IsNull())
2393 printf("= Requested data set:___________________________ %s\n", fProofDataSet.Data());
2394 if (fProofReset==1)
2395 printf("= Soft reset signal will be send to master______ CHANGE BEHAVIOR AFTER COMPLETION\n");
2396 if (fProofReset>1)
2397 printf("= Hard reset signal will be send to master______ CHANGE BEHAVIOR AFTER COMPLETION\n");
2398 if (!fROOTVersion.IsNull())
2399 printf("= ROOT version requested________________________ %s\n", fROOTVersion.Data());
2400 else
2401 printf("= ROOT version requested________________________ default\n");
2402 printf("= AliRoot version requested_____________________ %s\n", fAliROOTVersion.Data());
2403 if (!fAliRootMode.IsNull())
2404 printf("= Requested AliRoot mode________________________ %s\n", fAliRootMode.Data());
2405 if (fNproofWorkers)
2406 printf("= Number of PROOF workers limited to____________ %d\n", fNproofWorkers);
2407 if (fNproofWorkersPerSlave)
2408 printf("= Maximum number of workers per slave___________ %d\n", fNproofWorkersPerSlave);
2409 if (TestSpecialBit(kClearPackages))
2410 printf("= ClearPackages requested...\n");
2411 if (fIncludePath.Data())
2412 printf("= Include path for runtime task compilation: ___ %s\n", fIncludePath.Data());
2413 printf("= Additional libs to be loaded or souces to be compiled runtime: <%s>\n",fAdditionalLibs.Data());
2414 if (fPackages && fPackages->GetEntries()) {
2415 TIter next(fPackages);
2416 TObject *obj;
2417 TString list;
2418 while ((obj=next())) list += obj->GetName();
2419 printf("= Par files to be used: ________________________ %s\n", list.Data());
2420 }
2421 if (TestSpecialBit(kProofConnectGrid))
2422 printf("= Requested PROOF connection to grid\n");
2423 return;
2424 }
2425 printf("= OverwriteMode:________________________________ %d\n", fOverwriteMode);
2426 if (fOverwriteMode) {
2427 printf("***** NOTE: Overwrite mode will overwrite the input generated datasets and partial results from previous analysis. \
2428 \n***** To disable, use: plugin->SetOverwriteMode(kFALSE);\n");
2429 }
2430 printf("= Copy files to grid: __________________________ %s\n", (IsUseCopy())?"YES":"NO");
2431 printf("= Check if files can be copied to grid: ________ %s\n", (IsCheckCopy())?"YES":"NO");
2432 printf("= Production mode:______________________________ %d\n", fProductionMode);
2433 printf("= Version of API requested: ____________________ %s\n", fAPIVersion.Data());
2434 printf("= Version of ROOT requested: ___________________ %s\n", fROOTVersion.Data());
2435 printf("= Version of AliRoot requested: ________________ %s\n", fAliROOTVersion.Data());
2436 if (fUser.Length())
2437 printf("= User running the plugin: _____________________ %s\n", fUser.Data());
2438 printf("= Grid workdir relative to user $HOME: _________ %s\n", fGridWorkingDir.Data());
2439 printf("= Grid output directory relative to workdir: ___ %s\n", fGridOutputDir.Data());
2440 printf("= Data base directory path requested: __________ %s\n", fGridDataDir.Data());
2441 printf("= Data search pattern: _________________________ %s\n", fDataPattern.Data());
2442 printf("= Input data format: ___________________________ %s\n", fInputFormat.Data());
2443 if (fRunNumbers.Length())
2444 printf("= Run numbers to be processed: _________________ %s\n", fRunNumbers.Data());
2445 if (fRunRange[0])
2446 printf("= Run range to be processed: ___________________ %d-%d\n", fRunRange[0], fRunRange[1]);
2447 if (!fRunRange[0] && !fRunNumbers.Length()) {
2448 TIter next(fInputFiles);
2449 TObject *obj;
2450 TString list;
2451 while ((obj=next())) list += obj->GetName();
2452 printf("= Input files to be processed: _________________ %s\n", list.Data());
2453 }
2454 if (TestBit(AliAnalysisGrid::kTest))
2455 printf("= Number of input files used in test mode: _____ %d\n", fNtestFiles);
2456 printf("= List of output files to be registered: _______ %s\n", fOutputFiles.Data());
2457 printf("= List of outputs going to be archived: ________ %s\n", fOutputArchive.Data());
2458 printf("= List of outputs that should not be merged: ___ %s\n", fMergeExcludes.Data());
2459 printf("= List of outputs that should not be registered: %s\n", fRegisterExcludes.Data());
2460 printf("= List of outputs produced during Terminate: ___ %s\n", fTerminateFiles.Data());
2461 printf("=====================================================================\n");
2462 printf("= Job price: ___________________________________ %d\n", fPrice);
2463 printf("= Time to live (TTL): __________________________ %d\n", fTTL);
2464 printf("= Max files per subjob: ________________________ %d\n", fSplitMaxInputFileNumber);
2465 if (fMaxInitFailed>0)
2466 printf("= Max number of subjob fails to kill: __________ %d\n", fMaxInitFailed);
2467 if (fMasterResubmitThreshold>0)
2468 printf("= Resubmit master job if failed subjobs >_______ %d\n", fMasterResubmitThreshold);
2469 printf("= Number of replicas for the output files_______ %d\n", fNreplicas);
2470 if (fNrunsPerMaster>0)
2471 printf("= Number of runs per master job: _______________ %d\n", fNrunsPerMaster);
2472 printf("= Number of files in one chunk to be merged: ___ %d\n", fMaxMergeFiles);
2473 printf("= Name of the generated execution script: ______ %s\n", fExecutable.Data());
2474 printf("= Executable command: __________________________ %s\n", fExecutableCommand.Data());
2475 if (fArguments.Length())
2476 printf("= Arguments for the execution script: __________ %s\n",fArguments.Data());
2477 if (fExecutableArgs.Length())
2478 printf("= Arguments after macro name in executable______ %s\n",fExecutableArgs.Data());
2479 printf("= Name of the generated analysis macro: ________ %s\n",fAnalysisMacro.Data());
2480 printf("= User analysis files to be deployed: __________ %s\n",fAnalysisSource.Data());
2481 printf("= Additional libs to be loaded or souces to be compiled runtime: <%s>\n",fAdditionalLibs.Data());
2482 printf("= Master jobs split mode: ______________________ %s\n",fSplitMode.Data());
2483 if (fDatasetName)
2484 printf("= Custom name for the dataset to be created: ___ %s\n", fDatasetName.Data());
2485 printf("= Name of the generated JDL: ___________________ %s\n", fJDLName.Data());
2486 if (fIncludePath.Data())
2487 printf("= Include path for runtime task compilation: ___ %s\n", fIncludePath.Data());
2488 if (fCloseSE.Length())
2489 printf("= Force job outputs to storage element: ________ %s\n", fCloseSE.Data());
2490 if (fFriendChainName.Length())
2491 printf("= Open friend chain file on worker: ____________ %s\n", fFriendChainName.Data());
2492 if (fPackages && fPackages->GetEntries()) {
2493 TIter next(fPackages);
2494 TObject *obj;
2495 TString list;
2496 while ((obj=next())) list += obj->GetName();
2497 printf("= Par files to be used: ________________________ %s\n", list.Data());
2498 }
2499}
2500
2501//______________________________________________________________________________
2502void AliAnalysisAlien::SetDefaults()
2503{
2504// Set default values for everything. What cannot be filled will be left empty.
2505 if (fGridJDL) delete fGridJDL;
2506 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
2507 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
2508 fPrice = 1;
2509 fTTL = 30000;
2510 fSplitMaxInputFileNumber = 100;
2511 fMaxInitFailed = 0;
2512 fMasterResubmitThreshold = 0;
2513 fNtestFiles = 10;
2514 fNreplicas = 2;
2515 fRunRange[0] = 0;
2516 fRunRange[1] = 0;
2517 fRunPrefix = "%d";
2518 fNrunsPerMaster = 1;
2519 fMaxMergeFiles = 100;
2520 fRunNumbers = "";
2521 fExecutable = "analysis.sh";
2522 fExecutableCommand = "root -b -q -x";
2523 fArguments = "";
2524 fExecutableArgs = "";
2525 fAnalysisMacro = "myAnalysis.C";
2526 fAnalysisSource = "";
2527 fAdditionalLibs = "";
2528 fSplitMode = "se";
2529 fAPIVersion = "";
2530 fROOTVersion = "";
2531 fAliROOTVersion = "";
2532 fUser = ""; // Your alien user name
2533 fGridWorkingDir = "";
2534 fGridDataDir = ""; // Can be like: /alice/sim/PDC_08a/LHC08c9/
2535 fDataPattern = "*AliESDs.root"; // Can be like: *AliESDs.root, */pass1/*AliESDs.root, ...
2536 fFriendChainName = "";
2537 fGridOutputDir = "output";
2538 fOutputArchive = "log_archive.zip:std*@disk=1 root_archive.zip:*.root@disk=2";
2539 fOutputFiles = ""; // Like "AliAODs.root histos.root"
2540 fInputFormat = "xml-single";
2541 fJDLName = "analysis.jdl";
2542 fJobTag = "Automatically generated analysis JDL";
2543 fMergeExcludes = "";
2544 fMergeViaJDL = 0;
2545 SetUseCopy(kTRUE);
2546 SetCheckCopy(kTRUE);
2547 SetDefaultOutputs(kTRUE);
2548 fOverwriteMode = 1;
2549}
2550
2551//______________________________________________________________________________
2552void AliAnalysisAlien::SetRootVersionForProof(const char *version)
2553{
2554// Obsolete method. Use SetROOTVersion instead
2555 Warning("SetRootVersionForProof", "Obsolete. Use SetROOTVersion instead");
2556 if (fROOTVersion.IsNull()) SetROOTVersion(version);
2557 else Error("SetRootVersionForProof", "ROOT version already set to %s", fROOTVersion.Data());
2558}
2559
2560//______________________________________________________________________________
2561Bool_t AliAnalysisAlien::CheckMergedFiles(const char *filename, const char *aliendir, Int_t nperchunk, const char *jdl)
2562{
2563// Checks current merge stage, makes xml for the next stage, counts number of files, submits next stage.
2564 // First check if the result is already in the output directory.
2565 if (FileExists(Form("%s/%s",aliendir,filename))) {
2566 printf("Final merged results found. Not merging again.\n");
2567 return kFALSE;
2568 }
2569 // Now check the last stage done.
2570 Int_t stage = 0;
2571 while (1) {
2572 if (!FileExists(Form("%s/Stage_%d.xml",aliendir, stage+1))) break;
2573 stage++;
2574 }
2575 // Next stage of merging
2576 stage++;
2577 TString pattern = "*root_archive.zip";
2578 if (stage>1) pattern = Form("Stage_%d/*root_archive.zip", stage-1);
2579 TGridResult *res = gGrid->Command(Form("find -x Stage_%d %s %s", stage, aliendir, pattern.Data()));
2580 if (res) delete res;
2581 // Write standard output to file
2582 gROOT->ProcessLine(Form("gGrid->Stdout(); > %s", Form("Stage_%d.xml",stage)));
2583 // Count the number of files inside
2584 ifstream ifile;
2585 ifile.open(Form("Stage_%d.xml",stage));
2586 if (!ifile.good()) {
2587 ::Error("CheckMergedFiles", "Could not redirect result of the find command to file %s", Form("Stage_%d.xml",stage));
2588 return kFALSE;
2589 }
2590 TString line;
2591 Int_t nfiles = 0;
2592 while (!ifile.eof()) {
2593 ifile >> line;
2594 if (line.Contains("/event")) nfiles++;
2595 }
2596 ifile.close();
2597 if (!nfiles) {
2598 ::Error("CheckMergedFiles", "Cannot start Stage_%d merging since Stage_%d did not produced yet output", stage, stage-1);
2599 return kFALSE;
2600 } else {
2601 printf("=== Stage_%d produced %d files\n", stage-1, nfiles);
2602 }
2603 // Copy the file in the output directory
2604 printf("===> Copying collection %s in the output directory %s\n", Form("Stage_%d.xml",stage), aliendir);
2605// TFile::Cp(Form("Stage_%d.xml",stage), Form("alien://%s/Stage_%d.xml",aliendir,stage));
2606 if (!copyLocal2Alien("CheckMergedFiles", Form("Stage_%d.xml",stage),
2607 Form("%s/Stage_%d.xml",aliendir,stage))) Fatal("","Terminating");
2608 // Check if this is the last stage to be done.
2609 Bool_t laststage = (nfiles<nperchunk);
2610 if (fMaxMergeStages && stage>=fMaxMergeStages) laststage = kTRUE;
2611 Int_t jobId = 0;
2612 if (laststage) {
2613 printf("### Submiting final merging stage %d\n", stage);
2614 TString finalJDL = jdl;
2615 finalJDL.ReplaceAll(".jdl", "_final.jdl");
2616 TString query = Form("submit %s %s %d", finalJDL.Data(), aliendir, stage);
2617 jobId = SubmitSingleJob(query);
2618 } else {
2619 printf("### Submiting merging stage %d\n", stage);
2620 TString query = Form("submit %s %s %d", jdl, aliendir, stage);
2621 jobId = SubmitSingleJob(query);
2622 }
2623 if (!jobId) return kFALSE;
2624
2625 if (!fGridJobIDs.IsNull()) fGridJobIDs.Append(" ");
2626 fGridJobIDs.Append(Form("%d", jobId));
2627 if (!fGridStages.IsNull()) fGridStages.Append(" ");
2628 fGridStages.Append(Form("%s_merge_stage%d",
2629 laststage ? "final" : "partial", stage));
2630
2631 return kTRUE;
2632}
2633
2634//______________________________________________________________________________
2635AliAnalysisManager *AliAnalysisAlien::LoadAnalysisManager(const char *fname)
2636{
2637// Loat the analysis manager from a file.
2638 TFile *file = TFile::Open(fname);
2639 if (!file) {
2640 ::Error("LoadAnalysisManager", "Cannot open file %s", fname);
2641 return 0;
2642 }
2643 TIter nextkey(file->GetListOfKeys());
2644 AliAnalysisManager *mgr = 0;
2645 TKey *key;
2646 while ((key=(TKey*)nextkey())) {
2647 if (!strcmp(key->GetClassName(), "AliAnalysisManager"))
2648 mgr = (AliAnalysisManager*)file->Get(key->GetName());
2649 }
2650 if (!mgr)
2651 ::Error("LoadAnalysisManager", "No analysis manager found in file %s", fname);
2652 return mgr;
2653}
2654
2655//______________________________________________________________________________
2656Int_t AliAnalysisAlien::SubmitSingleJob(const char *query)
2657{
2658// Submits a single job corresponding to the query and returns job id. If 0 submission failed.
2659 if (!gGrid) return 0;
2660 printf("=> %s ------> ",query);
2661 TGridResult *res = gGrid->Command(query);
2662 if (!res) return 0;
2663 TString jobId = res->GetKey(0,"jobId");
2664 delete res;
2665 if (jobId.IsNull()) {
2666 printf("submission failed. Reason:\n");
2667 gGrid->Stdout();
2668 gGrid->Stderr();
2669 ::Error("SubmitSingleJob", "Your query %s could not be submitted", query);
2670 return 0;
2671 }
2672 Int_t ijobId = jobId.Atoi();
2673 printf(" Job id: '%s' (%d)\n", jobId.Data(), ijobId);
2674 return ijobId;
2675}
2676
2677//______________________________________________________________________________
2678Bool_t AliAnalysisAlien::MergeInfo(const char *output, const char *collection)
2679{
2680// Merges a collection of output files using concatenation.
2681 TString scoll(collection);
2682 if (!scoll.Contains(".xml")) return kFALSE;
2683 TGridCollection *coll = (TGridCollection*)gROOT->ProcessLine(Form("TAlienCollection::Open(\"%s\");", collection));
2684 if (!coll) {
2685 ::Error("MergeInfo", "Input XML %s collection empty.", collection);
2686 return kFALSE;
2687 }
2688 // Iterate grid collection
2689 TString outtmp;
2690 Bool_t merged = kFALSE;
2691 Int_t ifile = 0;
2692 while (coll->Next()) {
2693 TString fname = gSystem->DirName(coll->GetTURL());
2694 fname += "/";
2695 fname += output;
2696 outtmp = Form("%d_%s", ifile, output);
2697 if (!TFile::Cp(fname, outtmp)) {
2698 ::Error("MergeInfo", "Could not copy %s", fname.Data());
2699 continue;
2700 }
2701 ifile++;
2702 if (ifile<2) {
2703 gSystem->Exec(Form("cp %s lastmerged", outtmp.Data()));
2704 continue;
2705 }
2706 gSystem->Exec(Form("cat lastmerged %s > tempmerged", outtmp.Data()));
2707 gSystem->Exec("cp tempmerged lastmerged");
2708 }
2709 if (ifile) {
2710 gSystem->Exec(Form("cp lastmerged %s", output));
2711 gSystem->Exec(Form("rm tempmerged lastmerged *_%s", output));
2712 merged = kTRUE;
2713 }
2714 return merged;
2715}
2716
2717//______________________________________________________________________________
2718Bool_t AliAnalysisAlien::MergeOutput(const char *output, const char *basedir, Int_t nmaxmerge, Int_t stage)
2719{
2720// Merge given output files from basedir. Basedir can be an alien output directory
2721// but also an xml file with root_archive.zip locations. The file merger will merge nmaxmerge
2722// files in a group (ignored for xml input). Merging can be done in stages:
2723// stage=0 : will merge all existing files in a single stage, supporting resume if run locally
2724// stage=1 : works with an xml of all root_archive.zip in the output directory
2725// stage>1 : works with an xml of all root_archive.zip in the Stage_<n-1> directory
2726 TString outputFile = output;
2727 TString command;
2728 TString outputChunk;
2729 TString previousChunk = "";
2730 TObjArray *listoffiles = new TObjArray();
2731// listoffiles->SetOwner();
2732 Int_t countChunk = 0;
2733 Int_t countZero = nmaxmerge;
2734 Bool_t merged = kTRUE;
2735 Bool_t isGrid = kTRUE;
2736 Int_t index = outputFile.Index("@");
2737 if (index > 0) outputFile.Remove(index);
2738 TString inputFile = outputFile;
2739 TString sbasedir = basedir;
2740 if (sbasedir.Contains(".xml")) {
2741 // Merge files pointed by the xml - ignore nmaxmerge and set ichunk to 0
2742 nmaxmerge = 9999999;
2743 TGridCollection *coll = (TGridCollection*)gROOT->ProcessLine(Form("TAlienCollection::Open(\"%s\");", basedir));
2744 if (!coll) {
2745 ::Error("MergeOutput", "Input XML collection empty.");
2746 return kFALSE;
2747 }
2748 // Iterate grid collection
2749 while (coll->Next()) {
2750 TString fname = gSystem->DirName(coll->GetTURL());
2751 fname += "/";
2752 fname += inputFile;
2753 listoffiles->Add(new TNamed(fname.Data(),""));
2754 }
2755 } else if (sbasedir.Contains(".txt")) {
2756 // The file having the .txt extension is expected to contain a list of
2757 // folders where the output files will be looked. For alien folders,
2758 // the full folder LFN is expected (starting with alien://)
2759 // Assume lfn's on each line
2760 TString line;
2761 ifstream in;
2762 in.open(sbasedir);
2763 if (in.fail()) {
2764 ::Error("MergeOutput", "File %s cannot be opened. Merging stopped." ,sbasedir.Data());
2765 return kTRUE;
2766 }
2767 Int_t nfiles = 0;
2768 while (in.good()) {
2769 in >> line;
2770 if (line.IsNull() || line.BeginsWith("#")) continue;
2771 line.Strip();
2772 if (!line.Contains("alien:")) isGrid = kFALSE;
2773 line += "/";
2774 line += outputFile;
2775 nfiles++;
2776 listoffiles->Add(new TNamed(line.Data(),""));
2777 }
2778 in.close();
2779 if (!nfiles) {
2780 ::Error("MergeOutput","Input file %s contains no files to be merged\n", sbasedir.Data());
2781 delete listoffiles;
2782 return kFALSE;
2783 }
2784 } else {
2785 command = Form("find %s/ *%s", basedir, inputFile.Data());
2786 printf("command: %s\n", command.Data());
2787 TGridResult *res = gGrid->Command(command);
2788 if (!res) {
2789 ::Error("MergeOutput","No result for the find command\n");
2790 delete listoffiles;
2791 return kFALSE;
2792 }
2793 TIter nextmap(res);
2794 TMap *map = 0;
2795 while ((map=(TMap*)nextmap())) {
2796 TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("turl"));
2797 if (!objs || !objs->GetString().Length()) {
2798 // Nothing found - skip this output
2799 delete res;
2800 delete listoffiles;
2801 return kFALSE;
2802 }
2803 listoffiles->Add(new TNamed(objs->GetName(),""));
2804 }
2805 delete res;
2806 }
2807 if (!listoffiles->GetEntries()) {
2808 ::Error("MergeOutput","No result for the find command\n");
2809 delete listoffiles;
2810 return kFALSE;
2811 }
2812
2813 TFileMerger *fm = 0;
2814 TIter next0(listoffiles);
2815 TObjArray *listoffilestmp = new TObjArray();
2816 listoffilestmp->SetOwner();
2817 TObject *nextfile;
2818 TString snextfile;
2819 // Keep only the files at upper level
2820 Int_t countChar = 0;
2821 while ((nextfile=next0())) {
2822 snextfile = nextfile->GetName();
2823 Int_t crtCount = snextfile.CountChar('/');
2824 if (nextfile == listoffiles->First()) countChar = crtCount;
2825 if (crtCount < countChar) countChar = crtCount;
2826 }
2827 next0.Reset();
2828 while ((nextfile=next0())) {
2829 snextfile = nextfile->GetName();
2830 Int_t crtCount = snextfile.CountChar('/');
2831 if (crtCount > countChar) {
2832 delete nextfile;
2833 continue;
2834 }
2835 listoffilestmp->Add(nextfile);
2836 }
2837 delete listoffiles;
2838 listoffiles = listoffilestmp; // Now contains 'good' files
2839 listoffiles->Print();
2840 TIter next(listoffiles);
2841 // Check if there is a merge operation to resume. Works only for stage 0 or 1.
2842 outputChunk = outputFile;
2843 outputChunk.ReplaceAll(".root", "_*.root");
2844 // Check for existent temporary merge files
2845 // Check overwrite mode and remove previous partial results if needed
2846 // Preserve old merging functionality for stage 0.
2847 if (stage==0) {
2848 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
2849 while (1) {
2850 // Skip as many input files as in a chunk
2851 for (Int_t counter=0; counter<nmaxmerge; counter++) {
2852 nextfile = next();
2853 if (!nextfile) {
2854 ::Error("MergeOutput", "Mismatch found. Please remove partial merged files from local dir.");
2855 delete listoffiles;
2856 return kFALSE;
2857 }
2858 snextfile = nextfile->GetName();
2859 }
2860 outputChunk = outputFile;
2861 outputChunk.ReplaceAll(".root", Form("_%04d.root", countChunk));
2862 countChunk++;
2863 if (gSystem->AccessPathName(outputChunk)) continue;
2864 // Merged file with chunks up to <countChunk> found
2865 ::Info("MergeOutput", "Resume merging of <%s> from <%s>\n", outputFile.Data(), outputChunk.Data());
2866 previousChunk = outputChunk;
2867 break;
2868 }
2869 }
2870 countZero = nmaxmerge;
2871
2872 while ((nextfile=next())) {
2873 snextfile = nextfile->GetName();
2874 // Loop 'find' results and get next LFN
2875 if (countZero == nmaxmerge) {
2876 // First file in chunk - create file merger and add previous chunk if any.
2877 fm = new TFileMerger(isGrid);
2878 fm->SetFastMethod(kTRUE);
2879 if (previousChunk.Length()) fm->AddFile(previousChunk.Data());
2880 outputChunk = outputFile;
2881 outputChunk.ReplaceAll(".root", Form("_%04d.root", countChunk));
2882 }
2883 // If last file found, put merged results in the output file
2884 if (nextfile == listoffiles->Last()) outputChunk = outputFile;
2885 // Add file to be merged and decrement chunk counter.
2886 fm->AddFile(snextfile);
2887 countZero--;
2888 if (countZero==0 || nextfile == listoffiles->Last()) {
2889 if (!fm->GetMergeList() || !fm->GetMergeList()->GetSize()) {
2890 // Nothing found - skip this output
2891 ::Warning("MergeOutput", "No <%s> files found.", inputFile.Data());
2892 merged = kFALSE;
2893 break;
2894 }
2895 fm->OutputFile(outputChunk);
2896 // Merge the outputs, then go to next chunk
2897 if (!fm->Merge()) {
2898 ::Error("MergeOutput", "Could not merge all <%s> files", outputFile.Data());
2899 merged = kFALSE;
2900 break;
2901 } else {
2902 ::Info("MergeOutputs", "\n##### Merged %d output files to <%s>", fm->GetMergeList()->GetSize(), outputChunk.Data());
2903 gSystem->Unlink(previousChunk);
2904 }
2905 if (nextfile == listoffiles->Last()) break;
2906 countChunk++;
2907 countZero = nmaxmerge;
2908 previousChunk = outputChunk;
2909 }
2910 }
2911 delete listoffiles;
2912 delete fm;
2913 return merged;
2914 }
2915 // Merging stage different than 0.
2916 // Move to the begining of the requested chunk.
2917 fm = new TFileMerger(isGrid);
2918 fm->SetFastMethod(kTRUE);
2919 while ((nextfile=next())) fm->AddFile(nextfile->GetName());
2920 delete listoffiles;
2921 if (!fm->GetMergeList() || !fm->GetMergeList()->GetSize()) {
2922 // Nothing found - skip this output
2923 ::Warning("MergeOutput", "No <%s> files found.", inputFile.Data());
2924 delete fm;
2925 return kFALSE;
2926 }
2927 fm->OutputFile(outputFile);
2928 // Merge the outputs
2929 if (!fm->Merge()) {
2930 ::Error("MergeOutput", "Could not merge all <%s> files", outputFile.Data());
2931 delete fm;
2932 return kFALSE;
2933 } else {
2934 ::Info("MergeOutput", "\n##### Merged %d output files to <%s>", fm->GetMergeList()->GetSize(), outputFile.Data());
2935 }
2936 delete fm;
2937 return kTRUE;
2938}
2939
2940//______________________________________________________________________________
2941Bool_t AliAnalysisAlien::MergeOutputs()
2942{
2943// Merge analysis outputs existing in the AliEn space.
2944 if (TestBit(AliAnalysisGrid::kTest)) return kTRUE;
2945 if (TestBit(AliAnalysisGrid::kOffline)) return kFALSE;
2946 if (!Connect()) {
2947 Error("MergeOutputs", "Cannot merge outputs without grid connection. Terminate will NOT be executed");
2948 return kFALSE;
2949 }
2950 if (fMergeViaJDL) {
2951 if (!TestBit(AliAnalysisGrid::kMerge)) {
2952 Info("MergeOutputs", "### Re-run with <MergeViaJDL> option in terminate mode of the plugin to submit merging jobs ###");
2953 return kFALSE;
2954 }
2955 if (fProductionMode) {
2956 Info("MergeOutputs", "### Merging will be submitted by LPM manager... ###");
2957 return kFALSE;
2958 }
2959 Info("MergeOutputs", "Submitting merging JDL");
2960 if (!SubmitMerging()) return kFALSE;
2961 Info("MergeOutputs", "### Re-run with <MergeViaJDL> off to collect results after merging jobs are done ###");
2962 Info("MergeOutputs", "### The Terminate() method is executed by the merging jobs");
2963 return kFALSE;
2964 }
2965 // Get the output path
2966 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
2967 if (!DirectoryExists(fGridOutputDir)) {
2968 Error("MergeOutputs", "Grid output directory %s not found. Terminate() will NOT be executed", fGridOutputDir.Data());
2969 return kFALSE;
2970 }
2971 if (!fOutputFiles.Length()) {
2972 Error("MergeOutputs", "No output file names defined. Are you running the right AliAnalysisAlien configuration ?");
2973 return kFALSE;
2974 }
2975 // Check if fast read option was requested
2976 Info("MergeOutputs", "Started local merging of output files from: alien://%s \
2977 \n======= overwrite mode = %d", fGridOutputDir.Data(), (Int_t)fOverwriteMode);
2978 if (fFastReadOption) {
2979 Warning("MergeOutputs", "You requested FastRead option. Using xrootd flags to reduce timeouts. This may skip some files that could be accessed ! \
2980 \n+++ NOTE: To disable this option, use: plugin->SetFastReadOption(kFALSE)");
2981 gEnv->SetValue("XNet.ConnectTimeout",50);
2982 gEnv->SetValue("XNet.RequestTimeout",50);
2983 gEnv->SetValue("XNet.MaxRedirectCount",2);
2984 gEnv->SetValue("XNet.ReconnectTimeout",50);
2985 gEnv->SetValue("XNet.FirstConnectMaxCnt",1);
2986 }
2987 // Make sure we change the temporary directory
2988 gSystem->Setenv("TMPDIR", gSystem->pwd());
2989 // Set temporary compilation directory to current one
2990 gSystem->SetBuildDir(gSystem->pwd(), kTRUE);
2991 TObjArray *list = fOutputFiles.Tokenize(",");
2992 TIter next(list);
2993 TObjString *str;
2994 TString outputFile;
2995 Bool_t merged = kTRUE;
2996 while((str=(TObjString*)next())) {
2997 outputFile = str->GetString();
2998 Int_t index = outputFile.Index("@");
2999 if (index > 0) outputFile.Remove(index);
3000 TString outputChunk = outputFile;
3001 outputChunk.ReplaceAll(".root", "_*.root");
3002 // Skip already merged outputs
3003 if (!gSystem->AccessPathName(outputFile)) {
3004 if (fOverwriteMode) {
3005 Info("MergeOutputs", "Overwrite mode. Existing file %s was deleted.", outputFile.Data());
3006 gSystem->Unlink(outputFile);
3007 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
3008 Info("MergeOutput", "Overwrite mode: partial merged files %s will removed",
3009 outputChunk.Data());
3010 gSystem->Exec(Form("rm -f %s", outputChunk.Data()));
3011 }
3012 } else {
3013 Info("MergeOutputs", "Output file <%s> found. Not merging again.", outputFile.Data());
3014 continue;
3015 }
3016 } else {
3017 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
3018 Info("MergeOutput", "Overwrite mode: partial merged files %s will removed",
3019 outputChunk.Data());
3020 gSystem->Exec(Form("rm -f %s", outputChunk.Data()));
3021 }
3022 }
3023 if (fMergeExcludes.Contains(outputFile.Data()) ||
3024 fRegisterExcludes.Contains(outputFile.Data())) continue;
3025 // Perform a 'find' command in the output directory, looking for registered outputs
3026 merged = MergeOutput(outputFile, fGridOutputDir, fMaxMergeFiles);
3027 if (!merged) {
3028 Error("MergeOutputs", "Terminate() will NOT be executed");
3029 delete list;
3030 return kFALSE;
3031 }
3032 TFile *fileOpened = (TFile*)gROOT->GetListOfFiles()->FindObject(outputFile);
3033 if (fileOpened) fileOpened->Close();
3034 }
3035 delete list;
3036 return kTRUE;
3037}
3038
3039//______________________________________________________________________________
3040void AliAnalysisAlien::SetDefaultOutputs(Bool_t flag)
3041{
3042// Use the output files connected to output containers from the analysis manager
3043// rather than the files defined by SetOutputFiles
3044 if (flag && !TObject::TestBit(AliAnalysisGrid::kDefaultOutputs))
3045 Info("SetDefaultOutputs", "Plugin will use the output files taken from analysis manager");
3046 TObject::SetBit(AliAnalysisGrid::kDefaultOutputs, flag);
3047}
3048
3049//______________________________________________________________________________
3050void AliAnalysisAlien::SetOutputFiles(const char *list)
3051{
3052// Manually set the output files list.
3053// Removes duplicates. Not allowed if default outputs are not disabled.
3054 if (TObject::TestBit(AliAnalysisGrid::kDefaultOutputs)) {
3055 Fatal("SetOutputFiles", "You have to explicitly call SetDefaultOutputs(kFALSE) to manually set output files.");
3056 return;
3057 }
3058 Info("SetOutputFiles", "Output file list is set manually - you are on your own.");
3059 fOutputFiles = "";
3060 TString slist = list;
3061 if (slist.Contains("@")) Warning("SetOutputFiles","The plugin does not allow explicit SE's. Please use: SetNumberOfReplicas() instead.");
3062 TObjArray *arr = slist.Tokenize(" ");
3063 TObjString *os;
3064 TIter next(arr);
3065 TString sout;
3066 while ((os=(TObjString*)next())) {
3067 sout = os->GetString();
3068 if (sout.Index("@")>0) sout.Remove(sout.Index("@"));
3069 if (fOutputFiles.Contains(sout)) continue;
3070 if (!fOutputFiles.IsNull()) fOutputFiles += ",";
3071 fOutputFiles += sout;
3072 }
3073 delete arr;
3074}
3075
3076//______________________________________________________________________________
3077void AliAnalysisAlien::SetOutputArchive(const char *list)
3078{
3079// Manually set the output archive list. Free text - you are on your own...
3080// Not allowed if default outputs are not disabled.
3081 if (TObject::TestBit(AliAnalysisGrid::kDefaultOutputs)) {
3082 Fatal("SetOutputArchive", "You have to explicitly call SetDefaultOutputs(kFALSE) to manually set the output archives.");
3083 return;
3084 }
3085 Info("SetOutputArchive", "Output archive is set manually - you are on your own.");
3086 fOutputArchive = list;
3087}
3088
3089//______________________________________________________________________________
3090void AliAnalysisAlien::SetPreferedSE(const char */*se*/)
3091{
3092// Setting a prefered output SE is not allowed anymore.
3093 Warning("SetPreferedSE", "Setting a preferential SE is not allowed anymore via the plugin. Use SetNumberOfReplicas() and SetDefaultOutputs()");
3094}
3095
3096//______________________________________________________________________________
3097void AliAnalysisAlien::SetProofParameter(const char *pname, const char *value)
3098{
3099// Set some PROOF special parameter.
3100 TPair *pair = dynamic_cast<TPair*>(fProofParam.FindObject(pname));
3101 if (pair) {
3102 TObject *old = pair->Key();
3103 TObject *val = pair->Value();
3104 fProofParam.Remove(old);
3105 delete old;
3106 delete val;
3107 }
3108 fProofParam.Add(new TObjString(pname), new TObjString(value));
3109}
3110
3111//______________________________________________________________________________
3112const char *AliAnalysisAlien::GetProofParameter(const char *pname) const
3113{
3114// Returns a special PROOF parameter.
3115 TPair *pair = dynamic_cast<TPair*>(fProofParam.FindObject(pname));
3116 if (!pair) return 0;
3117 return pair->Value()->GetName();
3118}
3119
3120//______________________________________________________________________________
3121Bool_t AliAnalysisAlien::StartAnalysis(Long64_t /*nentries*/, Long64_t /*firstEntry*/)
3122{
3123// Start remote grid analysis.
3124 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
3125 Bool_t testMode = TestBit(AliAnalysisGrid::kTest);
3126 if (!mgr || !mgr->IsInitialized()) {
3127 Error("StartAnalysis", "You need an initialized analysis manager for this");
3128 return kFALSE;
3129 }
3130 // Are we in PROOF mode ?
3131 if (mgr->IsProofMode()) {
3132 if (testMode) Info("StartAnalysis", "##### Starting PROOF analysis with Proof Lite via the plugin #####");
3133 else Info("StartAnalysis", "##### Starting PROOF analysis on cluster <%s> via the plugin #####", fProofCluster.Data());
3134 if (fProofCluster.IsNull()) {
3135 Error("StartAnalysis", "You need to specify the proof cluster name via SetProofCluster");
3136 return kFALSE;
3137 }
3138 if (fProofDataSet.IsNull() && !testMode) {
3139 Error("StartAnalysis", "You need to specify a dataset using SetProofDataSet()");
3140 return kFALSE;
3141 }
3142 // Set the needed environment
3143 gEnv->SetValue("XSec.GSI.DelegProxy","2");
3144 // Do we need to reset PROOF ? The success of the Reset operation cannot be checked
3145 if (fProofReset && !testMode) {
3146 if (fProofReset==1) {
3147 Info("StartAnalysis", "Sending soft reset signal to proof cluster %s", fProofCluster.Data());
3148 gROOT->ProcessLine(Form("TProof::Reset(\"%s\", kFALSE);", fProofCluster.Data()));
3149 } else {
3150 Info("StartAnalysis", "Sending hard reset signal to proof cluster %s", fProofCluster.Data());
3151 gROOT->ProcessLine(Form("TProof::Reset(\"%s\", kTRUE);", fProofCluster.Data()));
3152 }
3153 Info("StartAnalysis", "Stopping the analysis. Please use SetProofReset(0) to resume.");
3154 return kFALSE;
3155 }
3156
3157 if (!testMode) {
3158 // Check if there is an old active session
3159 Long_t nsessions = gROOT->ProcessLine(Form("TProof::Mgr(\"%s\")->QuerySessions(\"\")->GetEntries();", fProofCluster.Data()));
3160 if (nsessions) {
3161 Error("StartAnalysis","You have to reset your old session first\n");
3162 return kFALSE;
3163 }
3164 }
3165 // Do we need to change the ROOT version ? The success of this cannot be checked.
3166 if (!fROOTVersion.IsNull() && !testMode) {
3167 gROOT->ProcessLine(Form("TProof::Mgr(\"%s\")->SetROOTVersion(\"VO_ALICE@ROOT::%s\");",
3168 fProofCluster.Data(), fROOTVersion.Data()));
3169 }
3170 // Connect to PROOF and check the status
3171 Long_t proof = 0;
3172 TString sworkers;
3173 if (fNproofWorkersPerSlave) sworkers = Form("workers=%dx", fNproofWorkersPerSlave);
3174 else if (fNproofWorkers) sworkers = Form("workers=%d", fNproofWorkers);
3175 if (!testMode) {
3176 if (!sworkers.IsNull())
3177 proof = gROOT->ProcessLine(Form("TProof::Open(\"%s\", \"%s\");", fProofCluster.Data(), sworkers.Data()));
3178 else
3179 proof = gROOT->ProcessLine(Form("TProof::Open(\"%s\");", fProofCluster.Data()));
3180 } else {
3181 proof = gROOT->ProcessLine("TProof::Open(\"\");");
3182 if (!proof) {
3183 Error("StartAnalysis", "Could not start PROOF in test mode");
3184 return kFALSE;
3185 }
3186 }
3187 if (!proof) {
3188 Error("StartAnalysis", "Could not connect to PROOF cluster <%s>", fProofCluster.Data());
3189 return kFALSE;
3190 }
3191 if (fNproofWorkersPerSlave*fNproofWorkers > 0)
3192 gROOT->ProcessLine(Form("gProof->SetParallel(%d);", fNproofWorkers));
3193 // Set proof special parameters if any
3194 TIter nextpp(&fProofParam);
3195 TObject *proofparam;
3196 while ((proofparam=nextpp())) {
3197 TString svalue = GetProofParameter(proofparam->GetName());
3198 gROOT->ProcessLine(Form("gProof->SetParameter(\"%s\",%s);", proofparam->GetName(), svalue.Data()));
3199 }
3200 // Is dataset existing ?
3201 if (!testMode) {
3202 TString dataset = fProofDataSet;
3203 Int_t index = dataset.Index("#");
3204 if (index>=0) dataset.Remove(index);
3205// if (!gROOT->ProcessLine(Form("gProof->ExistsDataSet(\"%s\");",fProofDataSet.Data()))) {
3206// Error("StartAnalysis", "Dataset %s not existing", fProofDataSet.Data());
3207// return kFALSE;
3208// }
3209// Info("StartAnalysis", "Dataset %s found", dataset.Data());
3210 }
3211 // Is ClearPackages() needed ?
3212 if (TestSpecialBit(kClearPackages)) {
3213 Info("StartAnalysis", "ClearPackages signal sent to PROOF. Use SetClearPackages(kFALSE) to reset this.");
3214 gROOT->ProcessLine("gProof->ClearPackages();");
3215 }
3216 // Is a given aliroot mode requested ?
3217 TList optionsList;
3218 TString parLibs;
3219 if (!fAliRootMode.IsNull()) {
3220 TString alirootMode = fAliRootMode;
3221 if (alirootMode == "default") alirootMode = "";
3222 Info("StartAnalysis", "You are requesting AliRoot mode: %s", fAliRootMode.Data());
3223 optionsList.SetOwner();
3224 optionsList.Add(new TNamed("ALIROOT_MODE", alirootMode.Data()));
3225 // Check the additional libs to be loaded
3226 TString extraLibs;
3227 Bool_t parMode = kFALSE;
3228 if (!alirootMode.IsNull()) extraLibs = "ANALYSIS:OADB:ANALYSISalice";
3229 // Parse the extra libs for .so
3230 if (fAdditionalLibs.Length()) {
3231 TObjArray *list = fAdditionalLibs.Tokenize(" ");
3232 TIter next(list);
3233 TObjString *str;
3234 while((str=(TObjString*)next())) {
3235 if (str->GetString().Contains(".so")) {
3236 if (parMode) {
3237 Warning("StartAnalysis", "Plugin does not support loading libs after par files in PROOF mode. Library %s and following will not load on workers", str->GetName());
3238 break;
3239 }
3240 TString stmp = str->GetName();
3241 if (stmp.BeginsWith("lib")) stmp.Remove(0,3);
3242 stmp.ReplaceAll(".so","");
3243 if (!extraLibs.IsNull()) extraLibs += ":";
3244 extraLibs += stmp;
3245 continue;
3246 }
3247 if (str->GetString().Contains(".par")) {
3248 // The first par file found in the list will not allow any further .so
3249 parMode = kTRUE;
3250 if (!parLibs.IsNull()) parLibs += ":";
3251 parLibs += str->GetName();
3252 continue;
3253 }
3254 }
3255 if (list) delete list;
3256 }
3257 if (!extraLibs.IsNull()) {
3258 Info("StartAnalysis", "Adding extra libs: %s",extraLibs.Data());
3259 optionsList.Add(new TNamed("ALIROOT_EXTRA_LIBS",extraLibs.Data()));
3260 }
3261 // Check extra includes
3262 if (!fIncludePath.IsNull()) {
3263 TString includePath = fIncludePath;
3264 includePath.ReplaceAll(" ",":");
3265 includePath.ReplaceAll("$ALICE_ROOT/","");
3266 includePath.ReplaceAll("${ALICE_ROOT}/","");
3267 includePath.ReplaceAll("-I","");
3268 includePath.Remove(TString::kTrailing, ':');
3269 Info("StartAnalysis", "Adding extra includes: %s",includePath.Data());
3270 optionsList.Add(new TNamed("ALIROOT_EXTRA_INCLUDES",includePath.Data()));
3271 }
3272 // Check if connection to grid is requested
3273 if (TestSpecialBit(kProofConnectGrid))
3274 optionsList.Add(new TNamed("ALIROOT_ENABLE_ALIEN", "1"));
3275 // Enable AliRoot par
3276 if (testMode) {
3277 // Enable proof lite package
3278 TString alirootLite = gSystem->ExpandPathName("$ALICE_ROOT/ANALYSIS/macros/AliRootProofLite.par");
3279 for (Int_t i=0; i<optionsList.GetSize(); i++) {
3280 TNamed *obj = (TNamed*)optionsList.At(i);
3281 printf("%s %s\n", obj->GetName(), obj->GetTitle());
3282 }
3283 if (!gROOT->ProcessLine(Form("gProof->UploadPackage(\"%s\");",alirootLite.Data()))
3284 && !gROOT->ProcessLine(Form("gProof->EnablePackage(\"%s\", (TList*)%p);",alirootLite.Data(),&optionsList))) {
3285 Info("StartAnalysis", "AliRootProofLite enabled");
3286 } else {
3287 Error("StartAnalysis", "There was an error trying to enable package AliRootProofLite.par");
3288 return kFALSE;
3289 }
3290 } else {
3291 if ( ! fAliROOTVersion.IsNull() ) {
3292 if (gROOT->ProcessLine(Form("gProof->EnablePackage(\"VO_ALICE@AliRoot::%s\", (TList*)%p, kTRUE);",
3293 fAliROOTVersion.Data(), &optionsList))) {
3294 Error("StartAnalysis", "There was an error trying to enable package VO_ALICE@AliRoot::%s", fAliROOTVersion.Data());
3295 return kFALSE;
3296 }
3297 }
3298 }
3299 // Enable first par files from fAdditionalLibs
3300 if (!parLibs.IsNull()) {
3301 TObjArray *list = parLibs.Tokenize(":");
3302 TIter next(list);
3303 TObjString *package;
3304 while((package=(TObjString*)next())) {
3305 TString spkg = package->GetName();
3306 spkg.ReplaceAll(".par", "");
3307 gSystem->Exec(TString::Format("rm -rf %s", spkg.Data()));
3308 if (!gROOT->ProcessLine(Form("gProof->UploadPackage(\"%s\");", package->GetName()))) {
3309 TString enablePackage = (testMode)?Form("gProof->EnablePackage(\"%s\",kFALSE);", package->GetName()):Form("gProof->EnablePackage(\"%s\",kTRUE);", package->GetName());
3310 if (gROOT->ProcessLine(enablePackage)) {
3311 Error("StartAnalysis", "There was an error trying to enable package %s", package->GetName());
3312 return kFALSE;
3313 }
3314 } else {
3315 Error("StartAnalysis", "There was an error trying to upload package %s", package->GetName());
3316 return kFALSE;
3317 }
3318 }
3319 if (list) delete list;
3320 }
3321 } else {
3322 if (fAdditionalLibs.Contains(".so") && !testMode) {
3323 Error("StartAnalysis", "You request additional libs to be loaded but did not enabled any AliRoot mode. Please refer to: \
3324 \n http://aaf.cern.ch/node/83 and use a parameter for SetAliRootMode()");
3325 return kFALSE;
3326 }
3327 }
3328 // Enable par files if requested
3329 if (fPackages && fPackages->GetEntries()) {
3330 TIter next(fPackages);
3331 TObject *package;
3332 while ((package=next())) {
3333 // Skip packages already enabled
3334 if (parLibs.Contains(package->GetName())) continue;
3335 TString spkg = package->GetName();
3336 spkg.ReplaceAll(".par", "");
3337 gSystem->Exec(TString::Format("rm -rf %s", spkg.Data()));
3338 if (!gROOT->ProcessLine(Form("gProof->UploadPackage(\"%s\");", package->GetName()))) {
3339 if (gROOT->ProcessLine(Form("gProof->EnablePackage(\"%s\",kTRUE);", package->GetName()))) {
3340 Error("StartAnalysis", "There was an error trying to enable package %s", package->GetName());
3341 return kFALSE;
3342 }
3343 } else {
3344 Error("StartAnalysis", "There was an error trying to upload package %s", package->GetName());
3345 return kFALSE;
3346 }
3347 }
3348 }
3349 // Do we need to load analysis source files ?
3350 // NOTE: don't load on client since this is anyway done by the user to attach his task.
3351 if (fAnalysisSource.Length()) {
3352 TObjArray *list = fAnalysisSource.Tokenize(" ");
3353 TIter next(list);
3354 TObjString *str;
3355 while((str=(TObjString*)next())) {
3356 gROOT->ProcessLine(Form("gProof->Load(\"%s+g\", kTRUE);", str->GetName()));
3357 }
3358 if (list) delete list;
3359 }
3360 if (testMode) {
3361 // Register dataset to proof lite.
3362 if (fFileForTestMode.IsNull()) {
3363 Error("GetChainForTestMode", "For proof test mode please use SetFileForTestMode() pointing to a file that contains data file locations.");
3364 return kFALSE;
3365 }
3366 if (gSystem->AccessPathName(fFileForTestMode)) {
3367 Error("GetChainForTestMode", "File not found: %s", fFileForTestMode.Data());
3368 return kFALSE;
3369 }
3370 TFileCollection *coll = new TFileCollection();
3371 coll->AddFromFile(fFileForTestMode);
3372 gROOT->ProcessLine(Form("gProof->RegisterDataSet(\"test_collection\", (TFileCollection*)%p, \"OV\");", coll));
3373 gROOT->ProcessLine("gProof->ShowDataSets()");
3374 }
3375 return kTRUE;
3376 }
3377
3378 // Check if output files have to be taken from the analysis manager
3379 if (TestBit(AliAnalysisGrid::kDefaultOutputs)) {
3380 // Add output files and AOD files
3381 fOutputFiles = GetListOfFiles("outaod");
3382 // Add extra files registered to the analysis manager
3383 TString extra = GetListOfFiles("ext");
3384 if (!extra.IsNull()) {
3385 extra.ReplaceAll(".root", "*.root");
3386 if (!fOutputFiles.IsNull()) fOutputFiles += ",";
3387 fOutputFiles += extra;
3388 }
3389 // Compose the output archive.
3390 fOutputArchive = "log_archive.zip:std*@disk=1 ";
3391 if (mgr->IsCollectThroughput())
3392 fOutputArchive += Form("root_archive.zip:%s,*.stat@disk=%d %s@disk=%d",fOutputFiles.Data(),fNreplicas, mgr->GetFileInfoLog(),fNreplicas);
3393 else
3394 fOutputArchive += Form("root_archive.zip:%s,*.stat@disk=%d",fOutputFiles.Data(),fNreplicas);
3395 }
3396// if (!fCloseSE.Length()) fCloseSE = gSystem->Getenv("alien_CLOSE_SE");
3397 if (TestBit(AliAnalysisGrid::kOffline)) {
3398 Info("StartAnalysis","\n##### OFFLINE MODE ##### Files to be used in GRID are produced but not copied \
3399 \n there nor any job run. You can revise the JDL and analysis \
3400 \n macro then run the same in \"submit\" mode.");
3401 } else if (TestBit(AliAnalysisGrid::kTest)) {
3402 Info("StartAnalysis","\n##### LOCAL MODE ##### Your analysis will be run locally on a subset of the requested \
3403 \n dataset.");
3404 } else if (TestBit(AliAnalysisGrid::kSubmit)) {
3405 Info("StartAnalysis","\n##### SUBMIT MODE ##### Files required by your analysis are copied to your grid working \
3406 \n space and job submitted.");
3407 } else if (TestBit(AliAnalysisGrid::kMerge)) {
3408 Info("StartAnalysis","\n##### MERGE MODE ##### The registered outputs of the analysis will be merged");
3409 if (fMergeViaJDL) CheckInputData();
3410 return kTRUE;
3411 } else {
3412 Info("StartAnalysis","\n##### FULL ANALYSIS MODE ##### Producing needed files and submitting your analysis job...");
3413 }
3414
3415 Print();
3416 if (!Connect()) {
3417 Error("StartAnalysis", "Cannot start grid analysis without grid connection");
3418 return kFALSE;
3419 }
3420 if (IsCheckCopy() && gGrid) CheckFileCopy(gGrid->GetHomeDirectory());
3421 if (!CheckInputData()) {
3422 Error("StartAnalysis", "There was an error in preprocessing your requested input data");
3423 return kFALSE;
3424 }
3425 if (!CreateDataset(fDataPattern)) {
3426 TString serror;
3427 if (!fRunNumbers.Length() && !fRunRange[0]) serror = Form("path to data directory: <%s>", fGridDataDir.Data());
3428 if (fRunNumbers.Length()) serror = "run numbers";
3429 if (fRunRange[0]) serror = Form("run range [%d, %d]", fRunRange[0], fRunRange[1]);
3430 serror += Form("\n or data pattern <%s>", fDataPattern.Data());
3431 Error("StartAnalysis", "No data to process. Please fix %s in your plugin configuration.", serror.Data());
3432 return kFALSE;
3433 }
3434 WriteAnalysisFile();
3435 WriteAnalysisMacro();
3436 WriteExecutable();
3437 WriteValidationScript();
3438 if (fMergeViaJDL) {
3439 WriteMergingMacro();
3440 WriteMergeExecutable();
3441 WriteValidationScript(kTRUE);
3442 }
3443 if (!CreateJDL()) return kFALSE;
3444 if (TestBit(AliAnalysisGrid::kOffline)) return kFALSE;
3445 if (testMode) {
3446 // Locally testing the analysis
3447 Info("StartAnalysis", "\n_______________________________________________________________________ \
3448 \n Running analysis script in a daughter shell as on a worker node \
3449 \n_______________________________________________________________________");
3450 TObjArray *list = fOutputFiles.Tokenize(",");
3451 TIter next(list);
3452 TObjString *str;
3453 TString outputFile;
3454 while((str=(TObjString*)next())) {
3455 outputFile = str->GetString();
3456 Int_t index = outputFile.Index("@");
3457 if (index > 0) outputFile.Remove(index);
3458 if (!gSystem->AccessPathName(outputFile)) gSystem->Exec(Form("rm %s", outputFile.Data()));
3459 }
3460 delete list;
3461 gSystem->Exec(Form("bash %s 2>stderr", fExecutable.Data()));
3462 gSystem->Exec(Form("bash %s",fValidationScript.Data()));
3463// gSystem->Exec("cat stdout");
3464 return kFALSE;
3465 }
3466 // Check if submitting is managed by LPM manager
3467 if (fProductionMode) {
3468 //TString prodfile = fJDLName;
3469 //prodfile.ReplaceAll(".jdl", ".prod");
3470 //WriteProductionFile(prodfile);
3471 Info("StartAnalysis", "Job submitting is managed by LPM. Rerun in terminate mode after jobs finished.");
3472 return kFALSE;
3473 }
3474 // Submit AliEn job(s)
3475 gGrid->Cd(fGridOutputDir);
3476 TGridResult *res;
3477 TString jobID = "";
3478 fGridJobIDs = "";
3479 fGridStages = "";
3480 if (!fRunNumbers.Length() && !fRunRange[0]) {
3481 // Submit a given xml or a set of runs
3482 res = gGrid->Command(Form("submit %s", fJDLName.Data()));
3483 printf("*************************** %s\n",Form("submit %s", fJDLName.Data()));
3484 if (res) {
3485 const char *cjobId = res->GetKey(0,"jobId");
3486 if (!cjobId) {
3487 gGrid->Stdout();
3488 gGrid->Stderr();
3489 Error("StartAnalysis", "Your JDL %s could not be submitted", fJDLName.Data());
3490 return kFALSE;
3491 } else {
3492 Info("StartAnalysis", "\n_______________________________________________________________________ \
3493 \n##### Your JDL %s was successfully submitted. \nTHE JOB ID IS: %s \
3494 \n_______________________________________________________________________",
3495 fJDLName.Data(), cjobId);
3496 jobID = cjobId;
3497 if (jobID.Atoi()) {
3498 if (!fGridJobIDs.IsNull()) fGridJobIDs.Append(" ");
3499 fGridJobIDs.Append(jobID);
3500 if (!fGridStages.IsNull()) fGridStages.Append(" ");
3501 fGridStages.Append("full");
3502 }
3503 }
3504 delete res;
3505 } else {
3506 Error("StartAnalysis", "No grid result after submission !!! Bailing out...");
3507 return kFALSE;
3508 }
3509 } else {
3510 // Submit for a range of enumeration of runs.
3511 if (!Submit()) return kFALSE;
3512 jobID = fGridJobIDs;
3513 }
3514
3515 if (fDropToShell) {
3516 Info("StartAnalysis", "\n#### STARTING AN ALIEN SHELL FOR YOU. EXIT WHEN YOUR JOB %s HAS FINISHED. #### \
3517 \n You may exit at any time and terminate the job later using the option <terminate> \
3518 \n ##################################################################################", jobID.Data());
3519 gSystem->Exec("aliensh");
3520 } else {
3521 Info("StartAnalysis", "\n#### SUBMITTED JOB %s TO ALIEN QUEUE #### \
3522 \n Remember to terminate the job later using the option <terminate> \
3523 \n ##################################################################################", jobID.Data());
3524 }
3525 return kTRUE;
3526}
3527
3528//______________________________________________________________________________
3529const char *AliAnalysisAlien::GetListOfFiles(const char *type)
3530{
3531// Get a comma-separated list of output files of the requested type.
3532// Type can be (case unsensitive):
3533// aod - list of aod files (std, extensions and filters)
3534// out - list of output files connected to containers (but not aod's or extras)
3535// ext - list of extra files registered to the manager
3536// ter - list of files produced in terminate
3537 static TString files;
3538 files = "";
3539 TString stype = type;
3540 stype.ToLower();
3541 TString aodfiles, extra;
3542 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
3543 if (!mgr) {
3544 ::Error("GetListOfFiles", "Cannot call this without analysis manager");
3545 return files.Data();
3546 }
3547 if (mgr->GetOutputEventHandler()) {
3548 aodfiles = mgr->GetOutputEventHandler()->GetOutputFileName();
3549 TString extraaod = mgr->GetOutputEventHandler()->GetExtraOutputs();
3550 if (!extraaod.IsNull()) {
3551 aodfiles += ",";
3552 aodfiles += extraaod;
3553 }
3554 }
3555 if (stype.Contains("aod")) {
3556 files = aodfiles;
3557 if (stype == "aod") return files.Data();
3558 }
3559 // Add output files that are not in the list of AOD files
3560 TString outputfiles = "";
3561 TIter next(mgr->GetOutputs());
3562 AliAnalysisDataContainer *output;
3563 const char *filename = 0;
3564 while ((output=(AliAnalysisDataContainer*)next())) {
3565 filename = output->GetFileName();
3566 if (!(strcmp(filename, "default"))) continue;
3567 if (outputfiles.Contains(filename)) continue;
3568 if (aodfiles.Contains(filename)) continue;
3569 if (!outputfiles.IsNull()) outputfiles += ",";
3570 outputfiles += filename;
3571 }
3572 if (stype.Contains("out")) {
3573 if (!files.IsNull()) files += ",";
3574 files += outputfiles;
3575 if (stype == "out") return files.Data();
3576 }
3577 // Add extra files registered to the analysis manager
3578 TString sextra;
3579 extra = mgr->GetExtraFiles();
3580 if (!extra.IsNull()) {
3581 extra.Strip();
3582 extra.ReplaceAll(" ", ",");
3583 TObjArray *fextra = extra.Tokenize(",");
3584 TIter nextx(fextra);
3585 TObject *obj;
3586 while ((obj=nextx())) {
3587 if (aodfiles.Contains(obj->GetName())) continue;
3588 if (outputfiles.Contains(obj->GetName())) continue;
3589 if (sextra.Contains(obj->GetName())) continue;
3590 if (!sextra.IsNull()) sextra += ",";
3591 sextra += obj->GetName();
3592 }
3593 delete fextra;
3594 if (stype.Contains("ext")) {
3595 if (!files.IsNull()) files += ",";
3596 files += sextra;
3597 }
3598 }
3599 if (stype == "ext") return files.Data();
3600 TString termfiles;
3601 if (!fTerminateFiles.IsNull()) {
3602 fTerminateFiles.Strip();
3603 fTerminateFiles.ReplaceAll(" ",",");
3604 TObjArray *fextra = fTerminateFiles.Tokenize(",");
3605 TIter nextx(fextra);
3606 TObject *obj;
3607 while ((obj=nextx())) {
3608 if (aodfiles.Contains(obj->GetName())) continue;
3609 if (outputfiles.Contains(obj->GetName())) continue;
3610 if (termfiles.Contains(obj->GetName())) continue;
3611 if (sextra.Contains(obj->GetName())) continue;
3612 if (!termfiles.IsNull()) termfiles += ",";
3613 termfiles += obj->GetName();
3614 }
3615 delete fextra;
3616 }
3617 if (stype.Contains("ter")) {
3618 if (!files.IsNull() && !termfiles.IsNull()) {
3619 files += ",";
3620 files += termfiles;
3621 }
3622 }
3623 return files.Data();
3624}
3625
3626//______________________________________________________________________________
3627Bool_t AliAnalysisAlien::Submit()
3628{
3629// Submit all master jobs.
3630 Int_t nmasterjobs = fInputFiles->GetEntries();
3631 Long_t tshoot = gSystem->Now();
3632 if (!fNsubmitted && !SubmitNext()) return kFALSE;
3633 while (fNsubmitted < nmasterjobs) {
3634 Long_t now = gSystem->Now();
3635 if ((now-tshoot)>30000) {
3636 tshoot = now;
3637 if (!SubmitNext()) return kFALSE;
3638 }
3639 }
3640 return kTRUE;
3641}
3642
3643//______________________________________________________________________________
3644Bool_t AliAnalysisAlien::SubmitMerging()
3645{
3646// Submit all merging jobs.
3647 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
3648 gGrid->Cd(fGridOutputDir);
3649 TString mergeJDLName = fExecutable;
3650 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
3651 if (!fInputFiles) {
3652 Error("SubmitMerging", "You have to use explicit run numbers or run range to merge via JDL!");
3653 return kFALSE;
3654 }
3655 Int_t ntosubmit = fInputFiles->GetEntries();
3656 for (Int_t i=0; i<ntosubmit; i++) {
3657 TString runOutDir = gSystem->BaseName(fInputFiles->At(i)->GetName());
3658 runOutDir.ReplaceAll(".xml", "");
3659 if (fOutputToRunNo) {
3660 // The output directory is the run number
3661 printf("### Submitting merging job for run <%s>\n", runOutDir.Data());
3662 runOutDir = Form("%s/%s", fGridOutputDir.Data(), runOutDir.Data());
3663 } else {
3664 if (!fRunNumbers.Length() && !fRunRange[0]) {
3665 // The output directory is the grid outdir
3666 printf("### Submitting merging job for the full output directory %s.\n", fGridOutputDir.Data());
3667 runOutDir = fGridOutputDir;
3668 } else {
3669 // The output directory is the master number in 3 digits format
3670 printf("### Submitting merging job for master <%03d>\n", i);
3671 runOutDir = Form("%s/%03d",fGridOutputDir.Data(), i);
3672 }
3673 }
3674 // Check now the number of merging stages.
3675 TObjArray *list = fOutputFiles.Tokenize(",");
3676 TIter next(list);
3677 TObjString *str;
3678 TString outputFile;
3679 while((str=(TObjString*)next())) {
3680 outputFile = str->GetString();
3681 Int_t index = outputFile.Index("@");
3682 if (index > 0) outputFile.Remove(index);
3683 if (!fMergeExcludes.Contains(outputFile) &&
3684 !fRegisterExcludes.Contains(outputFile)) break;
3685 }
3686 delete list;
3687 Bool_t done = CheckMergedFiles(outputFile, runOutDir, fMaxMergeFiles, mergeJDLName);
3688 if (!done && (i==ntosubmit-1)) return kFALSE;
3689 if (!fRunNumbers.Length() && !fRunRange[0]) break;
3690 }
3691 if (!ntosubmit) return kTRUE;
3692 if (fDropToShell) {
3693 Info("StartAnalysis", "\n #### STARTING AN ALIEN SHELL FOR YOU. You can exit any time or inspect your jobs in a different shell.##########\
3694 \n Make sure your jobs are in a final state (you can resubmit failed ones via 'masterjob <id> resubmit ERROR_ALL')\
3695 \n Rerun in 'terminate' mode to submit all merging stages, each AFTER the previous one completed. The final merged \
3696 \n output will be written to your alien output directory, while separate stages in <Stage_n>. \
3697 \n ################################################################################################################");
3698 gSystem->Exec("aliensh");
3699 } else {
3700 Info("StartAnalysis", "\n #### STARTED MERGING JOBS FOR YOU #### \
3701 \n Make sure your jobs are in a final state (you can resubmit failed ones via 'masterjob <id> resubmit ERROR_ALL') \
3702 \n Rerun in 'terminate' mode to submit all merging stages, each AFTER the previous one completed. The final merged \
3703 \n output will be written to your alien output directory, while separate stages in <Stage_n>. \
3704 \n ################################################################################################################");
3705 }
3706 return kTRUE;
3707}
3708
3709//______________________________________________________________________________
3710Bool_t AliAnalysisAlien::SubmitNext()
3711{
3712// Submit next bunch of master jobs if the queue is free. The first master job is
3713// submitted right away, while the next will not be unless the previous was split.
3714// The plugin will not submit new master jobs if there are more that 500 jobs in
3715// waiting phase.
3716 static Bool_t iscalled = kFALSE;
3717 static Int_t firstmaster = 0;
3718 static Int_t lastmaster = 0;
3719 static Int_t npermaster = 0;
3720 if (iscalled) return kTRUE;
3721 iscalled = kTRUE;
3722 Int_t nrunning=0, nwaiting=0, nerror=0, ndone=0;
3723 Int_t ntosubmit = 0;
3724 TGridResult *res;
3725 TString jobID = "";
3726 Int_t nmasterjobs = fInputFiles->GetEntries();
3727 if (!fNsubmitted) {
3728 ntosubmit = 1;
3729 if (!IsUseSubmitPolicy()) {
3730 if (nmasterjobs>5)
3731 Info("SubmitNext","### Warning submit policy not used ! Submitting too many jobs at a time may be prohibitted. \
3732 \n### You can use SetUseSubmitPolicy() to enable if you have problems.");
3733 ntosubmit = nmasterjobs;
3734 }
3735 } else {
3736 TString status = GetJobStatus(firstmaster, lastmaster, nrunning, nwaiting, nerror, ndone);
3737 printf("=== master %d: %s\n", lastmaster, status.Data());
3738 // If last master not split, just return
3739 if (status != "SPLIT") {iscalled = kFALSE; return kTRUE;}
3740 // No more than 100 waiting jobs
3741 if (nwaiting>500) {iscalled = kFALSE; return kTRUE;}
3742 npermaster = (nrunning+nwaiting+nerror+ndone)/fNsubmitted;
3743 if (npermaster) ntosubmit = (500-nwaiting)/npermaster;
3744 if (!ntosubmit) ntosubmit = 1;
3745 printf("=== WAITING(%d) RUNNING(%d) DONE(%d) OTHER(%d) NperMaster=%d => to submit %d jobs\n",
3746 nwaiting, nrunning, ndone, nerror, npermaster, ntosubmit);
3747 }
3748 for (Int_t i=0; i<ntosubmit; i++) {
3749 // Submit for a range of enumeration of runs.
3750 if (fNsubmitted>=nmasterjobs) {iscalled = kFALSE; return kTRUE;}
3751 TString query;
3752 TString runOutDir = gSystem->BaseName(fInputFiles->At(fNsubmitted)->GetName());
3753 runOutDir.ReplaceAll(".xml", "");
3754 if (fOutputToRunNo)
3755 query = Form("submit %s %s %s", fJDLName.Data(), fInputFiles->At(fNsubmitted)->GetName(), runOutDir.Data());
3756 else
3757 query = Form("submit %s %s %03d", fJDLName.Data(), fInputFiles->At(fNsubmitted)->GetName(), fNsubmitted);
3758 printf("********* %s\n",query.Data());
3759 res = gGrid->Command(query);
3760 if (res) {
3761 TString cjobId1 = res->GetKey(0,"jobId");
3762 if (!cjobId1.Length()) {
3763 iscalled = kFALSE;
3764 gGrid->Stdout();
3765 gGrid->Stderr();
3766 Error("StartAnalysis", "Your JDL %s could not be submitted. The message was:", fJDLName.Data());
3767 return kFALSE;
3768 } else {
3769 Info("StartAnalysis", "\n_______________________________________________________________________ \
3770 \n##### Your JDL %s submitted (%d to go). \nTHE JOB ID IS: %s \
3771 \n_______________________________________________________________________",
3772 fJDLName.Data(), nmasterjobs-fNsubmitted-1, cjobId1.Data());
3773 if (!fGridJobIDs.IsNull()) fGridJobIDs.Append(" ");
3774 fGridJobIDs.Append(cjobId1);
3775 if (!fGridStages.IsNull()) fGridStages.Append(" ");
3776 fGridStages.Append("full");
3777 jobID += cjobId1;
3778 jobID += " ";
3779 lastmaster = cjobId1.Atoi();
3780 if (!firstmaster) firstmaster = lastmaster;
3781 fNsubmitted++;
3782 }
3783 delete res;
3784 } else {
3785 Error("StartAnalysis", "No grid result after submission !!! Bailing out...");
3786 return kFALSE;
3787 }
3788 }
3789 iscalled = kFALSE;
3790 return kTRUE;
3791}
3792
3793//______________________________________________________________________________
3794void AliAnalysisAlien::WriteAnalysisFile()
3795{
3796// Write current analysis manager into the file <analysisFile>
3797 TString analysisFile = fExecutable;
3798 analysisFile.ReplaceAll(".sh", ".root");
3799 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3800 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
3801 if (!mgr || !mgr->IsInitialized()) {
3802 Error("WriteAnalysisFile", "You need an initialized analysis manager for this");
3803 return;
3804 }
3805 // Check analysis type
3806 TObject *handler;
3807 if (mgr->GetMCtruthEventHandler()) TObject::SetBit(AliAnalysisGrid::kUseMC);
3808 handler = (TObject*)mgr->GetInputEventHandler();
3809 if (handler) {
3810 if (handler->InheritsFrom("AliMultiInputEventHandler")) {
3811 AliMultiInputEventHandler *multiIH = (AliMultiInputEventHandler*)handler;
3812 if (multiIH->GetFirstInputEventHandler()->InheritsFrom("AliESDInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseESD);
3813 if (multiIH->GetFirstInputEventHandler()->InheritsFrom("AliAODInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseAOD);
3814 } else {
3815 if (handler->InheritsFrom("AliESDInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseESD);
3816 if (handler->InheritsFrom("AliAODInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseAOD);
3817 }
3818 }
3819 TDirectory *cdir = gDirectory;
3820 TFile *file = TFile::Open(analysisFile, "RECREATE");
3821 if (file) {
3822 // Skip task Terminate calls for the grid job (but not in test mode, where we want to check also the terminate mode
3823 if (!TestBit(AliAnalysisGrid::kTest)) mgr->SetSkipTerminate(kTRUE);
3824 // Unless merging makes no sense
3825 if (IsSingleOutput()) mgr->SetSkipTerminate(kFALSE);
3826 mgr->Write();
3827 delete file;
3828 // Enable termination for local jobs
3829 mgr->SetSkipTerminate(kFALSE);
3830 }
3831 if (cdir) cdir->cd();
3832 Info("WriteAnalysisFile", "\n##### Analysis manager: %s wrote to file <%s>\n", mgr->GetName(),analysisFile.Data());
3833 }
3834 Bool_t copy = kTRUE;
3835 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3836 if (copy) {
3837 CdWork();
3838 TString workdir = gGrid->GetHomeDirectory();
3839 workdir += fGridWorkingDir;
3840 Info("WriteAnalysisFile", "\n##### Copying file <%s> containing your initialized analysis manager to your alien workspace", analysisFile.Data());
3841 if (FileExists(analysisFile)) gGrid->Rm(analysisFile);
3842 if (!copyLocal2Alien("WriteAnalysisFile",analysisFile.Data(),
3843 Form("%s/%s", workdir.Data(),analysisFile.Data()))) Fatal("","Terminating");
3844 }
3845}
3846
3847//______________________________________________________________________________
3848void AliAnalysisAlien::WriteAnalysisMacro()
3849{
3850// Write the analysis macro that will steer the analysis in grid mode.
3851 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3852 ofstream out;
3853 out.open(fAnalysisMacro.Data(), ios::out);
3854 if (!out.good()) {
3855 Error("WriteAnalysisMacro", "could not open file %s for writing", fAnalysisMacro.Data());
3856 return;
3857 }
3858 Bool_t hasSTEERBase = kFALSE;
3859 Bool_t hasESD = kFALSE;
3860 Bool_t hasAOD = kFALSE;
3861 Bool_t hasANALYSIS = kFALSE;
3862 Bool_t hasOADB = kFALSE;
3863 Bool_t hasANALYSISalice = kFALSE;
3864 Bool_t hasCORRFW = kFALSE;
3865 TString func = fAnalysisMacro;
3866 TString type = "ESD";
3867 TString comment = "// Analysis using ";
3868 if (IsUseMCchain()) {
3869 type = "MC";
3870 comment += "MC";
3871 } else {
3872 if (TObject::TestBit(AliAnalysisGrid::kUseESD)) comment += "ESD";
3873 if (TObject::TestBit(AliAnalysisGrid::kUseAOD)) {
3874 type = "AOD";
3875 comment += "AOD";
3876 }
3877 }
3878 if (type!="AOD" && fFriendChainName!="") {
3879 Error("WriteAnalysisMacro", "Friend chain can be attached only to AOD");
3880 return;
3881 }
3882 if (TObject::TestBit(AliAnalysisGrid::kUseMC)) comment += "/MC";
3883 else comment += " data";
3884 out << "const char *anatype = \"" << type.Data() << "\";" << endl << endl;
3885 func.ReplaceAll(".C", "");
3886 out << "void " << func.Data() << "()" << endl;
3887 out << "{" << endl;
3888 out << comment.Data() << endl;
3889 out << "// Automatically generated analysis steering macro executed in grid subjobs" << endl << endl;
3890 out << " TStopwatch timer;" << endl;
3891 out << " timer.Start();" << endl << endl;
3892 // Change temp directory to current one
3893 if (!IsLocalTest()) {
3894 out << "// connect to AliEn and make the chain" << endl;
3895 out << " if (!TGrid::Connect(\"alien://\")) return;" << endl;
3896 }
3897 out << "// Set temporary merging directory to current one" << endl;
3898 out << " gSystem->Setenv(\"TMPDIR\", gSystem->pwd());" << endl << endl;
3899 out << "// Set temporary compilation directory to current one" << endl;
3900 out << " gSystem->SetBuildDir(gSystem->pwd(), kTRUE);" << endl << endl;
3901 // Reset existing include path
3902 out << "// Reset existing include path and add current directory first in the search" << endl;
3903 out << " gSystem->SetIncludePath(\"-I.\");" << endl;
3904 if (!fExecutableCommand.Contains("aliroot")) {
3905 out << "// load base root libraries" << endl;
3906 out << " gSystem->Load(\"libTree\");" << endl;
3907 out << " gSystem->Load(\"libGeom\");" << endl;
3908 out << " gSystem->Load(\"libVMC\");" << endl;
3909 out << " gSystem->Load(\"libPhysics\");" << endl << endl;
3910 out << " gSystem->Load(\"libMinuit\");" << endl << endl;
3911 }
3912 if (fAdditionalRootLibs.Length()) {
3913 // in principle libtree /lib geom libvmc etc. can go into this list, too
3914 out << "// Add aditional libraries" << endl;
3915 TObjArray *list = fAdditionalRootLibs.Tokenize(" ");
3916 TIter next(list);
3917 TObjString *str;
3918 while((str=(TObjString*)next())) {
3919 if (str->GetString().Contains(".so"))
3920 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
3921 }
3922 if (list) delete list;
3923 }
3924 out << "// Load analysis framework libraries" << endl;
3925 TString setupPar = "AliAnalysisAlien::SetupPar";
3926 if (!fPackages) {
3927 if (!fExecutableCommand.Contains("aliroot")) {
3928 out << " gSystem->Load(\"libSTEERBase\");" << endl;
3929 out << " gSystem->Load(\"libESD\");" << endl;
3930 out << " gSystem->Load(\"libAOD\");" << endl;
3931 }
3932 out << " gSystem->Load(\"libANALYSIS\");" << endl;
3933 out << " gSystem->Load(\"libOADB\");" << endl;
3934 out << " gSystem->Load(\"libANALYSISalice\");" << endl;
3935 out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
3936 } else {
3937 TIter next(fPackages);
3938 TObject *obj;
3939 TString pkgname;
3940 while ((obj=next())) {
3941 pkgname = obj->GetName();
3942 if (pkgname == "STEERBase" ||
3943 pkgname == "STEERBase.par") hasSTEERBase = kTRUE;
3944 if (pkgname == "ESD" ||
3945 pkgname == "ESD.par") hasESD = kTRUE;
3946 if (pkgname == "AOD" ||
3947 pkgname == "AOD.par") hasAOD = kTRUE;
3948 if (pkgname == "ANALYSIS" ||
3949 pkgname == "ANALYSIS.par") hasANALYSIS = kTRUE;
3950 if (pkgname == "OADB" ||
3951 pkgname == "OADB.par") hasOADB = kTRUE;
3952 if (pkgname == "ANALYSISalice" ||
3953 pkgname == "ANALYSISalice.par") hasANALYSISalice = kTRUE;
3954 if (pkgname == "CORRFW" ||
3955 pkgname == "CORRFW.par") hasCORRFW = kTRUE;
3956 }
3957 if (hasANALYSISalice) setupPar = "SetupPar";
3958 if (!hasSTEERBase) out << " gSystem->Load(\"libSTEERBase\");" << endl;
3959 else out << " if (!" << setupPar << "(\"STEERBase\")) return;" << endl;
3960 if (!hasESD) out << " gSystem->Load(\"libESD\");" << endl;
3961 else out << " if (!" << setupPar << "(\"ESD\")) return;" << endl;
3962 if (!hasAOD) out << " gSystem->Load(\"libAOD\");" << endl;
3963 else out << " if (!" << setupPar << "(\"AOD\")) return;" << endl;
3964 if (!hasANALYSIS) out << " gSystem->Load(\"libANALYSIS\");" << endl;
3965 else out << " if (!" << setupPar << "(\"ANALYSIS\")) return;" << endl;
3966 if (!hasOADB) out << " gSystem->Load(\"libOADB\");" << endl;
3967 else out << " if (!" << setupPar << "(\"OADB\")) return;" << endl;
3968 if (!hasANALYSISalice) out << " gSystem->Load(\"libANALYSISalice\");" << endl;
3969 else out << " if (!" << setupPar << "(\"ANALYSISalice\")) return;" << endl;
3970 if (!hasCORRFW) out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
3971 else out << " if (!" << setupPar << "(\"CORRFW\")) return;" << endl << endl;
3972 out << "// Compile other par packages" << endl;
3973 next.Reset();
3974 while ((obj=next())) {
3975 pkgname = obj->GetName();
3976 if (pkgname == "STEERBase" ||
3977 pkgname == "STEERBase.par" ||
3978 pkgname == "ESD" ||
3979 pkgname == "ESD.par" ||
3980 pkgname == "AOD" ||
3981 pkgname == "AOD.par" ||
3982 pkgname == "ANALYSIS" ||
3983 pkgname == "ANALYSIS.par" ||
3984 pkgname == "OADB" ||
3985 pkgname == "OADB.par" ||
3986 pkgname == "ANALYSISalice" ||
3987 pkgname == "ANALYSISalice.par" ||
3988 pkgname == "CORRFW" ||
3989 pkgname == "CORRFW.par") continue;
3990 out << " if (!" << setupPar << "(\"" << obj->GetName() << "\")) return;" << endl;
3991 }
3992 }
3993 out << "// include path" << endl;
3994 // Get the include path from the interpreter and remove entries pointing to AliRoot
3995 out << " TString intPath = gInterpreter->GetIncludePath();" << endl;
3996 out << " TObjArray *listpaths = intPath.Tokenize(\" \");" << endl;
3997 out << " TIter nextpath(listpaths);" << endl;
3998 out << " TObjString *pname;" << endl;
3999 out << " while ((pname=(TObjString*)nextpath())) {" << endl;
4000 out << " TString current = pname->GetName();" << endl;
4001 out << " if (current.Contains(\"AliRoot\") || current.Contains(\"ALICE_ROOT\")) continue;" << endl;
4002 out << " gSystem->AddIncludePath(current);" << endl;
4003 out << " }" << endl;
4004 out << " if (listpaths) delete listpaths;" << endl;
4005 if (fIncludePath.Length()) out << " gSystem->AddIncludePath(\"" << fIncludePath.Data() << "\");" << endl;
4006 out << " gROOT->ProcessLine(\".include $ALICE_ROOT/include\");" << endl;
4007 out << " printf(\"Include path: %s\\n\", gSystem->GetIncludePath());" << endl << endl;
4008 if (fAdditionalLibs.Length()) {
4009 out << "// Add aditional AliRoot libraries" << endl;
4010 TObjArray *list = fAdditionalLibs.Tokenize(" ");
4011 TIter next(list);
4012 TObjString *str;
4013 while((str=(TObjString*)next())) {
4014 if (str->GetString().Contains(".so"))
4015 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
4016 if (str->GetString().Contains(".par"))
4017 out << " if (!" << setupPar << "(\"" << str->GetString() << "\")) return;" << endl;
4018 }
4019 if (list) delete list;
4020 }
4021 out << endl;
4022 out << "// analysis source to be compiled at runtime (if any)" << endl;
4023 if (fAnalysisSource.Length()) {
4024 TObjArray *list = fAnalysisSource.Tokenize(" ");
4025 TIter next(list);
4026 TObjString *str;
4027 while((str=(TObjString*)next())) {
4028 out << " gROOT->ProcessLine(\".L " << str->GetString().Data() << "+g\");" << endl;
4029 }
4030 if (list) delete list;
4031 }
4032 out << endl;
4033// out << " printf(\"Currently load libraries:\\n\");" << endl;
4034// out << " printf(\"%s\\n\", gSystem->GetLibraries());" << endl;
4035 if (fFastReadOption) {
4036 Warning("WriteAnalysisMacro", "!!! You requested FastRead option. Using xrootd flags to reduce timeouts in the grid jobs. This may skip some files that could be accessed !!! \
4037 \n+++ NOTE: To disable this option, use: plugin->SetFastReadOption(kFALSE)");
4038 out << "// fast xrootd reading enabled" << endl;
4039 out << " printf(\"!!! You requested FastRead option. Using xrootd flags to reduce timeouts. Note that this may skip some files that could be accessed !!!\");" << endl;
4040 out << " gEnv->SetValue(\"XNet.ConnectTimeout\",50);" << endl;
4041 out << " gEnv->SetValue(\"XNet.RequestTimeout\",50);" << endl;
4042 out << " gEnv->SetValue(\"XNet.MaxRedirectCount\",2);" << endl;
4043 out << " gEnv->SetValue(\"XNet.ReconnectTimeout\",50);" << endl;
4044 out << " gEnv->SetValue(\"XNet.FirstConnectMaxCnt\",1);" << endl << endl;
4045 }
4046 out << "// read the analysis manager from file" << endl;
4047 TString analysisFile = fExecutable;
4048 analysisFile.ReplaceAll(".sh", ".root");
4049 out << " AliAnalysisManager *mgr = AliAnalysisAlien::LoadAnalysisManager(\""
4050 << analysisFile << "\");" << endl;
4051 out << " if (!mgr) return;" << endl;
4052 if (IsLocalTest()) {
4053 out << " AliAnalysisAlien *plugin = new AliAnalysisAlien();" << endl;
4054 out << " plugin->SetRunMode(\"test\");" << endl;
4055 if (fFileForTestMode.IsNull())
4056 out << " plugin->SetFileForTestMode(\"data.txt\");" << endl;
4057 else
4058 out << " plugin->SetFileForTestMode(\"" << fFileForTestMode << "\");" << endl;
4059 out << " plugin->SetNtestFiles(" << fNtestFiles << ");" << endl;
4060 if (!fFriendChainName.IsNull())
4061 out << " plugin->SetFriendChainName(\"" << fFriendChainName << "\");" << endl;
4062 if (IsUseMCchain())
4063 out << " plugin->SetUseMCchain();" << endl;
4064 out << " mgr->SetGridHandler(plugin);" << endl;
4065 if (AliAnalysisManager::GetAnalysisManager()) {
4066 out << " mgr->SetDebugLevel(" << AliAnalysisManager::GetAnalysisManager()->GetDebugLevel() << ");" << endl;
4067 out << " mgr->SetNSysInfo(" << AliAnalysisManager::GetAnalysisManager()->GetNsysInfo() << ");" << endl;
4068 } else {
4069 out << " mgr->SetDebugLevel(10);" << endl;
4070 out << " mgr->SetNSysInfo(100);" << endl;
4071 }
4072 }
4073 out << " mgr->PrintStatus();" << endl;
4074 if (AliAnalysisManager::GetAnalysisManager()) {
4075 if (AliAnalysisManager::GetAnalysisManager()->GetDebugLevel()>3) {
4076 out << " gEnv->SetValue(\"XNet.Debug\", \"1\");" << endl;
4077 } else {
4078 if (TestBit(AliAnalysisGrid::kTest))
4079 out << " AliLog::SetGlobalLogLevel(AliLog::kWarning);" << endl;
4080 else
4081 out << " AliLog::SetGlobalLogLevel(AliLog::kError);" << endl;
4082 }
4083 }
4084 if (!IsLocalTest()) {
4085 out << " TChain *chain = CreateChain(\"wn.xml\", anatype);" << endl << endl;
4086 out << " mgr->StartAnalysis(\"localfile\", chain);" << endl;
4087 } else {
4088 out << " mgr->StartAnalysis(\"localfile\");" << endl;
4089 }
4090 out << " timer.Stop();" << endl;
4091 out << " timer.Print();" << endl;
4092 out << "}" << endl << endl;
4093 if (!IsLocalTest()) {
4094 out <<"//________________________________________________________________________________" << endl;
4095 out << "TChain* CreateChain(const char *xmlfile, const char *type=\"ESD\")" << endl;
4096 out << "{" << endl;
4097 out << "// Create a chain using url's from xml file" << endl;
4098 out << " TString filename;" << endl;
4099 out << " Int_t run = 0;" << endl;
4100 if (IsUseMCchain()) {
4101 out << " TString treename = \"TE\";" << endl;
4102 } else {
4103 out << " TString treename = type;" << endl;
4104 out << " treename.ToLower();" << endl;
4105 out << " treename += \"Tree\";" << endl;
4106 }
4107 out << " printf(\"***************************************\\n\");" << endl;
4108 out << " printf(\" Getting chain of trees %s\\n\", treename.Data());" << endl;
4109 out << " printf(\"***************************************\\n\");" << endl;
4110 out << " TAlienCollection *coll = TAlienCollection::Open(xmlfile);" << endl;
4111 out << " if (!coll) {" << endl;
4112 out << " ::Error(\"CreateChain\", \"Cannot create an AliEn collection from %s\", xmlfile);" << endl;
4113 out << " return NULL;" << endl;
4114 out << " }" << endl;
4115 out << " AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();" << endl;
4116 out << " TChain *chain = new TChain(treename);" << endl;
4117 if(fFriendChainName!="") {
4118 out << " TChain *chainFriend = new TChain(treename);" << endl;
4119 }
4120 out << " coll->Reset();" << endl;
4121 out << " while (coll->Next()) {" << endl;
4122 out << " filename = coll->GetTURL("");" << endl;
4123 out << " if (mgr) {" << endl;
4124 out << " Int_t nrun = AliAnalysisManager::GetRunFromAlienPath(filename);" << endl;
4125 out << " if (nrun && nrun != run) {" << endl;
4126 out << " printf(\"### Run number detected from chain: %d\\n\", nrun);" << endl;
4127 out << " mgr->SetRunFromPath(nrun);" << endl;
4128 out << " run = nrun;" << endl;
4129 out << " }" << endl;
4130 out << " }" << endl;
4131 out << " chain->Add(filename);" << endl;
4132 if(fFriendChainName!="") {
4133 out << " TString fileFriend=coll->GetTURL(\"\");" << endl;
4134 out << " if (fileFriend.Index(\"#\") > -1) fileFriend.Remove(fileFriend.Index(\"#\"));" << endl;
4135 out << " fileFriend = gSystem->DirName(fileFriend);" << endl;
4136 out << " fileFriend += \"/\";" << endl;
4137 out << " fileFriend += \"" << fFriendChainName << "\";";
4138 out << " TFile *file = TFile::Open(fileFriend);" << endl;
4139 out << " if (file) {" << endl;
4140 out << " file->Close();" << endl;
4141 out << " chainFriend->Add(fileFriend.Data());" << endl;
4142 out << " } else {" << endl;
4143 out << " ::Fatal(\"CreateChain\", \"Cannot open friend file: %s\", fileFriend.Data());" << endl;
4144 out << " return 0;" << endl;
4145 out << " }" << endl;
4146 }
4147 out << " }" << endl;
4148 out << " if (!chain->GetNtrees()) {" << endl;
4149 out << " ::Error(\"CreateChain\", \"No tree found from collection %s\", xmlfile);" << endl;
4150 out << " return NULL;" << endl;
4151 out << " }" << endl;
4152 if(fFriendChainName!="") {
4153 out << " chain->AddFriend(chainFriend);" << endl;
4154 }
4155 out << " return chain;" << endl;
4156 out << "}" << endl << endl;
4157 }
4158 if (hasANALYSISalice) {
4159 out <<"//________________________________________________________________________________" << endl;
4160 out << "Bool_t SetupPar(const char *package) {" << endl;
4161 out << "// Compile the package and set it up." << endl;
4162 out << " TString pkgdir = package;" << endl;
4163 out << " pkgdir.ReplaceAll(\".par\",\"\");" << endl;
4164 out << " gSystem->Exec(TString::Format(\"tar xvzf %s.par\", pkgdir.Data()));" << endl;
4165 out << " TString cdir = gSystem->WorkingDirectory();" << endl;
4166 out << " gSystem->ChangeDirectory(pkgdir);" << endl;
4167 out << " // Check for BUILD.sh and execute" << endl;
4168 out << " if (!gSystem->AccessPathName(\"PROOF-INF/BUILD.sh\")) {" << endl;
4169 out << " printf(\"*******************************\\n\");" << endl;
4170 out << " printf(\"*** Building PAR archive ***\\n\");" << endl;
4171 out << " printf(\"*******************************\\n\");" << endl;
4172 out << " if (gSystem->Exec(\"PROOF-INF/BUILD.sh\")) {" << endl;
4173 out << " ::Error(\"SetupPar\", \"Cannot build par archive %s\", pkgdir.Data());" << endl;
4174 out << " gSystem->ChangeDirectory(cdir);" << endl;
4175 out << " return kFALSE;" << endl;
4176 out << " }" << endl;
4177 out << " } else {" << endl;
4178 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/BUILD.sh for package %s\", pkgdir.Data());" << endl;
4179 out << " gSystem->ChangeDirectory(cdir);" << endl;
4180 out << " return kFALSE;" << endl;
4181 out << " }" << endl;
4182 out << " // Check for SETUP.C and execute" << endl;
4183 out << " if (!gSystem->AccessPathName(\"PROOF-INF/SETUP.C\")) {" << endl;
4184 out << " printf(\"*******************************\\n\");" << endl;
4185 out << " printf(\"*** Setup PAR archive ***\\n\");" << endl;
4186 out << " printf(\"*******************************\\n\");" << endl;
4187 out << " gROOT->Macro(\"PROOF-INF/SETUP.C\");" << endl;
4188 out << " } else {" << endl;
4189 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/SETUP.C for package %s\", pkgdir.Data());" << endl;
4190 out << " gSystem->ChangeDirectory(cdir);" << endl;
4191 out << " return kFALSE;" << endl;
4192 out << " }" << endl;
4193 out << " // Restore original workdir" << endl;
4194 out << " gSystem->ChangeDirectory(cdir);" << endl;
4195 out << " return kTRUE;" << endl;
4196 out << "}" << endl;
4197 }
4198 Info("WriteAnalysisMacro", "\n##### Analysis macro to run on worker nodes <%s> written",fAnalysisMacro.Data());
4199 }
4200 Bool_t copy = kTRUE;
4201 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
4202 if (copy) {
4203 CdWork();
4204 TString workdir = gGrid->GetHomeDirectory();
4205 workdir += fGridWorkingDir;
4206 if (FileExists(fAnalysisMacro)) gGrid->Rm(fAnalysisMacro);
4207 Info("WriteAnalysisMacro", "\n##### Copying analysis macro: <%s> to your alien workspace", fAnalysisMacro.Data());
4208// TFile::Cp(Form("file:%s",fAnalysisMacro.Data()), Form("alien://%s/%s", workdir.Data(), fAnalysisMacro.Data()));
4209 if (!copyLocal2Alien("WriteAnalysisMacro",fAnalysisMacro.Data(),
4210 Form("alien://%s/%s", workdir.Data(),
4211 fAnalysisMacro.Data()))) Fatal("","Terminating");
4212 }
4213}
4214
4215//______________________________________________________________________________
4216void AliAnalysisAlien::WriteMergingMacro()
4217{
4218// Write a macro to merge the outputs per master job.
4219 if (!fMergeViaJDL) return;
4220 if (!fOutputFiles.Length()) {
4221 Error("WriteMergingMacro", "No output file names defined. Are you running the right AliAnalysisAlien configuration ?");
4222 return;
4223 }
4224 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
4225 TString mergingMacro = fExecutable;
4226 mergingMacro.ReplaceAll(".sh","_merge.C");
4227 if (gGrid && !fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
4228 if (!TestBit(AliAnalysisGrid::kSubmit)) {
4229 ofstream out;
4230 out.open(mergingMacro.Data(), ios::out);
4231 if (!out.good()) {
4232 Error("WriteMergingMacro", "could not open file %s for writing", fAnalysisMacro.Data());
4233 return;
4234 }
4235 Bool_t hasSTEERBase = kFALSE;
4236 Bool_t hasESD = kFALSE;
4237 Bool_t hasAOD = kFALSE;
4238 Bool_t hasANALYSIS = kFALSE;
4239 Bool_t hasOADB = kFALSE;
4240 Bool_t hasANALYSISalice = kFALSE;
4241 Bool_t hasCORRFW = kFALSE;
4242 TString func = mergingMacro;
4243 TString comment;
4244 func.ReplaceAll(".C", "");
4245 out << "void " << func.Data() << "(const char *dir, Int_t stage=0)" << endl;
4246 out << "{" << endl;
4247 out << "// Automatically generated merging macro executed in grid subjobs" << endl << endl;
4248 out << " TStopwatch timer;" << endl;
4249 out << " timer.Start();" << endl << endl;
4250 // Reset existing include path
4251 out << "// Reset existing include path and add current directory first in the search" << endl;
4252 out << " gSystem->SetIncludePath(\"-I.\");" << endl;
4253 if (!fExecutableCommand.Contains("aliroot")) {
4254 out << "// load base root libraries" << endl;
4255 out << " gSystem->Load(\"libTree\");" << endl;
4256 out << " gSystem->Load(\"libGeom\");" << endl;
4257 out << " gSystem->Load(\"libVMC\");" << endl;
4258 out << " gSystem->Load(\"libPhysics\");" << endl << endl;
4259 out << " gSystem->Load(\"libMinuit\");" << endl << endl;
4260 }
4261 if (fAdditionalRootLibs.Length()) {
4262 // in principle libtree /lib geom libvmc etc. can go into this list, too
4263 out << "// Add aditional libraries" << endl;
4264 TObjArray *list = fAdditionalRootLibs.Tokenize(" ");
4265 TIter next(list);
4266 TObjString *str;
4267 while((str=(TObjString*)next())) {
4268 if (str->GetString().Contains(".so"))
4269 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
4270 }
4271 if (list) delete list;
4272 }
4273 out << "// Load analysis framework libraries" << endl;
4274 if (!fPackages) {
4275 if (!fExecutableCommand.Contains("aliroot")) {
4276 out << " gSystem->Load(\"libSTEERBase\");" << endl;
4277 out << " gSystem->Load(\"libESD\");" << endl;
4278 out << " gSystem->Load(\"libAOD\");" << endl;
4279 }
4280 out << " gSystem->Load(\"libANALYSIS\");" << endl;
4281 out << " gSystem->Load(\"libOADB\");" << endl;
4282 out << " gSystem->Load(\"libANALYSISalice\");" << endl;
4283 out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
4284 } else {
4285 TIter next(fPackages);
4286 TObject *obj;
4287 TString pkgname;
4288 TString setupPar = "AliAnalysisAlien::SetupPar";
4289 while ((obj=next())) {
4290 pkgname = obj->GetName();
4291 if (pkgname == "STEERBase" ||
4292 pkgname == "STEERBase.par") hasSTEERBase = kTRUE;
4293 if (pkgname == "ESD" ||
4294 pkgname == "ESD.par") hasESD = kTRUE;
4295 if (pkgname == "AOD" ||
4296 pkgname == "AOD.par") hasAOD = kTRUE;
4297 if (pkgname == "ANALYSIS" ||
4298 pkgname == "ANALYSIS.par") hasANALYSIS = kTRUE;
4299 if (pkgname == "OADB" ||
4300 pkgname == "OADB.par") hasOADB = kTRUE;
4301 if (pkgname == "ANALYSISalice" ||
4302 pkgname == "ANALYSISalice.par") hasANALYSISalice = kTRUE;
4303 if (pkgname == "CORRFW" ||
4304 pkgname == "CORRFW.par") hasCORRFW = kTRUE;
4305 }
4306 if (hasANALYSISalice) setupPar = "SetupPar";
4307 if (!hasSTEERBase) out << " gSystem->Load(\"libSTEERBase\");" << endl;
4308 else out << " if (!" << setupPar << "(\"STEERBase\")) return;" << endl;
4309 if (!hasESD) out << " gSystem->Load(\"libESD\");" << endl;
4310 else out << " if (!" << setupPar << "(\"ESD\")) return;" << endl;
4311 if (!hasAOD) out << " gSystem->Load(\"libAOD\");" << endl;
4312 else out << " if (!" << setupPar << "(\"AOD\")) return;" << endl;
4313 out << " gSystem->Load(\"libOADB\");" << endl;
4314 if (!hasANALYSIS) out << " gSystem->Load(\"libANALYSIS\");" << endl;
4315 else out << " if (!" << setupPar << "(\"ANALYSIS\")) return;" << endl;
4316 if (!hasOADB) out << " gSystem->Load(\"libOADB\");" << endl;
4317 else out << " if (!" << setupPar << "(\"OADB\")) return;" << endl;
4318 if (!hasANALYSISalice) out << " gSystem->Load(\"libANALYSISalice\");" << endl;
4319 else out << " if (!" << setupPar << "(\"ANALYSISalice\")) return;" << endl;
4320 if (!hasCORRFW) out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
4321 else out << " if (!" << setupPar << "(\"CORRFW\")) return;" << endl << endl;
4322 out << "// Compile other par packages" << endl;
4323 next.Reset();
4324 while ((obj=next())) {
4325 pkgname = obj->GetName();
4326 if (pkgname == "STEERBase" ||
4327 pkgname == "STEERBase.par" ||
4328 pkgname == "ESD" ||
4329 pkgname == "ESD.par" ||
4330 pkgname == "AOD" ||
4331 pkgname == "AOD.par" ||
4332 pkgname == "ANALYSIS" ||
4333 pkgname == "ANALYSIS.par" ||
4334 pkgname == "OADB" ||
4335 pkgname == "OADB.par" ||
4336 pkgname == "ANALYSISalice" ||
4337 pkgname == "ANALYSISalice.par" ||
4338 pkgname == "CORRFW" ||
4339 pkgname == "CORRFW.par") continue;
4340 out << " if (!" << setupPar << "(\"" << obj->GetName() << "\")) return;" << endl;
4341 }
4342 }
4343 out << "// include path" << endl;
4344 // Get the include path from the interpreter and remove entries pointing to AliRoot
4345 out << " TString intPath = gInterpreter->GetIncludePath();" << endl;
4346 out << " TObjArray *listpaths = intPath.Tokenize(\" \");" << endl;
4347 out << " TIter nextpath(listpaths);" << endl;
4348 out << " TObjString *pname;" << endl;
4349 out << " while ((pname=(TObjString*)nextpath())) {" << endl;
4350 out << " TString current = pname->GetName();" << endl;
4351 out << " if (current.Contains(\"AliRoot\") || current.Contains(\"ALICE_ROOT\")) continue;" << endl;
4352 out << " gSystem->AddIncludePath(current);" << endl;
4353 out << " }" << endl;
4354 out << " if (listpaths) delete listpaths;" << endl;
4355 if (fIncludePath.Length()) out << " gSystem->AddIncludePath(\"" << fIncludePath.Data() << "\");" << endl;
4356 out << " gROOT->ProcessLine(\".include $ALICE_ROOT/include\");" << endl;
4357 out << " printf(\"Include path: %s\\n\", gSystem->GetIncludePath());" << endl << endl;
4358 if (fAdditionalLibs.Length()) {
4359 out << "// Add aditional AliRoot libraries" << endl;
4360 TObjArray *list = fAdditionalLibs.Tokenize(" ");
4361 TIter next(list);
4362 TObjString *str;
4363 while((str=(TObjString*)next())) {
4364 if (str->GetString().Contains(".so"))
4365 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
4366 }
4367 if (list) delete list;
4368 }
4369 out << endl;
4370 out << "// Analysis source to be compiled at runtime (if any)" << endl;
4371 if (fAnalysisSource.Length()) {
4372 TObjArray *list = fAnalysisSource.Tokenize(" ");
4373 TIter next(list);
4374 TObjString *str;
4375 while((str=(TObjString*)next())) {
4376 out << " gROOT->ProcessLine(\".L " << str->GetString().Data() << "+g\");" << endl;
4377 }
4378 if (list) delete list;
4379 }
4380 out << endl;
4381
4382 if (fFastReadOption) {
4383 Warning("WriteMergingMacro", "!!! You requested FastRead option. Using xrootd flags to reduce timeouts in the grid merging jobs. Note that this may skip some files that could be accessed !!!");
4384 out << "// fast xrootd reading enabled" << endl;
4385 out << " printf(\"!!! You requested FastRead option. Using xrootd flags to reduce timeouts. Note that this may skip some files that could be accessed !!!\");" << endl;
4386 out << " gEnv->SetValue(\"XNet.ConnectTimeout\",50);" << endl;
4387 out << " gEnv->SetValue(\"XNet.RequestTimeout\",50);" << endl;
4388 out << " gEnv->SetValue(\"XNet.MaxRedirectCount\",2);" << endl;
4389 out << " gEnv->SetValue(\"XNet.ReconnectTimeout\",50);" << endl;
4390 out << " gEnv->SetValue(\"XNet.FirstConnectMaxCnt\",1);" << endl << endl;
4391 }
4392 // Change temp directory to current one
4393 out << "// Connect to AliEn" << endl;
4394 out << " if (!TGrid::Connect(\"alien://\")) return;" << endl;
4395 out << "// Set temporary merging directory to current one" << endl;
4396 out << " gSystem->Setenv(\"TMPDIR\", gSystem->pwd());" << endl << endl;
4397 out << "// Set temporary compilation directory to current one" << endl;
4398 out << " gSystem->SetBuildDir(gSystem->pwd(), kTRUE);" << endl << endl;
4399 out << " TString outputDir = dir;" << endl;
4400 out << " TString outputFiles = \"" << GetListOfFiles("out") << "\";" << endl;
4401 out << " TString mergeExcludes = \"" << fMergeExcludes << " " << fRegisterExcludes << "\";" << endl;
4402 out << " TObjArray *list = outputFiles.Tokenize(\",\");" << endl;
4403 out << " TIter *iter = new TIter(list);" << endl;
4404 out << " TObjString *str;" << endl;
4405 out << " TString outputFile;" << endl;
4406 out << " Bool_t merged = kTRUE;" << endl;
4407 TString analysisFile = fExecutable;
4408 analysisFile.ReplaceAll(".sh", ".root");
4409 out << " AliAnalysisManager *mgr = AliAnalysisAlien::LoadAnalysisManager(\""
4410 << analysisFile << "\");" << endl;
4411 out << " if (!mgr) {" << endl;
4412 out << " printf(\"ERROR: Analysis manager could not be extracted from file \");" << endl;
4413 out << " return;" << endl;
4414 out << " }" << endl;
4415 if (IsLocalTest()) {
4416 out << " printf(\"===================================\n\");" << endl;
4417 out << " printf(\"Testing merging...\\n\");" << endl;
4418 out << " printf(\"===================================\n\");" << endl;
4419 }
4420 out << " while((str=(TObjString*)iter->Next())) {" << endl;
4421 out << " outputFile = str->GetString();" << endl;
4422 out << " if (outputFile.Contains(\"*\")) continue;" << endl;
4423 out << " Int_t index = outputFile.Index(\"@\");" << endl;
4424 out << " if (index > 0) outputFile.Remove(index);" << endl;
4425 out << " // Skip already merged outputs" << endl;
4426 out << " if (!gSystem->AccessPathName(outputFile)) {" << endl;
4427 out << " printf(\"Output file <%s> found. Not merging again.\\n\",outputFile.Data());" << endl;
4428 out << " continue;" << endl;
4429 out << " }" << endl;
4430 out << " if (mergeExcludes.Contains(outputFile.Data())) continue;" << endl;
4431 out << " merged = AliAnalysisAlien::MergeOutput(outputFile, outputDir, " << fMaxMergeFiles << ", stage);" << endl;
4432 out << " if (!merged) {" << endl;
4433 out << " printf(\"ERROR: Cannot merge %s\\n\", outputFile.Data());" << endl;
4434 out << " return;" << endl;
4435 out << " }" << endl;
4436 out << " }" << endl;
4437 if (mgr && mgr->IsCollectThroughput() && !IsLocalTest()) {
4438 out << " TString infolog = \"" << mgr->GetFileInfoLog() << "\";" << endl;
4439 out << " AliAnalysisAlien::MergeInfo(infolog, outputDir);" << endl;
4440 }
4441 out << " // all outputs merged, validate" << endl;
4442 out << " ofstream out;" << endl;
4443 out << " out.open(\"outputs_valid\", ios::out);" << endl;
4444 out << " out.close();" << endl;
4445 out << " // read the analysis manager from file" << endl;
4446 if (IsLocalTest()) {
4447 out << " printf(\"===================================\n\");" << endl;
4448 out << " printf(\"Testing Terminate()...\\n\");" << endl;
4449 out << " printf(\"===================================\n\");" << endl;
4450 } else {
4451 out << " if (!outputDir.Contains(\"Stage\")) return;" << endl;
4452 }
4453 out << " mgr->SetRunFromPath(mgr->GetRunFromAlienPath(dir));" << endl;
4454 out << " mgr->SetSkipTerminate(kFALSE);" << endl;
4455 out << " mgr->PrintStatus();" << endl;
4456 if (mgr) {
4457 if (mgr->GetDebugLevel()>3) {
4458 out << " gEnv->SetValue(\"XNet.Debug\", \"1\");" << endl;
4459 } else {
4460 if (TestBit(AliAnalysisGrid::kTest))
4461 out << " AliLog::SetGlobalLogLevel(AliLog::kWarning);" << endl;
4462 else
4463 out << " AliLog::SetGlobalLogLevel(AliLog::kError);" << endl;
4464 }
4465 }
4466 out << " TTree *tree = NULL;" << endl;
4467 out << " mgr->StartAnalysis(\"gridterminate\", tree);" << endl;
4468 out << "}" << endl << endl;
4469 if (hasANALYSISalice) {
4470 out <<"//________________________________________________________________________________" << endl;
4471 out << "Bool_t SetupPar(const char *package) {" << endl;
4472 out << "// Compile the package and set it up." << endl;
4473 out << " TString pkgdir = package;" << endl;
4474 out << " pkgdir.ReplaceAll(\".par\",\"\");" << endl;
4475 out << " gSystem->Exec(TString::Format(\"tar xvzf %s.par\", pkgdir.Data()));" << endl;
4476 out << " TString cdir = gSystem->WorkingDirectory();" << endl;
4477 out << " gSystem->ChangeDirectory(pkgdir);" << endl;
4478 out << " // Check for BUILD.sh and execute" << endl;
4479 out << " if (!gSystem->AccessPathName(\"PROOF-INF/BUILD.sh\")) {" << endl;
4480 out << " printf(\"*******************************\\n\");" << endl;
4481 out << " printf(\"*** Building PAR archive ***\\n\");" << endl;
4482 out << " printf(\"*******************************\\n\");" << endl;
4483 out << " if (gSystem->Exec(\"PROOF-INF/BUILD.sh\")) {" << endl;
4484 out << " ::Error(\"SetupPar\", \"Cannot build par archive %s\", pkgdir.Data());" << endl;
4485 out << " gSystem->ChangeDirectory(cdir);" << endl;
4486 out << " return kFALSE;" << endl;
4487 out << " }" << endl;
4488 out << " } else {" << endl;
4489 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/BUILD.sh for package %s\", pkgdir.Data());" << endl;
4490 out << " gSystem->ChangeDirectory(cdir);" << endl;
4491 out << " return kFALSE;" << endl;
4492 out << " }" << endl;
4493 out << " // Check for SETUP.C and execute" << endl;
4494 out << " if (!gSystem->AccessPathName(\"PROOF-INF/SETUP.C\")) {" << endl;
4495 out << " printf(\"*******************************\\n\");" << endl;
4496 out << " printf(\"*** Setup PAR archive ***\\n\");" << endl;
4497 out << " printf(\"*******************************\\n\");" << endl;
4498 out << " gROOT->Macro(\"PROOF-INF/SETUP.C\");" << endl;
4499 out << " } else {" << endl;
4500 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/SETUP.C for package %s\", pkgdir.Data());" << endl;
4501 out << " gSystem->ChangeDirectory(cdir);" << endl;
4502 out << " return kFALSE;" << endl;
4503 out << " }" << endl;
4504 out << " // Restore original workdir" << endl;
4505 out << " gSystem->ChangeDirectory(cdir);" << endl;
4506 out << " return kTRUE;" << endl;
4507 out << "}" << endl;
4508 }
4509 }
4510 Bool_t copy = kTRUE;
4511 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
4512 if (copy) {
4513 CdWork();
4514 TString workdir = gGrid->GetHomeDirectory();
4515 workdir += fGridWorkingDir;
4516 if (FileExists(mergingMacro)) gGrid->Rm(mergingMacro);
4517 Info("WriteMergingMacro", "\n##### Copying merging macro: <%s> to your alien workspace", mergingMacro.Data());
4518// TFile::Cp(Form("file:%s",mergingMacro.Data()), Form("alien://%s/%s", workdir.Data(), mergingMacro.Data()));
4519 if (!copyLocal2Alien("WriteMergeMacro",mergingMacro.Data(),
4520 Form("%s/%s", workdir.Data(), mergingMacro.Data()))) Fatal("","Terminating");
4521 }
4522}
4523
4524//______________________________________________________________________________
4525Bool_t AliAnalysisAlien::SetupPar(const char *package)
4526{
4527// Compile the par file archive pointed by <package>. This must be present in the current directory.
4528// Note that for loading the compiled library. The current directory should have precedence in
4529// LD_LIBRARY_PATH
4530 TString pkgdir = package;
4531 pkgdir.ReplaceAll(".par","");
4532 gSystem->Exec(TString::Format("tar xzf %s.par", pkgdir.Data()));
4533 TString cdir = gSystem->WorkingDirectory();
4534 gSystem->ChangeDirectory(pkgdir);
4535 // Check for BUILD.sh and execute
4536 if (!gSystem->AccessPathName("PROOF-INF/BUILD.sh")) {
4537 printf("**************************************************\n");
4538 printf("*** Building PAR archive %s\n", package);
4539 printf("**************************************************\n");
4540 if (gSystem->Exec("PROOF-INF/BUILD.sh")) {
4541 ::Error("SetupPar", "Cannot build par archive %s", pkgdir.Data());
4542 gSystem->ChangeDirectory(cdir);
4543 return kFALSE;
4544 }
4545 } else {
4546 ::Error("SetupPar","Cannot access PROOF-INF/BUILD.sh for package %s", pkgdir.Data());
4547 gSystem->ChangeDirectory(cdir);
4548 return kFALSE;
4549 }
4550 // Check for SETUP.C and execute
4551 if (!gSystem->AccessPathName("PROOF-INF/SETUP.C")) {
4552 printf("**************************************************\n");
4553 printf("*** Setup PAR archive %s\n", package);
4554 printf("**************************************************\n");
4555 gROOT->Macro("PROOF-INF/SETUP.C");
4556 printf("*** Loaded library: %s\n", gSystem->GetLibraries(pkgdir,"",kFALSE));
4557 } else {
4558 ::Error("SetupPar","Cannot access PROOF-INF/SETUP.C for package %s", pkgdir.Data());
4559 gSystem->ChangeDirectory(cdir);
4560 return kFALSE;
4561 }
4562 // Restore original workdir
4563 gSystem->ChangeDirectory(cdir);
4564 return kTRUE;
4565}
4566
4567//______________________________________________________________________________
4568void AliAnalysisAlien::WriteExecutable()
4569{
4570// Generate the alien executable script.
4571 // Patch executable with -x to catch error code
4572 if (fExecutableCommand.Contains("root") &&
4573 fExecutableCommand.Contains("-q") &&
4574 !fExecutableCommand.Contains("-x")) fExecutableCommand += " -x";
4575 if (!TestBit(AliAnalysisGrid::kSubmit)) {
4576 ofstream out;
4577 out.open(fExecutable.Data(), ios::out);
4578 if (out.bad()) {
4579 Error("WriteExecutable", "Bad file name for executable: %s", fExecutable.Data());
4580 return;
4581 }
4582 out << "#!/bin/bash" << endl;
4583 // Make sure we can properly compile par files
4584 out << "export LD_LIBRARY_PATH=.:$LD_LIBRARY_PATH" << endl;
4585 out << "echo \"=========================================\"" << endl;
4586 out << "echo \"############## PATH : ##############\"" << endl;
4587 out << "echo $PATH" << endl;
4588 out << "echo \"############## LD_LIBRARY_PATH : ##############\"" << endl;
4589 out << "echo $LD_LIBRARY_PATH" << endl;
4590 out << "echo \"############## ROOTSYS : ##############\"" << endl;
4591 out << "echo $ROOTSYS" << endl;
4592 out << "echo \"############## which root : ##############\"" << endl;
4593 out << "which root" << endl;
4594 out << "echo \"############## ALICE_ROOT : ##############\"" << endl;
4595 out << "echo $ALICE_ROOT" << endl;
4596 out << "echo \"############## which aliroot : ##############\"" << endl;
4597 out << "which aliroot" << endl;
4598 out << "echo \"############## system limits : ##############\"" << endl;
4599 out << "ulimit -a" << endl;
4600 out << "echo \"############## memory : ##############\"" << endl;
4601 out << "free -m" << endl;
4602 out << "echo \"=========================================\"" << endl << endl;
4603 out << fExecutableCommand << " ";
4604 out << fAnalysisMacro.Data() << " " << fExecutableArgs.Data() << endl;
4605 out << "RET=$?" << endl;
4606 out << "if [ \"$RET\" != \"0\" ];then" << endl;
4607 out << " echo \"======== ERROR : " << fAnalysisMacro.Data() << " finished with NON zero code: $RET ========\"" << endl;
4608 out << " if [ \"$RET\" -gt 128 ] && [ \"$RET\" -lt 160 ]; then"<<endl;
4609 out << " let sig=\"$RET - 128\""<<endl;
4610 out << " sigs='HUP INT QUIT ILL TRAP ABRT BUS FPE"<<endl;
4611 out << " KILL USR1 SEGV USR2 PIPE ALRM TERM STKFLT"<<endl;
4612 out << " CHLD CONT STOP TSTP TTIN TTOU URG XCPU"<<endl;
4613 out << " XFSZ VTALRM PROF WINCH IO PWR SYS'"<<endl;
4614 out << " sig=SIG`echo $sigs | awk '{ print $'\"$sig\"' }'`"<<endl;
4615 out << " echo \"======== it appears to have been killed with signal: $sig ========\""<<endl;
4616 out << " fi"<<endl;
4617 out << " exit $RET"<< endl;
4618 out << "fi" << endl << endl ;
4619 out << "echo \"======== " << fAnalysisMacro.Data() << " finished with exit code: $RET ========\"" << endl;
4620 out << "echo \"############## memory after: ##############\"" << endl;
4621 out << "free -m" << endl;
4622 }
4623 Bool_t copy = kTRUE;
4624 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
4625 if (copy) {
4626 CdWork();
4627 TString workdir = gGrid->GetHomeDirectory();
4628 TString bindir = Form("%s/bin", workdir.Data());
4629 if (!DirectoryExists(bindir)) gGrid->Mkdir(bindir,"-p");
4630 workdir += fGridWorkingDir;
4631 TString executable = Form("%s/bin/%s", gGrid->GetHomeDirectory(), fExecutable.Data());
4632 if (FileExists(executable)) gGrid->Rm(executable);
4633 Info("WriteExecutable", "\n##### Copying executable file <%s> to your AliEn bin directory", fExecutable.Data());
4634// TFile::Cp(Form("file:%s",fExecutable.Data()), Form("alien://%s", executable.Data()));
4635 if (!copyLocal2Alien("WriteExecutable",fExecutable.Data(),
4636 executable.Data())) Fatal("","Terminating");
4637 }
4638}
4639
4640//______________________________________________________________________________
4641void AliAnalysisAlien::WriteMergeExecutable()
4642{
4643// Generate the alien executable script for the merging job.
4644 if (!fMergeViaJDL) return;
4645 TString mergeExec = fExecutable;
4646 mergeExec.ReplaceAll(".sh", "_merge.sh");
4647 if (!TestBit(AliAnalysisGrid::kSubmit)) {
4648 ofstream out;
4649 out.open(mergeExec.Data(), ios::out);
4650 if (out.bad()) {
4651 Error("WriteMergingExecutable", "Bad file name for executable: %s", mergeExec.Data());
4652 return;
4653 }
4654 out << "#!/bin/bash" << endl;
4655 // Make sure we can properly compile par files
4656 out << "export LD_LIBRARY_PATH=.:$LD_LIBRARY_PATH" << endl;
4657 out << "echo \"=========================================\"" << endl;
4658 out << "echo \"############## PATH : ##############\"" << endl;
4659 out << "echo $PATH" << endl;
4660 out << "echo \"############## LD_LIBRARY_PATH : ##############\"" << endl;
4661 out << "echo $LD_LIBRARY_PATH" << endl;
4662 out << "echo \"############## ROOTSYS : ##############\"" << endl;
4663 out << "echo $ROOTSYS" << endl;
4664 out << "echo \"############## which root : ##############\"" << endl;
4665 out << "which root" << endl;
4666 out << "echo \"############## ALICE_ROOT : ##############\"" << endl;
4667 out << "echo $ALICE_ROOT" << endl;
4668 out << "echo \"############## which aliroot : ##############\"" << endl;
4669 out << "which aliroot" << endl;
4670 out << "echo \"############## system limits : ##############\"" << endl;
4671 out << "ulimit -a" << endl;
4672 out << "echo \"############## memory : ##############\"" << endl;
4673 out << "free -m" << endl;
4674 out << "echo \"=========================================\"" << endl << endl;
4675 TString mergeMacro = fExecutable;
4676 mergeMacro.ReplaceAll(".sh", "_merge.C");
4677 if (IsOneStageMerging())
4678 out << "export ARG=\"" << mergeMacro << "(\\\"$1\\\")\"" << endl;
4679 else
4680 out << "export ARG=\"" << mergeMacro << "(\\\"$1\\\",$2)\"" << endl;
4681 out << fExecutableCommand << " " << "$ARG" << endl;
4682 out << "RET=$?" << endl;
4683 out << "if [ \"$RET\" != \"0\" ];then" << endl;
4684 out << " echo \"======== ERROR : " << fAnalysisMacro.Data() << " finished with NON zero code: $RET ========\"" << endl;
4685 out << " if [ \"$RET\" -gt 128 ] && [ \"$RET\" -lt 160 ]; then"<<endl;
4686 out << " let sig=\"$RET - 128\""<<endl;
4687 out << " sigs='HUP INT QUIT ILL TRAP ABRT BUS FPE"<<endl;
4688 out << " KILL USR1 SEGV USR2 PIPE ALRM TERM STKFLT"<<endl;
4689 out << " CHLD CONT STOP TSTP TTIN TTOU URG XCPU"<<endl;
4690 out << " XFSZ VTALRM PROF WINCH IO PWR SYS'"<<endl;
4691 out << " sig=SIG`echo $sigs | awk '{ print $'\"$sig\"' }'`"<<endl;
4692 out << " echo \"======== it appears to have been killed with signal: $sig ========\""<<endl;
4693 out << " fi"<<endl;
4694 out << " exit $RET"<< endl;
4695 out << "fi" << endl << endl ;
4696 out << "echo \"======== " << mergeMacro.Data() << " finished with exit code: $? ========\"" << endl;
4697 out << "echo \"############## memory after: ##############\"" << endl;
4698 out << "free -m" << endl;
4699 }
4700 Bool_t copy = kTRUE;
4701 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
4702 if (copy) {
4703 CdWork();
4704 TString workdir = gGrid->GetHomeDirectory();
4705 TString bindir = Form("%s/bin", workdir.Data());
4706 if (!DirectoryExists(bindir)) gGrid->Mkdir(bindir,"-p");
4707 workdir += fGridWorkingDir;
4708 TString executable = Form("%s/bin/%s", gGrid->GetHomeDirectory(), mergeExec.Data());
4709 if (FileExists(executable)) gGrid->Rm(executable);
4710 Info("WriteMergeExecutable", "\n##### Copying executable file <%s> to your AliEn bin directory", mergeExec.Data());
4711// TFile::Cp(Form("file:%s",mergeExec.Data()), Form("alien://%s", executable.Data()));
4712 if (!copyLocal2Alien("WriteMergeExecutable",
4713 mergeExec.Data(), executable.Data())) Fatal("","Terminating");
4714 }
4715}
4716
4717//______________________________________________________________________________
4718void AliAnalysisAlien::WriteProductionFile(const char *filename) const
4719{
4720// Write the production file to be submitted by LPM manager. The format is:
4721// First line: full_path_to_jdl estimated_no_subjobs_per_master
4722// Next lines: full_path_to_dataset XXX (XXX is a string)
4723// To submit, one has to: submit jdl XXX for all lines
4724 ofstream out;
4725 out.open(filename, ios::out);
4726 if (out.bad()) {
4727 Error("WriteProductionFile", "Bad file name: %s", filename);
4728 return;
4729 }
4730 TString workdir;
4731 if (!fProductionMode && !fGridWorkingDir.BeginsWith("/alice"))
4732 workdir = gGrid->GetHomeDirectory();
4733 workdir += fGridWorkingDir;
4734 Int_t njobspermaster = 1000*fNrunsPerMaster/fSplitMaxInputFileNumber;
4735 TString locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
4736 out << locjdl << " " << njobspermaster << endl;
4737 Int_t nmasterjobs = fInputFiles->GetEntries();
4738 for (Int_t i=0; i<nmasterjobs; i++) {
4739 TString runOutDir = gSystem->BaseName(fInputFiles->At(i)->GetName());
4740 runOutDir.ReplaceAll(".xml", "");
4741 if (fOutputToRunNo)
4742 out << Form("%s", fInputFiles->At(i)->GetName()) << " " << runOutDir << endl;
4743 else
4744 out << Form("%s", fInputFiles->At(i)->GetName()) << " " << Form("%03d", i) << endl;
4745 }
4746 if (gGrid) {
4747 Info("WriteProductionFile", "\n##### Copying production file <%s> to your work directory", filename);
4748 if (FileExists(filename)) gGrid->Rm(filename);
4749// TFile::Cp(Form("file:%s",filename), Form("alien://%s/%s", workdir.Data(),filename));
4750 if (!copyLocal2Alien("WriteProductionFile", filename,
4751 Form("%s/%s", workdir.Data(),filename))) Fatal("","Terminating");
4752 }
4753}
4754
4755//______________________________________________________________________________
4756void AliAnalysisAlien::WriteValidationScript(Bool_t merge)
4757{
4758// Generate the alien validation script.
4759 // Generate the validation script
4760 TObjString *os;
4761 if (fValidationScript.IsNull()) {
4762 fValidationScript = fExecutable;
4763 fValidationScript.ReplaceAll(".sh", "_validation.sh");
4764 }
4765 TString validationScript = fValidationScript;
4766 if (merge) validationScript.ReplaceAll(".sh", "_merge.sh");
4767 if (!Connect()) {
4768 Error("WriteValidationScript", "Alien connection required");
4769 return;
4770 }
4771 if (!fTerminateFiles.IsNull()) {
4772 fTerminateFiles.Strip();
4773 fTerminateFiles.ReplaceAll(" ",",");
4774 }
4775 TString outStream = "";
4776 if (!TestBit(AliAnalysisGrid::kTest)) outStream = " >> stdout";
4777 if (!TestBit(AliAnalysisGrid::kSubmit)) {
4778 ofstream out;
4779 out.open(validationScript, ios::out);
4780 out << "#!/bin/bash" << endl;
4781 out << "##################################################" << endl;
4782 out << "validateout=`dirname $0`" << endl;
4783 out << "validatetime=`date`" << endl;
4784 out << "validated=\"0\";" << endl;
4785 out << "error=0" << endl;
4786 out << "if [ -z $validateout ]" << endl;
4787 out << "then" << endl;
4788 out << " validateout=\".\"" << endl;
4789 out << "fi" << endl << endl;
4790 out << "cd $validateout;" << endl;
4791 out << "validateworkdir=`pwd`;" << endl << endl;
4792 out << "echo \"*******************************************************\"" << outStream << endl;
4793 out << "echo \"* Automatically generated validation script *\"" << outStream << endl;
4794 out << "" << endl;
4795 out << "echo \"* Time: $validatetime \"" << outStream << endl;
4796 out << "echo \"* Dir: $validateout\"" << outStream << endl;
4797 out << "echo \"* Workdir: $validateworkdir\"" << outStream << endl;
4798 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl;
4799 out << "ls -la ./" << outStream << endl;
4800 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl << endl;
4801 out << "##################################################" << endl;
4802 out << "" << endl;
4803
4804 out << "if [ ! -f stderr ] ; then" << endl;
4805 out << " error=1" << endl;
4806 out << " echo \"* ########## Job not validated - no stderr ###\" " << outStream << endl;
4807 out << " echo \"Error = $error\" " << outStream << endl;
4808 out << "fi" << endl;
4809
4810 out << "parArch=`grep -Ei \"Cannot Build the PAR Archive\" stderr`" << endl;
4811 out << "segViol=`grep -Ei \"Segmentation violation\" stderr`" << endl;
4812 out << "segFault=`grep -Ei \"Segmentation fault\" stderr`" << endl;
4813 out << "glibcErr=`grep -Ei \"*** glibc detected ***\" stderr`" << endl;
4814 out << "" << endl;
4815
4816 out << "if [ \"$parArch\" != \"\" ] ; then" << endl;
4817 out << " error=1" << endl;
4818 out << " echo \"* ########## Job not validated - PAR archive not built ###\" " << outStream << endl;
4819 out << " echo \"$parArch\" " << outStream << endl;
4820 out << " echo \"Error = $error\" " << outStream << endl;
4821 out << "fi" << endl;
4822
4823 out << "if [ \"$segViol\" != \"\" ] ; then" << endl;
4824 out << " error=1" << endl;
4825 out << " echo \"* ########## Job not validated - Segment. violation ###\" " << outStream << endl;
4826 out << " echo \"$segViol\" " << outStream << endl;
4827 out << " echo \"Error = $error\" " << outStream << endl;
4828 out << "fi" << endl;
4829
4830 out << "if [ \"$segFault\" != \"\" ] ; then" << endl;
4831 out << " error=1" << endl;
4832 out << " echo \"* ########## Job not validated - Segment. fault ###\" " << outStream << endl;
4833 out << " echo \"$segFault\" " << outStream << endl;
4834 out << " echo \"Error = $error\" " << outStream << endl;
4835 out << "fi" << endl;
4836
4837 out << "if [ \"$glibcErr\" != \"\" ] ; then" << endl;
4838 out << " error=1" << endl;
4839 out << " echo \"* ########## Job not validated - *** glibc detected *** ###\" " << outStream << endl;
4840 out << " echo \"$glibcErr\" " << outStream << endl;
4841 out << " echo \"Error = $error\" " << outStream << endl;
4842 out << "fi" << endl;
4843
4844 // Part dedicated to the specific analyses running into the train
4845
4846 TString outputFiles = fOutputFiles;
4847 if (merge && !fTerminateFiles.IsNull()) {
4848 outputFiles += ",";
4849 outputFiles += fTerminateFiles;
4850 }
4851 TObjArray *arr = outputFiles.Tokenize(",");
4852 TIter next1(arr);
4853 TString outputFile;
4854 while (!merge && (os=(TObjString*)next1())) {
4855 // No need to validate outputs produced by merging since the merging macro does this
4856 outputFile = os->GetString();
4857 Int_t index = outputFile.Index("@");
4858 if (index > 0) outputFile.Remove(index);
4859 if (fTerminateFiles.Contains(outputFile)) continue;
4860 if (outputFile.Contains("*")) continue;
4861 out << "if ! [ -f " << outputFile.Data() << " ] ; then" << endl;
4862 out << " error=1" << endl;
4863 out << " echo \"Output file " << outputFile << " not found. Job FAILED !\"" << outStream << endl;
4864 out << " echo \"Output file " << outputFile << " not found. Job FAILED !\" >> stderr" << endl;
4865 out << "fi" << endl;
4866 }
4867 delete arr;
4868 out << "if ! [ -f outputs_valid ] ; then" << endl;
4869 out << " error=1" << endl;
4870 out << " echo \"Output files were not validated by the analysis manager\" >> stdout" << endl;
4871 out << " echo \"Output files were not validated by the analysis manager\" >> stderr" << endl;
4872 out << "fi" << endl;
4873
4874 out << "if [ $error = 0 ] ; then" << endl;
4875 out << " echo \"* ---------------- Job Validated ------------------*\"" << outStream << endl;
4876 if (!IsKeepLogs()) {
4877 out << " echo \"* === Logs std* will be deleted === \"" << endl;
4878 outStream = "";
4879 out << " rm -f std*" << endl;
4880 }
4881 out << "fi" << endl;
4882
4883 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl;
4884 out << "echo \"*******************************************************\"" << outStream << endl;
4885 out << "cd -" << endl;
4886 out << "exit $error" << endl;
4887 }
4888 Bool_t copy = kTRUE;
4889 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
4890 if (copy) {
4891 CdWork();
4892 TString workdir = gGrid->GetHomeDirectory();
4893 workdir += fGridWorkingDir;
4894 Info("WriteValidationScript", "\n##### Copying validation script <%s> to your AliEn working space", validationScript.Data());
4895 if (FileExists(validationScript)) gGrid->Rm(validationScript);
4896// TFile::Cp(Form("file:%s",validationScript.Data()), Form("alien://%s/%s", workdir.Data(),validationScript.Data()));
4897 if (!copyLocal2Alien("WriteValidationScript", validationScript.Data(),
4898 Form("%s/%s",workdir.Data(), validationScript.Data()))) Fatal("","Terminating");
4899 }
4900}