1 /**************************************************************************
2 * Copyright(c) 1998-2007, ALICE Experiment at CERN, All rights reserved. *
4 * Author: The ALICE Off-line Project. *
5 * Contributors are mentioned in the code where appropriate. *
7 * Permission to use, copy, modify and distribute this software and its *
8 * documentation strictly for non-commercial purposes is hereby granted *
9 * without fee, provided that the above copyright notice appears in all *
10 * copies and that both the copyright notice and this permission notice *
11 * appear in the supporting documentation. The authors make no claims *
12 * about the suitability of this software for any purpose. It is *
13 * provided "as is" without express or implied warranty. *
14 **************************************************************************/
16 // Author: Mihaela Gheata, 01/09/2008
18 //==============================================================================
19 // AliAnalysisAlien - AliEn utility class. Provides interface for creating
20 // a personalized JDL, finding and creating a dataset.
21 //==============================================================================
23 #include "AliAnalysisAlien.h"
25 #include "Riostream.h"
32 #include "TFileCollection.h"
34 #include "TObjString.h"
35 #include "TObjArray.h"
37 #include "TGridResult.h"
38 #include "TGridCollection.h"
40 #include "TGridJobStatusList.h"
41 #include "TGridJobStatus.h"
42 #include "TFileMerger.h"
43 #include "AliAnalysisManager.h"
44 #include "AliVEventHandler.h"
45 #include "AliAnalysisDataContainer.h"
46 #include "AliMultiInputEventHandler.h"
48 ClassImp(AliAnalysisAlien)
50 //______________________________________________________________________________
51 AliAnalysisAlien::AliAnalysisAlien()
57 fSplitMaxInputFileNumber(0),
59 fMasterResubmitThreshold(0),
72 fNproofWorkersPerSlave(0),
82 fAdditionalRootLibs(),
110 fRootVersionForProof(),
121 //______________________________________________________________________________
122 AliAnalysisAlien::AliAnalysisAlien(const char *name)
123 :AliAnalysisGrid(name),
128 fSplitMaxInputFileNumber(0),
130 fMasterResubmitThreshold(0),
143 fNproofWorkersPerSlave(0),
147 fExecutableCommand(),
153 fAdditionalRootLibs(),
181 fRootVersionForProof(),
192 //______________________________________________________________________________
193 AliAnalysisAlien::AliAnalysisAlien(const AliAnalysisAlien& other)
194 :AliAnalysisGrid(other),
197 fPrice(other.fPrice),
199 fSplitMaxInputFileNumber(other.fSplitMaxInputFileNumber),
200 fMaxInitFailed(other.fMaxInitFailed),
201 fMasterResubmitThreshold(other.fMasterResubmitThreshold),
202 fNtestFiles(other.fNtestFiles),
203 fNrunsPerMaster(other.fNrunsPerMaster),
204 fMaxMergeFiles(other.fMaxMergeFiles),
205 fMaxMergeStages(other.fMaxMergeStages),
206 fNsubmitted(other.fNsubmitted),
207 fProductionMode(other.fProductionMode),
208 fOutputToRunNo(other.fOutputToRunNo),
209 fMergeViaJDL(other.fMergeViaJDL),
210 fFastReadOption(other.fFastReadOption),
211 fOverwriteMode(other.fOverwriteMode),
212 fNreplicas(other.fNreplicas),
213 fNproofWorkers(other.fNproofWorkers),
214 fNproofWorkersPerSlave(other.fNproofWorkersPerSlave),
215 fProofReset(other.fProofReset),
216 fRunNumbers(other.fRunNumbers),
217 fExecutable(other.fExecutable),
218 fExecutableCommand(other.fExecutableCommand),
219 fArguments(other.fArguments),
220 fExecutableArgs(other.fExecutableArgs),
221 fAnalysisMacro(other.fAnalysisMacro),
222 fAnalysisSource(other.fAnalysisSource),
223 fValidationScript(other.fValidationScript),
224 fAdditionalRootLibs(other.fAdditionalRootLibs),
225 fAdditionalLibs(other.fAdditionalLibs),
226 fSplitMode(other.fSplitMode),
227 fAPIVersion(other.fAPIVersion),
228 fROOTVersion(other.fROOTVersion),
229 fAliROOTVersion(other.fAliROOTVersion),
230 fExternalPackages(other.fExternalPackages),
232 fGridWorkingDir(other.fGridWorkingDir),
233 fGridDataDir(other.fGridDataDir),
234 fDataPattern(other.fDataPattern),
235 fGridOutputDir(other.fGridOutputDir),
236 fOutputArchive(other.fOutputArchive),
237 fOutputFiles(other.fOutputFiles),
238 fInputFormat(other.fInputFormat),
239 fDatasetName(other.fDatasetName),
240 fJDLName(other.fJDLName),
241 fTerminateFiles(other.fTerminateFiles),
242 fMergeExcludes(other.fMergeExcludes),
243 fIncludePath(other.fIncludePath),
244 fCloseSE(other.fCloseSE),
245 fFriendChainName(other.fFriendChainName),
246 fJobTag(other.fJobTag),
247 fOutputSingle(other.fOutputSingle),
248 fRunPrefix(other.fRunPrefix),
249 fProofCluster(other.fProofCluster),
250 fProofDataSet(other.fProofDataSet),
251 fFileForTestMode(other.fFileForTestMode),
252 fRootVersionForProof(other.fRootVersionForProof),
253 fAliRootMode(other.fAliRootMode),
254 fMergeDirName(other.fMergeDirName),
260 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
261 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
262 fRunRange[0] = other.fRunRange[0];
263 fRunRange[1] = other.fRunRange[1];
264 if (other.fInputFiles) {
265 fInputFiles = new TObjArray();
266 TIter next(other.fInputFiles);
268 while ((obj=next())) fInputFiles->Add(new TObjString(obj->GetName()));
269 fInputFiles->SetOwner();
271 if (other.fPackages) {
272 fPackages = new TObjArray();
273 TIter next(other.fPackages);
275 while ((obj=next())) fPackages->Add(new TObjString(obj->GetName()));
276 fPackages->SetOwner();
280 //______________________________________________________________________________
281 AliAnalysisAlien::~AliAnalysisAlien()
284 if (fGridJDL) delete fGridJDL;
285 if (fMergingJDL) delete fMergingJDL;
286 if (fInputFiles) delete fInputFiles;
287 if (fPackages) delete fPackages;
288 fProofParam.DeleteAll();
291 //______________________________________________________________________________
292 AliAnalysisAlien &AliAnalysisAlien::operator=(const AliAnalysisAlien& other)
295 if (this != &other) {
296 AliAnalysisGrid::operator=(other);
297 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
298 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
299 fPrice = other.fPrice;
301 fSplitMaxInputFileNumber = other.fSplitMaxInputFileNumber;
302 fMaxInitFailed = other.fMaxInitFailed;
303 fMasterResubmitThreshold = other.fMasterResubmitThreshold;
304 fNtestFiles = other.fNtestFiles;
305 fNrunsPerMaster = other.fNrunsPerMaster;
306 fMaxMergeFiles = other.fMaxMergeFiles;
307 fMaxMergeStages = other.fMaxMergeStages;
308 fNsubmitted = other.fNsubmitted;
309 fProductionMode = other.fProductionMode;
310 fOutputToRunNo = other.fOutputToRunNo;
311 fMergeViaJDL = other.fMergeViaJDL;
312 fFastReadOption = other.fFastReadOption;
313 fOverwriteMode = other.fOverwriteMode;
314 fNreplicas = other.fNreplicas;
315 fNproofWorkers = other.fNproofWorkers;
316 fNproofWorkersPerSlave = other.fNproofWorkersPerSlave;
317 fProofReset = other.fProofReset;
318 fRunNumbers = other.fRunNumbers;
319 fExecutable = other.fExecutable;
320 fExecutableCommand = other.fExecutableCommand;
321 fArguments = other.fArguments;
322 fExecutableArgs = other.fExecutableArgs;
323 fAnalysisMacro = other.fAnalysisMacro;
324 fAnalysisSource = other.fAnalysisSource;
325 fValidationScript = other.fValidationScript;
326 fAdditionalRootLibs = other.fAdditionalRootLibs;
327 fAdditionalLibs = other.fAdditionalLibs;
328 fSplitMode = other.fSplitMode;
329 fAPIVersion = other.fAPIVersion;
330 fROOTVersion = other.fROOTVersion;
331 fAliROOTVersion = other.fAliROOTVersion;
332 fExternalPackages = other.fExternalPackages;
334 fGridWorkingDir = other.fGridWorkingDir;
335 fGridDataDir = other.fGridDataDir;
336 fDataPattern = other.fDataPattern;
337 fGridOutputDir = other.fGridOutputDir;
338 fOutputArchive = other.fOutputArchive;
339 fOutputFiles = other.fOutputFiles;
340 fInputFormat = other.fInputFormat;
341 fDatasetName = other.fDatasetName;
342 fJDLName = other.fJDLName;
343 fTerminateFiles = other.fTerminateFiles;
344 fMergeExcludes = other.fMergeExcludes;
345 fIncludePath = other.fIncludePath;
346 fCloseSE = other.fCloseSE;
347 fFriendChainName = other.fFriendChainName;
348 fJobTag = other.fJobTag;
349 fOutputSingle = other.fOutputSingle;
350 fRunPrefix = other.fRunPrefix;
351 fProofCluster = other.fProofCluster;
352 fProofDataSet = other.fProofDataSet;
353 fFileForTestMode = other.fFileForTestMode;
354 fRootVersionForProof = other.fRootVersionForProof;
355 fAliRootMode = other.fAliRootMode;
356 fMergeDirName = other.fMergeDirName;
357 if (other.fInputFiles) {
358 fInputFiles = new TObjArray();
359 TIter next(other.fInputFiles);
361 while ((obj=next())) fInputFiles->Add(new TObjString(obj->GetName()));
362 fInputFiles->SetOwner();
364 if (other.fPackages) {
365 fPackages = new TObjArray();
366 TIter next(other.fPackages);
368 while ((obj=next())) fPackages->Add(new TObjString(obj->GetName()));
369 fPackages->SetOwner();
375 //______________________________________________________________________________
376 void AliAnalysisAlien::SetRunPrefix(const char *prefix)
378 // Set the run number format. Can be a prefix or a format like "%09d"
380 if (!fRunPrefix.Contains("%")) fRunPrefix += "%d";
383 //______________________________________________________________________________
384 void AliAnalysisAlien::AddIncludePath(const char *path)
386 // Add include path in the remote analysis macro.
388 if (p.Contains("-I")) fIncludePath += Form("%s ", path);
389 else fIncludePath += Form("-I%s ", path);
392 //______________________________________________________________________________
393 void AliAnalysisAlien::AddRunNumber(Int_t run)
395 // Add a run number to the list of runs to be processed.
396 if (fRunNumbers.Length()) fRunNumbers += " ";
397 fRunNumbers += Form(fRunPrefix.Data(), run);
400 //______________________________________________________________________________
401 void AliAnalysisAlien::AddRunList(const char* runList)
403 // Add several runs into the list of runs; they are expected to be separated by a blank character.
404 TString sList = runList;
405 TObjArray *list = sList.Tokenize(" ");
406 Int_t n = list->GetEntries();
407 for (Int_t i = 0; i < n; i++) {
408 TObjString *os = (TObjString*)list->At(i);
409 AddRunNumber(os->GetString().Atoi());
414 //______________________________________________________________________________
415 void AliAnalysisAlien::AddRunNumber(const char* run)
417 // Add a run number to the list of runs to be processed.
420 TObjArray *arr = runs.Tokenize(" ");
423 prefix.Append(fRunPrefix, fRunPrefix.Index("%d"));
424 while ((os=(TObjString*)next())){
425 if (fRunNumbers.Length()) fRunNumbers += " ";
426 fRunNumbers += Form("%s%s", prefix.Data(), os->GetString().Data());
431 //______________________________________________________________________________
432 void AliAnalysisAlien::AddDataFile(const char *lfn)
434 // Adds a data file to the input to be analysed. The file should be a valid LFN
435 // or point to an existing file in the alien workdir.
436 if (!fInputFiles) fInputFiles = new TObjArray();
437 fInputFiles->Add(new TObjString(lfn));
440 //______________________________________________________________________________
441 void AliAnalysisAlien::AddExternalPackage(const char *package)
443 // Adds external packages w.r.t to the default ones (root,aliroot and gapi)
444 if (fExternalPackages) fExternalPackages += " ";
445 fExternalPackages += package;
448 //______________________________________________________________________________
449 Bool_t AliAnalysisAlien::Connect()
451 // Try to connect to AliEn. User needs a valid token and /tmp/gclient_env_$UID sourced.
452 if (gGrid && gGrid->IsConnected()) return kTRUE;
453 if (fProductionMode) return kTRUE;
455 Info("Connect", "Trying to connect to AliEn ...");
456 TGrid::Connect("alien://");
458 if (!gGrid || !gGrid->IsConnected()) {
459 Error("Connect", "Did not managed to connect to AliEn. Make sure you have a valid token.");
462 fUser = gGrid->GetUser();
463 Info("Connect", "\n##### Connected to AliEn as user %s. Setting analysis user to <%s>", fUser.Data(), fUser.Data());
467 //______________________________________________________________________________
468 void AliAnalysisAlien::CdWork()
470 // Check validity of alien workspace. Create directory if possible.
472 Error("CdWork", "Alien connection required");
475 TString homedir = gGrid->GetHomeDirectory();
476 TString workdir = homedir + fGridWorkingDir;
477 if (DirectoryExists(workdir)) {
481 // Work directory not existing - create it
483 if (gGrid->Mkdir(workdir, "-p")) {
484 gGrid->Cd(fGridWorkingDir);
485 Info("CdWork", "\n##### Created alien working directory %s", fGridWorkingDir.Data());
487 Warning("CdWork", "Working directory %s cannot be created.\n Using %s instead.",
488 workdir.Data(), homedir.Data());
489 fGridWorkingDir = "";
493 //______________________________________________________________________________
494 Bool_t AliAnalysisAlien::CheckFileCopy(const char *alienpath)
496 // Check if file copying is possible.
497 if (fProductionMode) return kTRUE;
499 Error("CheckFileCopy", "Not connected to AliEn. File copying cannot be tested.");
502 Info("CheckFileCopy", "Checking possibility to copy files to your AliEn home directory... \
503 \n +++ NOTE: You can disable this via: plugin->SetCheckCopy(kFALSE);");
504 // Check if alien_CLOSE_SE is defined
505 TString closeSE = gSystem->Getenv("alien_CLOSE_SE");
506 if (!closeSE.IsNull()) {
507 Info("CheckFileCopy", "Your current close storage is pointing to: \
508 \n alien_CLOSE_SE = \"%s\"", closeSE.Data());
510 Warning("CheckFileCopy", "Your current close storage is empty ! Depending on your location, file copying may fail.");
512 // Check if grid directory exists.
513 if (!DirectoryExists(alienpath)) {
514 Error("CheckFileCopy", "Alien path %s does not seem to exist", alienpath);
517 TFile f("plugin_test_copy", "RECREATE");
518 // User may not have write permissions to current directory
520 Error("CheckFileCopy", "Cannot create local test file. Do you have write access to current directory: <%s> ?",
521 gSystem->WorkingDirectory());
525 if (FileExists(Form("alien://%s/%s",alienpath, f.GetName()))) gGrid->Rm(Form("alien://%s/%s",alienpath, f.GetName()));
526 if (!TFile::Cp(f.GetName(), Form("alien://%s/%s",alienpath, f.GetName()))) {
527 Error("CheckFileCopy", "Cannot copy files to Alien destination: <%s> This may be temporary, or: \
528 \n# 1. Make sure you have write permissions there. If this is the case: \
529 \n# 2. Check the storage availability at: http://alimonitor.cern.ch/stats?page=SE/table \
530 \n# Do: export alien_CLOSE_SE=\"working_disk_SE\" \
531 \n# To make this permanent put in in your .bashrc (in .alienshrc is not enough) \
532 \n# Redo token: rm /tmp/x509up_u$UID then: alien-token-init <username>", alienpath);
533 gSystem->Unlink(f.GetName());
536 gSystem->Unlink(f.GetName());
537 gGrid->Rm(Form("%s%s",alienpath,f.GetName()));
538 Info("CheckFileCopy", "### ...SUCCESS ###");
542 //______________________________________________________________________________
543 Bool_t AliAnalysisAlien::CheckInputData()
545 // Check validity of input data. If necessary, create xml files.
546 if (fProductionMode) return kTRUE;
547 if (!fInputFiles && !fRunNumbers.Length() && !fRunRange[0]) {
548 if (!fGridDataDir.Length()) {
549 Error("CkeckInputData", "AliEn path to base data directory must be set.\n = Use: SetGridDataDir()");
553 Error("CheckInputData", "Merging via jdl works only with run numbers, run range or provided xml");
556 Info("CheckInputData", "Analysis will make a single xml for base data directory %s",fGridDataDir.Data());
557 if (fDataPattern.Contains("tag") && TestBit(AliAnalysisGrid::kTest))
558 TObject::SetBit(AliAnalysisGrid::kUseTags, kTRUE); // ADDED (fix problem in determining the tag usage in test mode)
561 // Process declared files
562 Bool_t isCollection = kFALSE;
563 Bool_t isXml = kFALSE;
564 Bool_t useTags = kFALSE;
565 Bool_t checked = kFALSE;
566 if (!TestBit(AliAnalysisGrid::kTest)) CdWork();
568 TString workdir = gGrid->GetHomeDirectory();
569 workdir += fGridWorkingDir;
572 TIter next(fInputFiles);
573 while ((objstr=(TObjString*)next())) {
576 file += objstr->GetString();
577 // Store full lfn path
578 if (FileExists(file)) objstr->SetString(file);
580 file = objstr->GetName();
581 if (!FileExists(objstr->GetName())) {
582 Error("CheckInputData", "Data file %s not found or not in your working dir: %s",
583 objstr->GetName(), workdir.Data());
587 Bool_t iscoll, isxml, usetags;
588 CheckDataType(file, iscoll, isxml, usetags);
591 isCollection = iscoll;
594 TObject::SetBit(AliAnalysisGrid::kUseTags, useTags);
596 if ((iscoll != isCollection) || (isxml != isXml) || (usetags != useTags)) {
597 Error("CheckInputData", "Some conflict was found in the types of inputs");
603 // Process requested run numbers
604 if (!fRunNumbers.Length() && !fRunRange[0]) return kTRUE;
605 // Check validity of alien data directory
606 if (!fGridDataDir.Length()) {
607 Error("CkeckInputData", "AliEn path to base data directory must be set.\n = Use: SetGridDataDir()");
610 if (!DirectoryExists(fGridDataDir)) {
611 Error("CheckInputData", "Data directory %s not existing.", fGridDataDir.Data());
615 Error("CheckInputData", "You are using raw AliEn collections as input. Cannot process run numbers.");
619 if (checked && !isXml) {
620 Error("CheckInputData", "Cannot mix processing of full runs with non-xml files");
623 // Check validity of run number(s)
628 TString schunk, schunk2;
632 useTags = fDataPattern.Contains("tag");
633 TObject::SetBit(AliAnalysisGrid::kUseTags, useTags);
635 if (useTags != fDataPattern.Contains("tag")) {
636 Error("CheckInputData", "Cannot mix input files using/not using tags");
639 if (fRunNumbers.Length()) {
640 Info("CheckDataType", "Using supplied run numbers (run ranges are ignored)");
641 arr = fRunNumbers.Tokenize(" ");
643 while ((os=(TObjString*)next())) {
644 path = Form("%s/%s ", fGridDataDir.Data(), os->GetString().Data());
645 if (!DirectoryExists(path)) {
646 Warning("CheckInputData", "Run number %s not found in path: <%s>", os->GetString().Data(), path.Data());
649 path = Form("%s/%s.xml", workdir.Data(),os->GetString().Data());
650 TString msg = "\n##### file: ";
652 msg += " type: xml_collection;";
653 if (useTags) msg += " using_tags: Yes";
654 else msg += " using_tags: No";
655 Info("CheckDataType", "%s", msg.Data());
656 if (fNrunsPerMaster<2) {
657 AddDataFile(Form("%s.xml", os->GetString().Data()));
660 if (((nruns-1)%fNrunsPerMaster) == 0) {
661 schunk = os->GetString();
663 if ((nruns%fNrunsPerMaster)!=0 && os!=arr->Last()) continue;
664 schunk += Form("_%s.xml", os->GetString().Data());
670 Info("CheckDataType", "Using run range [%d, %d]", fRunRange[0], fRunRange[1]);
671 for (Int_t irun=fRunRange[0]; irun<=fRunRange[1]; irun++) {
672 format = Form("%%s/%s ", fRunPrefix.Data());
673 path = Form(format.Data(), fGridDataDir.Data(), irun);
674 if (!DirectoryExists(path)) {
677 format = Form("%%s/%s.xml", fRunPrefix.Data());
678 path = Form(format.Data(), workdir.Data(),irun);
679 TString msg = "\n##### file: ";
681 msg += " type: xml_collection;";
682 if (useTags) msg += " using_tags: Yes";
683 else msg += " using_tags: No";
684 Info("CheckDataType", "%s", msg.Data());
685 if (fNrunsPerMaster<2) {
686 format = Form("%s.xml", fRunPrefix.Data());
687 AddDataFile(Form(format.Data(),irun));
690 if (((nruns-1)%fNrunsPerMaster) == 0) {
691 schunk = Form(fRunPrefix.Data(),irun);
693 format = Form("_%s.xml", fRunPrefix.Data());
694 schunk2 = Form(format.Data(), irun);
695 if ((nruns%fNrunsPerMaster)!=0 && irun != fRunRange[1]) continue;
708 //______________________________________________________________________________
709 Bool_t AliAnalysisAlien::CreateDataset(const char *pattern)
711 // Create dataset for the grid data directory + run number.
712 const Int_t gMaxEntries = 15000;
713 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline)) return kTRUE;
715 Error("CreateDataset", "Cannot create dataset with no grid connection");
720 if (!TestBit(AliAnalysisGrid::kTest)) CdWork();
721 TString workdir = gGrid->GetHomeDirectory();
722 workdir += fGridWorkingDir;
724 // Compose the 'find' command arguments
727 TString options = "-x collection ";
728 if (TestBit(AliAnalysisGrid::kTest)) options += Form("-l %d ", fNtestFiles);
729 else options += Form("-l %d ", gMaxEntries); // Protection for the find command
730 TString conditions = "";
737 TString schunk, schunk2;
738 TGridCollection *cbase=0, *cadd=0;
739 if (!fRunNumbers.Length() && !fRunRange[0]) {
740 if (fInputFiles && fInputFiles->GetEntries()) return kTRUE;
741 // Make a single data collection from data directory.
743 if (!DirectoryExists(path)) {
744 Error("CreateDataset", "Path to data directory %s not valid",fGridDataDir.Data());
748 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
749 else file = Form("%s.xml", gSystem->BaseName(path));
753 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
755 command += Form("%s -o %d ",options.Data(), nstart);
759 command += conditions;
760 printf("command: %s\n", command.Data());
761 TGridResult *res = gGrid->Command(command);
763 // Write standard output to file
764 gROOT->ProcessLine(Form("gGrid->Stdout(); > __tmp%d__%s", stage, file.Data()));
765 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
766 Bool_t nullFile = kFALSE;
768 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
770 nullFile = (gSystem->Exec(Form("grep -c /event __tmp%d__%s 2>/dev/null > __tmp__",stage,file.Data()))==0)?kFALSE:kTRUE;
772 Error("CreateDataset","Dataset %s produced by the previous find command is empty !", file.Data());
773 gSystem->Exec("rm -f __tmp*");
781 gSystem->Exec("rm -f __tmp__");
782 ncount = line.Atoi();
785 if (ncount == gMaxEntries) {
786 Info("CreateDataset", "Dataset %s has more than 15K entries. Trying to merge...", file.Data());
787 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
788 if (!cbase) cbase = cadd;
796 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
797 printf("... please wait - TAlienCollection::Add() scales badly...\n");
800 cbase->ExportXML(Form("file://%s", file.Data()),kFALSE,kFALSE, file, "Merged entries for a run");
801 delete cbase; cbase = 0;
803 TFile::Cp(Form("__tmp%d__%s",stage, file.Data()), file.Data());
805 gSystem->Exec("rm -f __tmp*");
806 Info("CreateDataset", "Created dataset %s with %d files", file.Data(), nstart+ncount);
810 Bool_t fileExists = FileExists(file);
811 if (!TestBit(AliAnalysisGrid::kTest) && (!fileExists || fOverwriteMode)) {
812 // Copy xml file to alien space
813 if (fileExists) gGrid->Rm(file);
814 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
815 if (!FileExists(file)) {
816 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
819 // Update list of files to be processed.
821 AddDataFile(Form("%s/%s", workdir.Data(), file.Data()));
825 Bool_t nullResult = kTRUE;
826 if (fRunNumbers.Length()) {
827 TObjArray *arr = fRunNumbers.Tokenize(" ");
830 while ((os=(TObjString*)next())) {
833 path = Form("%s/%s/ ", fGridDataDir.Data(), os->GetString().Data());
834 if (!DirectoryExists(path)) continue;
836 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
837 else file = Form("%s.xml", os->GetString().Data());
838 // If local collection file does not exist, create it via 'find' command.
842 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
844 command += Form("%s -o %d ",options.Data(), nstart);
847 command += conditions;
848 TGridResult *res = gGrid->Command(command);
850 // Write standard output to file
851 gROOT->ProcessLine(Form("gGrid->Stdout(); > __tmp%d__%s", stage,file.Data()));
852 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
853 Bool_t nullFile = kFALSE;
855 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
857 nullFile = (gSystem->Exec(Form("grep -c /event __tmp%d__%s 2>/dev/null > __tmp__",stage,file.Data()))==0)?kFALSE:kTRUE;
859 Warning("CreateDataset","Dataset %s produced by: <%s> is empty !", file.Data(), command.Data());
860 gSystem->Exec("rm -f __tmp*");
861 fRunNumbers.ReplaceAll(os->GetString().Data(), "");
869 gSystem->Exec("rm -f __tmp__");
870 ncount = line.Atoi();
874 if (ncount == gMaxEntries) {
875 Info("CreateDataset", "Dataset %s has more than 15K entries. Trying to merge...", file.Data());
876 if (fNrunsPerMaster > 1) {
877 Error("CreateDataset", "File %s has more than %d entries. Please set the number of runs per master to 1 !",
878 file.Data(),gMaxEntries);
881 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
882 if (!cbase) cbase = cadd;
889 if (cbase && fNrunsPerMaster<2) {
890 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
891 printf("... please wait - TAlienCollection::Add() scales badly...\n");
894 cbase->ExportXML(Form("file://%s", file.Data()),kFALSE,kFALSE, file, "Merged entries for a run");
895 delete cbase; cbase = 0;
897 TFile::Cp(Form("__tmp%d__%s",stage, file.Data()), file.Data());
899 gSystem->Exec("rm -f __tmp*");
900 Info("CreateDataset", "Created dataset %s with %d files", file.Data(), nstart+ncount);
904 if (TestBit(AliAnalysisGrid::kTest)) break;
905 // Check if there is one run per master job.
906 if (fNrunsPerMaster<2) {
907 if (FileExists(file)) {
908 if (fOverwriteMode) gGrid->Rm(file);
910 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
914 // Copy xml file to alien space
915 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
916 if (!FileExists(file)) {
917 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
923 if (((nruns-1)%fNrunsPerMaster) == 0) {
924 schunk = os->GetString();
925 cbase = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
927 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
928 printf(" Merging collection <%s> into masterjob input...\n", file.Data());
932 if ((nruns%fNrunsPerMaster)!=0 && os!=arr->Last()) {
935 schunk += Form("_%s.xml", os->GetString().Data());
936 if (FileExists(schunk)) {
937 if (fOverwriteMode) gGrid->Rm(file);
939 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", schunk.Data());
943 printf("Exporting merged collection <%s> and copying to AliEn\n", schunk.Data());
944 cbase->ExportXML(Form("file://%s", schunk.Data()),kFALSE,kFALSE, schunk, "Merged runs");
945 TFile::Cp(Form("file:%s",schunk.Data()), Form("alien://%s/%s",workdir.Data(), schunk.Data()));
946 if (!FileExists(schunk)) {
947 Error("CreateDataset", "Copy command did NOT succeed for %s", schunk.Data());
955 Error("CreateDataset", "No valid dataset corresponding to the query!");
959 // Process a full run range.
960 for (Int_t irun=fRunRange[0]; irun<=fRunRange[1]; irun++) {
961 format = Form("%%s/%s ", fRunPrefix.Data());
964 path = Form(format.Data(), fGridDataDir.Data(), irun);
965 if (!DirectoryExists(path)) continue;
967 format = Form("%s.xml", fRunPrefix.Data());
968 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
969 else file = Form(format.Data(), irun);
970 if (FileExists(file) && fNrunsPerMaster<2 && !TestBit(AliAnalysisGrid::kTest)) {
971 if (fOverwriteMode) gGrid->Rm(file);
973 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
977 // If local collection file does not exist, create it via 'find' command.
981 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
983 command += Form("%s -o %d ",options.Data(), nstart);
986 command += conditions;
987 TGridResult *res = gGrid->Command(command);
989 // Write standard output to file
990 gROOT->ProcessLine(Form("gGrid->Stdout(); > __tmp%d__%s", stage,file.Data()));
991 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
992 Bool_t nullFile = kFALSE;
994 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
996 nullFile = (gSystem->Exec(Form("grep -c /event __tmp%d__%s 2>/dev/null > __tmp__",stage,file.Data()))==0)?kFALSE:kTRUE;
998 Warning("CreateDataset","Dataset %s produced by: <%s> is empty !", file.Data(), command.Data());
999 gSystem->Exec("rm -f __tmp*");
1007 gSystem->Exec("rm -f __tmp__");
1008 ncount = line.Atoi();
1010 nullResult = kFALSE;
1012 if (ncount == gMaxEntries) {
1013 Info("CreateDataset", "Dataset %s has more than 15K entries. Trying to merge...", file.Data());
1014 if (fNrunsPerMaster > 1) {
1015 Error("CreateDataset", "File %s has more than %d entries. Please set the number of runs per master to 1 !",
1016 file.Data(),gMaxEntries);
1019 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1020 if (!cbase) cbase = cadd;
1027 if (cbase && fNrunsPerMaster<2) {
1028 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1029 printf("... please wait - TAlienCollection::Add() scales badly...\n");
1032 cbase->ExportXML(Form("file://%s", file.Data()),kFALSE,kFALSE, file, "Merged entries for a run");
1033 delete cbase; cbase = 0;
1035 TFile::Cp(Form("__tmp%d__%s",stage, file.Data()), file.Data());
1037 Info("CreateDataset", "Created dataset %s with %d files", file.Data(), nstart+ncount);
1041 if (TestBit(AliAnalysisGrid::kTest)) break;
1042 // Check if there is one run per master job.
1043 if (fNrunsPerMaster<2) {
1044 if (FileExists(file)) {
1045 if (fOverwriteMode) gGrid->Rm(file);
1047 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
1051 // Copy xml file to alien space
1052 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
1053 if (!FileExists(file)) {
1054 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
1059 // Check if the collection for the chunk exist locally.
1060 Int_t nchunk = (nruns-1)/fNrunsPerMaster;
1061 if (FileExists(fInputFiles->At(nchunk)->GetName())) {
1062 if (fOverwriteMode) gGrid->Rm(fInputFiles->At(nchunk)->GetName());
1065 printf(" Merging collection <%s> into %d runs chunk...\n",file.Data(),fNrunsPerMaster);
1066 if (((nruns-1)%fNrunsPerMaster) == 0) {
1067 schunk = Form(fRunPrefix.Data(), irun);
1068 cbase = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
1070 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
1074 format = Form("%%s_%s.xml", fRunPrefix.Data());
1075 schunk2 = Form(format.Data(), schunk.Data(), irun);
1076 if ((nruns%fNrunsPerMaster)!=0 && irun!=fRunRange[1] && schunk2 != fInputFiles->Last()->GetName()) {
1080 if (FileExists(schunk)) {
1081 if (fOverwriteMode) gGrid->Rm(schunk);
1083 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", schunk.Data());
1087 printf("Exporting merged collection <%s> and copying to AliEn.\n", schunk.Data());
1088 cbase->ExportXML(Form("file://%s", schunk.Data()),kFALSE,kFALSE, schunk, "Merged runs");
1089 if (FileExists(schunk)) {
1090 if (fOverwriteMode) gGrid->Rm(schunk);
1092 Info("CreateDataset", "\n##### Dataset %s exist. Skipping copy...", schunk.Data());
1096 TFile::Cp(Form("file:%s",schunk.Data()), Form("alien://%s/%s",workdir.Data(), schunk.Data()));
1097 if (!FileExists(schunk)) {
1098 Error("CreateDataset", "Copy command did NOT succeed for %s", schunk.Data());
1104 Error("CreateDataset", "No valid dataset corresponding to the query!");
1111 //______________________________________________________________________________
1112 Bool_t AliAnalysisAlien::CreateJDL()
1114 // Generate a JDL file according to current settings. The name of the file is
1115 // specified by fJDLName.
1116 Bool_t error = kFALSE;
1118 Bool_t copy = kTRUE;
1119 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
1120 Bool_t generate = kTRUE;
1121 if (TestBit(AliAnalysisGrid::kTest) || TestBit(AliAnalysisGrid::kSubmit)) generate = kFALSE;
1123 Error("CreateJDL", "Alien connection required");
1126 // Check validity of alien workspace
1128 if (!fProductionMode && !fGridWorkingDir.BeginsWith("/alice")) workdir = gGrid->GetHomeDirectory();
1129 if (!fProductionMode && !TestBit(AliAnalysisGrid::kTest)) CdWork();
1130 workdir += fGridWorkingDir;
1134 Error("CreateJDL()", "Define some input files for your analysis.");
1137 // Compose list of input files
1138 // Check if output files were defined
1139 if (!fOutputFiles.Length()) {
1140 Error("CreateJDL", "You must define at least one output file");
1143 // Check if an output directory was defined and valid
1144 if (!fGridOutputDir.Length()) {
1145 Error("CreateJDL", "You must define AliEn output directory");
1148 if (!fProductionMode) {
1149 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s", workdir.Data(), fGridOutputDir.Data());
1150 if (!DirectoryExists(fGridOutputDir)) {
1151 if (gGrid->Mkdir(fGridOutputDir,"-p")) {
1152 Info("CreateJDL", "\n##### Created alien output directory %s", fGridOutputDir.Data());
1154 Error("CreateJDL", "Could not create alien output directory %s", fGridOutputDir.Data());
1158 Warning("CreateJDL", "#### Output directory %s exists! If this contains old data, jobs will fail with ERROR_SV !!! ###", fGridOutputDir.Data());
1163 // Exit if any error up to now
1164 if (error) return kFALSE;
1166 if (!fUser.IsNull()) {
1167 fGridJDL->SetValue("User", Form("\"%s\"", fUser.Data()));
1168 fMergingJDL->SetValue("User", Form("\"%s\"", fUser.Data()));
1170 fGridJDL->SetExecutable(fExecutable, "This is the startup script");
1171 TString mergeExec = fExecutable;
1172 mergeExec.ReplaceAll(".sh", "_merge.sh");
1173 fMergingJDL->SetExecutable(mergeExec, "This is the startup script");
1174 mergeExec.ReplaceAll(".sh", ".C");
1175 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),mergeExec.Data()), "List of input files to be uploaded to workers");
1176 if (!fArguments.IsNull())
1177 fGridJDL->SetArguments(fArguments, "Arguments for the executable command");
1178 if (IsOneStageMerging()) fMergingJDL->SetArguments(fGridOutputDir);
1180 if (fProductionMode) fMergingJDL->SetArguments("wn.xml $4"); // xml, stage
1181 else fMergingJDL->SetArguments("wn.xml $2"); // xml, stage
1184 fGridJDL->SetValue("TTL", Form("\"%d\"",fTTL));
1185 fGridJDL->SetDescription("TTL", Form("Time after which the job is killed (%d min.)", fTTL/60));
1186 fMergingJDL->SetValue("TTL", Form("\"%d\"",fTTL));
1187 fMergingJDL->SetDescription("TTL", Form("Time after which the job is killed (%d min.)", fTTL/60));
1189 if (fMaxInitFailed > 0) {
1190 fGridJDL->SetValue("MaxInitFailed", Form("\"%d\"",fMaxInitFailed));
1191 fGridJDL->SetDescription("MaxInitFailed", "Maximum number of first failing jobs to abort the master job");
1193 if (fSplitMaxInputFileNumber > 0) {
1194 fGridJDL->SetValue("SplitMaxInputFileNumber", Form("\"%d\"", fSplitMaxInputFileNumber));
1195 fGridJDL->SetDescription("SplitMaxInputFileNumber", "Maximum number of input files to be processed per subjob");
1197 if (!IsOneStageMerging()) {
1198 fMergingJDL->SetValue("SplitMaxInputFileNumber", Form("\"%d\"",fMaxMergeFiles));
1199 fMergingJDL->SetDescription("SplitMaxInputFileNumber", "Maximum number of input files to be merged in one go");
1201 if (fSplitMode.Length()) {
1202 fGridJDL->SetValue("Split", Form("\"%s\"", fSplitMode.Data()));
1203 fGridJDL->SetDescription("Split", "We split per SE or file");
1205 fMergingJDL->SetValue("Split", "\"se\"");
1206 fMergingJDL->SetDescription("Split", "We split per SE for merging in stages");
1207 if (!fAliROOTVersion.IsNull()) {
1208 fGridJDL->AddToPackages("AliRoot", fAliROOTVersion,"VO_ALICE", "List of requested packages");
1209 fMergingJDL->AddToPackages("AliRoot", fAliROOTVersion, "VO_ALICE", "List of requested packages");
1211 if (!fROOTVersion.IsNull()) {
1212 fGridJDL->AddToPackages("ROOT", fROOTVersion);
1213 fMergingJDL->AddToPackages("ROOT", fROOTVersion);
1215 if (!fAPIVersion.IsNull()) {
1216 fGridJDL->AddToPackages("APISCONFIG", fAPIVersion);
1217 fMergingJDL->AddToPackages("APISCONFIG", fAPIVersion);
1219 if (!fExternalPackages.IsNull()) {
1220 arr = fExternalPackages.Tokenize(" ");
1222 while ((os=(TObjString*)next())) {
1223 TString pkgname = os->GetString();
1224 Int_t index = pkgname.Index("::");
1225 TString pkgversion = pkgname(index+2, pkgname.Length());
1226 pkgname.Remove(index);
1227 fGridJDL->AddToPackages(pkgname, pkgversion);
1228 fMergingJDL->AddToPackages(pkgname, pkgversion);
1232 fGridJDL->SetInputDataListFormat(fInputFormat, "Format of input data");
1233 fGridJDL->SetInputDataList("wn.xml", "Collection name to be processed on each worker node");
1234 fMergingJDL->SetInputDataListFormat(fInputFormat, "Format of input data");
1235 fMergingJDL->SetInputDataList("wn.xml", "Collection name to be processed on each worker node");
1236 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), fAnalysisMacro.Data()), "List of input files to be uploaded to workers");
1237 TString analysisFile = fExecutable;
1238 analysisFile.ReplaceAll(".sh", ".root");
1239 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),analysisFile.Data()));
1240 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),analysisFile.Data()));
1241 if (IsUsingTags() && !gSystem->AccessPathName("ConfigureCuts.C"))
1242 fGridJDL->AddToInputSandbox(Form("LF:%s/ConfigureCuts.C", workdir.Data()));
1243 if (fAdditionalLibs.Length()) {
1244 arr = fAdditionalLibs.Tokenize(" ");
1246 while ((os=(TObjString*)next())) {
1247 if (os->GetString().Contains(".so")) continue;
1248 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), os->GetString().Data()));
1249 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), os->GetString().Data()));
1254 TIter next(fPackages);
1256 while ((obj=next())) {
1257 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), obj->GetName()));
1258 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), obj->GetName()));
1261 if (fOutputArchive.Length()) {
1262 arr = fOutputArchive.Tokenize(" ");
1264 Bool_t first = kTRUE;
1265 const char *comment = "Files to be archived";
1266 const char *comment1 = comment;
1267 while ((os=(TObjString*)next())) {
1268 if (!first) comment = NULL;
1269 if (!os->GetString().Contains("@") && fCloseSE.Length())
1270 fGridJDL->AddToOutputArchive(Form("%s@%s",os->GetString().Data(), fCloseSE.Data()), comment);
1272 fGridJDL->AddToOutputArchive(os->GetString(), comment);
1276 // Output archive for the merging jdl
1277 TString outputArchive;
1278 if (TestBit(AliAnalysisGrid::kDefaultOutputs)) {
1279 outputArchive = "log_archive.zip:std*@disk=1 ";
1280 // Add normal output files, extra files + terminate files
1281 TString files = GetListOfFiles("outextter");
1282 // Do not register merge excludes
1283 if (!fMergeExcludes.IsNull()) {
1284 arr = fMergeExcludes.Tokenize(" ");
1286 while ((os=(TObjString*)next1())) {
1287 files.ReplaceAll(Form("%s,",os->GetString().Data()),"");
1288 files.ReplaceAll(os->GetString(),"");
1292 files.ReplaceAll(".root", "*.root");
1293 outputArchive += Form("root_archive.zip:%s,*.stat@disk=%d",files.Data(),fNreplicas);
1295 TString files = fOutputArchive;
1296 files.ReplaceAll(".root", "*.root"); // nreplicas etc should be already atttached by use
1297 outputArchive = files;
1299 arr = outputArchive.Tokenize(" ");
1303 while ((os=(TObjString*)next2())) {
1304 if (!first) comment = NULL;
1305 TString currentfile = os->GetString();
1306 if (!currentfile.Contains("@") && fCloseSE.Length())
1307 fMergingJDL->AddToOutputArchive(Form("%s@%s",currentfile.Data(), fCloseSE.Data()), comment);
1309 fMergingJDL->AddToOutputArchive(currentfile, comment);
1314 arr = fOutputFiles.Tokenize(",");
1316 Bool_t first = kTRUE;
1317 const char *comment = "Files to be saved";
1318 while ((os=(TObjString*)next())) {
1319 // Ignore ouputs in jdl that are also in outputarchive
1320 TString sout = os->GetString();
1321 sout.ReplaceAll("*", "");
1322 sout.ReplaceAll(".root", "");
1323 if (sout.Index("@")>0) sout.Remove(sout.Index("@"));
1324 if (fOutputArchive.Contains(sout)) continue;
1325 if (!first) comment = NULL;
1326 if (!os->GetString().Contains("@") && fCloseSE.Length())
1327 fGridJDL->AddToOutputSandbox(Form("%s@%s",os->GetString().Data(), fCloseSE.Data()), comment);
1329 fGridJDL->AddToOutputSandbox(os->GetString(), comment);
1331 if (fMergeExcludes.Contains(sout)) continue;
1332 if (!os->GetString().Contains("@") && fCloseSE.Length())
1333 fMergingJDL->AddToOutputSandbox(Form("%s@%s",os->GetString().Data(), fCloseSE.Data()), comment);
1335 fMergingJDL->AddToOutputSandbox(os->GetString(), comment);
1338 fGridJDL->SetPrice((UInt_t)fPrice, "AliEn price for this job");
1339 fMergingJDL->SetPrice((UInt_t)fPrice, "AliEn price for this job");
1340 TString validationScript = fValidationScript;
1341 fGridJDL->SetValidationCommand(Form("%s/%s", workdir.Data(),validationScript.Data()), "Validation script to be run for each subjob");
1342 validationScript.ReplaceAll(".sh", "_merge.sh");
1343 fMergingJDL->SetValidationCommand(Form("%s/%s", workdir.Data(),validationScript.Data()), "Validation script to be run for each subjob");
1344 if (fMasterResubmitThreshold) {
1345 fGridJDL->SetValue("MasterResubmitThreshold", Form("\"%d%%\"", fMasterResubmitThreshold));
1346 fGridJDL->SetDescription("MasterResubmitThreshold", "Resubmit failed jobs until DONE rate reaches this percentage");
1348 // Write a jdl with 2 input parameters: collection name and output dir name.
1351 // Copy jdl to grid workspace
1353 // Check if an output directory was defined and valid
1354 if (!fGridOutputDir.Length()) {
1355 Error("CreateJDL", "You must define AliEn output directory");
1358 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s", workdir.Data(), fGridOutputDir.Data());
1359 if (!fProductionMode && !DirectoryExists(fGridOutputDir)) {
1360 if (gGrid->Mkdir(fGridOutputDir,"-p")) {
1361 Info("CreateJDL", "\n##### Created alien output directory %s", fGridOutputDir.Data());
1363 Error("CreateJDL", "Could not create alien output directory %s", fGridOutputDir.Data());
1369 if (TestBit(AliAnalysisGrid::kSubmit)) {
1370 TString mergeJDLName = fExecutable;
1371 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
1372 TString locjdl = Form("%s/%s", fGridOutputDir.Data(),fJDLName.Data());
1373 TString locjdl1 = Form("%s/%s", fGridOutputDir.Data(),mergeJDLName.Data());
1374 if (fProductionMode) {
1375 locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
1376 locjdl1 = Form("%s/%s", workdir.Data(),mergeJDLName.Data());
1378 if (FileExists(locjdl)) gGrid->Rm(locjdl);
1379 if (FileExists(locjdl1)) gGrid->Rm(locjdl1);
1380 Info("CreateJDL", "\n##### Copying JDL file <%s> to your AliEn output directory", fJDLName.Data());
1381 TFile::Cp(Form("file:%s",fJDLName.Data()), Form("alien://%s", locjdl.Data()));
1383 Info("CreateJDL", "\n##### Copying merging JDL file <%s> to your AliEn output directory", mergeJDLName.Data());
1384 TFile::Cp(Form("file:%s",mergeJDLName.Data()), Form("alien://%s", locjdl1.Data()));
1387 if (fAdditionalLibs.Length()) {
1388 arr = fAdditionalLibs.Tokenize(" ");
1391 while ((os=(TObjString*)next())) {
1392 if (os->GetString().Contains(".so")) continue;
1393 Info("CreateJDL", "\n##### Copying dependency: <%s> to your alien workspace", os->GetString().Data());
1394 if (FileExists(os->GetString())) gGrid->Rm(os->GetString());
1395 TFile::Cp(Form("file:%s",os->GetString().Data()), Form("alien://%s/%s", workdir.Data(), os->GetString().Data()));
1400 TIter next(fPackages);
1402 while ((obj=next())) {
1403 if (FileExists(obj->GetName())) gGrid->Rm(obj->GetName());
1404 Info("CreateJDL", "\n##### Copying dependency: <%s> to your alien workspace", obj->GetName());
1405 TFile::Cp(Form("file:%s",obj->GetName()), Form("alien://%s/%s", workdir.Data(), obj->GetName()));
1412 //______________________________________________________________________________
1413 Bool_t AliAnalysisAlien::WriteJDL(Bool_t copy)
1415 // Writes one or more JDL's corresponding to findex. If findex is negative,
1416 // all run numbers are considered in one go (jdl). For non-negative indices
1417 // they correspond to the indices in the array fInputFiles.
1418 if (!fInputFiles) return kFALSE;
1421 if (!fProductionMode && !fGridWorkingDir.BeginsWith("/alice")) workdir = gGrid->GetHomeDirectory();
1422 workdir += fGridWorkingDir;
1423 TString stageName = "$2";
1424 if (fProductionMode) stageName = "$4";
1425 if (!fMergeDirName.IsNull()) {
1426 fMergingJDL->AddToInputDataCollection(Form("LF:$1/%s/Stage_%s.xml,nodownload",fMergeDirName.Data(),stageName.Data()), "Collection of files to be merged for current stage");
1427 fMergingJDL->SetOutputDirectory(Form("$1/%s/Stage_%s/#alien_counter_03i#",fMergeDirName.Data(),stageName.Data()), "Output directory");
1429 fMergingJDL->AddToInputDataCollection(Form("LF:$1/Stage_%s.xml,nodownload",stageName.Data()), "Collection of files to be merged for current stage");
1430 fMergingJDL->SetOutputDirectory(Form("$1/Stage_%s/#alien_counter_03i#",stageName.Data()), "Output directory");
1432 if (fProductionMode) {
1433 TIter next(fInputFiles);
1434 while ((os=next())) {
1435 fGridJDL->AddToInputDataCollection(Form("LF:%s,nodownload", os->GetName()), "Input xml collections");
1437 fGridJDL->SetOutputDirectory(Form("%s/#alien_counter_04i#", fGridOutputDir.Data()));
1439 if (!fRunNumbers.Length() && !fRunRange[0]) {
1440 // One jdl with no parameters in case input data is specified by name.
1441 TIter next(fInputFiles);
1443 fGridJDL->AddToInputDataCollection(Form("LF:%s,nodownload", os->GetName()), "Input xml collections");
1444 if (!fOutputSingle.IsNull())
1445 fGridJDL->SetOutputDirectory(Form("#alienfulldir#/../%s",fOutputSingle.Data()), "Output directory");
1447 fGridJDL->SetOutputDirectory(Form("%s/#alien_counter_03i#", fGridOutputDir.Data()), "Output directory");
1448 fMergingJDL->SetOutputDirectory(fGridOutputDir);
1451 // One jdl to be submitted with 2 input parameters: data collection name and output dir prefix
1452 fGridJDL->AddToInputDataCollection(Form("LF:%s/$1,nodownload", workdir.Data()), "Input xml collections");
1453 if (!fOutputSingle.IsNull()) {
1454 if (!fOutputToRunNo) fGridJDL->SetOutputDirectory(Form("#alienfulldir#/%s",fOutputSingle.Data()), "Output directory");
1455 else fGridJDL->SetOutputDirectory(Form("%s/$2",fGridOutputDir.Data()), "Output directory");
1457 fGridJDL->SetOutputDirectory(Form("%s/$2/#alien_counter_03i#", fGridOutputDir.Data()), "Output directory");
1462 // Generate the JDL as a string
1463 TString sjdl = fGridJDL->Generate();
1464 TString sjdl1 = fMergingJDL->Generate();
1466 if (!fMergeDirName.IsNull()) {
1467 fMergingJDL->SetOutputDirectory(Form("$1/%s",fMergeDirName.Data()), "Output directory");
1468 fMergingJDL->AddToInputSandbox(Form("LF:$1/%s/Stage_%s.xml",fMergeDirName.Data(),stageName.Data()));
1470 fMergingJDL->SetOutputDirectory("$1", "Output directory");
1471 fMergingJDL->AddToInputSandbox(Form("LF:$1/Stage_%s.xml",stageName.Data()));
1473 TString sjdl2 = fMergingJDL->Generate();
1474 Int_t index, index1;
1475 sjdl.ReplaceAll("\"LF:", "\n \"LF:");
1476 sjdl.ReplaceAll("(member", "\n (member");
1477 sjdl.ReplaceAll("\",\"VO_", "\",\n \"VO_");
1478 sjdl.ReplaceAll("{", "{\n ");
1479 sjdl.ReplaceAll("};", "\n};");
1480 sjdl.ReplaceAll("{\n \n", "{\n");
1481 sjdl.ReplaceAll("\n\n", "\n");
1482 sjdl.ReplaceAll("OutputDirectory", "OutputDir");
1483 sjdl1.ReplaceAll("\"LF:", "\n \"LF:");
1484 sjdl1.ReplaceAll("(member", "\n (member");
1485 sjdl1.ReplaceAll("\",\"VO_", "\",\n \"VO_");
1486 sjdl1.ReplaceAll("{", "{\n ");
1487 sjdl1.ReplaceAll("};", "\n};");
1488 sjdl1.ReplaceAll("{\n \n", "{\n");
1489 sjdl1.ReplaceAll("\n\n", "\n");
1490 sjdl1.ReplaceAll("OutputDirectory", "OutputDir");
1491 sjdl2.ReplaceAll("\"LF:", "\n \"LF:");
1492 sjdl2.ReplaceAll("(member", "\n (member");
1493 sjdl2.ReplaceAll("\",\"VO_", "\",\n \"VO_");
1494 sjdl2.ReplaceAll("{", "{\n ");
1495 sjdl2.ReplaceAll("};", "\n};");
1496 sjdl2.ReplaceAll("{\n \n", "{\n");
1497 sjdl2.ReplaceAll("\n\n", "\n");
1498 sjdl2.ReplaceAll("OutputDirectory", "OutputDir");
1499 sjdl += "JDLVariables = \n{\n \"Packages\",\n \"OutputDir\"\n};\n";
1500 sjdl.Prepend(Form("Jobtag = {\n \"comment:%s\"\n};\n", fJobTag.Data()));
1501 index = sjdl.Index("JDLVariables");
1502 if (index >= 0) sjdl.Insert(index, "\n# JDL variables\n");
1503 sjdl += "Workdirectorysize = {\"5000MB\"};";
1504 sjdl1 += "Workdirectorysize = {\"5000MB\"};";
1505 sjdl1 += "JDLVariables = \n{\n \"Packages\",\n \"OutputDir\"\n};\n";
1506 index = fJobTag.Index(":");
1507 if (index < 0) index = fJobTag.Length();
1508 TString jobTag = fJobTag;
1509 if (fProductionMode) jobTag.Insert(index,"_Stage$4");
1510 sjdl1.Prepend(Form("Jobtag = {\n \"comment:%s_Merging\"\n};\n", jobTag.Data()));
1511 if (fProductionMode) {
1512 sjdl1.Prepend("# Generated merging jdl (production mode) \
1513 \n# $1 = full alien path to output directory to be merged \
1514 \n# $2 = train number \
1515 \n# $3 = production (like LHC10b) \
1516 \n# $4 = merging stage \
1517 \n# Stage_<n>.xml made via: find <OutputDir> *Stage<n-1>/*root_archive.zip\n");
1518 sjdl2.Prepend(Form("Jobtag = {\n \"comment:%s_FinalMerging\"\n};\n", jobTag.Data()));
1519 sjdl2.Prepend("# Generated merging jdl \
1520 \n# $1 = full alien path to output directory to be merged \
1521 \n# $2 = train number \
1522 \n# $3 = production (like LHC10b) \
1523 \n# $4 = merging stage \
1524 \n# Stage_<n>.xml made via: find <OutputDir> *Stage<n-1>/*root_archive.zip\n");
1526 sjdl1.Prepend("# Generated merging jdl \
1527 \n# $1 = full alien path to output directory to be merged \
1528 \n# $2 = merging stage \
1529 \n# xml made via: find <OutputDir> *Stage<n-1>/*root_archive.zip\n");
1530 sjdl2.Prepend(Form("Jobtag = {\n \"comment:%s_FinalMerging\"\n};\n", jobTag.Data()));
1531 sjdl2.Prepend("# Generated merging jdl \
1532 \n# $1 = full alien path to output directory to be merged \
1533 \n# $2 = merging stage \
1534 \n# xml made via: find <OutputDir> *Stage<n-1>/*root_archive.zip\n");
1536 index = sjdl1.Index("JDLVariables");
1537 if (index >= 0) sjdl1.Insert(index, "\n# JDL variables\n");
1538 index = sjdl2.Index("JDLVariables");
1539 if (index >= 0) sjdl2.Insert(index, "\n# JDL variables\n");
1540 sjdl1 += "Workdirectorysize = {\"5000MB\"};";
1541 sjdl2 += "Workdirectorysize = {\"5000MB\"};";
1542 index = sjdl2.Index("Split =");
1544 index1 = sjdl2.Index("\n", index);
1545 sjdl2.Remove(index, index1-index+1);
1547 index = sjdl2.Index("SplitMaxInputFileNumber");
1549 index1 = sjdl2.Index("\n", index);
1550 sjdl2.Remove(index, index1-index+1);
1552 index = sjdl2.Index("InputDataCollection");
1554 index1 = sjdl2.Index(";", index);
1555 sjdl2.Remove(index, index1-index+1);
1557 index = sjdl2.Index("InputDataListFormat");
1559 index1 = sjdl2.Index("\n", index);
1560 sjdl2.Remove(index, index1-index+1);
1562 index = sjdl2.Index("InputDataList");
1564 index1 = sjdl2.Index("\n", index);
1565 sjdl2.Remove(index, index1-index+1);
1567 sjdl2.ReplaceAll("wn.xml", Form("Stage_%s.xml",stageName.Data()));
1568 // Write jdl to file
1570 out.open(fJDLName.Data(), ios::out);
1572 Error("WriteJDL", "Bad file name: %s", fJDLName.Data());
1575 out << sjdl << endl;
1577 TString mergeJDLName = fExecutable;
1578 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
1581 out1.open(mergeJDLName.Data(), ios::out);
1583 Error("WriteJDL", "Bad file name: %s", mergeJDLName.Data());
1586 out1 << sjdl1 << endl;
1589 TString finalJDL = mergeJDLName;
1590 finalJDL.ReplaceAll(".jdl", "_final.jdl");
1591 out2.open(finalJDL.Data(), ios::out);
1593 Error("WriteJDL", "Bad file name: %s", finalJDL.Data());
1596 out2 << sjdl2 << endl;
1600 // Copy jdl to grid workspace
1602 Info("WriteJDL", "\n##### You may want to review jdl:%s and analysis macro:%s before running in <submit> mode", fJDLName.Data(), fAnalysisMacro.Data());
1604 TString locjdl = Form("%s/%s", fGridOutputDir.Data(),fJDLName.Data());
1605 TString locjdl1 = Form("%s/%s", fGridOutputDir.Data(),mergeJDLName.Data());
1606 TString finalJDL = mergeJDLName;
1607 finalJDL.ReplaceAll(".jdl", "_final.jdl");
1608 TString locjdl2 = Form("%s/%s", fGridOutputDir.Data(),finalJDL.Data());
1609 if (fProductionMode) {
1610 locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
1611 locjdl1 = Form("%s/%s", workdir.Data(),mergeJDLName.Data());
1612 locjdl2 = Form("%s/%s", workdir.Data(),finalJDL.Data());
1614 if (FileExists(locjdl)) gGrid->Rm(locjdl);
1615 if (FileExists(locjdl1)) gGrid->Rm(locjdl1);
1616 if (FileExists(locjdl2)) gGrid->Rm(locjdl2);
1617 Info("WriteJDL", "\n##### Copying JDL file <%s> to your AliEn output directory", fJDLName.Data());
1618 TFile::Cp(Form("file:%s",fJDLName.Data()), Form("alien://%s", locjdl.Data()));
1620 Info("WriteJDL", "\n##### Copying merging JDL files <%s> to your AliEn output directory", mergeJDLName.Data());
1621 TFile::Cp(Form("file:%s",mergeJDLName.Data()), Form("alien://%s", locjdl1.Data()));
1622 TFile::Cp(Form("file:%s",finalJDL.Data()), Form("alien://%s", locjdl2.Data()));
1628 //______________________________________________________________________________
1629 Bool_t AliAnalysisAlien::FileExists(const char *lfn)
1631 // Returns true if file exists.
1632 if (!gGrid) return kFALSE;
1634 slfn.ReplaceAll("alien://","");
1635 TGridResult *res = gGrid->Ls(slfn);
1636 if (!res) return kFALSE;
1637 TMap *map = dynamic_cast<TMap*>(res->At(0));
1642 TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("name"));
1643 if (!objs || !objs->GetString().Length()) {
1651 //______________________________________________________________________________
1652 Bool_t AliAnalysisAlien::DirectoryExists(const char *dirname)
1654 // Returns true if directory exists. Can be also a path.
1655 if (!gGrid) return kFALSE;
1656 // Check if dirname is a path
1657 TString dirstripped = dirname;
1658 dirstripped = dirstripped.Strip();
1659 dirstripped = dirstripped.Strip(TString::kTrailing, '/');
1660 TString dir = gSystem->BaseName(dirstripped);
1662 TString path = gSystem->DirName(dirstripped);
1663 TGridResult *res = gGrid->Ls(path, "-F");
1664 if (!res) return kFALSE;
1668 while ((map=dynamic_cast<TMap*>(next()))) {
1669 obj = map->GetValue("name");
1671 if (dir == obj->GetName()) {
1680 //______________________________________________________________________________
1681 void AliAnalysisAlien::CheckDataType(const char *lfn, Bool_t &isCollection, Bool_t &isXml, Bool_t &useTags)
1683 // Check input data type.
1684 isCollection = kFALSE;
1688 Error("CheckDataType", "No connection to grid");
1691 isCollection = IsCollection(lfn);
1692 TString msg = "\n##### file: ";
1695 msg += " type: raw_collection;";
1696 // special treatment for collections
1698 // check for tag files in the collection
1699 TGridResult *res = gGrid->Command(Form("listFilesFromCollection -z -v %s",lfn), kFALSE);
1701 msg += " using_tags: No (unknown)";
1702 Info("CheckDataType", "%s", msg.Data());
1705 const char* typeStr = res->GetKey(0, "origLFN");
1706 if (!typeStr || !strlen(typeStr)) {
1707 msg += " using_tags: No (unknown)";
1708 Info("CheckDataType", "%s", msg.Data());
1711 TString file = typeStr;
1712 useTags = file.Contains(".tag");
1713 if (useTags) msg += " using_tags: Yes";
1714 else msg += " using_tags: No";
1715 Info("CheckDataType", "%s", msg.Data());
1720 isXml = slfn.Contains(".xml");
1722 // Open xml collection and check if there are tag files inside
1723 msg += " type: xml_collection;";
1724 TGridCollection *coll = (TGridCollection*)gROOT->ProcessLine(Form("TAlienCollection::Open(\"alien://%s\",1);",lfn));
1726 msg += " using_tags: No (unknown)";
1727 Info("CheckDataType", "%s", msg.Data());
1730 TMap *map = coll->Next();
1732 msg += " using_tags: No (unknown)";
1733 Info("CheckDataType", "%s", msg.Data());
1736 map = (TMap*)map->GetValue("");
1738 if (map && map->GetValue("name")) file = map->GetValue("name")->GetName();
1739 useTags = file.Contains(".tag");
1741 if (useTags) msg += " using_tags: Yes";
1742 else msg += " using_tags: No";
1743 Info("CheckDataType", "%s", msg.Data());
1746 useTags = slfn.Contains(".tag");
1747 if (slfn.Contains(".root")) msg += " type: root file;";
1748 else msg += " type: unknown file;";
1749 if (useTags) msg += " using_tags: Yes";
1750 else msg += " using_tags: No";
1751 Info("CheckDataType", "%s", msg.Data());
1754 //______________________________________________________________________________
1755 void AliAnalysisAlien::EnablePackage(const char *package)
1757 // Enables a par file supposed to exist in the current directory.
1758 TString pkg(package);
1759 pkg.ReplaceAll(".par", "");
1761 if (gSystem->AccessPathName(pkg)) {
1762 Fatal("EnablePackage", "Package %s not found", pkg.Data());
1765 if (!TObject::TestBit(AliAnalysisGrid::kUsePars))
1766 Info("EnablePackage", "AliEn plugin will use .par packages");
1767 TObject::SetBit(AliAnalysisGrid::kUsePars, kTRUE);
1769 fPackages = new TObjArray();
1770 fPackages->SetOwner();
1772 fPackages->Add(new TObjString(pkg));
1775 //______________________________________________________________________________
1776 TChain *AliAnalysisAlien::GetChainForTestMode(const char *treeName) const
1778 // Make a tree from files having the location specified in fFileForTestMode.
1779 // Inspired from JF's CreateESDChain.
1780 if (fFileForTestMode.IsNull()) {
1781 Error("GetChainForTestMode", "For proof test mode please use SetFileForTestMode() pointing to a file that contains data file locations.");
1784 if (gSystem->AccessPathName(fFileForTestMode)) {
1785 Error("GetChainForTestMode", "File not found: %s", fFileForTestMode.Data());
1790 in.open(fFileForTestMode);
1792 // Read the input list of files and add them to the chain
1794 TChain *chain = new TChain(treeName);
1798 if (line.IsNull()) continue;
1799 if (count++ == fNtestFiles) break;
1800 TString esdFile(line);
1801 TFile *file = TFile::Open(esdFile);
1803 if (!file->IsZombie()) chain->Add(esdFile);
1806 Error("GetChainforTestMode", "Skipping un-openable file: %s", esdFile.Data());
1810 if (!chain->GetListOfFiles()->GetEntries()) {
1811 Error("GetChainForTestMode", "No file from %s could be opened", fFileForTestMode.Data());
1819 //______________________________________________________________________________
1820 const char *AliAnalysisAlien::GetJobStatus(Int_t jobidstart, Int_t lastid, Int_t &nrunning, Int_t &nwaiting, Int_t &nerror, Int_t &ndone)
1822 // Get job status for all jobs with jobid>jobidstart.
1823 static char mstatus[20];
1829 TGridJobStatusList *list = gGrid->Ps("");
1830 if (!list) return mstatus;
1831 Int_t nentries = list->GetSize();
1832 TGridJobStatus *status;
1834 for (Int_t ijob=0; ijob<nentries; ijob++) {
1835 status = (TGridJobStatus *)list->At(ijob);
1836 pid = gROOT->ProcessLine(Form("atoi(((TAlienJobStatus*)%p)->GetKey(\"queueId\"));", status));
1837 if (pid<jobidstart) continue;
1838 if (pid == lastid) {
1839 gROOT->ProcessLine(Form("sprintf((char*)%p,((TAlienJobStatus*)%p)->GetKey(\"status\"));",mstatus, status));
1841 switch (status->GetStatus()) {
1842 case TGridJobStatus::kWAITING:
1844 case TGridJobStatus::kRUNNING:
1846 case TGridJobStatus::kABORTED:
1847 case TGridJobStatus::kFAIL:
1848 case TGridJobStatus::kUNKNOWN:
1850 case TGridJobStatus::kDONE:
1859 //______________________________________________________________________________
1860 Bool_t AliAnalysisAlien::IsCollection(const char *lfn) const
1862 // Returns true if file is a collection. Functionality duplicated from
1863 // TAlien::Type() because we don't want to directly depend on TAlien.
1865 Error("IsCollection", "No connection to grid");
1868 TGridResult *res = gGrid->Command(Form("type -z %s",lfn),kFALSE);
1869 if (!res) return kFALSE;
1870 const char* typeStr = res->GetKey(0, "type");
1871 if (!typeStr || !strlen(typeStr)) return kFALSE;
1872 if (!strcmp(typeStr, "collection")) return kTRUE;
1877 //______________________________________________________________________________
1878 Bool_t AliAnalysisAlien::IsSingleOutput() const
1880 // Check if single-ouput option is on.
1881 return (!fOutputSingle.IsNull());
1884 //______________________________________________________________________________
1885 void AliAnalysisAlien::Print(Option_t *) const
1887 // Print current plugin settings.
1888 printf("### AliEn analysis plugin current settings ###\n");
1889 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
1890 if (mgr && mgr->IsProofMode()) {
1891 TString proofType = "= PLUGIN IN PROOF MODE ON CLUSTER:_________________";
1892 if (TestBit(AliAnalysisGrid::kTest))
1893 proofType = "= PLUGIN IN PROOF LITE MODE ON CLUSTER:____________";
1894 printf("%s %s\n", proofType.Data(), fProofCluster.Data());
1895 if (!fProofDataSet.IsNull())
1896 printf("= Requested data set:___________________________ %s\n", fProofDataSet.Data());
1898 printf("= Soft reset signal will be send to master______ CHANGE BEHAVIOR AFTER COMPLETION\n");
1900 printf("= Hard reset signal will be send to master______ CHANGE BEHAVIOR AFTER COMPLETION\n");
1901 if (!fRootVersionForProof.IsNull())
1902 printf("= ROOT version requested________________________ %s\n", fRootVersionForProof.Data());
1904 printf("= ROOT version requested________________________ default\n");
1905 printf("= AliRoot version requested_____________________ %s\n", fAliROOTVersion.Data());
1906 if (!fAliRootMode.IsNull())
1907 printf("= Requested AliRoot mode________________________ %s\n", fAliRootMode.Data());
1909 printf("= Number of PROOF workers limited to____________ %d\n", fNproofWorkers);
1910 if (fNproofWorkersPerSlave)
1911 printf("= Maximum number of workers per slave___________ %d\n", fNproofWorkersPerSlave);
1912 if (TestSpecialBit(kClearPackages))
1913 printf("= ClearPackages requested...\n");
1914 if (fIncludePath.Data())
1915 printf("= Include path for runtime task compilation: ___ %s\n", fIncludePath.Data());
1916 printf("= Additional libs to be loaded or souces to be compiled runtime: <%s>\n",fAdditionalLibs.Data());
1917 if (fPackages && fPackages->GetEntries()) {
1918 TIter next(fPackages);
1921 while ((obj=next())) list += obj->GetName();
1922 printf("= Par files to be used: ________________________ %s\n", list.Data());
1924 if (TestSpecialBit(kProofConnectGrid))
1925 printf("= Requested PROOF connection to grid\n");
1928 printf("= OverwriteMode:________________________________ %d\n", fOverwriteMode);
1929 if (fOverwriteMode) {
1930 printf("***** NOTE: Overwrite mode will overwrite the input generated datasets and partial results from previous analysis. \
1931 \n***** To disable, use: plugin->SetOverwriteMode(kFALSE);\n");
1933 printf("= Copy files to grid: __________________________ %s\n", (IsUseCopy())?"YES":"NO");
1934 printf("= Check if files can be copied to grid: ________ %s\n", (IsCheckCopy())?"YES":"NO");
1935 printf("= Production mode:______________________________ %d\n", fProductionMode);
1936 printf("= Version of API requested: ____________________ %s\n", fAPIVersion.Data());
1937 printf("= Version of ROOT requested: ___________________ %s\n", fROOTVersion.Data());
1938 printf("= Version of AliRoot requested: ________________ %s\n", fAliROOTVersion.Data());
1940 printf("= User running the plugin: _____________________ %s\n", fUser.Data());
1941 printf("= Grid workdir relative to user $HOME: _________ %s\n", fGridWorkingDir.Data());
1942 printf("= Grid output directory relative to workdir: ___ %s\n", fGridOutputDir.Data());
1943 printf("= Data base directory path requested: __________ %s\n", fGridDataDir.Data());
1944 printf("= Data search pattern: _________________________ %s\n", fDataPattern.Data());
1945 printf("= Input data format: ___________________________ %s\n", fInputFormat.Data());
1946 if (fRunNumbers.Length())
1947 printf("= Run numbers to be processed: _________________ %s\n", fRunNumbers.Data());
1949 printf("= Run range to be processed: ___________________ %d-%d\n", fRunRange[0], fRunRange[1]);
1950 if (!fRunRange[0] && !fRunNumbers.Length()) {
1951 TIter next(fInputFiles);
1954 while ((obj=next())) list += obj->GetName();
1955 printf("= Input files to be processed: _________________ %s\n", list.Data());
1957 if (TestBit(AliAnalysisGrid::kTest))
1958 printf("= Number of input files used in test mode: _____ %d\n", fNtestFiles);
1959 printf("= List of output files to be registered: _______ %s\n", fOutputFiles.Data());
1960 printf("= List of outputs going to be archived: ________ %s\n", fOutputArchive.Data());
1961 printf("= List of outputs that should not be merged: ___ %s\n", fMergeExcludes.Data());
1962 printf("= List of outputs produced during Terminate: ___ %s\n", fTerminateFiles.Data());
1963 printf("=====================================================================\n");
1964 printf("= Job price: ___________________________________ %d\n", fPrice);
1965 printf("= Time to live (TTL): __________________________ %d\n", fTTL);
1966 printf("= Max files per subjob: ________________________ %d\n", fSplitMaxInputFileNumber);
1967 if (fMaxInitFailed>0)
1968 printf("= Max number of subjob fails to kill: __________ %d\n", fMaxInitFailed);
1969 if (fMasterResubmitThreshold>0)
1970 printf("= Resubmit master job if failed subjobs >_______ %d\n", fMasterResubmitThreshold);
1971 printf("= Number of replicas for the output files_______ %d\n", fNreplicas);
1972 if (fNrunsPerMaster>0)
1973 printf("= Number of runs per master job: _______________ %d\n", fNrunsPerMaster);
1974 printf("= Number of files in one chunk to be merged: ___ %d\n", fMaxMergeFiles);
1975 printf("= Name of the generated execution script: ______ %s\n", fExecutable.Data());
1976 printf("= Executable command: __________________________ %s\n", fExecutableCommand.Data());
1977 if (fArguments.Length())
1978 printf("= Arguments for the execution script: __________ %s\n",fArguments.Data());
1979 if (fExecutableArgs.Length())
1980 printf("= Arguments after macro name in executable______ %s\n",fExecutableArgs.Data());
1981 printf("= Name of the generated analysis macro: ________ %s\n",fAnalysisMacro.Data());
1982 printf("= User analysis files to be deployed: __________ %s\n",fAnalysisSource.Data());
1983 printf("= Additional libs to be loaded or souces to be compiled runtime: <%s>\n",fAdditionalLibs.Data());
1984 printf("= Master jobs split mode: ______________________ %s\n",fSplitMode.Data());
1986 printf("= Custom name for the dataset to be created: ___ %s\n", fDatasetName.Data());
1987 printf("= Name of the generated JDL: ___________________ %s\n", fJDLName.Data());
1988 if (fIncludePath.Data())
1989 printf("= Include path for runtime task compilation: ___ %s\n", fIncludePath.Data());
1990 if (fCloseSE.Length())
1991 printf("= Force job outputs to storage element: ________ %s\n", fCloseSE.Data());
1992 if (fFriendChainName.Length())
1993 printf("= Open friend chain file on worker: ____________ %s\n", fFriendChainName.Data());
1994 if (fPackages && fPackages->GetEntries()) {
1995 TIter next(fPackages);
1998 while ((obj=next())) list += obj->GetName();
1999 printf("= Par files to be used: ________________________ %s\n", list.Data());
2003 //______________________________________________________________________________
2004 void AliAnalysisAlien::SetDefaults()
2006 // Set default values for everything. What cannot be filled will be left empty.
2007 if (fGridJDL) delete fGridJDL;
2008 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
2009 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
2012 fSplitMaxInputFileNumber = 100;
2014 fMasterResubmitThreshold = 0;
2020 fNrunsPerMaster = 1;
2021 fMaxMergeFiles = 100;
2023 fExecutable = "analysis.sh";
2024 fExecutableCommand = "root -b -q";
2026 fExecutableArgs = "";
2027 fAnalysisMacro = "myAnalysis.C";
2028 fAnalysisSource = "";
2029 fAdditionalLibs = "";
2033 fAliROOTVersion = "";
2034 fUser = ""; // Your alien user name
2035 fGridWorkingDir = "";
2036 fGridDataDir = ""; // Can be like: /alice/sim/PDC_08a/LHC08c9/
2037 fDataPattern = "*AliESDs.root"; // Can be like: *AliESDs.root, */pass1/*AliESDs.root, ...
2038 fFriendChainName = "";
2039 fGridOutputDir = "output";
2040 fOutputArchive = "log_archive.zip:std*@disk=1 root_archive.zip:*.root@disk=2";
2041 fOutputFiles = ""; // Like "AliAODs.root histos.root"
2042 fInputFormat = "xml-single";
2043 fJDLName = "analysis.jdl";
2044 fJobTag = "Automatically generated analysis JDL";
2045 fMergeExcludes = "";
2048 SetCheckCopy(kTRUE);
2049 SetDefaultOutputs(kTRUE);
2053 //______________________________________________________________________________
2054 Bool_t AliAnalysisAlien::CheckMergedFiles(const char *filename, const char *aliendir, Int_t nperchunk, const char *jdl)
2056 // Checks current merge stage, makes xml for the next stage, counts number of files, submits next stage.
2057 // First check if the result is already in the output directory.
2058 if (FileExists(Form("%s/%s",aliendir,filename))) {
2059 printf("Final merged results found. Not merging again.\n");
2062 // Now check the last stage done.
2065 if (!FileExists(Form("%s/Stage_%d.xml",aliendir, stage+1))) break;
2068 // Next stage of merging
2070 TString pattern = "*root_archive.zip";
2071 if (stage>1) pattern = Form("Stage_%d/*root_archive.zip", stage-1);
2072 TGridResult *res = gGrid->Command(Form("find -x Stage_%d %s %s", stage, aliendir, pattern.Data()));
2073 if (res) delete res;
2074 // Write standard output to file
2075 gROOT->ProcessLine(Form("gGrid->Stdout(); > %s", Form("Stage_%d.xml",stage)));
2076 // Count the number of files inside
2078 ifile.open(Form("Stage_%d.xml",stage));
2079 if (!ifile.good()) {
2080 ::Error("CheckMergedFiles", "Could not redirect result of the find command to file %s", Form("Stage_%d.xml",stage));
2085 while (!ifile.eof()) {
2087 if (line.Contains("/event")) nfiles++;
2091 ::Error("CheckMergedFiles", "Cannot start Stage_%d merging since Stage_%d did not produced yet output", stage, stage-1);
2094 printf("=== Stage_%d produced %d files\n", stage-1, nfiles);
2096 // Copy the file in the output directory
2097 printf("===> Copying collection %s in the output directory %s\n", Form("Stage_%d.xml",stage), aliendir);
2098 TFile::Cp(Form("Stage_%d.xml",stage), Form("alien://%s/Stage_%d.xml",aliendir,stage));
2099 // Check if this is the last stage to be done.
2100 Bool_t laststage = (nfiles<nperchunk);
2101 if (fMaxMergeStages && stage>=fMaxMergeStages) laststage = kTRUE;
2103 printf("### Submiting final merging stage %d\n", stage);
2104 TString finalJDL = jdl;
2105 finalJDL.ReplaceAll(".jdl", "_final.jdl");
2106 TString query = Form("submit %s %s %d", finalJDL.Data(), aliendir, stage);
2107 Int_t jobId = SubmitSingleJob(query);
2108 if (!jobId) return kFALSE;
2110 printf("### Submiting merging stage %d\n", stage);
2111 TString query = Form("submit %s %s %d", jdl, aliendir, stage);
2112 Int_t jobId = SubmitSingleJob(query);
2113 if (!jobId) return kFALSE;
2118 //______________________________________________________________________________
2119 Int_t AliAnalysisAlien::SubmitSingleJob(const char *query)
2121 // Submits a single job corresponding to the query and returns job id. If 0 submission failed.
2122 if (!gGrid) return 0;
2123 printf("=> %s ------> ",query);
2124 TGridResult *res = gGrid->Command(query);
2126 TString jobId = res->GetKey(0,"jobId");
2128 if (jobId.IsNull()) {
2129 printf("submission failed. Reason:\n");
2132 ::Error("SubmitSingleJob", "Your query %s could not be submitted", query);
2135 printf(" Job id: %s\n", jobId.Data());
2139 //______________________________________________________________________________
2140 Bool_t AliAnalysisAlien::MergeOutput(const char *output, const char *basedir, Int_t nmaxmerge, Int_t stage)
2142 // Merge given output files from basedir. Basedir can be an alien output directory
2143 // but also an xml file with root_archive.zip locations. The file merger will merge nmaxmerge
2144 // files in a group (ignored for xml input). Merging can be done in stages:
2145 // stage=0 : will merge all existing files in a single stage, supporting resume if run locally
2146 // stage=1 : works with an xml of all root_archive.zip in the output directory
2147 // stage>1 : works with an xml of all root_archive.zip in the Stage_<n-1> directory
2148 TString outputFile = output;
2150 TString outputChunk;
2151 TString previousChunk = "";
2152 TObjArray *listoffiles = new TObjArray();
2153 // listoffiles->SetOwner();
2154 Int_t countChunk = 0;
2155 Int_t countZero = nmaxmerge;
2156 Bool_t merged = kTRUE;
2157 Int_t index = outputFile.Index("@");
2158 if (index > 0) outputFile.Remove(index);
2159 TString inputFile = outputFile;
2160 TString sbasedir = basedir;
2161 if (sbasedir.Contains(".xml")) {
2162 // Merge files pointed by the xml - ignore nmaxmerge and set ichunk to 0
2163 nmaxmerge = 9999999;
2164 TGridCollection *coll = (TGridCollection*)gROOT->ProcessLine(Form("TAlienCollection::Open(\"%s\");", basedir));
2166 ::Error("MergeOutput", "Input XML collection empty.");
2169 // Iterate grid collection
2170 while (coll->Next()) {
2171 TString fname = gSystem->DirName(coll->GetTURL());
2174 listoffiles->Add(new TNamed(fname.Data(),""));
2177 command = Form("find %s/ *%s", basedir, inputFile.Data());
2178 printf("command: %s\n", command.Data());
2179 TGridResult *res = gGrid->Command(command);
2181 ::Error("MergeOutput","No result for the find command\n");
2187 while ((map=(TMap*)nextmap())) {
2188 TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("turl"));
2189 if (!objs || !objs->GetString().Length()) {
2190 // Nothing found - skip this output
2195 listoffiles->Add(new TNamed(objs->GetName(),""));
2199 if (!listoffiles->GetEntries()) {
2200 ::Error("MergeOutput","No result for the find command\n");
2205 TFileMerger *fm = 0;
2206 TIter next0(listoffiles);
2207 TObjArray *listoffilestmp = new TObjArray();
2208 listoffilestmp->SetOwner();
2211 // Keep only the files at upper level
2212 Int_t countChar = 0;
2213 while ((nextfile=next0())) {
2214 snextfile = nextfile->GetName();
2215 Int_t crtCount = snextfile.CountChar('/');
2216 if (nextfile == listoffiles->First()) countChar = crtCount;
2217 if (crtCount < countChar) countChar = crtCount;
2220 while ((nextfile=next0())) {
2221 snextfile = nextfile->GetName();
2222 Int_t crtCount = snextfile.CountChar('/');
2223 if (crtCount > countChar) {
2227 listoffilestmp->Add(nextfile);
2230 listoffiles = listoffilestmp; // Now contains 'good' files
2231 listoffiles->Print();
2232 TIter next(listoffiles);
2233 // Check if there is a merge operation to resume. Works only for stage 0 or 1.
2234 outputChunk = outputFile;
2235 outputChunk.ReplaceAll(".root", "_*.root");
2236 // Check for existent temporary merge files
2237 // Check overwrite mode and remove previous partial results if needed
2238 // Preserve old merging functionality for stage 0.
2240 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
2242 // Skip as many input files as in a chunk
2243 for (Int_t counter=0; counter<nmaxmerge; counter++) {
2246 ::Error("MergeOutput", "Mismatch found. Please remove partial merged files from local dir.");
2250 snextfile = nextfile->GetName();
2252 outputChunk = outputFile;
2253 outputChunk.ReplaceAll(".root", Form("_%04d.root", countChunk));
2255 if (gSystem->AccessPathName(outputChunk)) continue;
2256 // Merged file with chunks up to <countChunk> found
2257 ::Info("MergeOutput", "Resume merging of <%s> from <%s>\n", outputFile.Data(), outputChunk.Data());
2258 previousChunk = outputChunk;
2262 countZero = nmaxmerge;
2264 while ((nextfile=next())) {
2265 snextfile = nextfile->GetName();
2266 // Loop 'find' results and get next LFN
2267 if (countZero == nmaxmerge) {
2268 // First file in chunk - create file merger and add previous chunk if any.
2269 fm = new TFileMerger(kFALSE);
2270 fm->SetFastMethod(kTRUE);
2271 if (previousChunk.Length()) fm->AddFile(previousChunk.Data());
2272 outputChunk = outputFile;
2273 outputChunk.ReplaceAll(".root", Form("_%04d.root", countChunk));
2275 // If last file found, put merged results in the output file
2276 if (nextfile == listoffiles->Last()) outputChunk = outputFile;
2277 // Add file to be merged and decrement chunk counter.
2278 fm->AddFile(snextfile);
2280 if (countZero==0 || nextfile == listoffiles->Last()) {
2281 if (!fm->GetMergeList() || !fm->GetMergeList()->GetSize()) {
2282 // Nothing found - skip this output
2283 ::Warning("MergeOutput", "No <%s> files found.", inputFile.Data());
2287 fm->OutputFile(outputChunk);
2288 // Merge the outputs, then go to next chunk
2290 ::Error("MergeOutput", "Could not merge all <%s> files", outputFile.Data());
2294 ::Info("MergeOutputs", "\n##### Merged %d output files to <%s>", fm->GetMergeList()->GetSize(), outputChunk.Data());
2295 gSystem->Unlink(previousChunk);
2297 if (nextfile == listoffiles->Last()) break;
2299 countZero = nmaxmerge;
2300 previousChunk = outputChunk;
2307 // Merging stage different than 0.
2308 // Move to the begining of the requested chunk.
2309 fm = new TFileMerger(kFALSE);
2310 fm->SetFastMethod(kTRUE);
2311 while ((nextfile=next())) fm->AddFile(nextfile->GetName());
2313 if (!fm->GetMergeList() || !fm->GetMergeList()->GetSize()) {
2314 // Nothing found - skip this output
2315 ::Warning("MergeOutput", "No <%s> files found.", inputFile.Data());
2319 fm->OutputFile(outputFile);
2320 // Merge the outputs
2322 ::Error("MergeOutput", "Could not merge all <%s> files", outputFile.Data());
2326 ::Info("MergeOutput", "\n##### Merged %d output files to <%s>", fm->GetMergeList()->GetSize(), outputFile.Data());
2332 //______________________________________________________________________________
2333 Bool_t AliAnalysisAlien::MergeOutputs()
2335 // Merge analysis outputs existing in the AliEn space.
2336 if (TestBit(AliAnalysisGrid::kTest)) return kTRUE;
2337 if (TestBit(AliAnalysisGrid::kOffline)) return kFALSE;
2339 Error("MergeOutputs", "Cannot merge outputs without grid connection. Terminate will NOT be executed");
2343 if (!TestBit(AliAnalysisGrid::kMerge)) {
2344 Info("MergeOutputs", "### Re-run with <MergeViaJDL> option in terminate mode of the plugin to submit merging jobs ###");
2347 if (fProductionMode) {
2348 Info("MergeOutputs", "### Merging will be submitted by LPM manager... ###");
2351 Info("MergeOutputs", "Submitting merging JDL");
2352 if (!SubmitMerging()) return kFALSE;
2353 Info("MergeOutputs", "### Re-run with <MergeViaJDL> off to collect results after merging jobs are done ###");
2354 Info("MergeOutputs", "### The Terminate() method is executed by the merging jobs");
2357 // Get the output path
2358 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
2359 if (!DirectoryExists(fGridOutputDir)) {
2360 Error("MergeOutputs", "Grid output directory %s not found. Terminate() will NOT be executed", fGridOutputDir.Data());
2363 if (!fOutputFiles.Length()) {
2364 Error("MergeOutputs", "No output file names defined. Are you running the right AliAnalysisAlien configuration ?");
2367 // Check if fast read option was requested
2368 Info("MergeOutputs", "Started local merging of output files from: alien://%s \
2369 \n======= overwrite mode = %d", fGridOutputDir.Data(), (Int_t)fOverwriteMode);
2370 if (fFastReadOption) {
2371 Warning("MergeOutputs", "You requested FastRead option. Using xrootd flags to reduce timeouts. This may skip some files that could be accessed ! \
2372 \n+++ NOTE: To disable this option, use: plugin->SetFastReadOption(kFALSE)");
2373 gEnv->SetValue("XNet.ConnectTimeout",50);
2374 gEnv->SetValue("XNet.RequestTimeout",50);
2375 gEnv->SetValue("XNet.MaxRedirectCount",2);
2376 gEnv->SetValue("XNet.ReconnectTimeout",50);
2377 gEnv->SetValue("XNet.FirstConnectMaxCnt",1);
2379 // Make sure we change the temporary directory
2380 gSystem->Setenv("TMPDIR", gSystem->pwd());
2381 // Set temporary compilation directory to current one
2382 gSystem->SetBuildDir(gSystem->pwd(), kTRUE);
2383 TObjArray *list = fOutputFiles.Tokenize(",");
2387 Bool_t merged = kTRUE;
2388 while((str=(TObjString*)next())) {
2389 outputFile = str->GetString();
2390 Int_t index = outputFile.Index("@");
2391 if (index > 0) outputFile.Remove(index);
2392 TString outputChunk = outputFile;
2393 outputChunk.ReplaceAll(".root", "_*.root");
2394 // Skip already merged outputs
2395 if (!gSystem->AccessPathName(outputFile)) {
2396 if (fOverwriteMode) {
2397 Info("MergeOutputs", "Overwrite mode. Existing file %s was deleted.", outputFile.Data());
2398 gSystem->Unlink(outputFile);
2399 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
2400 Info("MergeOutput", "Overwrite mode: partial merged files %s will removed",
2401 outputChunk.Data());
2402 gSystem->Exec(Form("rm -f %s", outputChunk.Data()));
2405 Info("MergeOutputs", "Output file <%s> found. Not merging again.", outputFile.Data());
2409 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
2410 Info("MergeOutput", "Overwrite mode: partial merged files %s will removed",
2411 outputChunk.Data());
2412 gSystem->Exec(Form("rm -f %s", outputChunk.Data()));
2415 if (fMergeExcludes.Length() &&
2416 fMergeExcludes.Contains(outputFile.Data())) continue;
2417 // Perform a 'find' command in the output directory, looking for registered outputs
2418 merged = MergeOutput(outputFile, fGridOutputDir, fMaxMergeFiles);
2420 Error("MergeOutputs", "Terminate() will NOT be executed");
2423 TFile *fileOpened = (TFile*)gROOT->GetListOfFiles()->FindObject(outputFile);
2424 if (fileOpened) fileOpened->Close();
2429 //______________________________________________________________________________
2430 void AliAnalysisAlien::SetDefaultOutputs(Bool_t flag)
2432 // Use the output files connected to output containers from the analysis manager
2433 // rather than the files defined by SetOutputFiles
2434 if (flag && !TObject::TestBit(AliAnalysisGrid::kDefaultOutputs))
2435 Info("SetDefaultOutputs", "Plugin will use the output files taken from analysis manager");
2436 TObject::SetBit(AliAnalysisGrid::kDefaultOutputs, flag);
2439 //______________________________________________________________________________
2440 void AliAnalysisAlien::SetOutputFiles(const char *list)
2442 // Manually set the output files list.
2443 // Removes duplicates. Not allowed if default outputs are not disabled.
2444 if (TObject::TestBit(AliAnalysisGrid::kDefaultOutputs)) {
2445 Fatal("SetOutputFiles", "You have to explicitly call SetDefaultOutputs(kFALSE) to manually set output files.");
2448 Info("SetOutputFiles", "Output file list is set manually - you are on your own.");
2450 TString slist = list;
2451 if (slist.Contains("@")) Warning("SetOutputFiles","The plugin does not allow explicit SE's. Please use: SetNumberOfReplicas() instead.");
2452 TObjArray *arr = slist.Tokenize(" ");
2456 while ((os=(TObjString*)next())) {
2457 sout = os->GetString();
2458 if (sout.Index("@")>0) sout.Remove(sout.Index("@"));
2459 if (fOutputFiles.Contains(sout)) continue;
2460 if (!fOutputFiles.IsNull()) fOutputFiles += ",";
2461 fOutputFiles += sout;
2466 //______________________________________________________________________________
2467 void AliAnalysisAlien::SetOutputArchive(const char *list)
2469 // Manually set the output archive list. Free text - you are on your own...
2470 // Not allowed if default outputs are not disabled.
2471 if (TObject::TestBit(AliAnalysisGrid::kDefaultOutputs)) {
2472 Fatal("SetOutputArchive", "You have to explicitly call SetDefaultOutputs(kFALSE) to manually set the output archives.");
2475 Info("SetOutputArchive", "Output archive is set manually - you are on your own.");
2476 fOutputArchive = list;
2479 //______________________________________________________________________________
2480 void AliAnalysisAlien::SetPreferedSE(const char */*se*/)
2482 // Setting a prefered output SE is not allowed anymore.
2483 Warning("SetPreferedSE", "Setting a preferential SE is not allowed anymore via the plugin. Use SetNumberOfReplicas() and SetDefaultOutputs()");
2486 //______________________________________________________________________________
2487 void AliAnalysisAlien::SetProofParameter(const char *pname, const char *value)
2489 // Set some PROOF special parameter.
2490 TPair *pair = dynamic_cast<TPair*>(fProofParam.FindObject(pname));
2492 TObject *old = pair->Key();
2493 TObject *val = pair->Value();
2494 fProofParam.Remove(old);
2498 fProofParam.Add(new TObjString(pname), new TObjString(value));
2501 //______________________________________________________________________________
2502 const char *AliAnalysisAlien::GetProofParameter(const char *pname) const
2504 // Returns a special PROOF parameter.
2505 TPair *pair = dynamic_cast<TPair*>(fProofParam.FindObject(pname));
2506 if (!pair) return 0;
2507 return pair->Value()->GetName();
2510 //______________________________________________________________________________
2511 Bool_t AliAnalysisAlien::StartAnalysis(Long64_t /*nentries*/, Long64_t /*firstEntry*/)
2513 // Start remote grid analysis.
2514 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
2515 Bool_t testMode = TestBit(AliAnalysisGrid::kTest);
2516 if (!mgr || !mgr->IsInitialized()) {
2517 Error("StartAnalysis", "You need an initialized analysis manager for this");
2520 // Are we in PROOF mode ?
2521 if (mgr->IsProofMode()) {
2522 if (testMode) Info("StartAnalysis", "##### Starting PROOF analysis with Proof Lite via the plugin #####");
2523 else Info("StartAnalysis", "##### Starting PROOF analysis on cluster <%s> via the plugin #####", fProofCluster.Data());
2524 if (fProofCluster.IsNull()) {
2525 Error("StartAnalysis", "You need to specify the proof cluster name via SetProofCluster");
2528 if (fProofDataSet.IsNull() && !testMode) {
2529 Error("StartAnalysis", "You need to specify a dataset using SetProofDataSet()");
2532 // Set the needed environment
2533 gEnv->SetValue("XSec.GSI.DelegProxy","2");
2534 // Do we need to reset PROOF ? The success of the Reset operation cannot be checked
2535 if (fProofReset && !testMode) {
2536 if (fProofReset==1) {
2537 Info("StartAnalysis", "Sending soft reset signal to proof cluster %s", fProofCluster.Data());
2538 gROOT->ProcessLine(Form("TProof::Reset(\"%s\", kFALSE);", fProofCluster.Data()));
2540 Info("StartAnalysis", "Sending hard reset signal to proof cluster %s", fProofCluster.Data());
2541 gROOT->ProcessLine(Form("TProof::Reset(\"%s\", kTRUE);", fProofCluster.Data()));
2543 Info("StartAnalysis", "Stopping the analysis. Please use SetProofReset(0) to resume.");
2548 // Check if there is an old active session
2549 Long_t nsessions = gROOT->ProcessLine(Form("TProof::Mgr(\"%s\")->QuerySessions(\"\")->GetEntries();", fProofCluster.Data()));
2551 Error("StartAnalysis","You have to reset your old session first\n");
2555 // Do we need to change the ROOT version ? The success of this cannot be checked.
2556 if (!fRootVersionForProof.IsNull() && !testMode) {
2557 gROOT->ProcessLine(Form("TProof::Mgr(\"%s\")->SetROOTVersion(\"%s\");",
2558 fProofCluster.Data(), fRootVersionForProof.Data()));
2560 // Connect to PROOF and check the status
2563 if (fNproofWorkersPerSlave) sworkers = Form("workers=%dx", fNproofWorkersPerSlave);
2564 else if (fNproofWorkers) sworkers = Form("workers=%d", fNproofWorkers);
2566 if (!sworkers.IsNull())
2567 proof = gROOT->ProcessLine(Form("TProof::Open(\"%s\", \"%s\");", fProofCluster.Data(), sworkers.Data()));
2569 proof = gROOT->ProcessLine(Form("TProof::Open(\"%s\");", fProofCluster.Data()));
2571 proof = gROOT->ProcessLine("TProof::Open(\"\");");
2573 Error("StartAnalysis", "Could not start PROOF in test mode");
2578 Error("StartAnalysis", "Could not connect to PROOF cluster <%s>", fProofCluster.Data());
2581 if (fNproofWorkersPerSlave*fNproofWorkers > 0)
2582 gROOT->ProcessLine(Form("gProof->SetParallel(%d);", fNproofWorkers));
2583 // Set proof special parameters if any
2584 TIter nextpp(&fProofParam);
2585 TObject *proofparam;
2586 while ((proofparam=nextpp())) {
2587 TString svalue = GetProofParameter(proofparam->GetName());
2588 gROOT->ProcessLine(Form("gProof->SetParameter(\"%s\",%s);", proofparam->GetName(), svalue.Data()));
2590 // Is dataset existing ?
2592 TString dataset = fProofDataSet;
2593 Int_t index = dataset.Index("#");
2594 if (index>=0) dataset.Remove(index);
2595 // if (!gROOT->ProcessLine(Form("gProof->ExistsDataSet(\"%s\");",fProofDataSet.Data()))) {
2596 // Error("StartAnalysis", "Dataset %s not existing", fProofDataSet.Data());
2599 // Info("StartAnalysis", "Dataset %s found", dataset.Data());
2601 // Is ClearPackages() needed ?
2602 if (TestSpecialBit(kClearPackages)) {
2603 Info("StartAnalysis", "ClearPackages signal sent to PROOF. Use SetClearPackages(kFALSE) to reset this.");
2604 gROOT->ProcessLine("gProof->ClearPackages();");
2606 // Is a given aliroot mode requested ?
2609 if (!fAliRootMode.IsNull()) {
2610 TString alirootMode = fAliRootMode;
2611 if (alirootMode == "default") alirootMode = "";
2612 Info("StartAnalysis", "You are requesting AliRoot mode: %s", fAliRootMode.Data());
2613 optionsList.SetOwner();
2614 optionsList.Add(new TNamed("ALIROOT_MODE", alirootMode.Data()));
2615 // Check the additional libs to be loaded
2617 Bool_t parMode = kFALSE;
2618 if (!alirootMode.IsNull()) extraLibs = "ANALYSIS:ANALYSISalice";
2619 // Parse the extra libs for .so
2620 if (fAdditionalLibs.Length()) {
2621 TObjArray *list = fAdditionalLibs.Tokenize(" ");
2624 while((str=(TObjString*)next())) {
2625 if (str->GetString().Contains(".so")) {
2627 Warning("StartAnalysis", "Plugin does not support loading libs after par files in PROOF mode. Library %s and following will not load on workers", str->GetName());
2630 TString stmp = str->GetName();
2631 if (stmp.BeginsWith("lib")) stmp.Remove(0,3);
2632 stmp.ReplaceAll(".so","");
2633 if (!extraLibs.IsNull()) extraLibs += ":";
2637 if (str->GetString().Contains(".par")) {
2638 // The first par file found in the list will not allow any further .so
2640 if (!parLibs.IsNull()) parLibs += ":";
2641 parLibs += str->GetName();
2645 if (list) delete list;
2647 if (!extraLibs.IsNull()) {
2648 Info("StartAnalysis", "Adding extra libs: %s",extraLibs.Data());
2649 optionsList.Add(new TNamed("ALIROOT_EXTRA_LIBS",extraLibs.Data()));
2651 // Check extra includes
2652 if (!fIncludePath.IsNull()) {
2653 TString includePath = fIncludePath;
2654 includePath.ReplaceAll(" ",":");
2655 includePath.ReplaceAll("$ALICE_ROOT/","");
2656 includePath.ReplaceAll("${ALICE_ROOT}/","");
2657 includePath.ReplaceAll("-I","");
2658 includePath.Remove(TString::kTrailing, ':');
2659 Info("StartAnalysis", "Adding extra includes: %s",includePath.Data());
2660 optionsList.Add(new TNamed("ALIROOT_EXTRA_INCLUDES",includePath.Data()));
2662 // Check if connection to grid is requested
2663 if (TestSpecialBit(kProofConnectGrid))
2664 optionsList.Add(new TNamed("ALIROOT_ENABLE_ALIEN", "1"));
2665 // Enable AliRoot par
2667 // Enable proof lite package
2668 TString alirootLite = gSystem->ExpandPathName("$ALICE_ROOT/ANALYSIS/macros/AliRootProofLite.par");
2669 for (Int_t i=0; i<optionsList.GetSize(); i++) {
2670 TNamed *obj = (TNamed*)optionsList.At(i);
2671 printf("%s %s\n", obj->GetName(), obj->GetTitle());
2673 if (!gROOT->ProcessLine(Form("gProof->UploadPackage(\"%s\");",alirootLite.Data()))
2674 && !gROOT->ProcessLine(Form("gProof->EnablePackage(\"%s\", (TList*)%p);",alirootLite.Data(),&optionsList))) {
2675 Info("StartAnalysis", "AliRootProofLite enabled");
2677 Error("StartAnalysis", "There was an error trying to enable package AliRootProofLite.par");
2681 if ( ! fAliROOTVersion.IsNull() ) {
2682 if (gROOT->ProcessLine(Form("gProof->EnablePackage(\"VO_ALICE@AliRoot::%s\", (TList*)%p, kTRUE);",
2683 fAliROOTVersion.Data(), &optionsList))) {
2684 Error("StartAnalysis", "There was an error trying to enable package VO_ALICE@AliRoot::%s", fAliROOTVersion.Data());
2689 // Enable first par files from fAdditionalLibs
2690 if (!parLibs.IsNull()) {
2691 TObjArray *list = parLibs.Tokenize(":");
2693 TObjString *package;
2694 while((package=(TObjString*)next())) {
2695 TString spkg = package->GetName();
2696 spkg.ReplaceAll(".par", "");
2697 gSystem->Exec(TString::Format("rm -rf %s", spkg.Data()));
2698 if (!gROOT->ProcessLine(Form("gProof->UploadPackage(\"%s\");", package->GetName()))) {
2699 TString enablePackage = (testMode)?Form("gProof->EnablePackage(\"%s\",kFALSE);", package->GetName()):Form("gProof->EnablePackage(\"%s\",kTRUE);", package->GetName());
2700 if (gROOT->ProcessLine(enablePackage)) {
2701 Error("StartAnalysis", "There was an error trying to enable package %s", package->GetName());
2705 Error("StartAnalysis", "There was an error trying to upload package %s", package->GetName());
2709 if (list) delete list;
2712 if (fAdditionalLibs.Contains(".so") && !testMode) {
2713 Error("StartAnalysis", "You request additional libs to be loaded but did not enabled any AliRoot mode. Please refer to: \
2714 \n http://aaf.cern.ch/node/83 and use a parameter for SetAliRootMode()");
2718 // Enable par files if requested
2719 if (fPackages && fPackages->GetEntries()) {
2720 TIter next(fPackages);
2722 while ((package=next())) {
2723 // Skip packages already enabled
2724 if (parLibs.Contains(package->GetName())) continue;
2725 TString spkg = package->GetName();
2726 spkg.ReplaceAll(".par", "");
2727 gSystem->Exec(TString::Format("rm -rf %s", spkg.Data()));
2728 if (!gROOT->ProcessLine(Form("gProof->UploadPackage(\"%s\");", package->GetName()))) {
2729 if (gROOT->ProcessLine(Form("gProof->EnablePackage(\"%s\",kTRUE);", package->GetName()))) {
2730 Error("StartAnalysis", "There was an error trying to enable package %s", package->GetName());
2734 Error("StartAnalysis", "There was an error trying to upload package %s", package->GetName());
2739 // Do we need to load analysis source files ?
2740 // NOTE: don't load on client since this is anyway done by the user to attach his task.
2741 if (fAnalysisSource.Length()) {
2742 TObjArray *list = fAnalysisSource.Tokenize(" ");
2745 while((str=(TObjString*)next())) {
2746 gROOT->ProcessLine(Form("gProof->Load(\"%s+g\", kTRUE);", str->GetName()));
2748 if (list) delete list;
2751 // Register dataset to proof lite.
2752 if (fFileForTestMode.IsNull()) {
2753 Error("GetChainForTestMode", "For proof test mode please use SetFileForTestMode() pointing to a file that contains data file locations.");
2756 if (gSystem->AccessPathName(fFileForTestMode)) {
2757 Error("GetChainForTestMode", "File not found: %s", fFileForTestMode.Data());
2760 TFileCollection *coll = new TFileCollection();
2761 coll->AddFromFile(fFileForTestMode);
2762 gROOT->ProcessLine(Form("gProof->RegisterDataSet(\"test_collection\", (TFileCollection*)%p, \"OV\");", coll));
2763 gROOT->ProcessLine("gProof->ShowDataSets()");
2768 // Check if output files have to be taken from the analysis manager
2769 if (TestBit(AliAnalysisGrid::kDefaultOutputs)) {
2770 // Add output files and AOD files
2771 fOutputFiles = GetListOfFiles("outaod");
2772 // Add extra files registered to the analysis manager
2773 TString extra = GetListOfFiles("ext");
2774 if (!extra.IsNull()) {
2775 extra.ReplaceAll(".root", "*.root");
2776 if (!fOutputFiles.IsNull()) fOutputFiles += ",";
2777 fOutputFiles += extra;
2779 // Compose the output archive.
2780 fOutputArchive = "log_archive.zip:std*@disk=1 ";
2781 fOutputArchive += Form("root_archive.zip:%s,*.stat@disk=%d",fOutputFiles.Data(),fNreplicas);
2783 // if (!fCloseSE.Length()) fCloseSE = gSystem->Getenv("alien_CLOSE_SE");
2784 if (TestBit(AliAnalysisGrid::kOffline)) {
2785 Info("StartAnalysis","\n##### OFFLINE MODE ##### Files to be used in GRID are produced but not copied \
2786 \n there nor any job run. You can revise the JDL and analysis \
2787 \n macro then run the same in \"submit\" mode.");
2788 } else if (TestBit(AliAnalysisGrid::kTest)) {
2789 Info("StartAnalysis","\n##### LOCAL MODE ##### Your analysis will be run locally on a subset of the requested \
2791 } else if (TestBit(AliAnalysisGrid::kSubmit)) {
2792 Info("StartAnalysis","\n##### SUBMIT MODE ##### Files required by your analysis are copied to your grid working \
2793 \n space and job submitted.");
2794 } else if (TestBit(AliAnalysisGrid::kMerge)) {
2795 Info("StartAnalysis","\n##### MERGE MODE ##### The registered outputs of the analysis will be merged");
2796 if (fMergeViaJDL) CheckInputData();
2799 Info("StartAnalysis","\n##### FULL ANALYSIS MODE ##### Producing needed files and submitting your analysis job...");
2804 Error("StartAnalysis", "Cannot start grid analysis without grid connection");
2807 if (IsCheckCopy() && gGrid) CheckFileCopy(gGrid->GetHomeDirectory());
2808 if (!CheckInputData()) {
2809 Error("StartAnalysis", "There was an error in preprocessing your requested input data");
2812 if (!CreateDataset(fDataPattern)) {
2814 if (!fRunNumbers.Length() && !fRunRange[0]) serror = Form("path to data directory: <%s>", fGridDataDir.Data());
2815 if (fRunNumbers.Length()) serror = "run numbers";
2816 if (fRunRange[0]) serror = Form("run range [%d, %d]", fRunRange[0], fRunRange[1]);
2817 serror += Form("\n or data pattern <%s>", fDataPattern.Data());
2818 Error("StartAnalysis", "No data to process. Please fix %s in your plugin configuration.", serror.Data());
2821 WriteAnalysisFile();
2822 WriteAnalysisMacro();
2824 WriteValidationScript();
2826 WriteMergingMacro();
2827 WriteMergeExecutable();
2828 WriteValidationScript(kTRUE);
2830 if (!CreateJDL()) return kFALSE;
2831 if (TestBit(AliAnalysisGrid::kOffline)) return kFALSE;
2833 // Locally testing the analysis
2834 Info("StartAnalysis", "\n_______________________________________________________________________ \
2835 \n Running analysis script in a daughter shell as on a worker node \
2836 \n_______________________________________________________________________");
2837 TObjArray *list = fOutputFiles.Tokenize(",");
2841 while((str=(TObjString*)next())) {
2842 outputFile = str->GetString();
2843 Int_t index = outputFile.Index("@");
2844 if (index > 0) outputFile.Remove(index);
2845 if (!gSystem->AccessPathName(outputFile)) gSystem->Exec(Form("rm %s", outputFile.Data()));
2848 gSystem->Exec(Form("bash %s 2>stderr", fExecutable.Data()));
2849 gSystem->Exec(Form("bash %s",fValidationScript.Data()));
2850 // gSystem->Exec("cat stdout");
2853 // Check if submitting is managed by LPM manager
2854 if (fProductionMode) {
2855 TString prodfile = fJDLName;
2856 prodfile.ReplaceAll(".jdl", ".prod");
2857 WriteProductionFile(prodfile);
2858 Info("StartAnalysis", "Job submitting is managed by LPM. Rerun in terminate mode after jobs finished.");
2861 // Submit AliEn job(s)
2862 gGrid->Cd(fGridOutputDir);
2865 if (!fRunNumbers.Length() && !fRunRange[0]) {
2866 // Submit a given xml or a set of runs
2867 res = gGrid->Command(Form("submit %s", fJDLName.Data()));
2868 printf("*************************** %s\n",Form("submit %s", fJDLName.Data()));
2870 const char *cjobId = res->GetKey(0,"jobId");
2874 Error("StartAnalysis", "Your JDL %s could not be submitted", fJDLName.Data());
2877 Info("StartAnalysis", "\n_______________________________________________________________________ \
2878 \n##### Your JDL %s was successfully submitted. \nTHE JOB ID IS: %s \
2879 \n_______________________________________________________________________",
2880 fJDLName.Data(), cjobId);
2885 Error("StartAnalysis", "No grid result after submission !!! Bailing out...");
2889 // Submit for a range of enumeration of runs.
2890 if (!Submit()) return kFALSE;
2893 Info("StartAnalysis", "\n#### STARTING AN ALIEN SHELL FOR YOU. EXIT WHEN YOUR JOB %s HAS FINISHED. #### \
2894 \n You may exit at any time and terminate the job later using the option <terminate> \
2895 \n ##################################################################################", jobID.Data());
2896 gSystem->Exec("aliensh");
2900 //______________________________________________________________________________
2901 const char *AliAnalysisAlien::GetListOfFiles(const char *type)
2903 // Get a comma-separated list of output files of the requested type.
2904 // Type can be (case unsensitive):
2905 // aod - list of aod files (std, extensions and filters)
2906 // out - list of output files connected to containers (but not aod's or extras)
2907 // ext - list of extra files registered to the manager
2908 // ter - list of files produced in terminate
2909 static TString files;
2911 TString stype = type;
2913 TString aodfiles, extra;
2914 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
2916 ::Error("GetListOfFiles", "Cannot call this without analysis manager");
2917 return files.Data();
2919 if (mgr->GetOutputEventHandler()) {
2920 aodfiles = mgr->GetOutputEventHandler()->GetOutputFileName();
2921 TString extraaod = mgr->GetOutputEventHandler()->GetExtraOutputs();
2922 if (!extraaod.IsNull()) {
2924 aodfiles += extraaod;
2927 if (stype.Contains("aod")) {
2929 if (stype == "aod") return files.Data();
2931 // Add output files that are not in the list of AOD files
2932 TString outputfiles = "";
2933 TIter next(mgr->GetOutputs());
2934 AliAnalysisDataContainer *output;
2935 const char *filename = 0;
2936 while ((output=(AliAnalysisDataContainer*)next())) {
2937 filename = output->GetFileName();
2938 if (!(strcmp(filename, "default"))) continue;
2939 if (outputfiles.Contains(filename)) continue;
2940 if (aodfiles.Contains(filename)) continue;
2941 if (!outputfiles.IsNull()) outputfiles += ",";
2942 outputfiles += filename;
2944 if (stype.Contains("out")) {
2945 if (!files.IsNull()) files += ",";
2946 files += outputfiles;
2947 if (stype == "out") return files.Data();
2949 // Add extra files registered to the analysis manager
2951 extra = mgr->GetExtraFiles();
2952 if (!extra.IsNull()) {
2954 extra.ReplaceAll(" ", ",");
2955 TObjArray *fextra = extra.Tokenize(",");
2956 TIter nextx(fextra);
2958 while ((obj=nextx())) {
2959 if (aodfiles.Contains(obj->GetName())) continue;
2960 if (outputfiles.Contains(obj->GetName())) continue;
2961 if (sextra.Contains(obj->GetName())) continue;
2962 if (!sextra.IsNull()) sextra += ",";
2963 sextra += obj->GetName();
2966 if (stype.Contains("ext")) {
2967 if (!files.IsNull()) files += ",";
2971 if (stype == "ext") return files.Data();
2973 if (!fTerminateFiles.IsNull()) {
2974 fTerminateFiles.Strip();
2975 fTerminateFiles.ReplaceAll(" ",",");
2976 TObjArray *fextra = fTerminateFiles.Tokenize(",");
2977 TIter nextx(fextra);
2979 while ((obj=nextx())) {
2980 if (aodfiles.Contains(obj->GetName())) continue;
2981 if (outputfiles.Contains(obj->GetName())) continue;
2982 if (termfiles.Contains(obj->GetName())) continue;
2983 if (sextra.Contains(obj->GetName())) continue;
2984 if (!termfiles.IsNull()) termfiles += ",";
2985 termfiles += obj->GetName();
2989 if (stype.Contains("ter")) {
2990 if (!files.IsNull() && !termfiles.IsNull()) {
2995 return files.Data();
2998 //______________________________________________________________________________
2999 Bool_t AliAnalysisAlien::Submit()
3001 // Submit all master jobs.
3002 Int_t nmasterjobs = fInputFiles->GetEntries();
3003 Long_t tshoot = gSystem->Now();
3004 if (!fNsubmitted && !SubmitNext()) return kFALSE;
3005 while (fNsubmitted < nmasterjobs) {
3006 Long_t now = gSystem->Now();
3007 if ((now-tshoot)>30000) {
3009 if (!SubmitNext()) return kFALSE;
3015 //______________________________________________________________________________
3016 Bool_t AliAnalysisAlien::SubmitMerging()
3018 // Submit all merging jobs.
3019 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
3020 gGrid->Cd(fGridOutputDir);
3021 TString mergeJDLName = fExecutable;
3022 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
3024 Error("SubmitMerging", "You have to use explicit run numbers or run range to merge via JDL!");
3027 Int_t ntosubmit = fInputFiles->GetEntries();
3028 for (Int_t i=0; i<ntosubmit; i++) {
3029 TString runOutDir = gSystem->BaseName(fInputFiles->At(i)->GetName());
3030 runOutDir.ReplaceAll(".xml", "");
3031 if (fOutputToRunNo) {
3032 // The output directory is the run number
3033 printf("### Submitting merging job for run <%s>\n", runOutDir.Data());
3034 runOutDir = Form("%s/%s", fGridOutputDir.Data(), runOutDir.Data());
3036 if (!fRunNumbers.Length() && !fRunRange[0]) {
3037 // The output directory is the grid outdir
3038 printf("### Submitting merging job for the full output directory %s.\n", fGridOutputDir.Data());
3039 runOutDir = fGridOutputDir;
3041 // The output directory is the master number in 3 digits format
3042 printf("### Submitting merging job for master <%03d>\n", i);
3043 runOutDir = Form("%s/%03d",fGridOutputDir.Data(), i);
3046 // Check now the number of merging stages.
3047 TObjArray *list = fOutputFiles.Tokenize(",");
3051 while((str=(TObjString*)next())) {
3052 outputFile = str->GetString();
3053 Int_t index = outputFile.Index("@");
3054 if (index > 0) outputFile.Remove(index);
3055 if (!fMergeExcludes.Contains(outputFile)) break;
3058 Bool_t done = CheckMergedFiles(outputFile, runOutDir, fMaxMergeFiles, mergeJDLName);
3059 if (!done && (i==ntosubmit-1)) return kFALSE;
3060 if (!fRunNumbers.Length() && !fRunRange[0]) break;
3062 if (!ntosubmit) return kTRUE;
3063 Info("StartAnalysis", "\n #### STARTING AN ALIEN SHELL FOR YOU. You can exit any time or inspect your jobs in a different shell.##########\
3064 \n Make sure your jobs are in a final state (you can resubmit failed ones via 'masterjob <id> resubmit ERROR_ALL')\
3065 \n Rerun in 'terminate' mode to submit all merging stages, each AFTER the previous one completed. The final merged \
3066 \n output will be written to your alien output directory, while separate stages in <Stage_n>. \
3067 \n ################################################################################################################");
3068 gSystem->Exec("aliensh");
3072 //______________________________________________________________________________
3073 Bool_t AliAnalysisAlien::SubmitNext()
3075 // Submit next bunch of master jobs if the queue is free. The first master job is
3076 // submitted right away, while the next will not be unless the previous was split.
3077 // The plugin will not submit new master jobs if there are more that 500 jobs in
3079 static Bool_t iscalled = kFALSE;
3080 static Int_t firstmaster = 0;
3081 static Int_t lastmaster = 0;
3082 static Int_t npermaster = 0;
3083 if (iscalled) return kTRUE;
3085 Int_t nrunning=0, nwaiting=0, nerror=0, ndone=0;
3086 Int_t ntosubmit = 0;
3089 Int_t nmasterjobs = fInputFiles->GetEntries();
3092 if (!IsUseSubmitPolicy()) {
3094 Info("SubmitNext","### Warning submit policy not used ! Submitting too many jobs at a time may be prohibitted. \
3095 \n### You can use SetUseSubmitPolicy() to enable if you have problems.");
3096 ntosubmit = nmasterjobs;
3099 TString status = GetJobStatus(firstmaster, lastmaster, nrunning, nwaiting, nerror, ndone);
3100 printf("=== master %d: %s\n", lastmaster, status.Data());
3101 // If last master not split, just return
3102 if (status != "SPLIT") {iscalled = kFALSE; return kTRUE;}
3103 // No more than 100 waiting jobs
3104 if (nwaiting>500) {iscalled = kFALSE; return kTRUE;}
3105 npermaster = (nrunning+nwaiting+nerror+ndone)/fNsubmitted;
3106 if (npermaster) ntosubmit = (500-nwaiting)/npermaster;
3107 if (!ntosubmit) ntosubmit = 1;
3108 printf("=== WAITING(%d) RUNNING(%d) DONE(%d) OTHER(%d) NperMaster=%d => to submit %d jobs\n",
3109 nwaiting, nrunning, ndone, nerror, npermaster, ntosubmit);
3111 for (Int_t i=0; i<ntosubmit; i++) {
3112 // Submit for a range of enumeration of runs.
3113 if (fNsubmitted>=nmasterjobs) {iscalled = kFALSE; return kTRUE;}
3115 TString runOutDir = gSystem->BaseName(fInputFiles->At(fNsubmitted)->GetName());
3116 runOutDir.ReplaceAll(".xml", "");
3118 query = Form("submit %s %s %s", fJDLName.Data(), fInputFiles->At(fNsubmitted)->GetName(), runOutDir.Data());
3120 query = Form("submit %s %s %03d", fJDLName.Data(), fInputFiles->At(fNsubmitted)->GetName(), fNsubmitted);
3121 printf("********* %s\n",query.Data());
3122 res = gGrid->Command(query);
3124 TString cjobId1 = res->GetKey(0,"jobId");
3125 if (!cjobId1.Length()) {
3129 Error("StartAnalysis", "Your JDL %s could not be submitted. The message was:", fJDLName.Data());
3132 Info("StartAnalysis", "\n_______________________________________________________________________ \
3133 \n##### Your JDL %s submitted (%d to go). \nTHE JOB ID IS: %s \
3134 \n_______________________________________________________________________",
3135 fJDLName.Data(), nmasterjobs-fNsubmitted-1, cjobId1.Data());
3138 lastmaster = cjobId1.Atoi();
3139 if (!firstmaster) firstmaster = lastmaster;
3144 Error("StartAnalysis", "No grid result after submission !!! Bailing out...");
3152 //______________________________________________________________________________
3153 void AliAnalysisAlien::WriteAnalysisFile()
3155 // Write current analysis manager into the file <analysisFile>
3156 TString analysisFile = fExecutable;
3157 analysisFile.ReplaceAll(".sh", ".root");
3158 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3159 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
3160 if (!mgr || !mgr->IsInitialized()) {
3161 Error("WriteAnalysisFile", "You need an initialized analysis manager for this");
3164 // Check analysis type
3166 if (mgr->GetMCtruthEventHandler()) TObject::SetBit(AliAnalysisGrid::kUseMC);
3167 handler = (TObject*)mgr->GetInputEventHandler();
3169 if (handler->InheritsFrom("AliMultiInputEventHandler")) {
3170 AliMultiInputEventHandler *multiIH = (AliMultiInputEventHandler*)handler;
3171 if (multiIH->GetFirstInputEventHandler()->InheritsFrom("AliESDInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseESD);
3172 if (multiIH->GetFirstInputEventHandler()->InheritsFrom("AliAODInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseAOD);
3174 if (handler->InheritsFrom("AliESDInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseESD);
3175 if (handler->InheritsFrom("AliAODInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseAOD);
3178 TDirectory *cdir = gDirectory;
3179 TFile *file = TFile::Open(analysisFile, "RECREATE");
3181 // Skip task Terminate calls for the grid job (but not in test mode, where we want to check also the terminate mode
3182 if (!TestBit(AliAnalysisGrid::kTest)) mgr->SetSkipTerminate(kTRUE);
3183 // Unless merging makes no sense
3184 if (IsSingleOutput()) mgr->SetSkipTerminate(kFALSE);
3187 // Enable termination for local jobs
3188 mgr->SetSkipTerminate(kFALSE);
3190 if (cdir) cdir->cd();
3191 Info("WriteAnalysisFile", "\n##### Analysis manager: %s wrote to file <%s>\n", mgr->GetName(),analysisFile.Data());
3193 Bool_t copy = kTRUE;
3194 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3197 TString workdir = gGrid->GetHomeDirectory();
3198 workdir += fGridWorkingDir;
3199 Info("WriteAnalysisFile", "\n##### Copying file <%s> containing your initialized analysis manager to your alien workspace", analysisFile.Data());
3200 if (FileExists(analysisFile)) gGrid->Rm(analysisFile);
3201 TFile::Cp(Form("file:%s",analysisFile.Data()), Form("alien://%s/%s", workdir.Data(),analysisFile.Data()));
3205 //______________________________________________________________________________
3206 void AliAnalysisAlien::WriteAnalysisMacro()
3208 // Write the analysis macro that will steer the analysis in grid mode.
3209 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3211 out.open(fAnalysisMacro.Data(), ios::out);
3213 Error("WriteAnalysisMacro", "could not open file %s for writing", fAnalysisMacro.Data());
3216 Bool_t hasSTEERBase = kFALSE;
3217 Bool_t hasESD = kFALSE;
3218 Bool_t hasAOD = kFALSE;
3219 Bool_t hasANALYSIS = kFALSE;
3220 Bool_t hasOADB = kFALSE;
3221 Bool_t hasANALYSISalice = kFALSE;
3222 Bool_t hasCORRFW = kFALSE;
3223 TString func = fAnalysisMacro;
3224 TString type = "ESD";
3225 TString comment = "// Analysis using ";
3226 if (IsUseMCchain()) {
3230 if (TObject::TestBit(AliAnalysisGrid::kUseESD)) comment += "ESD";
3231 if (TObject::TestBit(AliAnalysisGrid::kUseAOD)) {
3236 if (type!="AOD" && fFriendChainName!="") {
3237 Error("WriteAnalysisMacro", "Friend chain can be attached only to AOD");
3240 if (TObject::TestBit(AliAnalysisGrid::kUseMC)) comment += "/MC";
3241 else comment += " data";
3242 out << "const char *anatype = \"" << type.Data() << "\";" << endl << endl;
3243 func.ReplaceAll(".C", "");
3244 out << "void " << func.Data() << "()" << endl;
3246 out << comment.Data() << endl;
3247 out << "// Automatically generated analysis steering macro executed in grid subjobs" << endl << endl;
3248 out << " TStopwatch timer;" << endl;
3249 out << " timer.Start();" << endl << endl;
3250 // Change temp directory to current one
3251 out << "// Set temporary merging directory to current one" << endl;
3252 out << " gSystem->Setenv(\"TMPDIR\", gSystem->pwd());" << endl << endl;
3253 out << "// Set temporary compilation directory to current one" << endl;
3254 out << " gSystem->SetBuildDir(gSystem->pwd(), kTRUE);" << endl << endl;
3255 // Reset existing include path
3256 out << "// Reset existing include path and add current directory first in the search" << endl;
3257 out << " gSystem->SetIncludePath(\"-I.\");" << endl;
3258 if (!fExecutableCommand.Contains("aliroot")) {
3259 out << "// load base root libraries" << endl;
3260 out << " gSystem->Load(\"libTree\");" << endl;
3261 out << " gSystem->Load(\"libGeom\");" << endl;
3262 out << " gSystem->Load(\"libVMC\");" << endl;
3263 out << " gSystem->Load(\"libPhysics\");" << endl << endl;
3264 out << " gSystem->Load(\"libMinuit\");" << endl << endl;
3266 if (fAdditionalRootLibs.Length()) {
3267 // in principle libtree /lib geom libvmc etc. can go into this list, too
3268 out << "// Add aditional libraries" << endl;
3269 TObjArray *list = fAdditionalRootLibs.Tokenize(" ");
3272 while((str=(TObjString*)next())) {
3273 if (str->GetString().Contains(".so"))
3274 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
3276 if (list) delete list;
3278 out << "// Load analysis framework libraries" << endl;
3279 TString setupPar = "AliAnalysisAlien::SetupPar";
3281 if (!fExecutableCommand.Contains("aliroot")) {
3282 out << " gSystem->Load(\"libSTEERBase\");" << endl;
3283 out << " gSystem->Load(\"libESD\");" << endl;
3284 out << " gSystem->Load(\"libAOD\");" << endl;
3286 out << " gSystem->Load(\"libANALYSIS\");" << endl;
3287 out << " gSystem->Load(\"libOADB\");" << endl;
3288 out << " gSystem->Load(\"libANALYSISalice\");" << endl;
3289 out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
3291 TIter next(fPackages);
3294 while ((obj=next())) {
3295 pkgname = obj->GetName();
3296 if (pkgname == "STEERBase" ||
3297 pkgname == "STEERBase.par") hasSTEERBase = kTRUE;
3298 if (pkgname == "ESD" ||
3299 pkgname == "ESD.par") hasESD = kTRUE;
3300 if (pkgname == "AOD" ||
3301 pkgname == "AOD.par") hasAOD = kTRUE;
3302 if (pkgname == "ANALYSIS" ||
3303 pkgname == "ANALYSIS.par") hasANALYSIS = kTRUE;
3304 if (pkgname == "OADB" ||
3305 pkgname == "OADB.par") hasOADB = kTRUE;
3306 if (pkgname == "ANALYSISalice" ||
3307 pkgname == "ANALYSISalice.par") hasANALYSISalice = kTRUE;
3308 if (pkgname == "CORRFW" ||
3309 pkgname == "CORRFW.par") hasCORRFW = kTRUE;
3311 if (hasANALYSISalice) setupPar = "SetupPar";
3312 if (!hasSTEERBase) out << " gSystem->Load(\"libSTEERBase\");" << endl;
3313 else out << " if (!" << setupPar << "(\"STEERBase\")) return;" << endl;
3314 if (!hasESD) out << " gSystem->Load(\"libESD\");" << endl;
3315 else out << " if (!" << setupPar << "(\"ESD\")) return;" << endl;
3316 if (!hasAOD) out << " gSystem->Load(\"libAOD\");" << endl;
3317 else out << " if (!" << setupPar << "(\"AOD\")) return;" << endl;
3318 if (!hasANALYSIS) out << " gSystem->Load(\"libANALYSIS\");" << endl;
3319 else out << " if (!" << setupPar << "(\"ANALYSIS\")) return;" << endl;
3320 if (!hasOADB) out << " gSystem->Load(\"libOADB\");" << endl;
3321 else out << " if (!" << setupPar << "(\"OADB\")) return;" << endl;
3322 if (!hasANALYSISalice) out << " gSystem->Load(\"libANALYSISalice\");" << endl;
3323 else out << " if (!" << setupPar << "(\"ANALYSISalice\")) return;" << endl;
3324 if (!hasCORRFW) out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
3325 else out << " if (!" << setupPar << "(\"CORRFW\")) return;" << endl << endl;
3326 out << "// Compile other par packages" << endl;
3328 while ((obj=next())) {
3329 pkgname = obj->GetName();
3330 if (pkgname == "STEERBase" ||
3331 pkgname == "STEERBase.par" ||
3333 pkgname == "ESD.par" ||
3335 pkgname == "AOD.par" ||
3336 pkgname == "ANALYSIS" ||
3337 pkgname == "ANALYSIS.par" ||
3338 pkgname == "OADB" ||
3339 pkgname == "OADB.par" ||
3340 pkgname == "ANALYSISalice" ||
3341 pkgname == "ANALYSISalice.par" ||
3342 pkgname == "CORRFW" ||
3343 pkgname == "CORRFW.par") continue;
3344 out << " if (!" << setupPar << "(\"" << obj->GetName() << "\")) return;" << endl;
3347 out << "// include path" << endl;
3348 // Get the include path from the interpreter and remove entries pointing to AliRoot
3349 out << " TString intPath = gInterpreter->GetIncludePath();" << endl;
3350 out << " TObjArray *listpaths = intPath.Tokenize(\" \");" << endl;
3351 out << " TIter nextpath(listpaths);" << endl;
3352 out << " TObjString *pname;" << endl;
3353 out << " while ((pname=(TObjString*)nextpath())) {" << endl;
3354 out << " TString current = pname->GetName();" << endl;
3355 out << " if (current.Contains(\"AliRoot\") || current.Contains(\"ALICE_ROOT\")) continue;" << endl;
3356 out << " gSystem->AddIncludePath(current);" << endl;
3357 out << " }" << endl;
3358 out << " if (listpaths) delete listpaths;" << endl;
3359 if (fIncludePath.Length()) out << " gSystem->AddIncludePath(\"" << fIncludePath.Data() << "\");" << endl;
3360 out << " gROOT->ProcessLine(\".include $ALICE_ROOT/include\");" << endl;
3361 out << " printf(\"Include path: %s\\n\", gSystem->GetIncludePath());" << endl << endl;
3362 if (fAdditionalLibs.Length()) {
3363 out << "// Add aditional AliRoot libraries" << endl;
3364 TObjArray *list = fAdditionalLibs.Tokenize(" ");
3367 while((str=(TObjString*)next())) {
3368 if (str->GetString().Contains(".so"))
3369 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
3370 if (str->GetString().Contains(".par"))
3371 out << " if (!" << setupPar << "(\"" << str->GetString() << "\")) return;" << endl;
3373 if (list) delete list;
3376 out << "// analysis source to be compiled at runtime (if any)" << endl;
3377 if (fAnalysisSource.Length()) {
3378 TObjArray *list = fAnalysisSource.Tokenize(" ");
3381 while((str=(TObjString*)next())) {
3382 out << " gROOT->ProcessLine(\".L " << str->GetString().Data() << "+g\");" << endl;
3384 if (list) delete list;
3387 // out << " printf(\"Currently load libraries:\\n\");" << endl;
3388 // out << " printf(\"%s\\n\", gSystem->GetLibraries());" << endl;
3389 if (fFastReadOption) {
3390 Warning("WriteAnalysisMacro", "!!! You requested FastRead option. Using xrootd flags to reduce timeouts in the grid jobs. This may skip some files that could be accessed !!! \
3391 \n+++ NOTE: To disable this option, use: plugin->SetFastReadOption(kFALSE)");
3392 out << "// fast xrootd reading enabled" << endl;
3393 out << " printf(\"!!! You requested FastRead option. Using xrootd flags to reduce timeouts. Note that this may skip some files that could be accessed !!!\");" << endl;
3394 out << " gEnv->SetValue(\"XNet.ConnectTimeout\",50);" << endl;
3395 out << " gEnv->SetValue(\"XNet.RequestTimeout\",50);" << endl;
3396 out << " gEnv->SetValue(\"XNet.MaxRedirectCount\",2);" << endl;
3397 out << " gEnv->SetValue(\"XNet.ReconnectTimeout\",50);" << endl;
3398 out << " gEnv->SetValue(\"XNet.FirstConnectMaxCnt\",1);" << endl << endl;
3400 out << "// connect to AliEn and make the chain" << endl;
3401 out << " if (!TGrid::Connect(\"alien://\")) return;" << endl;
3402 out << "// read the analysis manager from file" << endl;
3403 TString analysisFile = fExecutable;
3404 analysisFile.ReplaceAll(".sh", ".root");
3405 out << " TFile *file = TFile::Open(\"" << analysisFile << "\");" << endl;
3406 out << " if (!file) return;" << endl;
3407 out << " TIter nextkey(file->GetListOfKeys());" << endl;
3408 out << " AliAnalysisManager *mgr = 0;" << endl;
3409 out << " TKey *key;" << endl;
3410 out << " while ((key=(TKey*)nextkey())) {" << endl;
3411 out << " if (!strcmp(key->GetClassName(), \"AliAnalysisManager\"))" << endl;
3412 out << " mgr = (AliAnalysisManager*)file->Get(key->GetName());" << endl;
3413 out << " };" << endl;
3414 out << " if (!mgr) {" << endl;
3415 out << " ::Error(\"" << func.Data() << "\", \"No analysis manager found in file " << analysisFile <<"\");" << endl;
3416 out << " return;" << endl;
3417 out << " }" << endl << endl;
3418 out << " mgr->PrintStatus();" << endl;
3419 if (AliAnalysisManager::GetAnalysisManager()) {
3420 if (AliAnalysisManager::GetAnalysisManager()->GetDebugLevel()>3) {
3421 out << " gEnv->SetValue(\"XNet.Debug\", \"1\");" << endl;
3423 if (TestBit(AliAnalysisGrid::kTest))
3424 out << " AliLog::SetGlobalLogLevel(AliLog::kWarning);" << endl;
3426 out << " AliLog::SetGlobalLogLevel(AliLog::kError);" << endl;
3429 if (IsUsingTags()) {
3430 out << " TChain *chain = CreateChainFromTags(\"wn.xml\", anatype);" << endl << endl;
3432 out << " TChain *chain = CreateChain(\"wn.xml\", anatype);" << endl << endl;
3434 out << " mgr->StartAnalysis(\"localfile\", chain);" << endl;
3435 out << " timer.Stop();" << endl;
3436 out << " timer.Print();" << endl;
3437 out << "}" << endl << endl;
3438 if (IsUsingTags()) {
3439 out << "TChain* CreateChainFromTags(const char *xmlfile, const char *type=\"ESD\")" << endl;
3441 out << "// Create a chain using tags from the xml file." << endl;
3442 out << " TAlienCollection* coll = TAlienCollection::Open(xmlfile);" << endl;
3443 out << " if (!coll) {" << endl;
3444 out << " ::Error(\"CreateChainFromTags\", \"Cannot create an AliEn collection from %s\", xmlfile);" << endl;
3445 out << " return NULL;" << endl;
3446 out << " }" << endl;
3447 out << " TGridResult* tagResult = coll->GetGridResult(\"\",kFALSE,kFALSE);" << endl;
3448 out << " AliTagAnalysis *tagAna = new AliTagAnalysis(type);" << endl;
3449 out << " tagAna->ChainGridTags(tagResult);" << endl << endl;
3450 out << " AliRunTagCuts *runCuts = new AliRunTagCuts();" << endl;
3451 out << " AliLHCTagCuts *lhcCuts = new AliLHCTagCuts();" << endl;
3452 out << " AliDetectorTagCuts *detCuts = new AliDetectorTagCuts();" << endl;
3453 out << " AliEventTagCuts *evCuts = new AliEventTagCuts();" << endl;
3454 out << " // Check if the cuts configuration file was provided" << endl;
3455 out << " if (!gSystem->AccessPathName(\"ConfigureCuts.C\")) {" << endl;
3456 out << " gROOT->LoadMacro(\"ConfigureCuts.C\");" << endl;
3457 out << " ConfigureCuts(runCuts, lhcCuts, detCuts, evCuts);" << endl;
3458 out << " }" << endl;
3459 if (fFriendChainName=="") {
3460 out << " TChain *chain = tagAna->QueryTags(runCuts, lhcCuts, detCuts, evCuts);" << endl;
3462 out << " TString tmpColl=\"tmpCollection.xml\";" << endl;
3463 out << " tagAna->CreateXMLCollection(tmpColl.Data(),runCuts, lhcCuts, detCuts, evCuts);" << endl;
3464 out << " TChain *chain = CreateChain(tmpColl.Data(),type);" << endl;
3466 out << " if (!chain || !chain->GetNtrees()) return NULL;" << endl;
3467 out << " chain->ls();" << endl;
3468 out << " return chain;" << endl;
3469 out << "}" << endl << endl;
3470 if (gSystem->AccessPathName("ConfigureCuts.C")) {
3471 TString msg = "\n##### You may want to provide a macro ConfigureCuts.C with a method:\n";
3472 msg += " void ConfigureCuts(AliRunTagCuts *runCuts,\n";
3473 msg += " AliLHCTagCuts *lhcCuts,\n";
3474 msg += " AliDetectorTagCuts *detCuts,\n";
3475 msg += " AliEventTagCuts *evCuts)";
3476 Info("WriteAnalysisMacro", "%s", msg.Data());
3479 if (!IsUsingTags() || fFriendChainName!="") {
3480 out <<"//________________________________________________________________________________" << endl;
3481 out << "TChain* CreateChain(const char *xmlfile, const char *type=\"ESD\")" << endl;
3483 out << "// Create a chain using url's from xml file" << endl;
3484 out << " TString filename;" << endl;
3485 out << " Int_t run = 0;" << endl;
3486 if (IsUseMCchain()) {
3487 out << " TString treename = \"TE\";" << endl;
3489 out << " TString treename = type;" << endl;
3490 out << " treename.ToLower();" << endl;
3491 out << " treename += \"Tree\";" << endl;
3493 out << " printf(\"***************************************\\n\");" << endl;
3494 out << " printf(\" Getting chain of trees %s\\n\", treename.Data());" << endl;
3495 out << " printf(\"***************************************\\n\");" << endl;
3496 out << " TAlienCollection *coll = TAlienCollection::Open(xmlfile);" << endl;
3497 out << " if (!coll) {" << endl;
3498 out << " ::Error(\"CreateChain\", \"Cannot create an AliEn collection from %s\", xmlfile);" << endl;
3499 out << " return NULL;" << endl;
3500 out << " }" << endl;
3501 out << " AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();" << endl;
3502 out << " TChain *chain = new TChain(treename);" << endl;
3503 if(fFriendChainName!="") {
3504 out << " TChain *chainFriend = new TChain(treename);" << endl;
3506 out << " coll->Reset();" << endl;
3507 out << " while (coll->Next()) {" << endl;
3508 out << " filename = coll->GetTURL("");" << endl;
3509 out << " if (mgr) {" << endl;
3510 out << " Int_t nrun = AliAnalysisManager::GetRunFromAlienPath(filename);" << endl;
3511 out << " if (nrun && nrun != run) {" << endl;
3512 out << " printf(\"### Run number detected from chain: %d\\n\", nrun);" << endl;
3513 out << " mgr->SetRunFromPath(nrun);" << endl;
3514 out << " run = nrun;" << endl;
3515 out << " }" << endl;
3516 out << " }" << endl;
3517 out << " chain->Add(filename);" << endl;
3518 if(fFriendChainName!="") {
3519 out << " TString fileFriend=coll->GetTURL(\"\");" << endl;
3520 out << " fileFriend.ReplaceAll(\"AliAOD.root\",\""<<fFriendChainName.Data()<<"\");" << endl;
3521 out << " fileFriend.ReplaceAll(\"AliAODs.root\",\""<<fFriendChainName.Data()<<"\");" << endl;
3522 out << " chainFriend->Add(fileFriend.Data());" << endl;
3524 out << " }" << endl;
3525 out << " if (!chain->GetNtrees()) {" << endl;
3526 out << " ::Error(\"CreateChain\", \"No tree found from collection %s\", xmlfile);" << endl;
3527 out << " return NULL;" << endl;
3528 out << " }" << endl;
3529 if(fFriendChainName!="") {
3530 out << " chain->AddFriend(chainFriend);" << endl;
3532 out << " return chain;" << endl;
3533 out << "}" << endl << endl;
3535 if (hasANALYSISalice) {
3536 out <<"//________________________________________________________________________________" << endl;
3537 out << "Bool_t SetupPar(const char *package) {" << endl;
3538 out << "// Compile the package and set it up." << endl;
3539 out << " TString pkgdir = package;" << endl;
3540 out << " pkgdir.ReplaceAll(\".par\",\"\");" << endl;
3541 out << " gSystem->Exec(TString::Format(\"tar xvzf %s.par\", pkgdir.Data()));" << endl;
3542 out << " TString cdir = gSystem->WorkingDirectory();" << endl;
3543 out << " gSystem->ChangeDirectory(pkgdir);" << endl;
3544 out << " // Check for BUILD.sh and execute" << endl;
3545 out << " if (!gSystem->AccessPathName(\"PROOF-INF/BUILD.sh\")) {" << endl;
3546 out << " printf(\"*******************************\\n\");" << endl;
3547 out << " printf(\"*** Building PAR archive ***\\n\");" << endl;
3548 out << " printf(\"*******************************\\n\");" << endl;
3549 out << " if (gSystem->Exec(\"PROOF-INF/BUILD.sh\")) {" << endl;
3550 out << " ::Error(\"SetupPar\", \"Cannot build par archive %s\", pkgdir.Data());" << endl;
3551 out << " gSystem->ChangeDirectory(cdir);" << endl;
3552 out << " return kFALSE;" << endl;
3553 out << " }" << endl;
3554 out << " } else {" << endl;
3555 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/BUILD.sh for package %s\", pkgdir.Data());" << endl;
3556 out << " gSystem->ChangeDirectory(cdir);" << endl;
3557 out << " return kFALSE;" << endl;
3558 out << " }" << endl;
3559 out << " // Check for SETUP.C and execute" << endl;
3560 out << " if (!gSystem->AccessPathName(\"PROOF-INF/SETUP.C\")) {" << endl;
3561 out << " printf(\"*******************************\\n\");" << endl;
3562 out << " printf(\"*** Setup PAR archive ***\\n\");" << endl;
3563 out << " printf(\"*******************************\\n\");" << endl;
3564 out << " gROOT->Macro(\"PROOF-INF/SETUP.C\");" << endl;
3565 out << " } else {" << endl;
3566 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/SETUP.C for package %s\", pkgdir.Data());" << endl;
3567 out << " gSystem->ChangeDirectory(cdir);" << endl;
3568 out << " return kFALSE;" << endl;
3569 out << " }" << endl;
3570 out << " // Restore original workdir" << endl;
3571 out << " gSystem->ChangeDirectory(cdir);" << endl;
3572 out << " return kTRUE;" << endl;
3575 Info("WriteAnalysisMacro", "\n##### Analysis macro to run on worker nodes <%s> written",fAnalysisMacro.Data());
3577 Bool_t copy = kTRUE;
3578 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3581 TString workdir = gGrid->GetHomeDirectory();
3582 workdir += fGridWorkingDir;
3583 if (FileExists(fAnalysisMacro)) gGrid->Rm(fAnalysisMacro);
3584 if (IsUsingTags() && !gSystem->AccessPathName("ConfigureCuts.C")) {
3585 if (FileExists("ConfigureCuts.C")) gGrid->Rm("ConfigureCuts.C");
3586 Info("WriteAnalysisMacro", "\n##### Copying cuts configuration macro: <ConfigureCuts.C> to your alien workspace");
3587 TFile::Cp("file:ConfigureCuts.C", Form("alien://%s/ConfigureCuts.C", workdir.Data()));
3589 Info("WriteAnalysisMacro", "\n##### Copying analysis macro: <%s> to your alien workspace", fAnalysisMacro.Data());
3590 TFile::Cp(Form("file:%s",fAnalysisMacro.Data()), Form("alien://%s/%s", workdir.Data(), fAnalysisMacro.Data()));
3594 //______________________________________________________________________________
3595 void AliAnalysisAlien::WriteMergingMacro()
3597 // Write a macro to merge the outputs per master job.
3598 if (!fMergeViaJDL) return;
3599 if (!fOutputFiles.Length()) {
3600 Error("WriteMergingMacro", "No output file names defined. Are you running the right AliAnalysisAlien configuration ?");
3603 TString mergingMacro = fExecutable;
3604 mergingMacro.ReplaceAll(".sh","_merge.C");
3605 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
3606 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3608 out.open(mergingMacro.Data(), ios::out);
3610 Error("WriteMergingMacro", "could not open file %s for writing", fAnalysisMacro.Data());
3613 Bool_t hasSTEERBase = kFALSE;
3614 Bool_t hasESD = kFALSE;
3615 Bool_t hasAOD = kFALSE;
3616 Bool_t hasANALYSIS = kFALSE;
3617 Bool_t hasOADB = kFALSE;
3618 Bool_t hasANALYSISalice = kFALSE;
3619 Bool_t hasCORRFW = kFALSE;
3620 TString func = mergingMacro;
3622 func.ReplaceAll(".C", "");
3623 out << "void " << func.Data() << "(const char *dir, Int_t stage=0)" << endl;
3625 out << "// Automatically generated merging macro executed in grid subjobs" << endl << endl;
3626 out << " TStopwatch timer;" << endl;
3627 out << " timer.Start();" << endl << endl;
3628 // Reset existing include path
3629 out << "// Reset existing include path and add current directory first in the search" << endl;
3630 out << " gSystem->SetIncludePath(\"-I.\");" << endl;
3631 if (!fExecutableCommand.Contains("aliroot")) {
3632 out << "// load base root libraries" << endl;
3633 out << " gSystem->Load(\"libTree\");" << endl;
3634 out << " gSystem->Load(\"libGeom\");" << endl;
3635 out << " gSystem->Load(\"libVMC\");" << endl;
3636 out << " gSystem->Load(\"libPhysics\");" << endl << endl;
3637 out << " gSystem->Load(\"libMinuit\");" << endl << endl;
3639 if (fAdditionalRootLibs.Length()) {
3640 // in principle libtree /lib geom libvmc etc. can go into this list, too
3641 out << "// Add aditional libraries" << endl;
3642 TObjArray *list = fAdditionalRootLibs.Tokenize(" ");
3645 while((str=(TObjString*)next())) {
3646 if (str->GetString().Contains(".so"))
3647 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
3649 if (list) delete list;
3651 out << "// Load analysis framework libraries" << endl;
3653 if (!fExecutableCommand.Contains("aliroot")) {
3654 out << " gSystem->Load(\"libSTEERBase\");" << endl;
3655 out << " gSystem->Load(\"libESD\");" << endl;
3656 out << " gSystem->Load(\"libAOD\");" << endl;
3658 out << " gSystem->Load(\"libANALYSIS\");" << endl;
3659 out << " gSystem->Load(\"libOADB\");" << endl;
3660 out << " gSystem->Load(\"libANALYSISalice\");" << endl;
3661 out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
3663 TIter next(fPackages);
3666 TString setupPar = "AliAnalysisAlien::SetupPar";
3667 while ((obj=next())) {
3668 pkgname = obj->GetName();
3669 if (pkgname == "STEERBase" ||
3670 pkgname == "STEERBase.par") hasSTEERBase = kTRUE;
3671 if (pkgname == "ESD" ||
3672 pkgname == "ESD.par") hasESD = kTRUE;
3673 if (pkgname == "AOD" ||
3674 pkgname == "AOD.par") hasAOD = kTRUE;
3675 if (pkgname == "ANALYSIS" ||
3676 pkgname == "ANALYSIS.par") hasANALYSIS = kTRUE;
3677 if (pkgname == "OADB" ||
3678 pkgname == "OADB.par") hasOADB = kTRUE;
3679 if (pkgname == "ANALYSISalice" ||
3680 pkgname == "ANALYSISalice.par") hasANALYSISalice = kTRUE;
3681 if (pkgname == "CORRFW" ||
3682 pkgname == "CORRFW.par") hasCORRFW = kTRUE;
3684 if (hasANALYSISalice) setupPar = "SetupPar";
3685 if (!hasSTEERBase) out << " gSystem->Load(\"libSTEERBase\");" << endl;
3686 else out << " if (!" << setupPar << "(\"STEERBase\")) return;" << endl;
3687 if (!hasESD) out << " gSystem->Load(\"libESD\");" << endl;
3688 else out << " if (!" << setupPar << "(\"ESD\")) return;" << endl;
3689 if (!hasAOD) out << " gSystem->Load(\"libAOD\");" << endl;
3690 else out << " if (!" << setupPar << "(\"AOD\")) return;" << endl;
3691 out << " gSystem->Load(\"libOADB\");" << endl;
3692 if (!hasANALYSIS) out << " gSystem->Load(\"libANALYSIS\");" << endl;
3693 else out << " if (!" << setupPar << "(\"ANALYSIS\")) return;" << endl;
3694 if (!hasOADB) out << " gSystem->Load(\"libOADB\");" << endl;
3695 else out << " if (!" << setupPar << "(\"OADB\")) return;" << endl;
3696 if (!hasANALYSISalice) out << " gSystem->Load(\"libANALYSISalice\");" << endl;
3697 else out << " if (!" << setupPar << "(\"ANALYSISalice\")) return;" << endl;
3698 if (!hasCORRFW) out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
3699 else out << " if (!" << setupPar << "(\"CORRFW\")) return;" << endl << endl;
3700 out << "// Compile other par packages" << endl;
3702 while ((obj=next())) {
3703 pkgname = obj->GetName();
3704 if (pkgname == "STEERBase" ||
3705 pkgname == "STEERBase.par" ||
3707 pkgname == "ESD.par" ||
3709 pkgname == "AOD.par" ||
3710 pkgname == "ANALYSIS" ||
3711 pkgname == "ANALYSIS.par" ||
3712 pkgname == "OADB" ||
3713 pkgname == "OADB.par" ||
3714 pkgname == "ANALYSISalice" ||
3715 pkgname == "ANALYSISalice.par" ||
3716 pkgname == "CORRFW" ||
3717 pkgname == "CORRFW.par") continue;
3718 out << " if (!" << setupPar << "(\"" << obj->GetName() << "\")) return;" << endl;
3721 out << "// include path" << endl;
3722 // Get the include path from the interpreter and remove entries pointing to AliRoot
3723 out << " TString intPath = gInterpreter->GetIncludePath();" << endl;
3724 out << " TObjArray *listpaths = intPath.Tokenize(\" \");" << endl;
3725 out << " TIter nextpath(listpaths);" << endl;
3726 out << " TObjString *pname;" << endl;
3727 out << " while ((pname=(TObjString*)nextpath())) {" << endl;
3728 out << " TString current = pname->GetName();" << endl;
3729 out << " if (current.Contains(\"AliRoot\") || current.Contains(\"ALICE_ROOT\")) continue;" << endl;
3730 out << " gSystem->AddIncludePath(current);" << endl;
3731 out << " }" << endl;
3732 out << " if (listpaths) delete listpaths;" << endl;
3733 if (fIncludePath.Length()) out << " gSystem->AddIncludePath(\"" << fIncludePath.Data() << "\");" << endl;
3734 out << " gROOT->ProcessLine(\".include $ALICE_ROOT/include\");" << endl;
3735 out << " printf(\"Include path: %s\\n\", gSystem->GetIncludePath());" << endl << endl;
3736 if (fAdditionalLibs.Length()) {
3737 out << "// Add aditional AliRoot libraries" << endl;
3738 TObjArray *list = fAdditionalLibs.Tokenize(" ");
3741 while((str=(TObjString*)next())) {
3742 if (str->GetString().Contains(".so"))
3743 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
3745 if (list) delete list;
3748 out << "// Analysis source to be compiled at runtime (if any)" << endl;
3749 if (fAnalysisSource.Length()) {
3750 TObjArray *list = fAnalysisSource.Tokenize(" ");
3753 while((str=(TObjString*)next())) {
3754 out << " gROOT->ProcessLine(\".L " << str->GetString().Data() << "+g\");" << endl;
3756 if (list) delete list;
3760 if (fFastReadOption) {
3761 Warning("WriteMergingMacro", "!!! You requested FastRead option. Using xrootd flags to reduce timeouts in the grid merging jobs. Note that this may skip some files that could be accessed !!!");
3762 out << "// fast xrootd reading enabled" << endl;
3763 out << " printf(\"!!! You requested FastRead option. Using xrootd flags to reduce timeouts. Note that this may skip some files that could be accessed !!!\");" << endl;
3764 out << " gEnv->SetValue(\"XNet.ConnectTimeout\",50);" << endl;
3765 out << " gEnv->SetValue(\"XNet.RequestTimeout\",50);" << endl;
3766 out << " gEnv->SetValue(\"XNet.MaxRedirectCount\",2);" << endl;
3767 out << " gEnv->SetValue(\"XNet.ReconnectTimeout\",50);" << endl;
3768 out << " gEnv->SetValue(\"XNet.FirstConnectMaxCnt\",1);" << endl << endl;
3770 // Change temp directory to current one
3771 out << "// Set temporary merging directory to current one" << endl;
3772 out << " gSystem->Setenv(\"TMPDIR\", gSystem->pwd());" << endl << endl;
3773 out << "// Set temporary compilation directory to current one" << endl;
3774 out << " gSystem->SetBuildDir(gSystem->pwd(), kTRUE);" << endl << endl;
3775 out << "// Connect to AliEn" << endl;
3776 out << " if (!TGrid::Connect(\"alien://\")) return;" << endl;
3777 out << " TString outputDir = dir;" << endl;
3778 out << " TString outputFiles = \"" << GetListOfFiles("out") << "\";" << endl;
3779 out << " TString mergeExcludes = \"" << fMergeExcludes << "\";" << endl;
3780 out << " TObjArray *list = outputFiles.Tokenize(\",\");" << endl;
3781 out << " TIter *iter = new TIter(list);" << endl;
3782 out << " TObjString *str;" << endl;
3783 out << " TString outputFile;" << endl;
3784 out << " Bool_t merged = kTRUE;" << endl;
3785 out << " while((str=(TObjString*)iter->Next())) {" << endl;
3786 out << " outputFile = str->GetString();" << endl;
3787 out << " if (outputFile.Contains(\"*\")) continue;" << endl;
3788 out << " Int_t index = outputFile.Index(\"@\");" << endl;
3789 out << " if (index > 0) outputFile.Remove(index);" << endl;
3790 out << " // Skip already merged outputs" << endl;
3791 out << " if (!gSystem->AccessPathName(outputFile)) {" << endl;
3792 out << " printf(\"Output file <%s> found. Not merging again.\",outputFile.Data());" << endl;
3793 out << " continue;" << endl;
3794 out << " }" << endl;
3795 out << " if (mergeExcludes.Contains(outputFile.Data())) continue;" << endl;
3796 out << " merged = AliAnalysisAlien::MergeOutput(outputFile, outputDir, " << fMaxMergeFiles << ", stage);" << endl;
3797 out << " if (!merged) {" << endl;
3798 out << " printf(\"ERROR: Cannot merge %s\\n\", outputFile.Data());" << endl;
3799 out << " return;" << endl;
3800 out << " }" << endl;
3801 out << " }" << endl;
3802 out << " // all outputs merged, validate" << endl;
3803 out << " ofstream out;" << endl;
3804 out << " out.open(\"outputs_valid\", ios::out);" << endl;
3805 out << " out.close();" << endl;
3806 out << " // read the analysis manager from file" << endl;
3807 TString analysisFile = fExecutable;
3808 analysisFile.ReplaceAll(".sh", ".root");
3809 out << " if (!outputDir.Contains(\"Stage\")) return;" << endl;
3810 out << " TFile *file = TFile::Open(\"" << analysisFile << "\");" << endl;
3811 out << " if (!file) return;" << endl;
3812 out << " TIter nextkey(file->GetListOfKeys());" << endl;
3813 out << " AliAnalysisManager *mgr = 0;" << endl;
3814 out << " TKey *key;" << endl;
3815 out << " while ((key=(TKey*)nextkey())) {" << endl;
3816 out << " if (!strcmp(key->GetClassName(), \"AliAnalysisManager\"))" << endl;
3817 out << " mgr = (AliAnalysisManager*)file->Get(key->GetName());" << endl;
3818 out << " };" << endl;
3819 out << " if (!mgr) {" << endl;
3820 out << " ::Error(\"" << func.Data() << "\", \"No analysis manager found in file" << analysisFile <<"\");" << endl;
3821 out << " return;" << endl;
3822 out << " }" << endl << endl;
3823 out << " mgr->SetRunFromPath(mgr->GetRunFromAlienPath(dir));" << endl;
3824 out << " mgr->SetSkipTerminate(kFALSE);" << endl;
3825 out << " mgr->PrintStatus();" << endl;
3826 if (AliAnalysisManager::GetAnalysisManager()) {
3827 if (AliAnalysisManager::GetAnalysisManager()->GetDebugLevel()>3) {
3828 out << " gEnv->SetValue(\"XNet.Debug\", \"1\");" << endl;
3830 if (TestBit(AliAnalysisGrid::kTest))
3831 out << " AliLog::SetGlobalLogLevel(AliLog::kWarning);" << endl;
3833 out << " AliLog::SetGlobalLogLevel(AliLog::kError);" << endl;
3836 out << " TTree *tree = NULL;" << endl;
3837 out << " mgr->StartAnalysis(\"gridterminate\", tree);" << endl;
3838 out << "}" << endl << endl;
3839 if (hasANALYSISalice) {
3840 out <<"//________________________________________________________________________________" << endl;
3841 out << "Bool_t SetupPar(const char *package) {" << endl;
3842 out << "// Compile the package and set it up." << endl;
3843 out << " TString pkgdir = package;" << endl;
3844 out << " pkgdir.ReplaceAll(\".par\",\"\");" << endl;
3845 out << " gSystem->Exec(TString::Format(\"tar xvzf %s.par\", pkgdir.Data()));" << endl;
3846 out << " TString cdir = gSystem->WorkingDirectory();" << endl;
3847 out << " gSystem->ChangeDirectory(pkgdir);" << endl;
3848 out << " // Check for BUILD.sh and execute" << endl;
3849 out << " if (!gSystem->AccessPathName(\"PROOF-INF/BUILD.sh\")) {" << endl;
3850 out << " printf(\"*******************************\\n\");" << endl;
3851 out << " printf(\"*** Building PAR archive ***\\n\");" << endl;
3852 out << " printf(\"*******************************\\n\");" << endl;
3853 out << " if (gSystem->Exec(\"PROOF-INF/BUILD.sh\")) {" << endl;
3854 out << " ::Error(\"SetupPar\", \"Cannot build par archive %s\", pkgdir.Data());" << endl;
3855 out << " gSystem->ChangeDirectory(cdir);" << endl;
3856 out << " return kFALSE;" << endl;
3857 out << " }" << endl;
3858 out << " } else {" << endl;
3859 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/BUILD.sh for package %s\", pkgdir.Data());" << endl;
3860 out << " gSystem->ChangeDirectory(cdir);" << endl;
3861 out << " return kFALSE;" << endl;
3862 out << " }" << endl;
3863 out << " // Check for SETUP.C and execute" << endl;
3864 out << " if (!gSystem->AccessPathName(\"PROOF-INF/SETUP.C\")) {" << endl;
3865 out << " printf(\"*******************************\\n\");" << endl;
3866 out << " printf(\"*** Setup PAR archive ***\\n\");" << endl;
3867 out << " printf(\"*******************************\\n\");" << endl;
3868 out << " gROOT->Macro(\"PROOF-INF/SETUP.C\");" << endl;
3869 out << " } else {" << endl;
3870 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/SETUP.C for package %s\", pkgdir.Data());" << endl;
3871 out << " gSystem->ChangeDirectory(cdir);" << endl;
3872 out << " return kFALSE;" << endl;
3873 out << " }" << endl;
3874 out << " // Restore original workdir" << endl;
3875 out << " gSystem->ChangeDirectory(cdir);" << endl;
3876 out << " return kTRUE;" << endl;
3880 Bool_t copy = kTRUE;
3881 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3884 TString workdir = gGrid->GetHomeDirectory();
3885 workdir += fGridWorkingDir;
3886 if (FileExists(mergingMacro)) gGrid->Rm(mergingMacro);
3887 Info("WriteMergingMacro", "\n##### Copying merging macro: <%s> to your alien workspace", mergingMacro.Data());
3888 TFile::Cp(Form("file:%s",mergingMacro.Data()), Form("alien://%s/%s", workdir.Data(), mergingMacro.Data()));
3892 //______________________________________________________________________________
3893 Bool_t AliAnalysisAlien::SetupPar(const char *package)
3895 // Compile the par file archive pointed by <package>. This must be present in the current directory.
3896 // Note that for loading the compiled library. The current directory should have precedence in
3898 TString pkgdir = package;
3899 pkgdir.ReplaceAll(".par","");
3900 gSystem->Exec(TString::Format("tar xzf %s.par", pkgdir.Data()));
3901 TString cdir = gSystem->WorkingDirectory();
3902 gSystem->ChangeDirectory(pkgdir);
3903 // Check for BUILD.sh and execute
3904 if (!gSystem->AccessPathName("PROOF-INF/BUILD.sh")) {
3905 printf("**************************************************\n");
3906 printf("*** Building PAR archive %s\n", package);
3907 printf("**************************************************\n");
3908 if (gSystem->Exec("PROOF-INF/BUILD.sh")) {
3909 ::Error("SetupPar", "Cannot build par archive %s", pkgdir.Data());
3910 gSystem->ChangeDirectory(cdir);
3914 ::Error("SetupPar","Cannot access PROOF-INF/BUILD.sh for package %s", pkgdir.Data());
3915 gSystem->ChangeDirectory(cdir);
3918 // Check for SETUP.C and execute
3919 if (!gSystem->AccessPathName("PROOF-INF/SETUP.C")) {
3920 printf("**************************************************\n");
3921 printf("*** Setup PAR archive %s\n", package);
3922 printf("**************************************************\n");
3923 gROOT->Macro("PROOF-INF/SETUP.C");
3924 printf("*** Loaded library: %s\n", gSystem->GetLibraries(pkgdir,"",kFALSE));
3926 ::Error("SetupPar","Cannot access PROOF-INF/SETUP.C for package %s", pkgdir.Data());
3927 gSystem->ChangeDirectory(cdir);
3930 // Restore original workdir
3931 gSystem->ChangeDirectory(cdir);
3935 //______________________________________________________________________________
3936 void AliAnalysisAlien::WriteExecutable()
3938 // Generate the alien executable script.
3939 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3941 out.open(fExecutable.Data(), ios::out);
3943 Error("WriteExecutable", "Bad file name for executable: %s", fExecutable.Data());
3946 out << "#!/bin/bash" << endl;
3947 // Make sure we can properly compile par files
3948 out << "export LD_LIBRARY_PATH=.:$LD_LIBRARY_PATH" << endl;
3949 out << "echo \"=========================================\"" << endl;
3950 out << "echo \"############## PATH : ##############\"" << endl;
3951 out << "echo $PATH" << endl;
3952 out << "echo \"############## LD_LIBRARY_PATH : ##############\"" << endl;
3953 out << "echo $LD_LIBRARY_PATH" << endl;
3954 out << "echo \"############## ROOTSYS : ##############\"" << endl;
3955 out << "echo $ROOTSYS" << endl;
3956 out << "echo \"############## which root : ##############\"" << endl;
3957 out << "which root" << endl;
3958 out << "echo \"############## ALICE_ROOT : ##############\"" << endl;
3959 out << "echo $ALICE_ROOT" << endl;
3960 out << "echo \"############## which aliroot : ##############\"" << endl;
3961 out << "which aliroot" << endl;
3962 out << "echo \"############## system limits : ##############\"" << endl;
3963 out << "ulimit -a" << endl;
3964 out << "echo \"############## memory : ##############\"" << endl;
3965 out << "free -m" << endl;
3966 out << "echo \"=========================================\"" << endl << endl;
3967 out << fExecutableCommand << " ";
3968 out << fAnalysisMacro.Data() << " " << fExecutableArgs.Data() << endl << endl;
3969 out << "echo \"======== " << fAnalysisMacro.Data() << " finished with exit code: $? ========\"" << endl;
3970 out << "echo \"############## memory after: ##############\"" << endl;
3971 out << "free -m" << endl;
3973 Bool_t copy = kTRUE;
3974 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3977 TString workdir = gGrid->GetHomeDirectory();
3978 TString bindir = Form("%s/bin", workdir.Data());
3979 if (!DirectoryExists(bindir)) gGrid->Mkdir(bindir,"-p");
3980 workdir += fGridWorkingDir;
3981 TString executable = Form("%s/bin/%s", gGrid->GetHomeDirectory(), fExecutable.Data());
3982 if (FileExists(executable)) gGrid->Rm(executable);
3983 Info("WriteExecutable", "\n##### Copying executable file <%s> to your AliEn bin directory", fExecutable.Data());
3984 TFile::Cp(Form("file:%s",fExecutable.Data()), Form("alien://%s", executable.Data()));
3988 //______________________________________________________________________________
3989 void AliAnalysisAlien::WriteMergeExecutable()
3991 // Generate the alien executable script for the merging job.
3992 if (!fMergeViaJDL) return;
3993 TString mergeExec = fExecutable;
3994 mergeExec.ReplaceAll(".sh", "_merge.sh");
3995 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3997 out.open(mergeExec.Data(), ios::out);
3999 Error("WriteMergingExecutable", "Bad file name for executable: %s", mergeExec.Data());
4002 out << "#!/bin/bash" << endl;
4003 // Make sure we can properly compile par files
4004 out << "export LD_LIBRARY_PATH=.:$LD_LIBRARY_PATH" << endl;
4005 out << "echo \"=========================================\"" << endl;
4006 out << "echo \"############## PATH : ##############\"" << endl;
4007 out << "echo $PATH" << endl;
4008 out << "echo \"############## LD_LIBRARY_PATH : ##############\"" << endl;
4009 out << "echo $LD_LIBRARY_PATH" << endl;
4010 out << "echo \"############## ROOTSYS : ##############\"" << endl;
4011 out << "echo $ROOTSYS" << endl;
4012 out << "echo \"############## which root : ##############\"" << endl;
4013 out << "which root" << endl;
4014 out << "echo \"############## ALICE_ROOT : ##############\"" << endl;
4015 out << "echo $ALICE_ROOT" << endl;
4016 out << "echo \"############## which aliroot : ##############\"" << endl;
4017 out << "which aliroot" << endl;
4018 out << "echo \"############## system limits : ##############\"" << endl;
4019 out << "ulimit -a" << endl;
4020 out << "echo \"############## memory : ##############\"" << endl;
4021 out << "free -m" << endl;
4022 out << "echo \"=========================================\"" << endl << endl;
4023 TString mergeMacro = fExecutable;
4024 mergeMacro.ReplaceAll(".sh", "_merge.C");
4025 if (IsOneStageMerging())
4026 out << "export ARG=\"" << mergeMacro << "(\\\"$1\\\")\"" << endl;
4028 out << "export ARG=\"" << mergeMacro << "(\\\"$1\\\",$2)\"" << endl;
4029 out << fExecutableCommand << " " << "$ARG" << endl;
4030 out << "echo \"======== " << mergeMacro.Data() << " finished with exit code: $? ========\"" << endl;
4031 out << "echo \"############## memory after: ##############\"" << endl;
4032 out << "free -m" << endl;
4034 Bool_t copy = kTRUE;
4035 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
4038 TString workdir = gGrid->GetHomeDirectory();
4039 TString bindir = Form("%s/bin", workdir.Data());
4040 if (!DirectoryExists(bindir)) gGrid->Mkdir(bindir,"-p");
4041 workdir += fGridWorkingDir;
4042 TString executable = Form("%s/bin/%s", gGrid->GetHomeDirectory(), mergeExec.Data());
4043 if (FileExists(executable)) gGrid->Rm(executable);
4044 Info("WriteMergeExecutable", "\n##### Copying executable file <%s> to your AliEn bin directory", mergeExec.Data());
4045 TFile::Cp(Form("file:%s",mergeExec.Data()), Form("alien://%s", executable.Data()));
4049 //______________________________________________________________________________
4050 void AliAnalysisAlien::WriteProductionFile(const char *filename) const
4052 // Write the production file to be submitted by LPM manager. The format is:
4053 // First line: full_path_to_jdl estimated_no_subjobs_per_master
4054 // Next lines: full_path_to_dataset XXX (XXX is a string)
4055 // To submit, one has to: submit jdl XXX for all lines
4057 out.open(filename, ios::out);
4059 Error("WriteProductionFile", "Bad file name: %s", filename);
4063 if (!fProductionMode && !fGridWorkingDir.BeginsWith("/alice"))
4064 workdir = gGrid->GetHomeDirectory();
4065 workdir += fGridWorkingDir;
4066 Int_t njobspermaster = 1000*fNrunsPerMaster/fSplitMaxInputFileNumber;
4067 TString locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
4068 out << locjdl << " " << njobspermaster << endl;
4069 Int_t nmasterjobs = fInputFiles->GetEntries();
4070 for (Int_t i=0; i<nmasterjobs; i++) {
4071 TString runOutDir = gSystem->BaseName(fInputFiles->At(i)->GetName());
4072 runOutDir.ReplaceAll(".xml", "");
4074 out << Form("%s", fInputFiles->At(i)->GetName()) << " " << runOutDir << endl;
4076 out << Form("%s", fInputFiles->At(i)->GetName()) << " " << Form("%03d", i) << endl;
4079 Info("WriteProductionFile", "\n##### Copying production file <%s> to your work directory", filename);
4080 if (FileExists(filename)) gGrid->Rm(filename);
4081 TFile::Cp(Form("file:%s",filename), Form("alien://%s/%s", workdir.Data(),filename));
4085 //______________________________________________________________________________
4086 void AliAnalysisAlien::WriteValidationScript(Bool_t merge)
4088 // Generate the alien validation script.
4089 // Generate the validation script
4091 if (fValidationScript.IsNull()) {
4092 fValidationScript = fExecutable;
4093 fValidationScript.ReplaceAll(".sh", "_validation.sh");
4095 TString validationScript = fValidationScript;
4096 if (merge) validationScript.ReplaceAll(".sh", "_merge.sh");
4098 Error("WriteValidationScript", "Alien connection required");
4101 if (!fTerminateFiles.IsNull()) {
4102 fTerminateFiles.Strip();
4103 fTerminateFiles.ReplaceAll(" ",",");
4105 TString outStream = "";
4106 if (!TestBit(AliAnalysisGrid::kTest)) outStream = " >> stdout";
4107 if (!TestBit(AliAnalysisGrid::kSubmit)) {
4109 out.open(validationScript, ios::out);
4110 out << "#!/bin/bash" << endl;
4111 out << "##################################################" << endl;
4112 out << "validateout=`dirname $0`" << endl;
4113 out << "validatetime=`date`" << endl;
4114 out << "validated=\"0\";" << endl;
4115 out << "error=0" << endl;
4116 out << "if [ -z $validateout ]" << endl;
4117 out << "then" << endl;
4118 out << " validateout=\".\"" << endl;
4119 out << "fi" << endl << endl;
4120 out << "cd $validateout;" << endl;
4121 out << "validateworkdir=`pwd`;" << endl << endl;
4122 out << "echo \"*******************************************************\"" << outStream << endl;
4123 out << "echo \"* Automatically generated validation script *\"" << outStream << endl;
4125 out << "echo \"* Time: $validatetime \"" << outStream << endl;
4126 out << "echo \"* Dir: $validateout\"" << outStream << endl;
4127 out << "echo \"* Workdir: $validateworkdir\"" << outStream << endl;
4128 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl;
4129 out << "ls -la ./" << outStream << endl;
4130 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl << endl;
4131 out << "##################################################" << endl;
4134 out << "if [ ! -f stderr ] ; then" << endl;
4135 out << " error=1" << endl;
4136 out << " echo \"* ########## Job not validated - no stderr ###\" " << outStream << endl;
4137 out << " echo \"Error = $error\" " << outStream << endl;
4138 out << "fi" << endl;
4140 out << "parArch=`grep -Ei \"Cannot Build the PAR Archive\" stderr`" << endl;
4141 out << "segViol=`grep -Ei \"Segmentation violation\" stderr`" << endl;
4142 out << "segFault=`grep -Ei \"Segmentation fault\" stderr`" << endl;
4143 out << "glibcErr=`grep -Ei \"*** glibc detected ***\" stderr`" << endl;
4146 out << "if [ \"$parArch\" != \"\" ] ; then" << endl;
4147 out << " error=1" << endl;
4148 out << " echo \"* ########## Job not validated - PAR archive not built ###\" " << outStream << endl;
4149 out << " echo \"$parArch\" " << outStream << endl;
4150 out << " echo \"Error = $error\" " << outStream << endl;
4151 out << "fi" << endl;
4153 out << "if [ \"$segViol\" != \"\" ] ; then" << endl;
4154 out << " error=1" << endl;
4155 out << " echo \"* ########## Job not validated - Segment. violation ###\" " << outStream << endl;
4156 out << " echo \"$segViol\" " << outStream << endl;
4157 out << " echo \"Error = $error\" " << outStream << endl;
4158 out << "fi" << endl;
4160 out << "if [ \"$segFault\" != \"\" ] ; then" << endl;
4161 out << " error=1" << endl;
4162 out << " echo \"* ########## Job not validated - Segment. fault ###\" " << outStream << endl;
4163 out << " echo \"$segFault\" " << outStream << endl;
4164 out << " echo \"Error = $error\" " << outStream << endl;
4165 out << "fi" << endl;
4167 out << "if [ \"$glibcErr\" != \"\" ] ; then" << endl;
4168 out << " error=1" << endl;
4169 out << " echo \"* ########## Job not validated - *** glibc detected *** ###\" " << outStream << endl;
4170 out << " echo \"$glibcErr\" " << outStream << endl;
4171 out << " echo \"Error = $error\" " << outStream << endl;
4172 out << "fi" << endl;
4174 // Part dedicated to the specific analyses running into the train
4176 TString outputFiles = fOutputFiles;
4177 if (merge && !fTerminateFiles.IsNull()) {
4179 outputFiles += fTerminateFiles;
4181 TObjArray *arr = outputFiles.Tokenize(",");
4184 while (!merge && (os=(TObjString*)next1())) {
4185 // No need to validate outputs produced by merging since the merging macro does this
4186 outputFile = os->GetString();
4187 Int_t index = outputFile.Index("@");
4188 if (index > 0) outputFile.Remove(index);
4189 if (fTerminateFiles.Contains(outputFile)) continue;
4190 if (outputFile.Contains("*")) continue;
4191 out << "if ! [ -f " << outputFile.Data() << " ] ; then" << endl;
4192 out << " error=1" << endl;
4193 out << " echo \"Output file " << outputFile << " not found. Job FAILED !\"" << outStream << endl;
4194 out << " echo \"Output file " << outputFile << " not found. Job FAILED !\" >> stderr" << endl;
4195 out << "fi" << endl;
4198 out << "if ! [ -f outputs_valid ] ; then" << endl;
4199 out << " error=1" << endl;
4200 out << " echo \"Output files were not validated by the analysis manager\" >> stdout" << endl;
4201 out << " echo \"Output files were not validated by the analysis manager\" >> stderr" << endl;
4202 out << "fi" << endl;
4204 out << "if [ $error = 0 ] ; then" << endl;
4205 out << " echo \"* ---------------- Job Validated ------------------*\"" << outStream << endl;
4206 if (!IsKeepLogs()) {
4207 out << " echo \"* === Logs std* will be deleted === \"" << endl;
4209 out << " rm -f std*" << endl;
4211 out << "fi" << endl;
4213 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl;
4214 out << "echo \"*******************************************************\"" << outStream << endl;
4215 out << "cd -" << endl;
4216 out << "exit $error" << endl;
4218 Bool_t copy = kTRUE;
4219 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
4222 TString workdir = gGrid->GetHomeDirectory();
4223 workdir += fGridWorkingDir;
4224 Info("WriteValidationScript", "\n##### Copying validation script <%s> to your AliEn working space", validationScript.Data());
4225 if (FileExists(validationScript)) gGrid->Rm(validationScript);
4226 TFile::Cp(Form("file:%s",validationScript.Data()), Form("alien://%s/%s", workdir.Data(),validationScript.Data()));