1 /**************************************************************************
2 * Copyright(c) 1998-2007, ALICE Experiment at CERN, All rights reserved. *
4 * Author: The ALICE Off-line Project. *
5 * Contributors are mentioned in the code where appropriate. *
7 * Permission to use, copy, modify and distribute this software and its *
8 * documentation strictly for non-commercial purposes is hereby granted *
9 * without fee, provided that the above copyright notice appears in all *
10 * copies and that both the copyright notice and this permission notice *
11 * appear in the supporting documentation. The authors make no claims *
12 * about the suitability of this software for any purpose. It is *
13 * provided "as is" without express or implied warranty. *
14 **************************************************************************/
16 // Author: Mihaela Gheata, 01/09/2008
18 //==============================================================================
19 // AliAnalysisAlien - AliEn utility class. Provides interface for creating
20 // a personalized JDL, finding and creating a dataset.
21 //==============================================================================
23 #include "AliAnalysisAlien.h"
25 #include "Riostream.h"
33 #include "TFileCollection.h"
35 #include "TObjString.h"
36 #include "TObjArray.h"
38 #include "TGridResult.h"
39 #include "TGridCollection.h"
41 #include "TGridJobStatusList.h"
42 #include "TGridJobStatus.h"
43 #include "TFileMerger.h"
44 #include "AliAnalysisManager.h"
45 #include "AliVEventHandler.h"
46 #include "AliAnalysisDataContainer.h"
47 #include "AliMultiInputEventHandler.h"
49 ClassImp(AliAnalysisAlien)
51 //______________________________________________________________________________
52 AliAnalysisAlien::AliAnalysisAlien()
58 fSplitMaxInputFileNumber(0),
60 fMasterResubmitThreshold(0),
73 fNproofWorkersPerSlave(0),
83 fAdditionalRootLibs(),
111 fRootVersionForProof(),
122 //______________________________________________________________________________
123 AliAnalysisAlien::AliAnalysisAlien(const char *name)
124 :AliAnalysisGrid(name),
129 fSplitMaxInputFileNumber(0),
131 fMasterResubmitThreshold(0),
144 fNproofWorkersPerSlave(0),
148 fExecutableCommand(),
154 fAdditionalRootLibs(),
182 fRootVersionForProof(),
193 //______________________________________________________________________________
194 AliAnalysisAlien::AliAnalysisAlien(const AliAnalysisAlien& other)
195 :AliAnalysisGrid(other),
198 fPrice(other.fPrice),
200 fSplitMaxInputFileNumber(other.fSplitMaxInputFileNumber),
201 fMaxInitFailed(other.fMaxInitFailed),
202 fMasterResubmitThreshold(other.fMasterResubmitThreshold),
203 fNtestFiles(other.fNtestFiles),
204 fNrunsPerMaster(other.fNrunsPerMaster),
205 fMaxMergeFiles(other.fMaxMergeFiles),
206 fMaxMergeStages(other.fMaxMergeStages),
207 fNsubmitted(other.fNsubmitted),
208 fProductionMode(other.fProductionMode),
209 fOutputToRunNo(other.fOutputToRunNo),
210 fMergeViaJDL(other.fMergeViaJDL),
211 fFastReadOption(other.fFastReadOption),
212 fOverwriteMode(other.fOverwriteMode),
213 fNreplicas(other.fNreplicas),
214 fNproofWorkers(other.fNproofWorkers),
215 fNproofWorkersPerSlave(other.fNproofWorkersPerSlave),
216 fProofReset(other.fProofReset),
217 fRunNumbers(other.fRunNumbers),
218 fExecutable(other.fExecutable),
219 fExecutableCommand(other.fExecutableCommand),
220 fArguments(other.fArguments),
221 fExecutableArgs(other.fExecutableArgs),
222 fAnalysisMacro(other.fAnalysisMacro),
223 fAnalysisSource(other.fAnalysisSource),
224 fValidationScript(other.fValidationScript),
225 fAdditionalRootLibs(other.fAdditionalRootLibs),
226 fAdditionalLibs(other.fAdditionalLibs),
227 fSplitMode(other.fSplitMode),
228 fAPIVersion(other.fAPIVersion),
229 fROOTVersion(other.fROOTVersion),
230 fAliROOTVersion(other.fAliROOTVersion),
231 fExternalPackages(other.fExternalPackages),
233 fGridWorkingDir(other.fGridWorkingDir),
234 fGridDataDir(other.fGridDataDir),
235 fDataPattern(other.fDataPattern),
236 fGridOutputDir(other.fGridOutputDir),
237 fOutputArchive(other.fOutputArchive),
238 fOutputFiles(other.fOutputFiles),
239 fInputFormat(other.fInputFormat),
240 fDatasetName(other.fDatasetName),
241 fJDLName(other.fJDLName),
242 fTerminateFiles(other.fTerminateFiles),
243 fMergeExcludes(other.fMergeExcludes),
244 fIncludePath(other.fIncludePath),
245 fCloseSE(other.fCloseSE),
246 fFriendChainName(other.fFriendChainName),
247 fJobTag(other.fJobTag),
248 fOutputSingle(other.fOutputSingle),
249 fRunPrefix(other.fRunPrefix),
250 fProofCluster(other.fProofCluster),
251 fProofDataSet(other.fProofDataSet),
252 fFileForTestMode(other.fFileForTestMode),
253 fRootVersionForProof(other.fRootVersionForProof),
254 fAliRootMode(other.fAliRootMode),
255 fMergeDirName(other.fMergeDirName),
261 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
262 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
263 fRunRange[0] = other.fRunRange[0];
264 fRunRange[1] = other.fRunRange[1];
265 if (other.fInputFiles) {
266 fInputFiles = new TObjArray();
267 TIter next(other.fInputFiles);
269 while ((obj=next())) fInputFiles->Add(new TObjString(obj->GetName()));
270 fInputFiles->SetOwner();
272 if (other.fPackages) {
273 fPackages = new TObjArray();
274 TIter next(other.fPackages);
276 while ((obj=next())) fPackages->Add(new TObjString(obj->GetName()));
277 fPackages->SetOwner();
281 //______________________________________________________________________________
282 AliAnalysisAlien::~AliAnalysisAlien()
285 if (fGridJDL) delete fGridJDL;
286 if (fMergingJDL) delete fMergingJDL;
287 if (fInputFiles) delete fInputFiles;
288 if (fPackages) delete fPackages;
289 fProofParam.DeleteAll();
292 //______________________________________________________________________________
293 AliAnalysisAlien &AliAnalysisAlien::operator=(const AliAnalysisAlien& other)
296 if (this != &other) {
297 AliAnalysisGrid::operator=(other);
298 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
299 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
300 fPrice = other.fPrice;
302 fSplitMaxInputFileNumber = other.fSplitMaxInputFileNumber;
303 fMaxInitFailed = other.fMaxInitFailed;
304 fMasterResubmitThreshold = other.fMasterResubmitThreshold;
305 fNtestFiles = other.fNtestFiles;
306 fNrunsPerMaster = other.fNrunsPerMaster;
307 fMaxMergeFiles = other.fMaxMergeFiles;
308 fMaxMergeStages = other.fMaxMergeStages;
309 fNsubmitted = other.fNsubmitted;
310 fProductionMode = other.fProductionMode;
311 fOutputToRunNo = other.fOutputToRunNo;
312 fMergeViaJDL = other.fMergeViaJDL;
313 fFastReadOption = other.fFastReadOption;
314 fOverwriteMode = other.fOverwriteMode;
315 fNreplicas = other.fNreplicas;
316 fNproofWorkers = other.fNproofWorkers;
317 fNproofWorkersPerSlave = other.fNproofWorkersPerSlave;
318 fProofReset = other.fProofReset;
319 fRunNumbers = other.fRunNumbers;
320 fExecutable = other.fExecutable;
321 fExecutableCommand = other.fExecutableCommand;
322 fArguments = other.fArguments;
323 fExecutableArgs = other.fExecutableArgs;
324 fAnalysisMacro = other.fAnalysisMacro;
325 fAnalysisSource = other.fAnalysisSource;
326 fValidationScript = other.fValidationScript;
327 fAdditionalRootLibs = other.fAdditionalRootLibs;
328 fAdditionalLibs = other.fAdditionalLibs;
329 fSplitMode = other.fSplitMode;
330 fAPIVersion = other.fAPIVersion;
331 fROOTVersion = other.fROOTVersion;
332 fAliROOTVersion = other.fAliROOTVersion;
333 fExternalPackages = other.fExternalPackages;
335 fGridWorkingDir = other.fGridWorkingDir;
336 fGridDataDir = other.fGridDataDir;
337 fDataPattern = other.fDataPattern;
338 fGridOutputDir = other.fGridOutputDir;
339 fOutputArchive = other.fOutputArchive;
340 fOutputFiles = other.fOutputFiles;
341 fInputFormat = other.fInputFormat;
342 fDatasetName = other.fDatasetName;
343 fJDLName = other.fJDLName;
344 fTerminateFiles = other.fTerminateFiles;
345 fMergeExcludes = other.fMergeExcludes;
346 fIncludePath = other.fIncludePath;
347 fCloseSE = other.fCloseSE;
348 fFriendChainName = other.fFriendChainName;
349 fJobTag = other.fJobTag;
350 fOutputSingle = other.fOutputSingle;
351 fRunPrefix = other.fRunPrefix;
352 fProofCluster = other.fProofCluster;
353 fProofDataSet = other.fProofDataSet;
354 fFileForTestMode = other.fFileForTestMode;
355 fRootVersionForProof = other.fRootVersionForProof;
356 fAliRootMode = other.fAliRootMode;
357 fMergeDirName = other.fMergeDirName;
358 if (other.fInputFiles) {
359 fInputFiles = new TObjArray();
360 TIter next(other.fInputFiles);
362 while ((obj=next())) fInputFiles->Add(new TObjString(obj->GetName()));
363 fInputFiles->SetOwner();
365 if (other.fPackages) {
366 fPackages = new TObjArray();
367 TIter next(other.fPackages);
369 while ((obj=next())) fPackages->Add(new TObjString(obj->GetName()));
370 fPackages->SetOwner();
376 //______________________________________________________________________________
377 void AliAnalysisAlien::SetRunPrefix(const char *prefix)
379 // Set the run number format. Can be a prefix or a format like "%09d"
381 if (!fRunPrefix.Contains("%")) fRunPrefix += "%d";
384 //______________________________________________________________________________
385 void AliAnalysisAlien::AddIncludePath(const char *path)
387 // Add include path in the remote analysis macro.
389 if (p.Contains("-I")) fIncludePath += Form("%s ", path);
390 else fIncludePath += Form("-I%s ", path);
393 //______________________________________________________________________________
394 void AliAnalysisAlien::AddRunNumber(Int_t run)
396 // Add a run number to the list of runs to be processed.
397 if (fRunNumbers.Length()) fRunNumbers += " ";
398 fRunNumbers += Form(fRunPrefix.Data(), run);
401 //______________________________________________________________________________
402 void AliAnalysisAlien::AddRunList(const char* runList)
404 // Add several runs into the list of runs; they are expected to be separated by a blank character.
405 TString sList = runList;
406 TObjArray *list = sList.Tokenize(" ");
407 Int_t n = list->GetEntries();
408 for (Int_t i = 0; i < n; i++) {
409 TObjString *os = (TObjString*)list->At(i);
410 AddRunNumber(os->GetString().Atoi());
415 //______________________________________________________________________________
416 void AliAnalysisAlien::AddRunNumber(const char* run)
418 // Add a run number to the list of runs to be processed.
421 TObjArray *arr = runs.Tokenize(" ");
424 prefix.Append(fRunPrefix, fRunPrefix.Index("%d"));
425 while ((os=(TObjString*)next())){
426 if (fRunNumbers.Length()) fRunNumbers += " ";
427 fRunNumbers += Form("%s%s", prefix.Data(), os->GetString().Data());
432 //______________________________________________________________________________
433 void AliAnalysisAlien::AddDataFile(const char *lfn)
435 // Adds a data file to the input to be analysed. The file should be a valid LFN
436 // or point to an existing file in the alien workdir.
437 if (!fInputFiles) fInputFiles = new TObjArray();
438 fInputFiles->Add(new TObjString(lfn));
441 //______________________________________________________________________________
442 void AliAnalysisAlien::AddExternalPackage(const char *package)
444 // Adds external packages w.r.t to the default ones (root,aliroot and gapi)
445 if (fExternalPackages) fExternalPackages += " ";
446 fExternalPackages += package;
449 //______________________________________________________________________________
450 Bool_t AliAnalysisAlien::Connect()
452 // Try to connect to AliEn. User needs a valid token and /tmp/gclient_env_$UID sourced.
453 if (gGrid && gGrid->IsConnected()) return kTRUE;
454 if (fProductionMode) return kTRUE;
456 Info("Connect", "Trying to connect to AliEn ...");
457 TGrid::Connect("alien://");
459 if (!gGrid || !gGrid->IsConnected()) {
460 Error("Connect", "Did not managed to connect to AliEn. Make sure you have a valid token.");
463 fUser = gGrid->GetUser();
464 Info("Connect", "\n##### Connected to AliEn as user %s. Setting analysis user to <%s>", fUser.Data(), fUser.Data());
468 //______________________________________________________________________________
469 void AliAnalysisAlien::CdWork()
471 // Check validity of alien workspace. Create directory if possible.
473 Error("CdWork", "Alien connection required");
476 TString homedir = gGrid->GetHomeDirectory();
477 TString workdir = homedir + fGridWorkingDir;
478 if (DirectoryExists(workdir)) {
482 // Work directory not existing - create it
484 if (gGrid->Mkdir(workdir, "-p")) {
485 gGrid->Cd(fGridWorkingDir);
486 Info("CdWork", "\n##### Created alien working directory %s", fGridWorkingDir.Data());
488 Warning("CdWork", "Working directory %s cannot be created.\n Using %s instead.",
489 workdir.Data(), homedir.Data());
490 fGridWorkingDir = "";
494 //______________________________________________________________________________
495 Bool_t AliAnalysisAlien::CheckFileCopy(const char *alienpath)
497 // Check if file copying is possible.
498 if (fProductionMode) return kTRUE;
500 Error("CheckFileCopy", "Not connected to AliEn. File copying cannot be tested.");
503 Info("CheckFileCopy", "Checking possibility to copy files to your AliEn home directory... \
504 \n +++ NOTE: You can disable this via: plugin->SetCheckCopy(kFALSE);");
505 // Check if alien_CLOSE_SE is defined
506 TString closeSE = gSystem->Getenv("alien_CLOSE_SE");
507 if (!closeSE.IsNull()) {
508 Info("CheckFileCopy", "Your current close storage is pointing to: \
509 \n alien_CLOSE_SE = \"%s\"", closeSE.Data());
511 Warning("CheckFileCopy", "Your current close storage is empty ! Depending on your location, file copying may fail.");
513 // Check if grid directory exists.
514 if (!DirectoryExists(alienpath)) {
515 Error("CheckFileCopy", "Alien path %s does not seem to exist", alienpath);
518 TFile f("plugin_test_copy", "RECREATE");
519 // User may not have write permissions to current directory
521 Error("CheckFileCopy", "Cannot create local test file. Do you have write access to current directory: <%s> ?",
522 gSystem->WorkingDirectory());
526 if (FileExists(Form("alien://%s/%s",alienpath, f.GetName()))) gGrid->Rm(Form("alien://%s/%s",alienpath, f.GetName()));
527 if (!TFile::Cp(f.GetName(), Form("alien://%s/%s",alienpath, f.GetName()))) {
528 Error("CheckFileCopy", "Cannot copy files to Alien destination: <%s> This may be temporary, or: \
529 \n# 1. Make sure you have write permissions there. If this is the case: \
530 \n# 2. Check the storage availability at: http://alimonitor.cern.ch/stats?page=SE/table \
531 \n# Do: export alien_CLOSE_SE=\"working_disk_SE\" \
532 \n# To make this permanent put in in your .bashrc (in .alienshrc is not enough) \
533 \n# Redo token: rm /tmp/x509up_u$UID then: alien-token-init <username>", alienpath);
534 gSystem->Unlink(f.GetName());
537 gSystem->Unlink(f.GetName());
538 gGrid->Rm(Form("%s%s",alienpath,f.GetName()));
539 Info("CheckFileCopy", "### ...SUCCESS ###");
543 //______________________________________________________________________________
544 Bool_t AliAnalysisAlien::CheckInputData()
546 // Check validity of input data. If necessary, create xml files.
547 if (fProductionMode) return kTRUE;
548 if (!fInputFiles && !fRunNumbers.Length() && !fRunRange[0]) {
549 if (!fGridDataDir.Length()) {
550 Error("CkeckInputData", "AliEn path to base data directory must be set.\n = Use: SetGridDataDir()");
554 Error("CheckInputData", "Merging via jdl works only with run numbers, run range or provided xml");
557 Info("CheckInputData", "Analysis will make a single xml for base data directory %s",fGridDataDir.Data());
558 if (fDataPattern.Contains("tag") && TestBit(AliAnalysisGrid::kTest))
559 TObject::SetBit(AliAnalysisGrid::kUseTags, kTRUE); // ADDED (fix problem in determining the tag usage in test mode)
562 // Process declared files
563 Bool_t isCollection = kFALSE;
564 Bool_t isXml = kFALSE;
565 Bool_t useTags = kFALSE;
566 Bool_t checked = kFALSE;
567 if (!TestBit(AliAnalysisGrid::kTest)) CdWork();
569 TString workdir = gGrid->GetHomeDirectory();
570 workdir += fGridWorkingDir;
573 TIter next(fInputFiles);
574 while ((objstr=(TObjString*)next())) {
577 file += objstr->GetString();
578 // Store full lfn path
579 if (FileExists(file)) objstr->SetString(file);
581 file = objstr->GetName();
582 if (!FileExists(objstr->GetName())) {
583 Error("CheckInputData", "Data file %s not found or not in your working dir: %s",
584 objstr->GetName(), workdir.Data());
588 Bool_t iscoll, isxml, usetags;
589 CheckDataType(file, iscoll, isxml, usetags);
592 isCollection = iscoll;
595 TObject::SetBit(AliAnalysisGrid::kUseTags, useTags);
597 if ((iscoll != isCollection) || (isxml != isXml) || (usetags != useTags)) {
598 Error("CheckInputData", "Some conflict was found in the types of inputs");
604 // Process requested run numbers
605 if (!fRunNumbers.Length() && !fRunRange[0]) return kTRUE;
606 // Check validity of alien data directory
607 if (!fGridDataDir.Length()) {
608 Error("CkeckInputData", "AliEn path to base data directory must be set.\n = Use: SetGridDataDir()");
611 if (!DirectoryExists(fGridDataDir)) {
612 Error("CheckInputData", "Data directory %s not existing.", fGridDataDir.Data());
616 Error("CheckInputData", "You are using raw AliEn collections as input. Cannot process run numbers.");
620 if (checked && !isXml) {
621 Error("CheckInputData", "Cannot mix processing of full runs with non-xml files");
624 // Check validity of run number(s)
629 TString schunk, schunk2;
633 useTags = fDataPattern.Contains("tag");
634 TObject::SetBit(AliAnalysisGrid::kUseTags, useTags);
636 if (useTags != fDataPattern.Contains("tag")) {
637 Error("CheckInputData", "Cannot mix input files using/not using tags");
640 if (fRunNumbers.Length()) {
641 Info("CheckDataType", "Using supplied run numbers (run ranges are ignored)");
642 arr = fRunNumbers.Tokenize(" ");
644 while ((os=(TObjString*)next())) {
645 path = Form("%s/%s ", fGridDataDir.Data(), os->GetString().Data());
646 if (!DirectoryExists(path)) {
647 Warning("CheckInputData", "Run number %s not found in path: <%s>", os->GetString().Data(), path.Data());
650 path = Form("%s/%s.xml", workdir.Data(),os->GetString().Data());
651 TString msg = "\n##### file: ";
653 msg += " type: xml_collection;";
654 if (useTags) msg += " using_tags: Yes";
655 else msg += " using_tags: No";
656 Info("CheckDataType", "%s", msg.Data());
657 if (fNrunsPerMaster<2) {
658 AddDataFile(Form("%s.xml", os->GetString().Data()));
661 if (((nruns-1)%fNrunsPerMaster) == 0) {
662 schunk = os->GetString();
664 if ((nruns%fNrunsPerMaster)!=0 && os!=arr->Last()) continue;
665 schunk += Form("_%s.xml", os->GetString().Data());
671 Info("CheckDataType", "Using run range [%d, %d]", fRunRange[0], fRunRange[1]);
672 for (Int_t irun=fRunRange[0]; irun<=fRunRange[1]; irun++) {
673 format = Form("%%s/%s ", fRunPrefix.Data());
674 path = Form(format.Data(), fGridDataDir.Data(), irun);
675 if (!DirectoryExists(path)) {
678 format = Form("%%s/%s.xml", fRunPrefix.Data());
679 path = Form(format.Data(), workdir.Data(),irun);
680 TString msg = "\n##### file: ";
682 msg += " type: xml_collection;";
683 if (useTags) msg += " using_tags: Yes";
684 else msg += " using_tags: No";
685 Info("CheckDataType", "%s", msg.Data());
686 if (fNrunsPerMaster<2) {
687 format = Form("%s.xml", fRunPrefix.Data());
688 AddDataFile(Form(format.Data(),irun));
691 if (((nruns-1)%fNrunsPerMaster) == 0) {
692 schunk = Form(fRunPrefix.Data(),irun);
694 format = Form("_%s.xml", fRunPrefix.Data());
695 schunk2 = Form(format.Data(), irun);
696 if ((nruns%fNrunsPerMaster)!=0 && irun != fRunRange[1]) continue;
709 //______________________________________________________________________________
710 Bool_t AliAnalysisAlien::CreateDataset(const char *pattern)
712 // Create dataset for the grid data directory + run number.
713 const Int_t gMaxEntries = 15000;
714 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline)) return kTRUE;
716 Error("CreateDataset", "Cannot create dataset with no grid connection");
721 if (!TestBit(AliAnalysisGrid::kTest)) CdWork();
722 TString workdir = gGrid->GetHomeDirectory();
723 workdir += fGridWorkingDir;
725 // Compose the 'find' command arguments
728 TString options = "-x collection ";
729 if (TestBit(AliAnalysisGrid::kTest)) options += Form("-l %d ", fNtestFiles);
730 else options += Form("-l %d ", gMaxEntries); // Protection for the find command
731 TString conditions = "";
738 TString schunk, schunk2;
739 TGridCollection *cbase=0, *cadd=0;
740 if (!fRunNumbers.Length() && !fRunRange[0]) {
741 if (fInputFiles && fInputFiles->GetEntries()) return kTRUE;
742 // Make a single data collection from data directory.
744 if (!DirectoryExists(path)) {
745 Error("CreateDataset", "Path to data directory %s not valid",fGridDataDir.Data());
749 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
750 else file = Form("%s.xml", gSystem->BaseName(path));
754 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
756 command += Form("%s -o %d ",options.Data(), nstart);
760 command += conditions;
761 printf("command: %s\n", command.Data());
762 TGridResult *res = gGrid->Command(command);
764 // Write standard output to file
765 gROOT->ProcessLine(Form("gGrid->Stdout(); > __tmp%d__%s", stage, file.Data()));
766 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
767 Bool_t nullFile = kFALSE;
769 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
771 nullFile = (gSystem->Exec(Form("grep -c /event __tmp%d__%s 2>/dev/null > __tmp__",stage,file.Data()))==0)?kFALSE:kTRUE;
773 Error("CreateDataset","Dataset %s produced by the previous find command is empty !", file.Data());
774 gSystem->Exec("rm -f __tmp*");
782 gSystem->Exec("rm -f __tmp__");
783 ncount = line.Atoi();
786 if (ncount == gMaxEntries) {
787 Info("CreateDataset", "Dataset %s has more than 15K entries. Trying to merge...", file.Data());
788 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
789 if (!cbase) cbase = cadd;
797 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
798 printf("... please wait - TAlienCollection::Add() scales badly...\n");
801 cbase->ExportXML(Form("file://%s", file.Data()),kFALSE,kFALSE, file, "Merged entries for a run");
802 delete cbase; cbase = 0;
804 TFile::Cp(Form("__tmp%d__%s",stage, file.Data()), file.Data());
806 gSystem->Exec("rm -f __tmp*");
807 Info("CreateDataset", "Created dataset %s with %d files", file.Data(), nstart+ncount);
811 Bool_t fileExists = FileExists(file);
812 if (!TestBit(AliAnalysisGrid::kTest) && (!fileExists || fOverwriteMode)) {
813 // Copy xml file to alien space
814 if (fileExists) gGrid->Rm(file);
815 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
816 if (!FileExists(file)) {
817 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
820 // Update list of files to be processed.
822 AddDataFile(Form("%s/%s", workdir.Data(), file.Data()));
826 Bool_t nullResult = kTRUE;
827 if (fRunNumbers.Length()) {
828 TObjArray *arr = fRunNumbers.Tokenize(" ");
831 while ((os=(TObjString*)next())) {
834 path = Form("%s/%s/ ", fGridDataDir.Data(), os->GetString().Data());
835 if (!DirectoryExists(path)) continue;
837 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
838 else file = Form("%s.xml", os->GetString().Data());
839 // If local collection file does not exist, create it via 'find' command.
843 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
845 command += Form("%s -o %d ",options.Data(), nstart);
848 command += conditions;
849 TGridResult *res = gGrid->Command(command);
851 // Write standard output to file
852 gROOT->ProcessLine(Form("gGrid->Stdout(); > __tmp%d__%s", stage,file.Data()));
853 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
854 Bool_t nullFile = kFALSE;
856 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
858 nullFile = (gSystem->Exec(Form("grep -c /event __tmp%d__%s 2>/dev/null > __tmp__",stage,file.Data()))==0)?kFALSE:kTRUE;
860 Warning("CreateDataset","Dataset %s produced by: <%s> is empty !", file.Data(), command.Data());
861 gSystem->Exec("rm -f __tmp*");
862 fRunNumbers.ReplaceAll(os->GetString().Data(), "");
870 gSystem->Exec("rm -f __tmp__");
871 ncount = line.Atoi();
875 if (ncount == gMaxEntries) {
876 Info("CreateDataset", "Dataset %s has more than 15K entries. Trying to merge...", file.Data());
877 if (fNrunsPerMaster > 1) {
878 Error("CreateDataset", "File %s has more than %d entries. Please set the number of runs per master to 1 !",
879 file.Data(),gMaxEntries);
882 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
883 if (!cbase) cbase = cadd;
890 if (cbase && fNrunsPerMaster<2) {
891 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
892 printf("... please wait - TAlienCollection::Add() scales badly...\n");
895 cbase->ExportXML(Form("file://%s", file.Data()),kFALSE,kFALSE, file, "Merged entries for a run");
896 delete cbase; cbase = 0;
898 TFile::Cp(Form("__tmp%d__%s",stage, file.Data()), file.Data());
900 gSystem->Exec("rm -f __tmp*");
901 Info("CreateDataset", "Created dataset %s with %d files", file.Data(), nstart+ncount);
905 if (TestBit(AliAnalysisGrid::kTest)) break;
906 // Check if there is one run per master job.
907 if (fNrunsPerMaster<2) {
908 if (FileExists(file)) {
909 if (fOverwriteMode) gGrid->Rm(file);
911 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
915 // Copy xml file to alien space
916 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
917 if (!FileExists(file)) {
918 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
924 if (((nruns-1)%fNrunsPerMaster) == 0) {
925 schunk = os->GetString();
926 cbase = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
928 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
929 printf(" Merging collection <%s> into masterjob input...\n", file.Data());
933 if ((nruns%fNrunsPerMaster)!=0 && os!=arr->Last()) {
936 schunk += Form("_%s.xml", os->GetString().Data());
937 if (FileExists(schunk)) {
938 if (fOverwriteMode) gGrid->Rm(file);
940 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", schunk.Data());
944 printf("Exporting merged collection <%s> and copying to AliEn\n", schunk.Data());
945 cbase->ExportXML(Form("file://%s", schunk.Data()),kFALSE,kFALSE, schunk, "Merged runs");
946 TFile::Cp(Form("file:%s",schunk.Data()), Form("alien://%s/%s",workdir.Data(), schunk.Data()));
947 if (!FileExists(schunk)) {
948 Error("CreateDataset", "Copy command did NOT succeed for %s", schunk.Data());
956 Error("CreateDataset", "No valid dataset corresponding to the query!");
960 // Process a full run range.
961 for (Int_t irun=fRunRange[0]; irun<=fRunRange[1]; irun++) {
962 format = Form("%%s/%s ", fRunPrefix.Data());
965 path = Form(format.Data(), fGridDataDir.Data(), irun);
966 if (!DirectoryExists(path)) continue;
968 format = Form("%s.xml", fRunPrefix.Data());
969 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
970 else file = Form(format.Data(), irun);
971 if (FileExists(file) && fNrunsPerMaster<2 && !TestBit(AliAnalysisGrid::kTest)) {
972 if (fOverwriteMode) gGrid->Rm(file);
974 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
978 // If local collection file does not exist, create it via 'find' command.
982 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
984 command += Form("%s -o %d ",options.Data(), nstart);
987 command += conditions;
988 TGridResult *res = gGrid->Command(command);
990 // Write standard output to file
991 gROOT->ProcessLine(Form("gGrid->Stdout(); > __tmp%d__%s", stage,file.Data()));
992 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
993 Bool_t nullFile = kFALSE;
995 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
997 nullFile = (gSystem->Exec(Form("grep -c /event __tmp%d__%s 2>/dev/null > __tmp__",stage,file.Data()))==0)?kFALSE:kTRUE;
999 Warning("CreateDataset","Dataset %s produced by: <%s> is empty !", file.Data(), command.Data());
1000 gSystem->Exec("rm -f __tmp*");
1008 gSystem->Exec("rm -f __tmp__");
1009 ncount = line.Atoi();
1011 nullResult = kFALSE;
1013 if (ncount == gMaxEntries) {
1014 Info("CreateDataset", "Dataset %s has more than 15K entries. Trying to merge...", file.Data());
1015 if (fNrunsPerMaster > 1) {
1016 Error("CreateDataset", "File %s has more than %d entries. Please set the number of runs per master to 1 !",
1017 file.Data(),gMaxEntries);
1020 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1021 if (!cbase) cbase = cadd;
1028 if (cbase && fNrunsPerMaster<2) {
1029 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1030 printf("... please wait - TAlienCollection::Add() scales badly...\n");
1033 cbase->ExportXML(Form("file://%s", file.Data()),kFALSE,kFALSE, file, "Merged entries for a run");
1034 delete cbase; cbase = 0;
1036 TFile::Cp(Form("__tmp%d__%s",stage, file.Data()), file.Data());
1038 Info("CreateDataset", "Created dataset %s with %d files", file.Data(), nstart+ncount);
1042 if (TestBit(AliAnalysisGrid::kTest)) break;
1043 // Check if there is one run per master job.
1044 if (fNrunsPerMaster<2) {
1045 if (FileExists(file)) {
1046 if (fOverwriteMode) gGrid->Rm(file);
1048 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
1052 // Copy xml file to alien space
1053 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
1054 if (!FileExists(file)) {
1055 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
1060 // Check if the collection for the chunk exist locally.
1061 Int_t nchunk = (nruns-1)/fNrunsPerMaster;
1062 if (FileExists(fInputFiles->At(nchunk)->GetName())) {
1063 if (fOverwriteMode) gGrid->Rm(fInputFiles->At(nchunk)->GetName());
1066 printf(" Merging collection <%s> into %d runs chunk...\n",file.Data(),fNrunsPerMaster);
1067 if (((nruns-1)%fNrunsPerMaster) == 0) {
1068 schunk = Form(fRunPrefix.Data(), irun);
1069 cbase = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
1071 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
1075 format = Form("%%s_%s.xml", fRunPrefix.Data());
1076 schunk2 = Form(format.Data(), schunk.Data(), irun);
1077 if ((nruns%fNrunsPerMaster)!=0 && irun!=fRunRange[1] && schunk2 != fInputFiles->Last()->GetName()) {
1081 if (FileExists(schunk)) {
1082 if (fOverwriteMode) gGrid->Rm(schunk);
1084 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", schunk.Data());
1088 printf("Exporting merged collection <%s> and copying to AliEn.\n", schunk.Data());
1089 cbase->ExportXML(Form("file://%s", schunk.Data()),kFALSE,kFALSE, schunk, "Merged runs");
1090 if (FileExists(schunk)) {
1091 if (fOverwriteMode) gGrid->Rm(schunk);
1093 Info("CreateDataset", "\n##### Dataset %s exist. Skipping copy...", schunk.Data());
1097 TFile::Cp(Form("file:%s",schunk.Data()), Form("alien://%s/%s",workdir.Data(), schunk.Data()));
1098 if (!FileExists(schunk)) {
1099 Error("CreateDataset", "Copy command did NOT succeed for %s", schunk.Data());
1105 Error("CreateDataset", "No valid dataset corresponding to the query!");
1112 //______________________________________________________________________________
1113 Bool_t AliAnalysisAlien::CreateJDL()
1115 // Generate a JDL file according to current settings. The name of the file is
1116 // specified by fJDLName.
1117 Bool_t error = kFALSE;
1119 Bool_t copy = kTRUE;
1120 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
1121 Bool_t generate = kTRUE;
1122 if (TestBit(AliAnalysisGrid::kTest) || TestBit(AliAnalysisGrid::kSubmit)) generate = kFALSE;
1124 Error("CreateJDL", "Alien connection required");
1127 // Check validity of alien workspace
1129 if (!fProductionMode && !fGridWorkingDir.BeginsWith("/alice")) workdir = gGrid->GetHomeDirectory();
1130 if (!fProductionMode && !TestBit(AliAnalysisGrid::kTest)) CdWork();
1131 workdir += fGridWorkingDir;
1135 Error("CreateJDL()", "Define some input files for your analysis.");
1138 // Compose list of input files
1139 // Check if output files were defined
1140 if (!fOutputFiles.Length()) {
1141 Error("CreateJDL", "You must define at least one output file");
1144 // Check if an output directory was defined and valid
1145 if (!fGridOutputDir.Length()) {
1146 Error("CreateJDL", "You must define AliEn output directory");
1149 if (!fProductionMode) {
1150 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s", workdir.Data(), fGridOutputDir.Data());
1151 if (!DirectoryExists(fGridOutputDir)) {
1152 if (gGrid->Mkdir(fGridOutputDir,"-p")) {
1153 Info("CreateJDL", "\n##### Created alien output directory %s", fGridOutputDir.Data());
1155 Error("CreateJDL", "Could not create alien output directory %s", fGridOutputDir.Data());
1159 Warning("CreateJDL", "#### Output directory %s exists! If this contains old data, jobs will fail with ERROR_SV !!! ###", fGridOutputDir.Data());
1164 // Exit if any error up to now
1165 if (error) return kFALSE;
1167 if (!fUser.IsNull()) {
1168 fGridJDL->SetValue("User", Form("\"%s\"", fUser.Data()));
1169 fMergingJDL->SetValue("User", Form("\"%s\"", fUser.Data()));
1171 fGridJDL->SetExecutable(fExecutable, "This is the startup script");
1172 TString mergeExec = fExecutable;
1173 mergeExec.ReplaceAll(".sh", "_merge.sh");
1174 fMergingJDL->SetExecutable(mergeExec, "This is the startup script");
1175 mergeExec.ReplaceAll(".sh", ".C");
1176 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),mergeExec.Data()), "List of input files to be uploaded to workers");
1177 if (!fArguments.IsNull())
1178 fGridJDL->SetArguments(fArguments, "Arguments for the executable command");
1179 if (IsOneStageMerging()) fMergingJDL->SetArguments(fGridOutputDir);
1181 if (fProductionMode) fMergingJDL->SetArguments("wn.xml $4"); // xml, stage
1182 else fMergingJDL->SetArguments("wn.xml $2"); // xml, stage
1185 fGridJDL->SetValue("TTL", Form("\"%d\"",fTTL));
1186 fGridJDL->SetDescription("TTL", Form("Time after which the job is killed (%d min.)", fTTL/60));
1187 fMergingJDL->SetValue("TTL", Form("\"%d\"",fTTL));
1188 fMergingJDL->SetDescription("TTL", Form("Time after which the job is killed (%d min.)", fTTL/60));
1190 if (fMaxInitFailed > 0) {
1191 fGridJDL->SetValue("MaxInitFailed", Form("\"%d\"",fMaxInitFailed));
1192 fGridJDL->SetDescription("MaxInitFailed", "Maximum number of first failing jobs to abort the master job");
1194 if (fSplitMaxInputFileNumber > 0) {
1195 fGridJDL->SetValue("SplitMaxInputFileNumber", Form("\"%d\"", fSplitMaxInputFileNumber));
1196 fGridJDL->SetDescription("SplitMaxInputFileNumber", "Maximum number of input files to be processed per subjob");
1198 if (!IsOneStageMerging()) {
1199 fMergingJDL->SetValue("SplitMaxInputFileNumber", Form("\"%d\"",fMaxMergeFiles));
1200 fMergingJDL->SetDescription("SplitMaxInputFileNumber", "Maximum number of input files to be merged in one go");
1202 if (fSplitMode.Length()) {
1203 fGridJDL->SetValue("Split", Form("\"%s\"", fSplitMode.Data()));
1204 fGridJDL->SetDescription("Split", "We split per SE or file");
1206 fMergingJDL->SetValue("Split", "\"se\"");
1207 fMergingJDL->SetDescription("Split", "We split per SE for merging in stages");
1208 if (!fAliROOTVersion.IsNull()) {
1209 fGridJDL->AddToPackages("AliRoot", fAliROOTVersion,"VO_ALICE", "List of requested packages");
1210 fMergingJDL->AddToPackages("AliRoot", fAliROOTVersion, "VO_ALICE", "List of requested packages");
1212 if (!fROOTVersion.IsNull()) {
1213 fGridJDL->AddToPackages("ROOT", fROOTVersion);
1214 fMergingJDL->AddToPackages("ROOT", fROOTVersion);
1216 if (!fAPIVersion.IsNull()) {
1217 fGridJDL->AddToPackages("APISCONFIG", fAPIVersion);
1218 fMergingJDL->AddToPackages("APISCONFIG", fAPIVersion);
1220 if (!fExternalPackages.IsNull()) {
1221 arr = fExternalPackages.Tokenize(" ");
1223 while ((os=(TObjString*)next())) {
1224 TString pkgname = os->GetString();
1225 Int_t index = pkgname.Index("::");
1226 TString pkgversion = pkgname(index+2, pkgname.Length());
1227 pkgname.Remove(index);
1228 fGridJDL->AddToPackages(pkgname, pkgversion);
1229 fMergingJDL->AddToPackages(pkgname, pkgversion);
1233 fGridJDL->SetInputDataListFormat(fInputFormat, "Format of input data");
1234 fGridJDL->SetInputDataList("wn.xml", "Collection name to be processed on each worker node");
1235 fMergingJDL->SetInputDataListFormat(fInputFormat, "Format of input data");
1236 fMergingJDL->SetInputDataList("wn.xml", "Collection name to be processed on each worker node");
1237 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), fAnalysisMacro.Data()), "List of input files to be uploaded to workers");
1238 TString analysisFile = fExecutable;
1239 analysisFile.ReplaceAll(".sh", ".root");
1240 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),analysisFile.Data()));
1241 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),analysisFile.Data()));
1242 if (IsUsingTags() && !gSystem->AccessPathName("ConfigureCuts.C"))
1243 fGridJDL->AddToInputSandbox(Form("LF:%s/ConfigureCuts.C", workdir.Data()));
1244 if (fAdditionalLibs.Length()) {
1245 arr = fAdditionalLibs.Tokenize(" ");
1247 while ((os=(TObjString*)next())) {
1248 if (os->GetString().Contains(".so")) continue;
1249 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), os->GetString().Data()));
1250 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), os->GetString().Data()));
1255 TIter next(fPackages);
1257 while ((obj=next())) {
1258 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), obj->GetName()));
1259 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), obj->GetName()));
1262 if (fOutputArchive.Length()) {
1263 arr = fOutputArchive.Tokenize(" ");
1265 Bool_t first = kTRUE;
1266 const char *comment = "Files to be archived";
1267 const char *comment1 = comment;
1268 while ((os=(TObjString*)next())) {
1269 if (!first) comment = NULL;
1270 if (!os->GetString().Contains("@") && fCloseSE.Length())
1271 fGridJDL->AddToOutputArchive(Form("%s@%s",os->GetString().Data(), fCloseSE.Data()), comment);
1273 fGridJDL->AddToOutputArchive(os->GetString(), comment);
1277 // Output archive for the merging jdl
1278 TString outputArchive;
1279 if (TestBit(AliAnalysisGrid::kDefaultOutputs)) {
1280 outputArchive = "log_archive.zip:std*@disk=1 ";
1281 // Add normal output files, extra files + terminate files
1282 TString files = GetListOfFiles("outextter");
1283 // Do not register merge excludes
1284 if (!fMergeExcludes.IsNull()) {
1285 arr = fMergeExcludes.Tokenize(" ");
1287 while ((os=(TObjString*)next1())) {
1288 files.ReplaceAll(Form("%s,",os->GetString().Data()),"");
1289 files.ReplaceAll(os->GetString(),"");
1293 files.ReplaceAll(".root", "*.root");
1294 outputArchive += Form("root_archive.zip:%s,*.stat@disk=%d",files.Data(),fNreplicas);
1296 TString files = fOutputArchive;
1297 files.ReplaceAll(".root", "*.root"); // nreplicas etc should be already atttached by use
1298 outputArchive = files;
1300 arr = outputArchive.Tokenize(" ");
1304 while ((os=(TObjString*)next2())) {
1305 if (!first) comment = NULL;
1306 TString currentfile = os->GetString();
1307 if (!currentfile.Contains("@") && fCloseSE.Length())
1308 fMergingJDL->AddToOutputArchive(Form("%s@%s",currentfile.Data(), fCloseSE.Data()), comment);
1310 fMergingJDL->AddToOutputArchive(currentfile, comment);
1315 arr = fOutputFiles.Tokenize(",");
1317 Bool_t first = kTRUE;
1318 const char *comment = "Files to be saved";
1319 while ((os=(TObjString*)next())) {
1320 // Ignore ouputs in jdl that are also in outputarchive
1321 TString sout = os->GetString();
1322 sout.ReplaceAll("*", "");
1323 sout.ReplaceAll(".root", "");
1324 if (sout.Index("@")>0) sout.Remove(sout.Index("@"));
1325 if (fOutputArchive.Contains(sout)) continue;
1326 if (!first) comment = NULL;
1327 if (!os->GetString().Contains("@") && fCloseSE.Length())
1328 fGridJDL->AddToOutputSandbox(Form("%s@%s",os->GetString().Data(), fCloseSE.Data()), comment);
1330 fGridJDL->AddToOutputSandbox(os->GetString(), comment);
1332 if (fMergeExcludes.Contains(sout)) continue;
1333 if (!os->GetString().Contains("@") && fCloseSE.Length())
1334 fMergingJDL->AddToOutputSandbox(Form("%s@%s",os->GetString().Data(), fCloseSE.Data()), comment);
1336 fMergingJDL->AddToOutputSandbox(os->GetString(), comment);
1339 fGridJDL->SetPrice((UInt_t)fPrice, "AliEn price for this job");
1340 fMergingJDL->SetPrice((UInt_t)fPrice, "AliEn price for this job");
1341 TString validationScript = fValidationScript;
1342 fGridJDL->SetValidationCommand(Form("%s/%s", workdir.Data(),validationScript.Data()), "Validation script to be run for each subjob");
1343 validationScript.ReplaceAll(".sh", "_merge.sh");
1344 fMergingJDL->SetValidationCommand(Form("%s/%s", workdir.Data(),validationScript.Data()), "Validation script to be run for each subjob");
1345 if (fMasterResubmitThreshold) {
1346 fGridJDL->SetValue("MasterResubmitThreshold", Form("\"%d%%\"", fMasterResubmitThreshold));
1347 fGridJDL->SetDescription("MasterResubmitThreshold", "Resubmit failed jobs until DONE rate reaches this percentage");
1349 // Write a jdl with 2 input parameters: collection name and output dir name.
1352 // Copy jdl to grid workspace
1354 // Check if an output directory was defined and valid
1355 if (!fGridOutputDir.Length()) {
1356 Error("CreateJDL", "You must define AliEn output directory");
1359 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s", workdir.Data(), fGridOutputDir.Data());
1360 if (!fProductionMode && !DirectoryExists(fGridOutputDir)) {
1361 if (gGrid->Mkdir(fGridOutputDir,"-p")) {
1362 Info("CreateJDL", "\n##### Created alien output directory %s", fGridOutputDir.Data());
1364 Error("CreateJDL", "Could not create alien output directory %s", fGridOutputDir.Data());
1370 if (TestBit(AliAnalysisGrid::kSubmit)) {
1371 TString mergeJDLName = fExecutable;
1372 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
1373 TString locjdl = Form("%s/%s", fGridOutputDir.Data(),fJDLName.Data());
1374 TString locjdl1 = Form("%s/%s", fGridOutputDir.Data(),mergeJDLName.Data());
1375 if (fProductionMode) {
1376 locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
1377 locjdl1 = Form("%s/%s", workdir.Data(),mergeJDLName.Data());
1379 if (FileExists(locjdl)) gGrid->Rm(locjdl);
1380 if (FileExists(locjdl1)) gGrid->Rm(locjdl1);
1381 Info("CreateJDL", "\n##### Copying JDL file <%s> to your AliEn output directory", fJDLName.Data());
1382 TFile::Cp(Form("file:%s",fJDLName.Data()), Form("alien://%s", locjdl.Data()));
1384 Info("CreateJDL", "\n##### Copying merging JDL file <%s> to your AliEn output directory", mergeJDLName.Data());
1385 TFile::Cp(Form("file:%s",mergeJDLName.Data()), Form("alien://%s", locjdl1.Data()));
1388 if (fAdditionalLibs.Length()) {
1389 arr = fAdditionalLibs.Tokenize(" ");
1392 while ((os=(TObjString*)next())) {
1393 if (os->GetString().Contains(".so")) continue;
1394 Info("CreateJDL", "\n##### Copying dependency: <%s> to your alien workspace", os->GetString().Data());
1395 if (FileExists(os->GetString())) gGrid->Rm(os->GetString());
1396 TFile::Cp(Form("file:%s",os->GetString().Data()), Form("alien://%s/%s", workdir.Data(), os->GetString().Data()));
1401 TIter next(fPackages);
1403 while ((obj=next())) {
1404 if (FileExists(obj->GetName())) gGrid->Rm(obj->GetName());
1405 Info("CreateJDL", "\n##### Copying dependency: <%s> to your alien workspace", obj->GetName());
1406 TFile::Cp(Form("file:%s",obj->GetName()), Form("alien://%s/%s", workdir.Data(), obj->GetName()));
1413 //______________________________________________________________________________
1414 Bool_t AliAnalysisAlien::WriteJDL(Bool_t copy)
1416 // Writes one or more JDL's corresponding to findex. If findex is negative,
1417 // all run numbers are considered in one go (jdl). For non-negative indices
1418 // they correspond to the indices in the array fInputFiles.
1419 if (!fInputFiles) return kFALSE;
1422 if (!fProductionMode && !fGridWorkingDir.BeginsWith("/alice")) workdir = gGrid->GetHomeDirectory();
1423 workdir += fGridWorkingDir;
1424 TString stageName = "$2";
1425 if (fProductionMode) stageName = "$4";
1426 if (!fMergeDirName.IsNull()) {
1427 fMergingJDL->AddToInputDataCollection(Form("LF:$1/%s/Stage_%s.xml,nodownload",fMergeDirName.Data(),stageName.Data()), "Collection of files to be merged for current stage");
1428 fMergingJDL->SetOutputDirectory(Form("$1/%s/Stage_%s/#alien_counter_03i#",fMergeDirName.Data(),stageName.Data()), "Output directory");
1430 fMergingJDL->AddToInputDataCollection(Form("LF:$1/Stage_%s.xml,nodownload",stageName.Data()), "Collection of files to be merged for current stage");
1431 fMergingJDL->SetOutputDirectory(Form("$1/Stage_%s/#alien_counter_03i#",stageName.Data()), "Output directory");
1433 if (fProductionMode) {
1434 TIter next(fInputFiles);
1435 while ((os=next())) {
1436 fGridJDL->AddToInputDataCollection(Form("LF:%s,nodownload", os->GetName()), "Input xml collections");
1438 fGridJDL->SetOutputDirectory(Form("%s/#alien_counter_04i#", fGridOutputDir.Data()));
1440 if (!fRunNumbers.Length() && !fRunRange[0]) {
1441 // One jdl with no parameters in case input data is specified by name.
1442 TIter next(fInputFiles);
1444 fGridJDL->AddToInputDataCollection(Form("LF:%s,nodownload", os->GetName()), "Input xml collections");
1445 if (!fOutputSingle.IsNull())
1446 fGridJDL->SetOutputDirectory(Form("#alienfulldir#/../%s",fOutputSingle.Data()), "Output directory");
1448 fGridJDL->SetOutputDirectory(Form("%s/#alien_counter_03i#", fGridOutputDir.Data()), "Output directory");
1449 fMergingJDL->SetOutputDirectory(fGridOutputDir);
1452 // One jdl to be submitted with 2 input parameters: data collection name and output dir prefix
1453 fGridJDL->AddToInputDataCollection(Form("LF:%s/$1,nodownload", workdir.Data()), "Input xml collections");
1454 if (!fOutputSingle.IsNull()) {
1455 if (!fOutputToRunNo) fGridJDL->SetOutputDirectory(Form("#alienfulldir#/%s",fOutputSingle.Data()), "Output directory");
1456 else fGridJDL->SetOutputDirectory(Form("%s/$2",fGridOutputDir.Data()), "Output directory");
1458 fGridJDL->SetOutputDirectory(Form("%s/$2/#alien_counter_03i#", fGridOutputDir.Data()), "Output directory");
1463 // Generate the JDL as a string
1464 TString sjdl = fGridJDL->Generate();
1465 TString sjdl1 = fMergingJDL->Generate();
1467 if (!fMergeDirName.IsNull()) {
1468 fMergingJDL->SetOutputDirectory(Form("$1/%s",fMergeDirName.Data()), "Output directory");
1469 fMergingJDL->AddToInputSandbox(Form("LF:$1/%s/Stage_%s.xml",fMergeDirName.Data(),stageName.Data()));
1471 fMergingJDL->SetOutputDirectory("$1", "Output directory");
1472 fMergingJDL->AddToInputSandbox(Form("LF:$1/Stage_%s.xml",stageName.Data()));
1474 TString sjdl2 = fMergingJDL->Generate();
1475 Int_t index, index1;
1476 sjdl.ReplaceAll("\"LF:", "\n \"LF:");
1477 sjdl.ReplaceAll("(member", "\n (member");
1478 sjdl.ReplaceAll("\",\"VO_", "\",\n \"VO_");
1479 sjdl.ReplaceAll("{", "{\n ");
1480 sjdl.ReplaceAll("};", "\n};");
1481 sjdl.ReplaceAll("{\n \n", "{\n");
1482 sjdl.ReplaceAll("\n\n", "\n");
1483 sjdl.ReplaceAll("OutputDirectory", "OutputDir");
1484 sjdl1.ReplaceAll("\"LF:", "\n \"LF:");
1485 sjdl1.ReplaceAll("(member", "\n (member");
1486 sjdl1.ReplaceAll("\",\"VO_", "\",\n \"VO_");
1487 sjdl1.ReplaceAll("{", "{\n ");
1488 sjdl1.ReplaceAll("};", "\n};");
1489 sjdl1.ReplaceAll("{\n \n", "{\n");
1490 sjdl1.ReplaceAll("\n\n", "\n");
1491 sjdl1.ReplaceAll("OutputDirectory", "OutputDir");
1492 sjdl2.ReplaceAll("\"LF:", "\n \"LF:");
1493 sjdl2.ReplaceAll("(member", "\n (member");
1494 sjdl2.ReplaceAll("\",\"VO_", "\",\n \"VO_");
1495 sjdl2.ReplaceAll("{", "{\n ");
1496 sjdl2.ReplaceAll("};", "\n};");
1497 sjdl2.ReplaceAll("{\n \n", "{\n");
1498 sjdl2.ReplaceAll("\n\n", "\n");
1499 sjdl2.ReplaceAll("OutputDirectory", "OutputDir");
1500 sjdl += "JDLVariables = \n{\n \"Packages\",\n \"OutputDir\"\n};\n";
1501 sjdl.Prepend(Form("Jobtag = {\n \"comment:%s\"\n};\n", fJobTag.Data()));
1502 index = sjdl.Index("JDLVariables");
1503 if (index >= 0) sjdl.Insert(index, "\n# JDL variables\n");
1504 sjdl += "Workdirectorysize = {\"5000MB\"};";
1505 sjdl1 += "Workdirectorysize = {\"5000MB\"};";
1506 sjdl1 += "JDLVariables = \n{\n \"Packages\",\n \"OutputDir\"\n};\n";
1507 index = fJobTag.Index(":");
1508 if (index < 0) index = fJobTag.Length();
1509 TString jobTag = fJobTag;
1510 if (fProductionMode) jobTag.Insert(index,"_Stage$4");
1511 sjdl1.Prepend(Form("Jobtag = {\n \"comment:%s_Merging\"\n};\n", jobTag.Data()));
1512 if (fProductionMode) {
1513 sjdl1.Prepend("# Generated merging jdl (production mode) \
1514 \n# $1 = full alien path to output directory to be merged \
1515 \n# $2 = train number \
1516 \n# $3 = production (like LHC10b) \
1517 \n# $4 = merging stage \
1518 \n# Stage_<n>.xml made via: find <OutputDir> *Stage<n-1>/*root_archive.zip\n");
1519 sjdl2.Prepend(Form("Jobtag = {\n \"comment:%s_FinalMerging\"\n};\n", jobTag.Data()));
1520 sjdl2.Prepend("# Generated merging jdl \
1521 \n# $1 = full alien path to output directory to be merged \
1522 \n# $2 = train number \
1523 \n# $3 = production (like LHC10b) \
1524 \n# $4 = merging stage \
1525 \n# Stage_<n>.xml made via: find <OutputDir> *Stage<n-1>/*root_archive.zip\n");
1527 sjdl1.Prepend("# Generated merging jdl \
1528 \n# $1 = full alien path to output directory to be merged \
1529 \n# $2 = merging stage \
1530 \n# xml made via: find <OutputDir> *Stage<n-1>/*root_archive.zip\n");
1531 sjdl2.Prepend(Form("Jobtag = {\n \"comment:%s_FinalMerging\"\n};\n", jobTag.Data()));
1532 sjdl2.Prepend("# Generated merging jdl \
1533 \n# $1 = full alien path to output directory to be merged \
1534 \n# $2 = merging stage \
1535 \n# xml made via: find <OutputDir> *Stage<n-1>/*root_archive.zip\n");
1537 index = sjdl1.Index("JDLVariables");
1538 if (index >= 0) sjdl1.Insert(index, "\n# JDL variables\n");
1539 index = sjdl2.Index("JDLVariables");
1540 if (index >= 0) sjdl2.Insert(index, "\n# JDL variables\n");
1541 sjdl1 += "Workdirectorysize = {\"5000MB\"};";
1542 sjdl2 += "Workdirectorysize = {\"5000MB\"};";
1543 index = sjdl2.Index("Split =");
1545 index1 = sjdl2.Index("\n", index);
1546 sjdl2.Remove(index, index1-index+1);
1548 index = sjdl2.Index("SplitMaxInputFileNumber");
1550 index1 = sjdl2.Index("\n", index);
1551 sjdl2.Remove(index, index1-index+1);
1553 index = sjdl2.Index("InputDataCollection");
1555 index1 = sjdl2.Index(";", index);
1556 sjdl2.Remove(index, index1-index+1);
1558 index = sjdl2.Index("InputDataListFormat");
1560 index1 = sjdl2.Index("\n", index);
1561 sjdl2.Remove(index, index1-index+1);
1563 index = sjdl2.Index("InputDataList");
1565 index1 = sjdl2.Index("\n", index);
1566 sjdl2.Remove(index, index1-index+1);
1568 sjdl2.ReplaceAll("wn.xml", Form("Stage_%s.xml",stageName.Data()));
1569 // Write jdl to file
1571 out.open(fJDLName.Data(), ios::out);
1573 Error("WriteJDL", "Bad file name: %s", fJDLName.Data());
1576 out << sjdl << endl;
1578 TString mergeJDLName = fExecutable;
1579 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
1582 out1.open(mergeJDLName.Data(), ios::out);
1584 Error("WriteJDL", "Bad file name: %s", mergeJDLName.Data());
1587 out1 << sjdl1 << endl;
1590 TString finalJDL = mergeJDLName;
1591 finalJDL.ReplaceAll(".jdl", "_final.jdl");
1592 out2.open(finalJDL.Data(), ios::out);
1594 Error("WriteJDL", "Bad file name: %s", finalJDL.Data());
1597 out2 << sjdl2 << endl;
1601 // Copy jdl to grid workspace
1603 Info("WriteJDL", "\n##### You may want to review jdl:%s and analysis macro:%s before running in <submit> mode", fJDLName.Data(), fAnalysisMacro.Data());
1605 TString locjdl = Form("%s/%s", fGridOutputDir.Data(),fJDLName.Data());
1606 TString locjdl1 = Form("%s/%s", fGridOutputDir.Data(),mergeJDLName.Data());
1607 TString finalJDL = mergeJDLName;
1608 finalJDL.ReplaceAll(".jdl", "_final.jdl");
1609 TString locjdl2 = Form("%s/%s", fGridOutputDir.Data(),finalJDL.Data());
1610 if (fProductionMode) {
1611 locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
1612 locjdl1 = Form("%s/%s", workdir.Data(),mergeJDLName.Data());
1613 locjdl2 = Form("%s/%s", workdir.Data(),finalJDL.Data());
1615 if (FileExists(locjdl)) gGrid->Rm(locjdl);
1616 if (FileExists(locjdl1)) gGrid->Rm(locjdl1);
1617 if (FileExists(locjdl2)) gGrid->Rm(locjdl2);
1618 Info("WriteJDL", "\n##### Copying JDL file <%s> to your AliEn output directory", fJDLName.Data());
1619 TFile::Cp(Form("file:%s",fJDLName.Data()), Form("alien://%s", locjdl.Data()));
1621 Info("WriteJDL", "\n##### Copying merging JDL files <%s> to your AliEn output directory", mergeJDLName.Data());
1622 TFile::Cp(Form("file:%s",mergeJDLName.Data()), Form("alien://%s", locjdl1.Data()));
1623 TFile::Cp(Form("file:%s",finalJDL.Data()), Form("alien://%s", locjdl2.Data()));
1629 //______________________________________________________________________________
1630 Bool_t AliAnalysisAlien::FileExists(const char *lfn)
1632 // Returns true if file exists.
1633 if (!gGrid) return kFALSE;
1635 slfn.ReplaceAll("alien://","");
1636 TGridResult *res = gGrid->Ls(slfn);
1637 if (!res) return kFALSE;
1638 TMap *map = dynamic_cast<TMap*>(res->At(0));
1643 TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("name"));
1644 if (!objs || !objs->GetString().Length()) {
1652 //______________________________________________________________________________
1653 Bool_t AliAnalysisAlien::DirectoryExists(const char *dirname)
1655 // Returns true if directory exists. Can be also a path.
1656 if (!gGrid) return kFALSE;
1657 // Check if dirname is a path
1658 TString dirstripped = dirname;
1659 dirstripped = dirstripped.Strip();
1660 dirstripped = dirstripped.Strip(TString::kTrailing, '/');
1661 TString dir = gSystem->BaseName(dirstripped);
1663 TString path = gSystem->DirName(dirstripped);
1664 TGridResult *res = gGrid->Ls(path, "-F");
1665 if (!res) return kFALSE;
1669 while ((map=dynamic_cast<TMap*>(next()))) {
1670 obj = map->GetValue("name");
1672 if (dir == obj->GetName()) {
1681 //______________________________________________________________________________
1682 void AliAnalysisAlien::CheckDataType(const char *lfn, Bool_t &isCollection, Bool_t &isXml, Bool_t &useTags)
1684 // Check input data type.
1685 isCollection = kFALSE;
1689 Error("CheckDataType", "No connection to grid");
1692 isCollection = IsCollection(lfn);
1693 TString msg = "\n##### file: ";
1696 msg += " type: raw_collection;";
1697 // special treatment for collections
1699 // check for tag files in the collection
1700 TGridResult *res = gGrid->Command(Form("listFilesFromCollection -z -v %s",lfn), kFALSE);
1702 msg += " using_tags: No (unknown)";
1703 Info("CheckDataType", "%s", msg.Data());
1706 const char* typeStr = res->GetKey(0, "origLFN");
1707 if (!typeStr || !strlen(typeStr)) {
1708 msg += " using_tags: No (unknown)";
1709 Info("CheckDataType", "%s", msg.Data());
1712 TString file = typeStr;
1713 useTags = file.Contains(".tag");
1714 if (useTags) msg += " using_tags: Yes";
1715 else msg += " using_tags: No";
1716 Info("CheckDataType", "%s", msg.Data());
1721 isXml = slfn.Contains(".xml");
1723 // Open xml collection and check if there are tag files inside
1724 msg += " type: xml_collection;";
1725 TGridCollection *coll = (TGridCollection*)gROOT->ProcessLine(Form("TAlienCollection::Open(\"alien://%s\",1);",lfn));
1727 msg += " using_tags: No (unknown)";
1728 Info("CheckDataType", "%s", msg.Data());
1731 TMap *map = coll->Next();
1733 msg += " using_tags: No (unknown)";
1734 Info("CheckDataType", "%s", msg.Data());
1737 map = (TMap*)map->GetValue("");
1739 if (map && map->GetValue("name")) file = map->GetValue("name")->GetName();
1740 useTags = file.Contains(".tag");
1742 if (useTags) msg += " using_tags: Yes";
1743 else msg += " using_tags: No";
1744 Info("CheckDataType", "%s", msg.Data());
1747 useTags = slfn.Contains(".tag");
1748 if (slfn.Contains(".root")) msg += " type: root file;";
1749 else msg += " type: unknown file;";
1750 if (useTags) msg += " using_tags: Yes";
1751 else msg += " using_tags: No";
1752 Info("CheckDataType", "%s", msg.Data());
1755 //______________________________________________________________________________
1756 void AliAnalysisAlien::EnablePackage(const char *package)
1758 // Enables a par file supposed to exist in the current directory.
1759 TString pkg(package);
1760 pkg.ReplaceAll(".par", "");
1762 if (gSystem->AccessPathName(pkg)) {
1763 Fatal("EnablePackage", "Package %s not found", pkg.Data());
1766 if (!TObject::TestBit(AliAnalysisGrid::kUsePars))
1767 Info("EnablePackage", "AliEn plugin will use .par packages");
1768 TObject::SetBit(AliAnalysisGrid::kUsePars, kTRUE);
1770 fPackages = new TObjArray();
1771 fPackages->SetOwner();
1773 fPackages->Add(new TObjString(pkg));
1776 //______________________________________________________________________________
1777 TChain *AliAnalysisAlien::GetChainForTestMode(const char *treeName) const
1779 // Make a tree from files having the location specified in fFileForTestMode.
1780 // Inspired from JF's CreateESDChain.
1781 if (fFileForTestMode.IsNull()) {
1782 Error("GetChainForTestMode", "For proof test mode please use SetFileForTestMode() pointing to a file that contains data file locations.");
1785 if (gSystem->AccessPathName(fFileForTestMode)) {
1786 Error("GetChainForTestMode", "File not found: %s", fFileForTestMode.Data());
1791 in.open(fFileForTestMode);
1793 // Read the input list of files and add them to the chain
1795 TChain *chain = new TChain(treeName);
1799 if (line.IsNull()) continue;
1800 if (count++ == fNtestFiles) break;
1801 TString esdFile(line);
1802 TFile *file = TFile::Open(esdFile);
1804 if (!file->IsZombie()) chain->Add(esdFile);
1807 Error("GetChainforTestMode", "Skipping un-openable file: %s", esdFile.Data());
1811 if (!chain->GetListOfFiles()->GetEntries()) {
1812 Error("GetChainForTestMode", "No file from %s could be opened", fFileForTestMode.Data());
1820 //______________________________________________________________________________
1821 const char *AliAnalysisAlien::GetJobStatus(Int_t jobidstart, Int_t lastid, Int_t &nrunning, Int_t &nwaiting, Int_t &nerror, Int_t &ndone)
1823 // Get job status for all jobs with jobid>jobidstart.
1824 static char mstatus[20];
1830 TGridJobStatusList *list = gGrid->Ps("");
1831 if (!list) return mstatus;
1832 Int_t nentries = list->GetSize();
1833 TGridJobStatus *status;
1835 for (Int_t ijob=0; ijob<nentries; ijob++) {
1836 status = (TGridJobStatus *)list->At(ijob);
1837 pid = gROOT->ProcessLine(Form("atoi(((TAlienJobStatus*)%p)->GetKey(\"queueId\"));", status));
1838 if (pid<jobidstart) continue;
1839 if (pid == lastid) {
1840 gROOT->ProcessLine(Form("sprintf((char*)%p,((TAlienJobStatus*)%p)->GetKey(\"status\"));",mstatus, status));
1842 switch (status->GetStatus()) {
1843 case TGridJobStatus::kWAITING:
1845 case TGridJobStatus::kRUNNING:
1847 case TGridJobStatus::kABORTED:
1848 case TGridJobStatus::kFAIL:
1849 case TGridJobStatus::kUNKNOWN:
1851 case TGridJobStatus::kDONE:
1860 //______________________________________________________________________________
1861 Bool_t AliAnalysisAlien::IsCollection(const char *lfn) const
1863 // Returns true if file is a collection. Functionality duplicated from
1864 // TAlien::Type() because we don't want to directly depend on TAlien.
1866 Error("IsCollection", "No connection to grid");
1869 TGridResult *res = gGrid->Command(Form("type -z %s",lfn),kFALSE);
1870 if (!res) return kFALSE;
1871 const char* typeStr = res->GetKey(0, "type");
1872 if (!typeStr || !strlen(typeStr)) return kFALSE;
1873 if (!strcmp(typeStr, "collection")) return kTRUE;
1878 //______________________________________________________________________________
1879 Bool_t AliAnalysisAlien::IsSingleOutput() const
1881 // Check if single-ouput option is on.
1882 return (!fOutputSingle.IsNull());
1885 //______________________________________________________________________________
1886 void AliAnalysisAlien::Print(Option_t *) const
1888 // Print current plugin settings.
1889 printf("### AliEn analysis plugin current settings ###\n");
1890 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
1891 if (mgr && mgr->IsProofMode()) {
1892 TString proofType = "= PLUGIN IN PROOF MODE ON CLUSTER:_________________";
1893 if (TestBit(AliAnalysisGrid::kTest))
1894 proofType = "= PLUGIN IN PROOF LITE MODE ON CLUSTER:____________";
1895 printf("%s %s\n", proofType.Data(), fProofCluster.Data());
1896 if (!fProofDataSet.IsNull())
1897 printf("= Requested data set:___________________________ %s\n", fProofDataSet.Data());
1899 printf("= Soft reset signal will be send to master______ CHANGE BEHAVIOR AFTER COMPLETION\n");
1901 printf("= Hard reset signal will be send to master______ CHANGE BEHAVIOR AFTER COMPLETION\n");
1902 if (!fRootVersionForProof.IsNull())
1903 printf("= ROOT version requested________________________ %s\n", fRootVersionForProof.Data());
1905 printf("= ROOT version requested________________________ default\n");
1906 printf("= AliRoot version requested_____________________ %s\n", fAliROOTVersion.Data());
1907 if (!fAliRootMode.IsNull())
1908 printf("= Requested AliRoot mode________________________ %s\n", fAliRootMode.Data());
1910 printf("= Number of PROOF workers limited to____________ %d\n", fNproofWorkers);
1911 if (fNproofWorkersPerSlave)
1912 printf("= Maximum number of workers per slave___________ %d\n", fNproofWorkersPerSlave);
1913 if (TestSpecialBit(kClearPackages))
1914 printf("= ClearPackages requested...\n");
1915 if (fIncludePath.Data())
1916 printf("= Include path for runtime task compilation: ___ %s\n", fIncludePath.Data());
1917 printf("= Additional libs to be loaded or souces to be compiled runtime: <%s>\n",fAdditionalLibs.Data());
1918 if (fPackages && fPackages->GetEntries()) {
1919 TIter next(fPackages);
1922 while ((obj=next())) list += obj->GetName();
1923 printf("= Par files to be used: ________________________ %s\n", list.Data());
1925 if (TestSpecialBit(kProofConnectGrid))
1926 printf("= Requested PROOF connection to grid\n");
1929 printf("= OverwriteMode:________________________________ %d\n", fOverwriteMode);
1930 if (fOverwriteMode) {
1931 printf("***** NOTE: Overwrite mode will overwrite the input generated datasets and partial results from previous analysis. \
1932 \n***** To disable, use: plugin->SetOverwriteMode(kFALSE);\n");
1934 printf("= Copy files to grid: __________________________ %s\n", (IsUseCopy())?"YES":"NO");
1935 printf("= Check if files can be copied to grid: ________ %s\n", (IsCheckCopy())?"YES":"NO");
1936 printf("= Production mode:______________________________ %d\n", fProductionMode);
1937 printf("= Version of API requested: ____________________ %s\n", fAPIVersion.Data());
1938 printf("= Version of ROOT requested: ___________________ %s\n", fROOTVersion.Data());
1939 printf("= Version of AliRoot requested: ________________ %s\n", fAliROOTVersion.Data());
1941 printf("= User running the plugin: _____________________ %s\n", fUser.Data());
1942 printf("= Grid workdir relative to user $HOME: _________ %s\n", fGridWorkingDir.Data());
1943 printf("= Grid output directory relative to workdir: ___ %s\n", fGridOutputDir.Data());
1944 printf("= Data base directory path requested: __________ %s\n", fGridDataDir.Data());
1945 printf("= Data search pattern: _________________________ %s\n", fDataPattern.Data());
1946 printf("= Input data format: ___________________________ %s\n", fInputFormat.Data());
1947 if (fRunNumbers.Length())
1948 printf("= Run numbers to be processed: _________________ %s\n", fRunNumbers.Data());
1950 printf("= Run range to be processed: ___________________ %d-%d\n", fRunRange[0], fRunRange[1]);
1951 if (!fRunRange[0] && !fRunNumbers.Length()) {
1952 TIter next(fInputFiles);
1955 while ((obj=next())) list += obj->GetName();
1956 printf("= Input files to be processed: _________________ %s\n", list.Data());
1958 if (TestBit(AliAnalysisGrid::kTest))
1959 printf("= Number of input files used in test mode: _____ %d\n", fNtestFiles);
1960 printf("= List of output files to be registered: _______ %s\n", fOutputFiles.Data());
1961 printf("= List of outputs going to be archived: ________ %s\n", fOutputArchive.Data());
1962 printf("= List of outputs that should not be merged: ___ %s\n", fMergeExcludes.Data());
1963 printf("= List of outputs produced during Terminate: ___ %s\n", fTerminateFiles.Data());
1964 printf("=====================================================================\n");
1965 printf("= Job price: ___________________________________ %d\n", fPrice);
1966 printf("= Time to live (TTL): __________________________ %d\n", fTTL);
1967 printf("= Max files per subjob: ________________________ %d\n", fSplitMaxInputFileNumber);
1968 if (fMaxInitFailed>0)
1969 printf("= Max number of subjob fails to kill: __________ %d\n", fMaxInitFailed);
1970 if (fMasterResubmitThreshold>0)
1971 printf("= Resubmit master job if failed subjobs >_______ %d\n", fMasterResubmitThreshold);
1972 printf("= Number of replicas for the output files_______ %d\n", fNreplicas);
1973 if (fNrunsPerMaster>0)
1974 printf("= Number of runs per master job: _______________ %d\n", fNrunsPerMaster);
1975 printf("= Number of files in one chunk to be merged: ___ %d\n", fMaxMergeFiles);
1976 printf("= Name of the generated execution script: ______ %s\n", fExecutable.Data());
1977 printf("= Executable command: __________________________ %s\n", fExecutableCommand.Data());
1978 if (fArguments.Length())
1979 printf("= Arguments for the execution script: __________ %s\n",fArguments.Data());
1980 if (fExecutableArgs.Length())
1981 printf("= Arguments after macro name in executable______ %s\n",fExecutableArgs.Data());
1982 printf("= Name of the generated analysis macro: ________ %s\n",fAnalysisMacro.Data());
1983 printf("= User analysis files to be deployed: __________ %s\n",fAnalysisSource.Data());
1984 printf("= Additional libs to be loaded or souces to be compiled runtime: <%s>\n",fAdditionalLibs.Data());
1985 printf("= Master jobs split mode: ______________________ %s\n",fSplitMode.Data());
1987 printf("= Custom name for the dataset to be created: ___ %s\n", fDatasetName.Data());
1988 printf("= Name of the generated JDL: ___________________ %s\n", fJDLName.Data());
1989 if (fIncludePath.Data())
1990 printf("= Include path for runtime task compilation: ___ %s\n", fIncludePath.Data());
1991 if (fCloseSE.Length())
1992 printf("= Force job outputs to storage element: ________ %s\n", fCloseSE.Data());
1993 if (fFriendChainName.Length())
1994 printf("= Open friend chain file on worker: ____________ %s\n", fFriendChainName.Data());
1995 if (fPackages && fPackages->GetEntries()) {
1996 TIter next(fPackages);
1999 while ((obj=next())) list += obj->GetName();
2000 printf("= Par files to be used: ________________________ %s\n", list.Data());
2004 //______________________________________________________________________________
2005 void AliAnalysisAlien::SetDefaults()
2007 // Set default values for everything. What cannot be filled will be left empty.
2008 if (fGridJDL) delete fGridJDL;
2009 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
2010 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
2013 fSplitMaxInputFileNumber = 100;
2015 fMasterResubmitThreshold = 0;
2021 fNrunsPerMaster = 1;
2022 fMaxMergeFiles = 100;
2024 fExecutable = "analysis.sh";
2025 fExecutableCommand = "root -b -q";
2027 fExecutableArgs = "";
2028 fAnalysisMacro = "myAnalysis.C";
2029 fAnalysisSource = "";
2030 fAdditionalLibs = "";
2034 fAliROOTVersion = "";
2035 fUser = ""; // Your alien user name
2036 fGridWorkingDir = "";
2037 fGridDataDir = ""; // Can be like: /alice/sim/PDC_08a/LHC08c9/
2038 fDataPattern = "*AliESDs.root"; // Can be like: *AliESDs.root, */pass1/*AliESDs.root, ...
2039 fFriendChainName = "";
2040 fGridOutputDir = "output";
2041 fOutputArchive = "log_archive.zip:std*@disk=1 root_archive.zip:*.root@disk=2";
2042 fOutputFiles = ""; // Like "AliAODs.root histos.root"
2043 fInputFormat = "xml-single";
2044 fJDLName = "analysis.jdl";
2045 fJobTag = "Automatically generated analysis JDL";
2046 fMergeExcludes = "";
2049 SetCheckCopy(kTRUE);
2050 SetDefaultOutputs(kTRUE);
2054 //______________________________________________________________________________
2055 Bool_t AliAnalysisAlien::CheckMergedFiles(const char *filename, const char *aliendir, Int_t nperchunk, const char *jdl)
2057 // Checks current merge stage, makes xml for the next stage, counts number of files, submits next stage.
2058 // First check if the result is already in the output directory.
2059 if (FileExists(Form("%s/%s",aliendir,filename))) {
2060 printf("Final merged results found. Not merging again.\n");
2063 // Now check the last stage done.
2066 if (!FileExists(Form("%s/Stage_%d.xml",aliendir, stage+1))) break;
2069 // Next stage of merging
2071 TString pattern = "*root_archive.zip";
2072 if (stage>1) pattern = Form("Stage_%d/*root_archive.zip", stage-1);
2073 TGridResult *res = gGrid->Command(Form("find -x Stage_%d %s %s", stage, aliendir, pattern.Data()));
2074 if (res) delete res;
2075 // Write standard output to file
2076 gROOT->ProcessLine(Form("gGrid->Stdout(); > %s", Form("Stage_%d.xml",stage)));
2077 // Count the number of files inside
2079 ifile.open(Form("Stage_%d.xml",stage));
2080 if (!ifile.good()) {
2081 ::Error("CheckMergedFiles", "Could not redirect result of the find command to file %s", Form("Stage_%d.xml",stage));
2086 while (!ifile.eof()) {
2088 if (line.Contains("/event")) nfiles++;
2092 ::Error("CheckMergedFiles", "Cannot start Stage_%d merging since Stage_%d did not produced yet output", stage, stage-1);
2095 printf("=== Stage_%d produced %d files\n", stage-1, nfiles);
2097 // Copy the file in the output directory
2098 printf("===> Copying collection %s in the output directory %s\n", Form("Stage_%d.xml",stage), aliendir);
2099 TFile::Cp(Form("Stage_%d.xml",stage), Form("alien://%s/Stage_%d.xml",aliendir,stage));
2100 // Check if this is the last stage to be done.
2101 Bool_t laststage = (nfiles<nperchunk);
2102 if (fMaxMergeStages && stage>=fMaxMergeStages) laststage = kTRUE;
2104 printf("### Submiting final merging stage %d\n", stage);
2105 TString finalJDL = jdl;
2106 finalJDL.ReplaceAll(".jdl", "_final.jdl");
2107 TString query = Form("submit %s %s %d", finalJDL.Data(), aliendir, stage);
2108 Int_t jobId = SubmitSingleJob(query);
2109 if (!jobId) return kFALSE;
2111 printf("### Submiting merging stage %d\n", stage);
2112 TString query = Form("submit %s %s %d", jdl, aliendir, stage);
2113 Int_t jobId = SubmitSingleJob(query);
2114 if (!jobId) return kFALSE;
2119 //______________________________________________________________________________
2120 AliAnalysisManager *AliAnalysisAlien::LoadAnalysisManager(const char *fname)
2122 // Loat the analysis manager from a file.
2123 TFile *file = TFile::Open(fname);
2125 ::Error("LoadAnalysisManager", "Cannot open file %s", fname);
2128 TIter nextkey(file->GetListOfKeys());
2129 AliAnalysisManager *mgr = 0;
2131 while ((key=(TKey*)nextkey())) {
2132 if (!strcmp(key->GetClassName(), "AliAnalysisManager"))
2133 mgr = (AliAnalysisManager*)file->Get(key->GetName());
2136 ::Error("LoadAnalysisManager", "No analysis manager found in file %s", fname);
2140 //______________________________________________________________________________
2141 Int_t AliAnalysisAlien::SubmitSingleJob(const char *query)
2143 // Submits a single job corresponding to the query and returns job id. If 0 submission failed.
2144 if (!gGrid) return 0;
2145 printf("=> %s ------> ",query);
2146 TGridResult *res = gGrid->Command(query);
2148 TString jobId = res->GetKey(0,"jobId");
2150 if (jobId.IsNull()) {
2151 printf("submission failed. Reason:\n");
2154 ::Error("SubmitSingleJob", "Your query %s could not be submitted", query);
2157 printf(" Job id: %s\n", jobId.Data());
2161 //______________________________________________________________________________
2162 Bool_t AliAnalysisAlien::MergeOutput(const char *output, const char *basedir, Int_t nmaxmerge, Int_t stage)
2164 // Merge given output files from basedir. Basedir can be an alien output directory
2165 // but also an xml file with root_archive.zip locations. The file merger will merge nmaxmerge
2166 // files in a group (ignored for xml input). Merging can be done in stages:
2167 // stage=0 : will merge all existing files in a single stage, supporting resume if run locally
2168 // stage=1 : works with an xml of all root_archive.zip in the output directory
2169 // stage>1 : works with an xml of all root_archive.zip in the Stage_<n-1> directory
2170 TString outputFile = output;
2172 TString outputChunk;
2173 TString previousChunk = "";
2174 TObjArray *listoffiles = new TObjArray();
2175 // listoffiles->SetOwner();
2176 Int_t countChunk = 0;
2177 Int_t countZero = nmaxmerge;
2178 Bool_t merged = kTRUE;
2179 Int_t index = outputFile.Index("@");
2180 if (index > 0) outputFile.Remove(index);
2181 TString inputFile = outputFile;
2182 TString sbasedir = basedir;
2183 if (sbasedir.Contains(".xml")) {
2184 // Merge files pointed by the xml - ignore nmaxmerge and set ichunk to 0
2185 nmaxmerge = 9999999;
2186 TGridCollection *coll = (TGridCollection*)gROOT->ProcessLine(Form("TAlienCollection::Open(\"%s\");", basedir));
2188 ::Error("MergeOutput", "Input XML collection empty.");
2191 // Iterate grid collection
2192 while (coll->Next()) {
2193 TString fname = gSystem->DirName(coll->GetTURL());
2196 listoffiles->Add(new TNamed(fname.Data(),""));
2199 command = Form("find %s/ *%s", basedir, inputFile.Data());
2200 printf("command: %s\n", command.Data());
2201 TGridResult *res = gGrid->Command(command);
2203 ::Error("MergeOutput","No result for the find command\n");
2209 while ((map=(TMap*)nextmap())) {
2210 TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("turl"));
2211 if (!objs || !objs->GetString().Length()) {
2212 // Nothing found - skip this output
2217 listoffiles->Add(new TNamed(objs->GetName(),""));
2221 if (!listoffiles->GetEntries()) {
2222 ::Error("MergeOutput","No result for the find command\n");
2227 TFileMerger *fm = 0;
2228 TIter next0(listoffiles);
2229 TObjArray *listoffilestmp = new TObjArray();
2230 listoffilestmp->SetOwner();
2233 // Keep only the files at upper level
2234 Int_t countChar = 0;
2235 while ((nextfile=next0())) {
2236 snextfile = nextfile->GetName();
2237 Int_t crtCount = snextfile.CountChar('/');
2238 if (nextfile == listoffiles->First()) countChar = crtCount;
2239 if (crtCount < countChar) countChar = crtCount;
2242 while ((nextfile=next0())) {
2243 snextfile = nextfile->GetName();
2244 Int_t crtCount = snextfile.CountChar('/');
2245 if (crtCount > countChar) {
2249 listoffilestmp->Add(nextfile);
2252 listoffiles = listoffilestmp; // Now contains 'good' files
2253 listoffiles->Print();
2254 TIter next(listoffiles);
2255 // Check if there is a merge operation to resume. Works only for stage 0 or 1.
2256 outputChunk = outputFile;
2257 outputChunk.ReplaceAll(".root", "_*.root");
2258 // Check for existent temporary merge files
2259 // Check overwrite mode and remove previous partial results if needed
2260 // Preserve old merging functionality for stage 0.
2262 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
2264 // Skip as many input files as in a chunk
2265 for (Int_t counter=0; counter<nmaxmerge; counter++) {
2268 ::Error("MergeOutput", "Mismatch found. Please remove partial merged files from local dir.");
2272 snextfile = nextfile->GetName();
2274 outputChunk = outputFile;
2275 outputChunk.ReplaceAll(".root", Form("_%04d.root", countChunk));
2277 if (gSystem->AccessPathName(outputChunk)) continue;
2278 // Merged file with chunks up to <countChunk> found
2279 ::Info("MergeOutput", "Resume merging of <%s> from <%s>\n", outputFile.Data(), outputChunk.Data());
2280 previousChunk = outputChunk;
2284 countZero = nmaxmerge;
2286 while ((nextfile=next())) {
2287 snextfile = nextfile->GetName();
2288 // Loop 'find' results and get next LFN
2289 if (countZero == nmaxmerge) {
2290 // First file in chunk - create file merger and add previous chunk if any.
2291 fm = new TFileMerger(kFALSE);
2292 fm->SetFastMethod(kTRUE);
2293 if (previousChunk.Length()) fm->AddFile(previousChunk.Data());
2294 outputChunk = outputFile;
2295 outputChunk.ReplaceAll(".root", Form("_%04d.root", countChunk));
2297 // If last file found, put merged results in the output file
2298 if (nextfile == listoffiles->Last()) outputChunk = outputFile;
2299 // Add file to be merged and decrement chunk counter.
2300 fm->AddFile(snextfile);
2302 if (countZero==0 || nextfile == listoffiles->Last()) {
2303 if (!fm->GetMergeList() || !fm->GetMergeList()->GetSize()) {
2304 // Nothing found - skip this output
2305 ::Warning("MergeOutput", "No <%s> files found.", inputFile.Data());
2309 fm->OutputFile(outputChunk);
2310 // Merge the outputs, then go to next chunk
2312 ::Error("MergeOutput", "Could not merge all <%s> files", outputFile.Data());
2316 ::Info("MergeOutputs", "\n##### Merged %d output files to <%s>", fm->GetMergeList()->GetSize(), outputChunk.Data());
2317 gSystem->Unlink(previousChunk);
2319 if (nextfile == listoffiles->Last()) break;
2321 countZero = nmaxmerge;
2322 previousChunk = outputChunk;
2329 // Merging stage different than 0.
2330 // Move to the begining of the requested chunk.
2331 fm = new TFileMerger(kFALSE);
2332 fm->SetFastMethod(kTRUE);
2333 while ((nextfile=next())) fm->AddFile(nextfile->GetName());
2335 if (!fm->GetMergeList() || !fm->GetMergeList()->GetSize()) {
2336 // Nothing found - skip this output
2337 ::Warning("MergeOutput", "No <%s> files found.", inputFile.Data());
2341 fm->OutputFile(outputFile);
2342 // Merge the outputs
2344 ::Error("MergeOutput", "Could not merge all <%s> files", outputFile.Data());
2348 ::Info("MergeOutput", "\n##### Merged %d output files to <%s>", fm->GetMergeList()->GetSize(), outputFile.Data());
2354 //______________________________________________________________________________
2355 Bool_t AliAnalysisAlien::MergeOutputs()
2357 // Merge analysis outputs existing in the AliEn space.
2358 if (TestBit(AliAnalysisGrid::kTest)) return kTRUE;
2359 if (TestBit(AliAnalysisGrid::kOffline)) return kFALSE;
2361 Error("MergeOutputs", "Cannot merge outputs without grid connection. Terminate will NOT be executed");
2365 if (!TestBit(AliAnalysisGrid::kMerge)) {
2366 Info("MergeOutputs", "### Re-run with <MergeViaJDL> option in terminate mode of the plugin to submit merging jobs ###");
2369 if (fProductionMode) {
2370 Info("MergeOutputs", "### Merging will be submitted by LPM manager... ###");
2373 Info("MergeOutputs", "Submitting merging JDL");
2374 if (!SubmitMerging()) return kFALSE;
2375 Info("MergeOutputs", "### Re-run with <MergeViaJDL> off to collect results after merging jobs are done ###");
2376 Info("MergeOutputs", "### The Terminate() method is executed by the merging jobs");
2379 // Get the output path
2380 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
2381 if (!DirectoryExists(fGridOutputDir)) {
2382 Error("MergeOutputs", "Grid output directory %s not found. Terminate() will NOT be executed", fGridOutputDir.Data());
2385 if (!fOutputFiles.Length()) {
2386 Error("MergeOutputs", "No output file names defined. Are you running the right AliAnalysisAlien configuration ?");
2389 // Check if fast read option was requested
2390 Info("MergeOutputs", "Started local merging of output files from: alien://%s \
2391 \n======= overwrite mode = %d", fGridOutputDir.Data(), (Int_t)fOverwriteMode);
2392 if (fFastReadOption) {
2393 Warning("MergeOutputs", "You requested FastRead option. Using xrootd flags to reduce timeouts. This may skip some files that could be accessed ! \
2394 \n+++ NOTE: To disable this option, use: plugin->SetFastReadOption(kFALSE)");
2395 gEnv->SetValue("XNet.ConnectTimeout",50);
2396 gEnv->SetValue("XNet.RequestTimeout",50);
2397 gEnv->SetValue("XNet.MaxRedirectCount",2);
2398 gEnv->SetValue("XNet.ReconnectTimeout",50);
2399 gEnv->SetValue("XNet.FirstConnectMaxCnt",1);
2401 // Make sure we change the temporary directory
2402 gSystem->Setenv("TMPDIR", gSystem->pwd());
2403 // Set temporary compilation directory to current one
2404 gSystem->SetBuildDir(gSystem->pwd(), kTRUE);
2405 TObjArray *list = fOutputFiles.Tokenize(",");
2409 Bool_t merged = kTRUE;
2410 while((str=(TObjString*)next())) {
2411 outputFile = str->GetString();
2412 Int_t index = outputFile.Index("@");
2413 if (index > 0) outputFile.Remove(index);
2414 TString outputChunk = outputFile;
2415 outputChunk.ReplaceAll(".root", "_*.root");
2416 // Skip already merged outputs
2417 if (!gSystem->AccessPathName(outputFile)) {
2418 if (fOverwriteMode) {
2419 Info("MergeOutputs", "Overwrite mode. Existing file %s was deleted.", outputFile.Data());
2420 gSystem->Unlink(outputFile);
2421 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
2422 Info("MergeOutput", "Overwrite mode: partial merged files %s will removed",
2423 outputChunk.Data());
2424 gSystem->Exec(Form("rm -f %s", outputChunk.Data()));
2427 Info("MergeOutputs", "Output file <%s> found. Not merging again.", outputFile.Data());
2431 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
2432 Info("MergeOutput", "Overwrite mode: partial merged files %s will removed",
2433 outputChunk.Data());
2434 gSystem->Exec(Form("rm -f %s", outputChunk.Data()));
2437 if (fMergeExcludes.Length() &&
2438 fMergeExcludes.Contains(outputFile.Data())) continue;
2439 // Perform a 'find' command in the output directory, looking for registered outputs
2440 merged = MergeOutput(outputFile, fGridOutputDir, fMaxMergeFiles);
2442 Error("MergeOutputs", "Terminate() will NOT be executed");
2445 TFile *fileOpened = (TFile*)gROOT->GetListOfFiles()->FindObject(outputFile);
2446 if (fileOpened) fileOpened->Close();
2451 //______________________________________________________________________________
2452 void AliAnalysisAlien::SetDefaultOutputs(Bool_t flag)
2454 // Use the output files connected to output containers from the analysis manager
2455 // rather than the files defined by SetOutputFiles
2456 if (flag && !TObject::TestBit(AliAnalysisGrid::kDefaultOutputs))
2457 Info("SetDefaultOutputs", "Plugin will use the output files taken from analysis manager");
2458 TObject::SetBit(AliAnalysisGrid::kDefaultOutputs, flag);
2461 //______________________________________________________________________________
2462 void AliAnalysisAlien::SetOutputFiles(const char *list)
2464 // Manually set the output files list.
2465 // Removes duplicates. Not allowed if default outputs are not disabled.
2466 if (TObject::TestBit(AliAnalysisGrid::kDefaultOutputs)) {
2467 Fatal("SetOutputFiles", "You have to explicitly call SetDefaultOutputs(kFALSE) to manually set output files.");
2470 Info("SetOutputFiles", "Output file list is set manually - you are on your own.");
2472 TString slist = list;
2473 if (slist.Contains("@")) Warning("SetOutputFiles","The plugin does not allow explicit SE's. Please use: SetNumberOfReplicas() instead.");
2474 TObjArray *arr = slist.Tokenize(" ");
2478 while ((os=(TObjString*)next())) {
2479 sout = os->GetString();
2480 if (sout.Index("@")>0) sout.Remove(sout.Index("@"));
2481 if (fOutputFiles.Contains(sout)) continue;
2482 if (!fOutputFiles.IsNull()) fOutputFiles += ",";
2483 fOutputFiles += sout;
2488 //______________________________________________________________________________
2489 void AliAnalysisAlien::SetOutputArchive(const char *list)
2491 // Manually set the output archive list. Free text - you are on your own...
2492 // Not allowed if default outputs are not disabled.
2493 if (TObject::TestBit(AliAnalysisGrid::kDefaultOutputs)) {
2494 Fatal("SetOutputArchive", "You have to explicitly call SetDefaultOutputs(kFALSE) to manually set the output archives.");
2497 Info("SetOutputArchive", "Output archive is set manually - you are on your own.");
2498 fOutputArchive = list;
2501 //______________________________________________________________________________
2502 void AliAnalysisAlien::SetPreferedSE(const char */*se*/)
2504 // Setting a prefered output SE is not allowed anymore.
2505 Warning("SetPreferedSE", "Setting a preferential SE is not allowed anymore via the plugin. Use SetNumberOfReplicas() and SetDefaultOutputs()");
2508 //______________________________________________________________________________
2509 void AliAnalysisAlien::SetProofParameter(const char *pname, const char *value)
2511 // Set some PROOF special parameter.
2512 TPair *pair = dynamic_cast<TPair*>(fProofParam.FindObject(pname));
2514 TObject *old = pair->Key();
2515 TObject *val = pair->Value();
2516 fProofParam.Remove(old);
2520 fProofParam.Add(new TObjString(pname), new TObjString(value));
2523 //______________________________________________________________________________
2524 const char *AliAnalysisAlien::GetProofParameter(const char *pname) const
2526 // Returns a special PROOF parameter.
2527 TPair *pair = dynamic_cast<TPair*>(fProofParam.FindObject(pname));
2528 if (!pair) return 0;
2529 return pair->Value()->GetName();
2532 //______________________________________________________________________________
2533 Bool_t AliAnalysisAlien::StartAnalysis(Long64_t /*nentries*/, Long64_t /*firstEntry*/)
2535 // Start remote grid analysis.
2536 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
2537 Bool_t testMode = TestBit(AliAnalysisGrid::kTest);
2538 if (!mgr || !mgr->IsInitialized()) {
2539 Error("StartAnalysis", "You need an initialized analysis manager for this");
2542 // Are we in PROOF mode ?
2543 if (mgr->IsProofMode()) {
2544 if (testMode) Info("StartAnalysis", "##### Starting PROOF analysis with Proof Lite via the plugin #####");
2545 else Info("StartAnalysis", "##### Starting PROOF analysis on cluster <%s> via the plugin #####", fProofCluster.Data());
2546 if (fProofCluster.IsNull()) {
2547 Error("StartAnalysis", "You need to specify the proof cluster name via SetProofCluster");
2550 if (fProofDataSet.IsNull() && !testMode) {
2551 Error("StartAnalysis", "You need to specify a dataset using SetProofDataSet()");
2554 // Set the needed environment
2555 gEnv->SetValue("XSec.GSI.DelegProxy","2");
2556 // Do we need to reset PROOF ? The success of the Reset operation cannot be checked
2557 if (fProofReset && !testMode) {
2558 if (fProofReset==1) {
2559 Info("StartAnalysis", "Sending soft reset signal to proof cluster %s", fProofCluster.Data());
2560 gROOT->ProcessLine(Form("TProof::Reset(\"%s\", kFALSE);", fProofCluster.Data()));
2562 Info("StartAnalysis", "Sending hard reset signal to proof cluster %s", fProofCluster.Data());
2563 gROOT->ProcessLine(Form("TProof::Reset(\"%s\", kTRUE);", fProofCluster.Data()));
2565 Info("StartAnalysis", "Stopping the analysis. Please use SetProofReset(0) to resume.");
2570 // Check if there is an old active session
2571 Long_t nsessions = gROOT->ProcessLine(Form("TProof::Mgr(\"%s\")->QuerySessions(\"\")->GetEntries();", fProofCluster.Data()));
2573 Error("StartAnalysis","You have to reset your old session first\n");
2577 // Do we need to change the ROOT version ? The success of this cannot be checked.
2578 if (!fRootVersionForProof.IsNull() && !testMode) {
2579 gROOT->ProcessLine(Form("TProof::Mgr(\"%s\")->SetROOTVersion(\"%s\");",
2580 fProofCluster.Data(), fRootVersionForProof.Data()));
2582 // Connect to PROOF and check the status
2585 if (fNproofWorkersPerSlave) sworkers = Form("workers=%dx", fNproofWorkersPerSlave);
2586 else if (fNproofWorkers) sworkers = Form("workers=%d", fNproofWorkers);
2588 if (!sworkers.IsNull())
2589 proof = gROOT->ProcessLine(Form("TProof::Open(\"%s\", \"%s\");", fProofCluster.Data(), sworkers.Data()));
2591 proof = gROOT->ProcessLine(Form("TProof::Open(\"%s\");", fProofCluster.Data()));
2593 proof = gROOT->ProcessLine("TProof::Open(\"\");");
2595 Error("StartAnalysis", "Could not start PROOF in test mode");
2600 Error("StartAnalysis", "Could not connect to PROOF cluster <%s>", fProofCluster.Data());
2603 if (fNproofWorkersPerSlave*fNproofWorkers > 0)
2604 gROOT->ProcessLine(Form("gProof->SetParallel(%d);", fNproofWorkers));
2605 // Set proof special parameters if any
2606 TIter nextpp(&fProofParam);
2607 TObject *proofparam;
2608 while ((proofparam=nextpp())) {
2609 TString svalue = GetProofParameter(proofparam->GetName());
2610 gROOT->ProcessLine(Form("gProof->SetParameter(\"%s\",%s);", proofparam->GetName(), svalue.Data()));
2612 // Is dataset existing ?
2614 TString dataset = fProofDataSet;
2615 Int_t index = dataset.Index("#");
2616 if (index>=0) dataset.Remove(index);
2617 // if (!gROOT->ProcessLine(Form("gProof->ExistsDataSet(\"%s\");",fProofDataSet.Data()))) {
2618 // Error("StartAnalysis", "Dataset %s not existing", fProofDataSet.Data());
2621 // Info("StartAnalysis", "Dataset %s found", dataset.Data());
2623 // Is ClearPackages() needed ?
2624 if (TestSpecialBit(kClearPackages)) {
2625 Info("StartAnalysis", "ClearPackages signal sent to PROOF. Use SetClearPackages(kFALSE) to reset this.");
2626 gROOT->ProcessLine("gProof->ClearPackages();");
2628 // Is a given aliroot mode requested ?
2631 if (!fAliRootMode.IsNull()) {
2632 TString alirootMode = fAliRootMode;
2633 if (alirootMode == "default") alirootMode = "";
2634 Info("StartAnalysis", "You are requesting AliRoot mode: %s", fAliRootMode.Data());
2635 optionsList.SetOwner();
2636 optionsList.Add(new TNamed("ALIROOT_MODE", alirootMode.Data()));
2637 // Check the additional libs to be loaded
2639 Bool_t parMode = kFALSE;
2640 if (!alirootMode.IsNull()) extraLibs = "ANALYSIS:ANALYSISalice";
2641 // Parse the extra libs for .so
2642 if (fAdditionalLibs.Length()) {
2643 TObjArray *list = fAdditionalLibs.Tokenize(" ");
2646 while((str=(TObjString*)next())) {
2647 if (str->GetString().Contains(".so")) {
2649 Warning("StartAnalysis", "Plugin does not support loading libs after par files in PROOF mode. Library %s and following will not load on workers", str->GetName());
2652 TString stmp = str->GetName();
2653 if (stmp.BeginsWith("lib")) stmp.Remove(0,3);
2654 stmp.ReplaceAll(".so","");
2655 if (!extraLibs.IsNull()) extraLibs += ":";
2659 if (str->GetString().Contains(".par")) {
2660 // The first par file found in the list will not allow any further .so
2662 if (!parLibs.IsNull()) parLibs += ":";
2663 parLibs += str->GetName();
2667 if (list) delete list;
2669 if (!extraLibs.IsNull()) {
2670 Info("StartAnalysis", "Adding extra libs: %s",extraLibs.Data());
2671 optionsList.Add(new TNamed("ALIROOT_EXTRA_LIBS",extraLibs.Data()));
2673 // Check extra includes
2674 if (!fIncludePath.IsNull()) {
2675 TString includePath = fIncludePath;
2676 includePath.ReplaceAll(" ",":");
2677 includePath.ReplaceAll("$ALICE_ROOT/","");
2678 includePath.ReplaceAll("${ALICE_ROOT}/","");
2679 includePath.ReplaceAll("-I","");
2680 includePath.Remove(TString::kTrailing, ':');
2681 Info("StartAnalysis", "Adding extra includes: %s",includePath.Data());
2682 optionsList.Add(new TNamed("ALIROOT_EXTRA_INCLUDES",includePath.Data()));
2684 // Check if connection to grid is requested
2685 if (TestSpecialBit(kProofConnectGrid))
2686 optionsList.Add(new TNamed("ALIROOT_ENABLE_ALIEN", "1"));
2687 // Enable AliRoot par
2689 // Enable proof lite package
2690 TString alirootLite = gSystem->ExpandPathName("$ALICE_ROOT/ANALYSIS/macros/AliRootProofLite.par");
2691 for (Int_t i=0; i<optionsList.GetSize(); i++) {
2692 TNamed *obj = (TNamed*)optionsList.At(i);
2693 printf("%s %s\n", obj->GetName(), obj->GetTitle());
2695 if (!gROOT->ProcessLine(Form("gProof->UploadPackage(\"%s\");",alirootLite.Data()))
2696 && !gROOT->ProcessLine(Form("gProof->EnablePackage(\"%s\", (TList*)%p);",alirootLite.Data(),&optionsList))) {
2697 Info("StartAnalysis", "AliRootProofLite enabled");
2699 Error("StartAnalysis", "There was an error trying to enable package AliRootProofLite.par");
2703 if ( ! fAliROOTVersion.IsNull() ) {
2704 if (gROOT->ProcessLine(Form("gProof->EnablePackage(\"VO_ALICE@AliRoot::%s\", (TList*)%p, kTRUE);",
2705 fAliROOTVersion.Data(), &optionsList))) {
2706 Error("StartAnalysis", "There was an error trying to enable package VO_ALICE@AliRoot::%s", fAliROOTVersion.Data());
2711 // Enable first par files from fAdditionalLibs
2712 if (!parLibs.IsNull()) {
2713 TObjArray *list = parLibs.Tokenize(":");
2715 TObjString *package;
2716 while((package=(TObjString*)next())) {
2717 TString spkg = package->GetName();
2718 spkg.ReplaceAll(".par", "");
2719 gSystem->Exec(TString::Format("rm -rf %s", spkg.Data()));
2720 if (!gROOT->ProcessLine(Form("gProof->UploadPackage(\"%s\");", package->GetName()))) {
2721 TString enablePackage = (testMode)?Form("gProof->EnablePackage(\"%s\",kFALSE);", package->GetName()):Form("gProof->EnablePackage(\"%s\",kTRUE);", package->GetName());
2722 if (gROOT->ProcessLine(enablePackage)) {
2723 Error("StartAnalysis", "There was an error trying to enable package %s", package->GetName());
2727 Error("StartAnalysis", "There was an error trying to upload package %s", package->GetName());
2731 if (list) delete list;
2734 if (fAdditionalLibs.Contains(".so") && !testMode) {
2735 Error("StartAnalysis", "You request additional libs to be loaded but did not enabled any AliRoot mode. Please refer to: \
2736 \n http://aaf.cern.ch/node/83 and use a parameter for SetAliRootMode()");
2740 // Enable par files if requested
2741 if (fPackages && fPackages->GetEntries()) {
2742 TIter next(fPackages);
2744 while ((package=next())) {
2745 // Skip packages already enabled
2746 if (parLibs.Contains(package->GetName())) continue;
2747 TString spkg = package->GetName();
2748 spkg.ReplaceAll(".par", "");
2749 gSystem->Exec(TString::Format("rm -rf %s", spkg.Data()));
2750 if (!gROOT->ProcessLine(Form("gProof->UploadPackage(\"%s\");", package->GetName()))) {
2751 if (gROOT->ProcessLine(Form("gProof->EnablePackage(\"%s\",kTRUE);", package->GetName()))) {
2752 Error("StartAnalysis", "There was an error trying to enable package %s", package->GetName());
2756 Error("StartAnalysis", "There was an error trying to upload package %s", package->GetName());
2761 // Do we need to load analysis source files ?
2762 // NOTE: don't load on client since this is anyway done by the user to attach his task.
2763 if (fAnalysisSource.Length()) {
2764 TObjArray *list = fAnalysisSource.Tokenize(" ");
2767 while((str=(TObjString*)next())) {
2768 gROOT->ProcessLine(Form("gProof->Load(\"%s+g\", kTRUE);", str->GetName()));
2770 if (list) delete list;
2773 // Register dataset to proof lite.
2774 if (fFileForTestMode.IsNull()) {
2775 Error("GetChainForTestMode", "For proof test mode please use SetFileForTestMode() pointing to a file that contains data file locations.");
2778 if (gSystem->AccessPathName(fFileForTestMode)) {
2779 Error("GetChainForTestMode", "File not found: %s", fFileForTestMode.Data());
2782 TFileCollection *coll = new TFileCollection();
2783 coll->AddFromFile(fFileForTestMode);
2784 gROOT->ProcessLine(Form("gProof->RegisterDataSet(\"test_collection\", (TFileCollection*)%p, \"OV\");", coll));
2785 gROOT->ProcessLine("gProof->ShowDataSets()");
2790 // Check if output files have to be taken from the analysis manager
2791 if (TestBit(AliAnalysisGrid::kDefaultOutputs)) {
2792 // Add output files and AOD files
2793 fOutputFiles = GetListOfFiles("outaod");
2794 // Add extra files registered to the analysis manager
2795 TString extra = GetListOfFiles("ext");
2796 if (!extra.IsNull()) {
2797 extra.ReplaceAll(".root", "*.root");
2798 if (!fOutputFiles.IsNull()) fOutputFiles += ",";
2799 fOutputFiles += extra;
2801 // Compose the output archive.
2802 fOutputArchive = "log_archive.zip:std*@disk=1 ";
2803 fOutputArchive += Form("root_archive.zip:%s,*.stat@disk=%d",fOutputFiles.Data(),fNreplicas);
2805 // if (!fCloseSE.Length()) fCloseSE = gSystem->Getenv("alien_CLOSE_SE");
2806 if (TestBit(AliAnalysisGrid::kOffline)) {
2807 Info("StartAnalysis","\n##### OFFLINE MODE ##### Files to be used in GRID are produced but not copied \
2808 \n there nor any job run. You can revise the JDL and analysis \
2809 \n macro then run the same in \"submit\" mode.");
2810 } else if (TestBit(AliAnalysisGrid::kTest)) {
2811 Info("StartAnalysis","\n##### LOCAL MODE ##### Your analysis will be run locally on a subset of the requested \
2813 } else if (TestBit(AliAnalysisGrid::kSubmit)) {
2814 Info("StartAnalysis","\n##### SUBMIT MODE ##### Files required by your analysis are copied to your grid working \
2815 \n space and job submitted.");
2816 } else if (TestBit(AliAnalysisGrid::kMerge)) {
2817 Info("StartAnalysis","\n##### MERGE MODE ##### The registered outputs of the analysis will be merged");
2818 if (fMergeViaJDL) CheckInputData();
2821 Info("StartAnalysis","\n##### FULL ANALYSIS MODE ##### Producing needed files and submitting your analysis job...");
2826 Error("StartAnalysis", "Cannot start grid analysis without grid connection");
2829 if (IsCheckCopy() && gGrid) CheckFileCopy(gGrid->GetHomeDirectory());
2830 if (!CheckInputData()) {
2831 Error("StartAnalysis", "There was an error in preprocessing your requested input data");
2834 if (!CreateDataset(fDataPattern)) {
2836 if (!fRunNumbers.Length() && !fRunRange[0]) serror = Form("path to data directory: <%s>", fGridDataDir.Data());
2837 if (fRunNumbers.Length()) serror = "run numbers";
2838 if (fRunRange[0]) serror = Form("run range [%d, %d]", fRunRange[0], fRunRange[1]);
2839 serror += Form("\n or data pattern <%s>", fDataPattern.Data());
2840 Error("StartAnalysis", "No data to process. Please fix %s in your plugin configuration.", serror.Data());
2843 WriteAnalysisFile();
2844 WriteAnalysisMacro();
2846 WriteValidationScript();
2848 WriteMergingMacro();
2849 WriteMergeExecutable();
2850 WriteValidationScript(kTRUE);
2852 if (!CreateJDL()) return kFALSE;
2853 if (TestBit(AliAnalysisGrid::kOffline)) return kFALSE;
2855 // Locally testing the analysis
2856 Info("StartAnalysis", "\n_______________________________________________________________________ \
2857 \n Running analysis script in a daughter shell as on a worker node \
2858 \n_______________________________________________________________________");
2859 TObjArray *list = fOutputFiles.Tokenize(",");
2863 while((str=(TObjString*)next())) {
2864 outputFile = str->GetString();
2865 Int_t index = outputFile.Index("@");
2866 if (index > 0) outputFile.Remove(index);
2867 if (!gSystem->AccessPathName(outputFile)) gSystem->Exec(Form("rm %s", outputFile.Data()));
2870 gSystem->Exec(Form("bash %s 2>stderr", fExecutable.Data()));
2871 gSystem->Exec(Form("bash %s",fValidationScript.Data()));
2872 // gSystem->Exec("cat stdout");
2875 // Check if submitting is managed by LPM manager
2876 if (fProductionMode) {
2877 TString prodfile = fJDLName;
2878 prodfile.ReplaceAll(".jdl", ".prod");
2879 WriteProductionFile(prodfile);
2880 Info("StartAnalysis", "Job submitting is managed by LPM. Rerun in terminate mode after jobs finished.");
2883 // Submit AliEn job(s)
2884 gGrid->Cd(fGridOutputDir);
2887 if (!fRunNumbers.Length() && !fRunRange[0]) {
2888 // Submit a given xml or a set of runs
2889 res = gGrid->Command(Form("submit %s", fJDLName.Data()));
2890 printf("*************************** %s\n",Form("submit %s", fJDLName.Data()));
2892 const char *cjobId = res->GetKey(0,"jobId");
2896 Error("StartAnalysis", "Your JDL %s could not be submitted", fJDLName.Data());
2899 Info("StartAnalysis", "\n_______________________________________________________________________ \
2900 \n##### Your JDL %s was successfully submitted. \nTHE JOB ID IS: %s \
2901 \n_______________________________________________________________________",
2902 fJDLName.Data(), cjobId);
2907 Error("StartAnalysis", "No grid result after submission !!! Bailing out...");
2911 // Submit for a range of enumeration of runs.
2912 if (!Submit()) return kFALSE;
2915 Info("StartAnalysis", "\n#### STARTING AN ALIEN SHELL FOR YOU. EXIT WHEN YOUR JOB %s HAS FINISHED. #### \
2916 \n You may exit at any time and terminate the job later using the option <terminate> \
2917 \n ##################################################################################", jobID.Data());
2918 gSystem->Exec("aliensh");
2922 //______________________________________________________________________________
2923 const char *AliAnalysisAlien::GetListOfFiles(const char *type)
2925 // Get a comma-separated list of output files of the requested type.
2926 // Type can be (case unsensitive):
2927 // aod - list of aod files (std, extensions and filters)
2928 // out - list of output files connected to containers (but not aod's or extras)
2929 // ext - list of extra files registered to the manager
2930 // ter - list of files produced in terminate
2931 static TString files;
2933 TString stype = type;
2935 TString aodfiles, extra;
2936 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
2938 ::Error("GetListOfFiles", "Cannot call this without analysis manager");
2939 return files.Data();
2941 if (mgr->GetOutputEventHandler()) {
2942 aodfiles = mgr->GetOutputEventHandler()->GetOutputFileName();
2943 TString extraaod = mgr->GetOutputEventHandler()->GetExtraOutputs();
2944 if (!extraaod.IsNull()) {
2946 aodfiles += extraaod;
2949 if (stype.Contains("aod")) {
2951 if (stype == "aod") return files.Data();
2953 // Add output files that are not in the list of AOD files
2954 TString outputfiles = "";
2955 TIter next(mgr->GetOutputs());
2956 AliAnalysisDataContainer *output;
2957 const char *filename = 0;
2958 while ((output=(AliAnalysisDataContainer*)next())) {
2959 filename = output->GetFileName();
2960 if (!(strcmp(filename, "default"))) continue;
2961 if (outputfiles.Contains(filename)) continue;
2962 if (aodfiles.Contains(filename)) continue;
2963 if (!outputfiles.IsNull()) outputfiles += ",";
2964 outputfiles += filename;
2966 if (stype.Contains("out")) {
2967 if (!files.IsNull()) files += ",";
2968 files += outputfiles;
2969 if (stype == "out") return files.Data();
2971 // Add extra files registered to the analysis manager
2973 extra = mgr->GetExtraFiles();
2974 if (!extra.IsNull()) {
2976 extra.ReplaceAll(" ", ",");
2977 TObjArray *fextra = extra.Tokenize(",");
2978 TIter nextx(fextra);
2980 while ((obj=nextx())) {
2981 if (aodfiles.Contains(obj->GetName())) continue;
2982 if (outputfiles.Contains(obj->GetName())) continue;
2983 if (sextra.Contains(obj->GetName())) continue;
2984 if (!sextra.IsNull()) sextra += ",";
2985 sextra += obj->GetName();
2988 if (stype.Contains("ext")) {
2989 if (!files.IsNull()) files += ",";
2993 if (stype == "ext") return files.Data();
2995 if (!fTerminateFiles.IsNull()) {
2996 fTerminateFiles.Strip();
2997 fTerminateFiles.ReplaceAll(" ",",");
2998 TObjArray *fextra = fTerminateFiles.Tokenize(",");
2999 TIter nextx(fextra);
3001 while ((obj=nextx())) {
3002 if (aodfiles.Contains(obj->GetName())) continue;
3003 if (outputfiles.Contains(obj->GetName())) continue;
3004 if (termfiles.Contains(obj->GetName())) continue;
3005 if (sextra.Contains(obj->GetName())) continue;
3006 if (!termfiles.IsNull()) termfiles += ",";
3007 termfiles += obj->GetName();
3011 if (stype.Contains("ter")) {
3012 if (!files.IsNull() && !termfiles.IsNull()) {
3017 return files.Data();
3020 //______________________________________________________________________________
3021 Bool_t AliAnalysisAlien::Submit()
3023 // Submit all master jobs.
3024 Int_t nmasterjobs = fInputFiles->GetEntries();
3025 Long_t tshoot = gSystem->Now();
3026 if (!fNsubmitted && !SubmitNext()) return kFALSE;
3027 while (fNsubmitted < nmasterjobs) {
3028 Long_t now = gSystem->Now();
3029 if ((now-tshoot)>30000) {
3031 if (!SubmitNext()) return kFALSE;
3037 //______________________________________________________________________________
3038 Bool_t AliAnalysisAlien::SubmitMerging()
3040 // Submit all merging jobs.
3041 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
3042 gGrid->Cd(fGridOutputDir);
3043 TString mergeJDLName = fExecutable;
3044 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
3046 Error("SubmitMerging", "You have to use explicit run numbers or run range to merge via JDL!");
3049 Int_t ntosubmit = fInputFiles->GetEntries();
3050 for (Int_t i=0; i<ntosubmit; i++) {
3051 TString runOutDir = gSystem->BaseName(fInputFiles->At(i)->GetName());
3052 runOutDir.ReplaceAll(".xml", "");
3053 if (fOutputToRunNo) {
3054 // The output directory is the run number
3055 printf("### Submitting merging job for run <%s>\n", runOutDir.Data());
3056 runOutDir = Form("%s/%s", fGridOutputDir.Data(), runOutDir.Data());
3058 if (!fRunNumbers.Length() && !fRunRange[0]) {
3059 // The output directory is the grid outdir
3060 printf("### Submitting merging job for the full output directory %s.\n", fGridOutputDir.Data());
3061 runOutDir = fGridOutputDir;
3063 // The output directory is the master number in 3 digits format
3064 printf("### Submitting merging job for master <%03d>\n", i);
3065 runOutDir = Form("%s/%03d",fGridOutputDir.Data(), i);
3068 // Check now the number of merging stages.
3069 TObjArray *list = fOutputFiles.Tokenize(",");
3073 while((str=(TObjString*)next())) {
3074 outputFile = str->GetString();
3075 Int_t index = outputFile.Index("@");
3076 if (index > 0) outputFile.Remove(index);
3077 if (!fMergeExcludes.Contains(outputFile)) break;
3080 Bool_t done = CheckMergedFiles(outputFile, runOutDir, fMaxMergeFiles, mergeJDLName);
3081 if (!done && (i==ntosubmit-1)) return kFALSE;
3082 if (!fRunNumbers.Length() && !fRunRange[0]) break;
3084 if (!ntosubmit) return kTRUE;
3085 Info("StartAnalysis", "\n #### STARTING AN ALIEN SHELL FOR YOU. You can exit any time or inspect your jobs in a different shell.##########\
3086 \n Make sure your jobs are in a final state (you can resubmit failed ones via 'masterjob <id> resubmit ERROR_ALL')\
3087 \n Rerun in 'terminate' mode to submit all merging stages, each AFTER the previous one completed. The final merged \
3088 \n output will be written to your alien output directory, while separate stages in <Stage_n>. \
3089 \n ################################################################################################################");
3090 gSystem->Exec("aliensh");
3094 //______________________________________________________________________________
3095 Bool_t AliAnalysisAlien::SubmitNext()
3097 // Submit next bunch of master jobs if the queue is free. The first master job is
3098 // submitted right away, while the next will not be unless the previous was split.
3099 // The plugin will not submit new master jobs if there are more that 500 jobs in
3101 static Bool_t iscalled = kFALSE;
3102 static Int_t firstmaster = 0;
3103 static Int_t lastmaster = 0;
3104 static Int_t npermaster = 0;
3105 if (iscalled) return kTRUE;
3107 Int_t nrunning=0, nwaiting=0, nerror=0, ndone=0;
3108 Int_t ntosubmit = 0;
3111 Int_t nmasterjobs = fInputFiles->GetEntries();
3114 if (!IsUseSubmitPolicy()) {
3116 Info("SubmitNext","### Warning submit policy not used ! Submitting too many jobs at a time may be prohibitted. \
3117 \n### You can use SetUseSubmitPolicy() to enable if you have problems.");
3118 ntosubmit = nmasterjobs;
3121 TString status = GetJobStatus(firstmaster, lastmaster, nrunning, nwaiting, nerror, ndone);
3122 printf("=== master %d: %s\n", lastmaster, status.Data());
3123 // If last master not split, just return
3124 if (status != "SPLIT") {iscalled = kFALSE; return kTRUE;}
3125 // No more than 100 waiting jobs
3126 if (nwaiting>500) {iscalled = kFALSE; return kTRUE;}
3127 npermaster = (nrunning+nwaiting+nerror+ndone)/fNsubmitted;
3128 if (npermaster) ntosubmit = (500-nwaiting)/npermaster;
3129 if (!ntosubmit) ntosubmit = 1;
3130 printf("=== WAITING(%d) RUNNING(%d) DONE(%d) OTHER(%d) NperMaster=%d => to submit %d jobs\n",
3131 nwaiting, nrunning, ndone, nerror, npermaster, ntosubmit);
3133 for (Int_t i=0; i<ntosubmit; i++) {
3134 // Submit for a range of enumeration of runs.
3135 if (fNsubmitted>=nmasterjobs) {iscalled = kFALSE; return kTRUE;}
3137 TString runOutDir = gSystem->BaseName(fInputFiles->At(fNsubmitted)->GetName());
3138 runOutDir.ReplaceAll(".xml", "");
3140 query = Form("submit %s %s %s", fJDLName.Data(), fInputFiles->At(fNsubmitted)->GetName(), runOutDir.Data());
3142 query = Form("submit %s %s %03d", fJDLName.Data(), fInputFiles->At(fNsubmitted)->GetName(), fNsubmitted);
3143 printf("********* %s\n",query.Data());
3144 res = gGrid->Command(query);
3146 TString cjobId1 = res->GetKey(0,"jobId");
3147 if (!cjobId1.Length()) {
3151 Error("StartAnalysis", "Your JDL %s could not be submitted. The message was:", fJDLName.Data());
3154 Info("StartAnalysis", "\n_______________________________________________________________________ \
3155 \n##### Your JDL %s submitted (%d to go). \nTHE JOB ID IS: %s \
3156 \n_______________________________________________________________________",
3157 fJDLName.Data(), nmasterjobs-fNsubmitted-1, cjobId1.Data());
3160 lastmaster = cjobId1.Atoi();
3161 if (!firstmaster) firstmaster = lastmaster;
3166 Error("StartAnalysis", "No grid result after submission !!! Bailing out...");
3174 //______________________________________________________________________________
3175 void AliAnalysisAlien::WriteAnalysisFile()
3177 // Write current analysis manager into the file <analysisFile>
3178 TString analysisFile = fExecutable;
3179 analysisFile.ReplaceAll(".sh", ".root");
3180 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3181 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
3182 if (!mgr || !mgr->IsInitialized()) {
3183 Error("WriteAnalysisFile", "You need an initialized analysis manager for this");
3186 // Check analysis type
3188 if (mgr->GetMCtruthEventHandler()) TObject::SetBit(AliAnalysisGrid::kUseMC);
3189 handler = (TObject*)mgr->GetInputEventHandler();
3191 if (handler->InheritsFrom("AliMultiInputEventHandler")) {
3192 AliMultiInputEventHandler *multiIH = (AliMultiInputEventHandler*)handler;
3193 if (multiIH->GetFirstInputEventHandler()->InheritsFrom("AliESDInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseESD);
3194 if (multiIH->GetFirstInputEventHandler()->InheritsFrom("AliAODInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseAOD);
3196 if (handler->InheritsFrom("AliESDInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseESD);
3197 if (handler->InheritsFrom("AliAODInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseAOD);
3200 TDirectory *cdir = gDirectory;
3201 TFile *file = TFile::Open(analysisFile, "RECREATE");
3203 // Skip task Terminate calls for the grid job (but not in test mode, where we want to check also the terminate mode
3204 if (!TestBit(AliAnalysisGrid::kTest)) mgr->SetSkipTerminate(kTRUE);
3205 // Unless merging makes no sense
3206 if (IsSingleOutput()) mgr->SetSkipTerminate(kFALSE);
3209 // Enable termination for local jobs
3210 mgr->SetSkipTerminate(kFALSE);
3212 if (cdir) cdir->cd();
3213 Info("WriteAnalysisFile", "\n##### Analysis manager: %s wrote to file <%s>\n", mgr->GetName(),analysisFile.Data());
3215 Bool_t copy = kTRUE;
3216 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3219 TString workdir = gGrid->GetHomeDirectory();
3220 workdir += fGridWorkingDir;
3221 Info("WriteAnalysisFile", "\n##### Copying file <%s> containing your initialized analysis manager to your alien workspace", analysisFile.Data());
3222 if (FileExists(analysisFile)) gGrid->Rm(analysisFile);
3223 TFile::Cp(Form("file:%s",analysisFile.Data()), Form("alien://%s/%s", workdir.Data(),analysisFile.Data()));
3227 //______________________________________________________________________________
3228 void AliAnalysisAlien::WriteAnalysisMacro()
3230 // Write the analysis macro that will steer the analysis in grid mode.
3231 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3233 out.open(fAnalysisMacro.Data(), ios::out);
3235 Error("WriteAnalysisMacro", "could not open file %s for writing", fAnalysisMacro.Data());
3238 Bool_t hasSTEERBase = kFALSE;
3239 Bool_t hasESD = kFALSE;
3240 Bool_t hasAOD = kFALSE;
3241 Bool_t hasANALYSIS = kFALSE;
3242 Bool_t hasOADB = kFALSE;
3243 Bool_t hasANALYSISalice = kFALSE;
3244 Bool_t hasCORRFW = kFALSE;
3245 TString func = fAnalysisMacro;
3246 TString type = "ESD";
3247 TString comment = "// Analysis using ";
3248 if (IsUseMCchain()) {
3252 if (TObject::TestBit(AliAnalysisGrid::kUseESD)) comment += "ESD";
3253 if (TObject::TestBit(AliAnalysisGrid::kUseAOD)) {
3258 if (type!="AOD" && fFriendChainName!="") {
3259 Error("WriteAnalysisMacro", "Friend chain can be attached only to AOD");
3262 if (TObject::TestBit(AliAnalysisGrid::kUseMC)) comment += "/MC";
3263 else comment += " data";
3264 out << "const char *anatype = \"" << type.Data() << "\";" << endl << endl;
3265 func.ReplaceAll(".C", "");
3266 out << "void " << func.Data() << "()" << endl;
3268 out << comment.Data() << endl;
3269 out << "// Automatically generated analysis steering macro executed in grid subjobs" << endl << endl;
3270 out << " TStopwatch timer;" << endl;
3271 out << " timer.Start();" << endl << endl;
3272 // Change temp directory to current one
3273 out << "// Set temporary merging directory to current one" << endl;
3274 out << " gSystem->Setenv(\"TMPDIR\", gSystem->pwd());" << endl << endl;
3275 out << "// Set temporary compilation directory to current one" << endl;
3276 out << " gSystem->SetBuildDir(gSystem->pwd(), kTRUE);" << endl << endl;
3277 // Reset existing include path
3278 out << "// Reset existing include path and add current directory first in the search" << endl;
3279 out << " gSystem->SetIncludePath(\"-I.\");" << endl;
3280 if (!fExecutableCommand.Contains("aliroot")) {
3281 out << "// load base root libraries" << endl;
3282 out << " gSystem->Load(\"libTree\");" << endl;
3283 out << " gSystem->Load(\"libGeom\");" << endl;
3284 out << " gSystem->Load(\"libVMC\");" << endl;
3285 out << " gSystem->Load(\"libPhysics\");" << endl << endl;
3286 out << " gSystem->Load(\"libMinuit\");" << endl << endl;
3288 if (fAdditionalRootLibs.Length()) {
3289 // in principle libtree /lib geom libvmc etc. can go into this list, too
3290 out << "// Add aditional libraries" << endl;
3291 TObjArray *list = fAdditionalRootLibs.Tokenize(" ");
3294 while((str=(TObjString*)next())) {
3295 if (str->GetString().Contains(".so"))
3296 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
3298 if (list) delete list;
3300 out << "// Load analysis framework libraries" << endl;
3301 TString setupPar = "AliAnalysisAlien::SetupPar";
3303 if (!fExecutableCommand.Contains("aliroot")) {
3304 out << " gSystem->Load(\"libSTEERBase\");" << endl;
3305 out << " gSystem->Load(\"libESD\");" << endl;
3306 out << " gSystem->Load(\"libAOD\");" << endl;
3308 out << " gSystem->Load(\"libANALYSIS\");" << endl;
3309 out << " gSystem->Load(\"libOADB\");" << endl;
3310 out << " gSystem->Load(\"libANALYSISalice\");" << endl;
3311 out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
3313 TIter next(fPackages);
3316 while ((obj=next())) {
3317 pkgname = obj->GetName();
3318 if (pkgname == "STEERBase" ||
3319 pkgname == "STEERBase.par") hasSTEERBase = kTRUE;
3320 if (pkgname == "ESD" ||
3321 pkgname == "ESD.par") hasESD = kTRUE;
3322 if (pkgname == "AOD" ||
3323 pkgname == "AOD.par") hasAOD = kTRUE;
3324 if (pkgname == "ANALYSIS" ||
3325 pkgname == "ANALYSIS.par") hasANALYSIS = kTRUE;
3326 if (pkgname == "OADB" ||
3327 pkgname == "OADB.par") hasOADB = kTRUE;
3328 if (pkgname == "ANALYSISalice" ||
3329 pkgname == "ANALYSISalice.par") hasANALYSISalice = kTRUE;
3330 if (pkgname == "CORRFW" ||
3331 pkgname == "CORRFW.par") hasCORRFW = kTRUE;
3333 if (hasANALYSISalice) setupPar = "SetupPar";
3334 if (!hasSTEERBase) out << " gSystem->Load(\"libSTEERBase\");" << endl;
3335 else out << " if (!" << setupPar << "(\"STEERBase\")) return;" << endl;
3336 if (!hasESD) out << " gSystem->Load(\"libESD\");" << endl;
3337 else out << " if (!" << setupPar << "(\"ESD\")) return;" << endl;
3338 if (!hasAOD) out << " gSystem->Load(\"libAOD\");" << endl;
3339 else out << " if (!" << setupPar << "(\"AOD\")) return;" << endl;
3340 if (!hasANALYSIS) out << " gSystem->Load(\"libANALYSIS\");" << endl;
3341 else out << " if (!" << setupPar << "(\"ANALYSIS\")) return;" << endl;
3342 if (!hasOADB) out << " gSystem->Load(\"libOADB\");" << endl;
3343 else out << " if (!" << setupPar << "(\"OADB\")) return;" << endl;
3344 if (!hasANALYSISalice) out << " gSystem->Load(\"libANALYSISalice\");" << endl;
3345 else out << " if (!" << setupPar << "(\"ANALYSISalice\")) return;" << endl;
3346 if (!hasCORRFW) out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
3347 else out << " if (!" << setupPar << "(\"CORRFW\")) return;" << endl << endl;
3348 out << "// Compile other par packages" << endl;
3350 while ((obj=next())) {
3351 pkgname = obj->GetName();
3352 if (pkgname == "STEERBase" ||
3353 pkgname == "STEERBase.par" ||
3355 pkgname == "ESD.par" ||
3357 pkgname == "AOD.par" ||
3358 pkgname == "ANALYSIS" ||
3359 pkgname == "ANALYSIS.par" ||
3360 pkgname == "OADB" ||
3361 pkgname == "OADB.par" ||
3362 pkgname == "ANALYSISalice" ||
3363 pkgname == "ANALYSISalice.par" ||
3364 pkgname == "CORRFW" ||
3365 pkgname == "CORRFW.par") continue;
3366 out << " if (!" << setupPar << "(\"" << obj->GetName() << "\")) return;" << endl;
3369 out << "// include path" << endl;
3370 // Get the include path from the interpreter and remove entries pointing to AliRoot
3371 out << " TString intPath = gInterpreter->GetIncludePath();" << endl;
3372 out << " TObjArray *listpaths = intPath.Tokenize(\" \");" << endl;
3373 out << " TIter nextpath(listpaths);" << endl;
3374 out << " TObjString *pname;" << endl;
3375 out << " while ((pname=(TObjString*)nextpath())) {" << endl;
3376 out << " TString current = pname->GetName();" << endl;
3377 out << " if (current.Contains(\"AliRoot\") || current.Contains(\"ALICE_ROOT\")) continue;" << endl;
3378 out << " gSystem->AddIncludePath(current);" << endl;
3379 out << " }" << endl;
3380 out << " if (listpaths) delete listpaths;" << endl;
3381 if (fIncludePath.Length()) out << " gSystem->AddIncludePath(\"" << fIncludePath.Data() << "\");" << endl;
3382 out << " gROOT->ProcessLine(\".include $ALICE_ROOT/include\");" << endl;
3383 out << " printf(\"Include path: %s\\n\", gSystem->GetIncludePath());" << endl << endl;
3384 if (fAdditionalLibs.Length()) {
3385 out << "// Add aditional AliRoot libraries" << endl;
3386 TObjArray *list = fAdditionalLibs.Tokenize(" ");
3389 while((str=(TObjString*)next())) {
3390 if (str->GetString().Contains(".so"))
3391 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
3392 if (str->GetString().Contains(".par"))
3393 out << " if (!" << setupPar << "(\"" << str->GetString() << "\")) return;" << endl;
3395 if (list) delete list;
3398 out << "// analysis source to be compiled at runtime (if any)" << endl;
3399 if (fAnalysisSource.Length()) {
3400 TObjArray *list = fAnalysisSource.Tokenize(" ");
3403 while((str=(TObjString*)next())) {
3404 out << " gROOT->ProcessLine(\".L " << str->GetString().Data() << "+g\");" << endl;
3406 if (list) delete list;
3409 // out << " printf(\"Currently load libraries:\\n\");" << endl;
3410 // out << " printf(\"%s\\n\", gSystem->GetLibraries());" << endl;
3411 if (fFastReadOption) {
3412 Warning("WriteAnalysisMacro", "!!! You requested FastRead option. Using xrootd flags to reduce timeouts in the grid jobs. This may skip some files that could be accessed !!! \
3413 \n+++ NOTE: To disable this option, use: plugin->SetFastReadOption(kFALSE)");
3414 out << "// fast xrootd reading enabled" << endl;
3415 out << " printf(\"!!! You requested FastRead option. Using xrootd flags to reduce timeouts. Note that this may skip some files that could be accessed !!!\");" << endl;
3416 out << " gEnv->SetValue(\"XNet.ConnectTimeout\",50);" << endl;
3417 out << " gEnv->SetValue(\"XNet.RequestTimeout\",50);" << endl;
3418 out << " gEnv->SetValue(\"XNet.MaxRedirectCount\",2);" << endl;
3419 out << " gEnv->SetValue(\"XNet.ReconnectTimeout\",50);" << endl;
3420 out << " gEnv->SetValue(\"XNet.FirstConnectMaxCnt\",1);" << endl << endl;
3422 out << "// connect to AliEn and make the chain" << endl;
3423 out << " if (!TGrid::Connect(\"alien://\")) return;" << endl;
3424 out << "// read the analysis manager from file" << endl;
3425 TString analysisFile = fExecutable;
3426 analysisFile.ReplaceAll(".sh", ".root");
3427 out << " AliAnalysisManager *mgr = AliAnalysisAlien::LoadAnalysisManager(\""
3428 << analysisFile << "\");" << endl;
3429 out << " if (!mgr) return;" << endl;
3430 out << " mgr->PrintStatus();" << endl;
3431 if (AliAnalysisManager::GetAnalysisManager()) {
3432 if (AliAnalysisManager::GetAnalysisManager()->GetDebugLevel()>3) {
3433 out << " gEnv->SetValue(\"XNet.Debug\", \"1\");" << endl;
3435 if (TestBit(AliAnalysisGrid::kTest))
3436 out << " AliLog::SetGlobalLogLevel(AliLog::kWarning);" << endl;
3438 out << " AliLog::SetGlobalLogLevel(AliLog::kError);" << endl;
3441 if (IsUsingTags()) {
3442 out << " TChain *chain = CreateChainFromTags(\"wn.xml\", anatype);" << endl << endl;
3444 out << " TChain *chain = CreateChain(\"wn.xml\", anatype);" << endl << endl;
3446 out << " mgr->StartAnalysis(\"localfile\", chain);" << endl;
3447 out << " timer.Stop();" << endl;
3448 out << " timer.Print();" << endl;
3449 out << "}" << endl << endl;
3450 if (IsUsingTags()) {
3451 out << "TChain* CreateChainFromTags(const char *xmlfile, const char *type=\"ESD\")" << endl;
3453 out << "// Create a chain using tags from the xml file." << endl;
3454 out << " TAlienCollection* coll = TAlienCollection::Open(xmlfile);" << endl;
3455 out << " if (!coll) {" << endl;
3456 out << " ::Error(\"CreateChainFromTags\", \"Cannot create an AliEn collection from %s\", xmlfile);" << endl;
3457 out << " return NULL;" << endl;
3458 out << " }" << endl;
3459 out << " TGridResult* tagResult = coll->GetGridResult(\"\",kFALSE,kFALSE);" << endl;
3460 out << " AliTagAnalysis *tagAna = new AliTagAnalysis(type);" << endl;
3461 out << " tagAna->ChainGridTags(tagResult);" << endl << endl;
3462 out << " AliRunTagCuts *runCuts = new AliRunTagCuts();" << endl;
3463 out << " AliLHCTagCuts *lhcCuts = new AliLHCTagCuts();" << endl;
3464 out << " AliDetectorTagCuts *detCuts = new AliDetectorTagCuts();" << endl;
3465 out << " AliEventTagCuts *evCuts = new AliEventTagCuts();" << endl;
3466 out << " // Check if the cuts configuration file was provided" << endl;
3467 out << " if (!gSystem->AccessPathName(\"ConfigureCuts.C\")) {" << endl;
3468 out << " gROOT->LoadMacro(\"ConfigureCuts.C\");" << endl;
3469 out << " ConfigureCuts(runCuts, lhcCuts, detCuts, evCuts);" << endl;
3470 out << " }" << endl;
3471 if (fFriendChainName=="") {
3472 out << " TChain *chain = tagAna->QueryTags(runCuts, lhcCuts, detCuts, evCuts);" << endl;
3474 out << " TString tmpColl=\"tmpCollection.xml\";" << endl;
3475 out << " tagAna->CreateXMLCollection(tmpColl.Data(),runCuts, lhcCuts, detCuts, evCuts);" << endl;
3476 out << " TChain *chain = CreateChain(tmpColl.Data(),type);" << endl;
3478 out << " if (!chain || !chain->GetNtrees()) return NULL;" << endl;
3479 out << " chain->ls();" << endl;
3480 out << " return chain;" << endl;
3481 out << "}" << endl << endl;
3482 if (gSystem->AccessPathName("ConfigureCuts.C")) {
3483 TString msg = "\n##### You may want to provide a macro ConfigureCuts.C with a method:\n";
3484 msg += " void ConfigureCuts(AliRunTagCuts *runCuts,\n";
3485 msg += " AliLHCTagCuts *lhcCuts,\n";
3486 msg += " AliDetectorTagCuts *detCuts,\n";
3487 msg += " AliEventTagCuts *evCuts)";
3488 Info("WriteAnalysisMacro", "%s", msg.Data());
3491 if (!IsUsingTags() || fFriendChainName!="") {
3492 out <<"//________________________________________________________________________________" << endl;
3493 out << "TChain* CreateChain(const char *xmlfile, const char *type=\"ESD\")" << endl;
3495 out << "// Create a chain using url's from xml file" << endl;
3496 out << " TString filename;" << endl;
3497 out << " Int_t run = 0;" << endl;
3498 if (IsUseMCchain()) {
3499 out << " TString treename = \"TE\";" << endl;
3501 out << " TString treename = type;" << endl;
3502 out << " treename.ToLower();" << endl;
3503 out << " treename += \"Tree\";" << endl;
3505 out << " printf(\"***************************************\\n\");" << endl;
3506 out << " printf(\" Getting chain of trees %s\\n\", treename.Data());" << endl;
3507 out << " printf(\"***************************************\\n\");" << endl;
3508 out << " TAlienCollection *coll = TAlienCollection::Open(xmlfile);" << endl;
3509 out << " if (!coll) {" << endl;
3510 out << " ::Error(\"CreateChain\", \"Cannot create an AliEn collection from %s\", xmlfile);" << endl;
3511 out << " return NULL;" << endl;
3512 out << " }" << endl;
3513 out << " AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();" << endl;
3514 out << " TChain *chain = new TChain(treename);" << endl;
3515 if(fFriendChainName!="") {
3516 out << " TChain *chainFriend = new TChain(treename);" << endl;
3518 out << " coll->Reset();" << endl;
3519 out << " while (coll->Next()) {" << endl;
3520 out << " filename = coll->GetTURL("");" << endl;
3521 out << " if (mgr) {" << endl;
3522 out << " Int_t nrun = AliAnalysisManager::GetRunFromAlienPath(filename);" << endl;
3523 out << " if (nrun && nrun != run) {" << endl;
3524 out << " printf(\"### Run number detected from chain: %d\\n\", nrun);" << endl;
3525 out << " mgr->SetRunFromPath(nrun);" << endl;
3526 out << " run = nrun;" << endl;
3527 out << " }" << endl;
3528 out << " }" << endl;
3529 out << " chain->Add(filename);" << endl;
3530 if(fFriendChainName!="") {
3531 out << " TString fileFriend=coll->GetTURL(\"\");" << endl;
3532 out << " fileFriend.ReplaceAll(\"AliAOD.root\",\""<<fFriendChainName.Data()<<"\");" << endl;
3533 out << " fileFriend.ReplaceAll(\"AliAODs.root\",\""<<fFriendChainName.Data()<<"\");" << endl;
3534 out << " chainFriend->Add(fileFriend.Data());" << endl;
3536 out << " }" << endl;
3537 out << " if (!chain->GetNtrees()) {" << endl;
3538 out << " ::Error(\"CreateChain\", \"No tree found from collection %s\", xmlfile);" << endl;
3539 out << " return NULL;" << endl;
3540 out << " }" << endl;
3541 if(fFriendChainName!="") {
3542 out << " chain->AddFriend(chainFriend);" << endl;
3544 out << " return chain;" << endl;
3545 out << "}" << endl << endl;
3547 if (hasANALYSISalice) {
3548 out <<"//________________________________________________________________________________" << endl;
3549 out << "Bool_t SetupPar(const char *package) {" << endl;
3550 out << "// Compile the package and set it up." << endl;
3551 out << " TString pkgdir = package;" << endl;
3552 out << " pkgdir.ReplaceAll(\".par\",\"\");" << endl;
3553 out << " gSystem->Exec(TString::Format(\"tar xvzf %s.par\", pkgdir.Data()));" << endl;
3554 out << " TString cdir = gSystem->WorkingDirectory();" << endl;
3555 out << " gSystem->ChangeDirectory(pkgdir);" << endl;
3556 out << " // Check for BUILD.sh and execute" << endl;
3557 out << " if (!gSystem->AccessPathName(\"PROOF-INF/BUILD.sh\")) {" << endl;
3558 out << " printf(\"*******************************\\n\");" << endl;
3559 out << " printf(\"*** Building PAR archive ***\\n\");" << endl;
3560 out << " printf(\"*******************************\\n\");" << endl;
3561 out << " if (gSystem->Exec(\"PROOF-INF/BUILD.sh\")) {" << endl;
3562 out << " ::Error(\"SetupPar\", \"Cannot build par archive %s\", pkgdir.Data());" << endl;
3563 out << " gSystem->ChangeDirectory(cdir);" << endl;
3564 out << " return kFALSE;" << endl;
3565 out << " }" << endl;
3566 out << " } else {" << endl;
3567 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/BUILD.sh for package %s\", pkgdir.Data());" << endl;
3568 out << " gSystem->ChangeDirectory(cdir);" << endl;
3569 out << " return kFALSE;" << endl;
3570 out << " }" << endl;
3571 out << " // Check for SETUP.C and execute" << endl;
3572 out << " if (!gSystem->AccessPathName(\"PROOF-INF/SETUP.C\")) {" << endl;
3573 out << " printf(\"*******************************\\n\");" << endl;
3574 out << " printf(\"*** Setup PAR archive ***\\n\");" << endl;
3575 out << " printf(\"*******************************\\n\");" << endl;
3576 out << " gROOT->Macro(\"PROOF-INF/SETUP.C\");" << endl;
3577 out << " } else {" << endl;
3578 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/SETUP.C for package %s\", pkgdir.Data());" << endl;
3579 out << " gSystem->ChangeDirectory(cdir);" << endl;
3580 out << " return kFALSE;" << endl;
3581 out << " }" << endl;
3582 out << " // Restore original workdir" << endl;
3583 out << " gSystem->ChangeDirectory(cdir);" << endl;
3584 out << " return kTRUE;" << endl;
3587 Info("WriteAnalysisMacro", "\n##### Analysis macro to run on worker nodes <%s> written",fAnalysisMacro.Data());
3589 Bool_t copy = kTRUE;
3590 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3593 TString workdir = gGrid->GetHomeDirectory();
3594 workdir += fGridWorkingDir;
3595 if (FileExists(fAnalysisMacro)) gGrid->Rm(fAnalysisMacro);
3596 if (IsUsingTags() && !gSystem->AccessPathName("ConfigureCuts.C")) {
3597 if (FileExists("ConfigureCuts.C")) gGrid->Rm("ConfigureCuts.C");
3598 Info("WriteAnalysisMacro", "\n##### Copying cuts configuration macro: <ConfigureCuts.C> to your alien workspace");
3599 TFile::Cp("file:ConfigureCuts.C", Form("alien://%s/ConfigureCuts.C", workdir.Data()));
3601 Info("WriteAnalysisMacro", "\n##### Copying analysis macro: <%s> to your alien workspace", fAnalysisMacro.Data());
3602 TFile::Cp(Form("file:%s",fAnalysisMacro.Data()), Form("alien://%s/%s", workdir.Data(), fAnalysisMacro.Data()));
3606 //______________________________________________________________________________
3607 void AliAnalysisAlien::WriteMergingMacro()
3609 // Write a macro to merge the outputs per master job.
3610 if (!fMergeViaJDL) return;
3611 if (!fOutputFiles.Length()) {
3612 Error("WriteMergingMacro", "No output file names defined. Are you running the right AliAnalysisAlien configuration ?");
3615 TString mergingMacro = fExecutable;
3616 mergingMacro.ReplaceAll(".sh","_merge.C");
3617 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
3618 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3620 out.open(mergingMacro.Data(), ios::out);
3622 Error("WriteMergingMacro", "could not open file %s for writing", fAnalysisMacro.Data());
3625 Bool_t hasSTEERBase = kFALSE;
3626 Bool_t hasESD = kFALSE;
3627 Bool_t hasAOD = kFALSE;
3628 Bool_t hasANALYSIS = kFALSE;
3629 Bool_t hasOADB = kFALSE;
3630 Bool_t hasANALYSISalice = kFALSE;
3631 Bool_t hasCORRFW = kFALSE;
3632 TString func = mergingMacro;
3634 func.ReplaceAll(".C", "");
3635 out << "void " << func.Data() << "(const char *dir, Int_t stage=0)" << endl;
3637 out << "// Automatically generated merging macro executed in grid subjobs" << endl << endl;
3638 out << " TStopwatch timer;" << endl;
3639 out << " timer.Start();" << endl << endl;
3640 // Reset existing include path
3641 out << "// Reset existing include path and add current directory first in the search" << endl;
3642 out << " gSystem->SetIncludePath(\"-I.\");" << endl;
3643 if (!fExecutableCommand.Contains("aliroot")) {
3644 out << "// load base root libraries" << endl;
3645 out << " gSystem->Load(\"libTree\");" << endl;
3646 out << " gSystem->Load(\"libGeom\");" << endl;
3647 out << " gSystem->Load(\"libVMC\");" << endl;
3648 out << " gSystem->Load(\"libPhysics\");" << endl << endl;
3649 out << " gSystem->Load(\"libMinuit\");" << endl << endl;
3651 if (fAdditionalRootLibs.Length()) {
3652 // in principle libtree /lib geom libvmc etc. can go into this list, too
3653 out << "// Add aditional libraries" << endl;
3654 TObjArray *list = fAdditionalRootLibs.Tokenize(" ");
3657 while((str=(TObjString*)next())) {
3658 if (str->GetString().Contains(".so"))
3659 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
3661 if (list) delete list;
3663 out << "// Load analysis framework libraries" << endl;
3665 if (!fExecutableCommand.Contains("aliroot")) {
3666 out << " gSystem->Load(\"libSTEERBase\");" << endl;
3667 out << " gSystem->Load(\"libESD\");" << endl;
3668 out << " gSystem->Load(\"libAOD\");" << endl;
3670 out << " gSystem->Load(\"libANALYSIS\");" << endl;
3671 out << " gSystem->Load(\"libOADB\");" << endl;
3672 out << " gSystem->Load(\"libANALYSISalice\");" << endl;
3673 out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
3675 TIter next(fPackages);
3678 TString setupPar = "AliAnalysisAlien::SetupPar";
3679 while ((obj=next())) {
3680 pkgname = obj->GetName();
3681 if (pkgname == "STEERBase" ||
3682 pkgname == "STEERBase.par") hasSTEERBase = kTRUE;
3683 if (pkgname == "ESD" ||
3684 pkgname == "ESD.par") hasESD = kTRUE;
3685 if (pkgname == "AOD" ||
3686 pkgname == "AOD.par") hasAOD = kTRUE;
3687 if (pkgname == "ANALYSIS" ||
3688 pkgname == "ANALYSIS.par") hasANALYSIS = kTRUE;
3689 if (pkgname == "OADB" ||
3690 pkgname == "OADB.par") hasOADB = kTRUE;
3691 if (pkgname == "ANALYSISalice" ||
3692 pkgname == "ANALYSISalice.par") hasANALYSISalice = kTRUE;
3693 if (pkgname == "CORRFW" ||
3694 pkgname == "CORRFW.par") hasCORRFW = kTRUE;
3696 if (hasANALYSISalice) setupPar = "SetupPar";
3697 if (!hasSTEERBase) out << " gSystem->Load(\"libSTEERBase\");" << endl;
3698 else out << " if (!" << setupPar << "(\"STEERBase\")) return;" << endl;
3699 if (!hasESD) out << " gSystem->Load(\"libESD\");" << endl;
3700 else out << " if (!" << setupPar << "(\"ESD\")) return;" << endl;
3701 if (!hasAOD) out << " gSystem->Load(\"libAOD\");" << endl;
3702 else out << " if (!" << setupPar << "(\"AOD\")) return;" << endl;
3703 out << " gSystem->Load(\"libOADB\");" << endl;
3704 if (!hasANALYSIS) out << " gSystem->Load(\"libANALYSIS\");" << endl;
3705 else out << " if (!" << setupPar << "(\"ANALYSIS\")) return;" << endl;
3706 if (!hasOADB) out << " gSystem->Load(\"libOADB\");" << endl;
3707 else out << " if (!" << setupPar << "(\"OADB\")) return;" << endl;
3708 if (!hasANALYSISalice) out << " gSystem->Load(\"libANALYSISalice\");" << endl;
3709 else out << " if (!" << setupPar << "(\"ANALYSISalice\")) return;" << endl;
3710 if (!hasCORRFW) out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
3711 else out << " if (!" << setupPar << "(\"CORRFW\")) return;" << endl << endl;
3712 out << "// Compile other par packages" << endl;
3714 while ((obj=next())) {
3715 pkgname = obj->GetName();
3716 if (pkgname == "STEERBase" ||
3717 pkgname == "STEERBase.par" ||
3719 pkgname == "ESD.par" ||
3721 pkgname == "AOD.par" ||
3722 pkgname == "ANALYSIS" ||
3723 pkgname == "ANALYSIS.par" ||
3724 pkgname == "OADB" ||
3725 pkgname == "OADB.par" ||
3726 pkgname == "ANALYSISalice" ||
3727 pkgname == "ANALYSISalice.par" ||
3728 pkgname == "CORRFW" ||
3729 pkgname == "CORRFW.par") continue;
3730 out << " if (!" << setupPar << "(\"" << obj->GetName() << "\")) return;" << endl;
3733 out << "// include path" << endl;
3734 // Get the include path from the interpreter and remove entries pointing to AliRoot
3735 out << " TString intPath = gInterpreter->GetIncludePath();" << endl;
3736 out << " TObjArray *listpaths = intPath.Tokenize(\" \");" << endl;
3737 out << " TIter nextpath(listpaths);" << endl;
3738 out << " TObjString *pname;" << endl;
3739 out << " while ((pname=(TObjString*)nextpath())) {" << endl;
3740 out << " TString current = pname->GetName();" << endl;
3741 out << " if (current.Contains(\"AliRoot\") || current.Contains(\"ALICE_ROOT\")) continue;" << endl;
3742 out << " gSystem->AddIncludePath(current);" << endl;
3743 out << " }" << endl;
3744 out << " if (listpaths) delete listpaths;" << endl;
3745 if (fIncludePath.Length()) out << " gSystem->AddIncludePath(\"" << fIncludePath.Data() << "\");" << endl;
3746 out << " gROOT->ProcessLine(\".include $ALICE_ROOT/include\");" << endl;
3747 out << " printf(\"Include path: %s\\n\", gSystem->GetIncludePath());" << endl << endl;
3748 if (fAdditionalLibs.Length()) {
3749 out << "// Add aditional AliRoot libraries" << endl;
3750 TObjArray *list = fAdditionalLibs.Tokenize(" ");
3753 while((str=(TObjString*)next())) {
3754 if (str->GetString().Contains(".so"))
3755 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
3757 if (list) delete list;
3760 out << "// Analysis source to be compiled at runtime (if any)" << endl;
3761 if (fAnalysisSource.Length()) {
3762 TObjArray *list = fAnalysisSource.Tokenize(" ");
3765 while((str=(TObjString*)next())) {
3766 out << " gROOT->ProcessLine(\".L " << str->GetString().Data() << "+g\");" << endl;
3768 if (list) delete list;
3772 if (fFastReadOption) {
3773 Warning("WriteMergingMacro", "!!! You requested FastRead option. Using xrootd flags to reduce timeouts in the grid merging jobs. Note that this may skip some files that could be accessed !!!");
3774 out << "// fast xrootd reading enabled" << endl;
3775 out << " printf(\"!!! You requested FastRead option. Using xrootd flags to reduce timeouts. Note that this may skip some files that could be accessed !!!\");" << endl;
3776 out << " gEnv->SetValue(\"XNet.ConnectTimeout\",50);" << endl;
3777 out << " gEnv->SetValue(\"XNet.RequestTimeout\",50);" << endl;
3778 out << " gEnv->SetValue(\"XNet.MaxRedirectCount\",2);" << endl;
3779 out << " gEnv->SetValue(\"XNet.ReconnectTimeout\",50);" << endl;
3780 out << " gEnv->SetValue(\"XNet.FirstConnectMaxCnt\",1);" << endl << endl;
3782 // Change temp directory to current one
3783 out << "// Set temporary merging directory to current one" << endl;
3784 out << " gSystem->Setenv(\"TMPDIR\", gSystem->pwd());" << endl << endl;
3785 out << "// Set temporary compilation directory to current one" << endl;
3786 out << " gSystem->SetBuildDir(gSystem->pwd(), kTRUE);" << endl << endl;
3787 out << "// Connect to AliEn" << endl;
3788 out << " if (!TGrid::Connect(\"alien://\")) return;" << endl;
3789 out << " TString outputDir = dir;" << endl;
3790 out << " TString outputFiles = \"" << GetListOfFiles("out") << "\";" << endl;
3791 out << " TString mergeExcludes = \"" << fMergeExcludes << "\";" << endl;
3792 out << " TObjArray *list = outputFiles.Tokenize(\",\");" << endl;
3793 out << " TIter *iter = new TIter(list);" << endl;
3794 out << " TObjString *str;" << endl;
3795 out << " TString outputFile;" << endl;
3796 out << " Bool_t merged = kTRUE;" << endl;
3797 out << " while((str=(TObjString*)iter->Next())) {" << endl;
3798 out << " outputFile = str->GetString();" << endl;
3799 out << " if (outputFile.Contains(\"*\")) continue;" << endl;
3800 out << " Int_t index = outputFile.Index(\"@\");" << endl;
3801 out << " if (index > 0) outputFile.Remove(index);" << endl;
3802 out << " // Skip already merged outputs" << endl;
3803 out << " if (!gSystem->AccessPathName(outputFile)) {" << endl;
3804 out << " printf(\"Output file <%s> found. Not merging again.\",outputFile.Data());" << endl;
3805 out << " continue;" << endl;
3806 out << " }" << endl;
3807 out << " if (mergeExcludes.Contains(outputFile.Data())) continue;" << endl;
3808 out << " merged = AliAnalysisAlien::MergeOutput(outputFile, outputDir, " << fMaxMergeFiles << ", stage);" << endl;
3809 out << " if (!merged) {" << endl;
3810 out << " printf(\"ERROR: Cannot merge %s\\n\", outputFile.Data());" << endl;
3811 out << " return;" << endl;
3812 out << " }" << endl;
3813 out << " }" << endl;
3814 out << " // all outputs merged, validate" << endl;
3815 out << " ofstream out;" << endl;
3816 out << " out.open(\"outputs_valid\", ios::out);" << endl;
3817 out << " out.close();" << endl;
3818 out << " // read the analysis manager from file" << endl;
3819 TString analysisFile = fExecutable;
3820 analysisFile.ReplaceAll(".sh", ".root");
3821 out << " if (!outputDir.Contains(\"Stage\")) return;" << endl;
3822 out << " AliAnalysisManager *mgr = AliAnalysisAlien::LoadAnalysisManager(\""
3823 << analysisFile << "\");" << endl;
3824 out << " if (!mgr) return;" << endl;
3825 out << " mgr->SetRunFromPath(mgr->GetRunFromAlienPath(dir));" << endl;
3826 out << " mgr->SetSkipTerminate(kFALSE);" << endl;
3827 out << " mgr->PrintStatus();" << endl;
3828 if (AliAnalysisManager::GetAnalysisManager()) {
3829 if (AliAnalysisManager::GetAnalysisManager()->GetDebugLevel()>3) {
3830 out << " gEnv->SetValue(\"XNet.Debug\", \"1\");" << endl;
3832 if (TestBit(AliAnalysisGrid::kTest))
3833 out << " AliLog::SetGlobalLogLevel(AliLog::kWarning);" << endl;
3835 out << " AliLog::SetGlobalLogLevel(AliLog::kError);" << endl;
3838 out << " TTree *tree = NULL;" << endl;
3839 out << " mgr->StartAnalysis(\"gridterminate\", tree);" << endl;
3840 out << "}" << endl << endl;
3841 if (hasANALYSISalice) {
3842 out <<"//________________________________________________________________________________" << endl;
3843 out << "Bool_t SetupPar(const char *package) {" << endl;
3844 out << "// Compile the package and set it up." << endl;
3845 out << " TString pkgdir = package;" << endl;
3846 out << " pkgdir.ReplaceAll(\".par\",\"\");" << endl;
3847 out << " gSystem->Exec(TString::Format(\"tar xvzf %s.par\", pkgdir.Data()));" << endl;
3848 out << " TString cdir = gSystem->WorkingDirectory();" << endl;
3849 out << " gSystem->ChangeDirectory(pkgdir);" << endl;
3850 out << " // Check for BUILD.sh and execute" << endl;
3851 out << " if (!gSystem->AccessPathName(\"PROOF-INF/BUILD.sh\")) {" << endl;
3852 out << " printf(\"*******************************\\n\");" << endl;
3853 out << " printf(\"*** Building PAR archive ***\\n\");" << endl;
3854 out << " printf(\"*******************************\\n\");" << endl;
3855 out << " if (gSystem->Exec(\"PROOF-INF/BUILD.sh\")) {" << endl;
3856 out << " ::Error(\"SetupPar\", \"Cannot build par archive %s\", pkgdir.Data());" << endl;
3857 out << " gSystem->ChangeDirectory(cdir);" << endl;
3858 out << " return kFALSE;" << endl;
3859 out << " }" << endl;
3860 out << " } else {" << endl;
3861 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/BUILD.sh for package %s\", pkgdir.Data());" << endl;
3862 out << " gSystem->ChangeDirectory(cdir);" << endl;
3863 out << " return kFALSE;" << endl;
3864 out << " }" << endl;
3865 out << " // Check for SETUP.C and execute" << endl;
3866 out << " if (!gSystem->AccessPathName(\"PROOF-INF/SETUP.C\")) {" << endl;
3867 out << " printf(\"*******************************\\n\");" << endl;
3868 out << " printf(\"*** Setup PAR archive ***\\n\");" << endl;
3869 out << " printf(\"*******************************\\n\");" << endl;
3870 out << " gROOT->Macro(\"PROOF-INF/SETUP.C\");" << endl;
3871 out << " } else {" << endl;
3872 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/SETUP.C for package %s\", pkgdir.Data());" << endl;
3873 out << " gSystem->ChangeDirectory(cdir);" << endl;
3874 out << " return kFALSE;" << endl;
3875 out << " }" << endl;
3876 out << " // Restore original workdir" << endl;
3877 out << " gSystem->ChangeDirectory(cdir);" << endl;
3878 out << " return kTRUE;" << endl;
3882 Bool_t copy = kTRUE;
3883 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3886 TString workdir = gGrid->GetHomeDirectory();
3887 workdir += fGridWorkingDir;
3888 if (FileExists(mergingMacro)) gGrid->Rm(mergingMacro);
3889 Info("WriteMergingMacro", "\n##### Copying merging macro: <%s> to your alien workspace", mergingMacro.Data());
3890 TFile::Cp(Form("file:%s",mergingMacro.Data()), Form("alien://%s/%s", workdir.Data(), mergingMacro.Data()));
3894 //______________________________________________________________________________
3895 Bool_t AliAnalysisAlien::SetupPar(const char *package)
3897 // Compile the par file archive pointed by <package>. This must be present in the current directory.
3898 // Note that for loading the compiled library. The current directory should have precedence in
3900 TString pkgdir = package;
3901 pkgdir.ReplaceAll(".par","");
3902 gSystem->Exec(TString::Format("tar xzf %s.par", pkgdir.Data()));
3903 TString cdir = gSystem->WorkingDirectory();
3904 gSystem->ChangeDirectory(pkgdir);
3905 // Check for BUILD.sh and execute
3906 if (!gSystem->AccessPathName("PROOF-INF/BUILD.sh")) {
3907 printf("**************************************************\n");
3908 printf("*** Building PAR archive %s\n", package);
3909 printf("**************************************************\n");
3910 if (gSystem->Exec("PROOF-INF/BUILD.sh")) {
3911 ::Error("SetupPar", "Cannot build par archive %s", pkgdir.Data());
3912 gSystem->ChangeDirectory(cdir);
3916 ::Error("SetupPar","Cannot access PROOF-INF/BUILD.sh for package %s", pkgdir.Data());
3917 gSystem->ChangeDirectory(cdir);
3920 // Check for SETUP.C and execute
3921 if (!gSystem->AccessPathName("PROOF-INF/SETUP.C")) {
3922 printf("**************************************************\n");
3923 printf("*** Setup PAR archive %s\n", package);
3924 printf("**************************************************\n");
3925 gROOT->Macro("PROOF-INF/SETUP.C");
3926 printf("*** Loaded library: %s\n", gSystem->GetLibraries(pkgdir,"",kFALSE));
3928 ::Error("SetupPar","Cannot access PROOF-INF/SETUP.C for package %s", pkgdir.Data());
3929 gSystem->ChangeDirectory(cdir);
3932 // Restore original workdir
3933 gSystem->ChangeDirectory(cdir);
3937 //______________________________________________________________________________
3938 void AliAnalysisAlien::WriteExecutable()
3940 // Generate the alien executable script.
3941 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3943 out.open(fExecutable.Data(), ios::out);
3945 Error("WriteExecutable", "Bad file name for executable: %s", fExecutable.Data());
3948 out << "#!/bin/bash" << endl;
3949 // Make sure we can properly compile par files
3950 out << "export LD_LIBRARY_PATH=.:$LD_LIBRARY_PATH" << endl;
3951 out << "echo \"=========================================\"" << endl;
3952 out << "echo \"############## PATH : ##############\"" << endl;
3953 out << "echo $PATH" << endl;
3954 out << "echo \"############## LD_LIBRARY_PATH : ##############\"" << endl;
3955 out << "echo $LD_LIBRARY_PATH" << endl;
3956 out << "echo \"############## ROOTSYS : ##############\"" << endl;
3957 out << "echo $ROOTSYS" << endl;
3958 out << "echo \"############## which root : ##############\"" << endl;
3959 out << "which root" << endl;
3960 out << "echo \"############## ALICE_ROOT : ##############\"" << endl;
3961 out << "echo $ALICE_ROOT" << endl;
3962 out << "echo \"############## which aliroot : ##############\"" << endl;
3963 out << "which aliroot" << endl;
3964 out << "echo \"############## system limits : ##############\"" << endl;
3965 out << "ulimit -a" << endl;
3966 out << "echo \"############## memory : ##############\"" << endl;
3967 out << "free -m" << endl;
3968 out << "echo \"=========================================\"" << endl << endl;
3969 out << fExecutableCommand << " ";
3970 out << fAnalysisMacro.Data() << " " << fExecutableArgs.Data() << endl << endl;
3971 out << "echo \"======== " << fAnalysisMacro.Data() << " finished with exit code: $? ========\"" << endl;
3972 out << "echo \"############## memory after: ##############\"" << endl;
3973 out << "free -m" << endl;
3975 Bool_t copy = kTRUE;
3976 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3979 TString workdir = gGrid->GetHomeDirectory();
3980 TString bindir = Form("%s/bin", workdir.Data());
3981 if (!DirectoryExists(bindir)) gGrid->Mkdir(bindir,"-p");
3982 workdir += fGridWorkingDir;
3983 TString executable = Form("%s/bin/%s", gGrid->GetHomeDirectory(), fExecutable.Data());
3984 if (FileExists(executable)) gGrid->Rm(executable);
3985 Info("WriteExecutable", "\n##### Copying executable file <%s> to your AliEn bin directory", fExecutable.Data());
3986 TFile::Cp(Form("file:%s",fExecutable.Data()), Form("alien://%s", executable.Data()));
3990 //______________________________________________________________________________
3991 void AliAnalysisAlien::WriteMergeExecutable()
3993 // Generate the alien executable script for the merging job.
3994 if (!fMergeViaJDL) return;
3995 TString mergeExec = fExecutable;
3996 mergeExec.ReplaceAll(".sh", "_merge.sh");
3997 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3999 out.open(mergeExec.Data(), ios::out);
4001 Error("WriteMergingExecutable", "Bad file name for executable: %s", mergeExec.Data());
4004 out << "#!/bin/bash" << endl;
4005 // Make sure we can properly compile par files
4006 out << "export LD_LIBRARY_PATH=.:$LD_LIBRARY_PATH" << endl;
4007 out << "echo \"=========================================\"" << endl;
4008 out << "echo \"############## PATH : ##############\"" << endl;
4009 out << "echo $PATH" << endl;
4010 out << "echo \"############## LD_LIBRARY_PATH : ##############\"" << endl;
4011 out << "echo $LD_LIBRARY_PATH" << endl;
4012 out << "echo \"############## ROOTSYS : ##############\"" << endl;
4013 out << "echo $ROOTSYS" << endl;
4014 out << "echo \"############## which root : ##############\"" << endl;
4015 out << "which root" << endl;
4016 out << "echo \"############## ALICE_ROOT : ##############\"" << endl;
4017 out << "echo $ALICE_ROOT" << endl;
4018 out << "echo \"############## which aliroot : ##############\"" << endl;
4019 out << "which aliroot" << endl;
4020 out << "echo \"############## system limits : ##############\"" << endl;
4021 out << "ulimit -a" << endl;
4022 out << "echo \"############## memory : ##############\"" << endl;
4023 out << "free -m" << endl;
4024 out << "echo \"=========================================\"" << endl << endl;
4025 TString mergeMacro = fExecutable;
4026 mergeMacro.ReplaceAll(".sh", "_merge.C");
4027 if (IsOneStageMerging())
4028 out << "export ARG=\"" << mergeMacro << "(\\\"$1\\\")\"" << endl;
4030 out << "export ARG=\"" << mergeMacro << "(\\\"$1\\\",$2)\"" << endl;
4031 out << fExecutableCommand << " " << "$ARG" << endl;
4032 out << "echo \"======== " << mergeMacro.Data() << " finished with exit code: $? ========\"" << endl;
4033 out << "echo \"############## memory after: ##############\"" << endl;
4034 out << "free -m" << endl;
4036 Bool_t copy = kTRUE;
4037 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
4040 TString workdir = gGrid->GetHomeDirectory();
4041 TString bindir = Form("%s/bin", workdir.Data());
4042 if (!DirectoryExists(bindir)) gGrid->Mkdir(bindir,"-p");
4043 workdir += fGridWorkingDir;
4044 TString executable = Form("%s/bin/%s", gGrid->GetHomeDirectory(), mergeExec.Data());
4045 if (FileExists(executable)) gGrid->Rm(executable);
4046 Info("WriteMergeExecutable", "\n##### Copying executable file <%s> to your AliEn bin directory", mergeExec.Data());
4047 TFile::Cp(Form("file:%s",mergeExec.Data()), Form("alien://%s", executable.Data()));
4051 //______________________________________________________________________________
4052 void AliAnalysisAlien::WriteProductionFile(const char *filename) const
4054 // Write the production file to be submitted by LPM manager. The format is:
4055 // First line: full_path_to_jdl estimated_no_subjobs_per_master
4056 // Next lines: full_path_to_dataset XXX (XXX is a string)
4057 // To submit, one has to: submit jdl XXX for all lines
4059 out.open(filename, ios::out);
4061 Error("WriteProductionFile", "Bad file name: %s", filename);
4065 if (!fProductionMode && !fGridWorkingDir.BeginsWith("/alice"))
4066 workdir = gGrid->GetHomeDirectory();
4067 workdir += fGridWorkingDir;
4068 Int_t njobspermaster = 1000*fNrunsPerMaster/fSplitMaxInputFileNumber;
4069 TString locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
4070 out << locjdl << " " << njobspermaster << endl;
4071 Int_t nmasterjobs = fInputFiles->GetEntries();
4072 for (Int_t i=0; i<nmasterjobs; i++) {
4073 TString runOutDir = gSystem->BaseName(fInputFiles->At(i)->GetName());
4074 runOutDir.ReplaceAll(".xml", "");
4076 out << Form("%s", fInputFiles->At(i)->GetName()) << " " << runOutDir << endl;
4078 out << Form("%s", fInputFiles->At(i)->GetName()) << " " << Form("%03d", i) << endl;
4081 Info("WriteProductionFile", "\n##### Copying production file <%s> to your work directory", filename);
4082 if (FileExists(filename)) gGrid->Rm(filename);
4083 TFile::Cp(Form("file:%s",filename), Form("alien://%s/%s", workdir.Data(),filename));
4087 //______________________________________________________________________________
4088 void AliAnalysisAlien::WriteValidationScript(Bool_t merge)
4090 // Generate the alien validation script.
4091 // Generate the validation script
4093 if (fValidationScript.IsNull()) {
4094 fValidationScript = fExecutable;
4095 fValidationScript.ReplaceAll(".sh", "_validation.sh");
4097 TString validationScript = fValidationScript;
4098 if (merge) validationScript.ReplaceAll(".sh", "_merge.sh");
4100 Error("WriteValidationScript", "Alien connection required");
4103 if (!fTerminateFiles.IsNull()) {
4104 fTerminateFiles.Strip();
4105 fTerminateFiles.ReplaceAll(" ",",");
4107 TString outStream = "";
4108 if (!TestBit(AliAnalysisGrid::kTest)) outStream = " >> stdout";
4109 if (!TestBit(AliAnalysisGrid::kSubmit)) {
4111 out.open(validationScript, ios::out);
4112 out << "#!/bin/bash" << endl;
4113 out << "##################################################" << endl;
4114 out << "validateout=`dirname $0`" << endl;
4115 out << "validatetime=`date`" << endl;
4116 out << "validated=\"0\";" << endl;
4117 out << "error=0" << endl;
4118 out << "if [ -z $validateout ]" << endl;
4119 out << "then" << endl;
4120 out << " validateout=\".\"" << endl;
4121 out << "fi" << endl << endl;
4122 out << "cd $validateout;" << endl;
4123 out << "validateworkdir=`pwd`;" << endl << endl;
4124 out << "echo \"*******************************************************\"" << outStream << endl;
4125 out << "echo \"* Automatically generated validation script *\"" << outStream << endl;
4127 out << "echo \"* Time: $validatetime \"" << outStream << endl;
4128 out << "echo \"* Dir: $validateout\"" << outStream << endl;
4129 out << "echo \"* Workdir: $validateworkdir\"" << outStream << endl;
4130 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl;
4131 out << "ls -la ./" << outStream << endl;
4132 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl << endl;
4133 out << "##################################################" << endl;
4136 out << "if [ ! -f stderr ] ; then" << endl;
4137 out << " error=1" << endl;
4138 out << " echo \"* ########## Job not validated - no stderr ###\" " << outStream << endl;
4139 out << " echo \"Error = $error\" " << outStream << endl;
4140 out << "fi" << endl;
4142 out << "parArch=`grep -Ei \"Cannot Build the PAR Archive\" stderr`" << endl;
4143 out << "segViol=`grep -Ei \"Segmentation violation\" stderr`" << endl;
4144 out << "segFault=`grep -Ei \"Segmentation fault\" stderr`" << endl;
4145 out << "glibcErr=`grep -Ei \"*** glibc detected ***\" stderr`" << endl;
4148 out << "if [ \"$parArch\" != \"\" ] ; then" << endl;
4149 out << " error=1" << endl;
4150 out << " echo \"* ########## Job not validated - PAR archive not built ###\" " << outStream << endl;
4151 out << " echo \"$parArch\" " << outStream << endl;
4152 out << " echo \"Error = $error\" " << outStream << endl;
4153 out << "fi" << endl;
4155 out << "if [ \"$segViol\" != \"\" ] ; then" << endl;
4156 out << " error=1" << endl;
4157 out << " echo \"* ########## Job not validated - Segment. violation ###\" " << outStream << endl;
4158 out << " echo \"$segViol\" " << outStream << endl;
4159 out << " echo \"Error = $error\" " << outStream << endl;
4160 out << "fi" << endl;
4162 out << "if [ \"$segFault\" != \"\" ] ; then" << endl;
4163 out << " error=1" << endl;
4164 out << " echo \"* ########## Job not validated - Segment. fault ###\" " << outStream << endl;
4165 out << " echo \"$segFault\" " << outStream << endl;
4166 out << " echo \"Error = $error\" " << outStream << endl;
4167 out << "fi" << endl;
4169 out << "if [ \"$glibcErr\" != \"\" ] ; then" << endl;
4170 out << " error=1" << endl;
4171 out << " echo \"* ########## Job not validated - *** glibc detected *** ###\" " << outStream << endl;
4172 out << " echo \"$glibcErr\" " << outStream << endl;
4173 out << " echo \"Error = $error\" " << outStream << endl;
4174 out << "fi" << endl;
4176 // Part dedicated to the specific analyses running into the train
4178 TString outputFiles = fOutputFiles;
4179 if (merge && !fTerminateFiles.IsNull()) {
4181 outputFiles += fTerminateFiles;
4183 TObjArray *arr = outputFiles.Tokenize(",");
4186 while (!merge && (os=(TObjString*)next1())) {
4187 // No need to validate outputs produced by merging since the merging macro does this
4188 outputFile = os->GetString();
4189 Int_t index = outputFile.Index("@");
4190 if (index > 0) outputFile.Remove(index);
4191 if (fTerminateFiles.Contains(outputFile)) continue;
4192 if (outputFile.Contains("*")) continue;
4193 out << "if ! [ -f " << outputFile.Data() << " ] ; then" << endl;
4194 out << " error=1" << endl;
4195 out << " echo \"Output file " << outputFile << " not found. Job FAILED !\"" << outStream << endl;
4196 out << " echo \"Output file " << outputFile << " not found. Job FAILED !\" >> stderr" << endl;
4197 out << "fi" << endl;
4200 out << "if ! [ -f outputs_valid ] ; then" << endl;
4201 out << " error=1" << endl;
4202 out << " echo \"Output files were not validated by the analysis manager\" >> stdout" << endl;
4203 out << " echo \"Output files were not validated by the analysis manager\" >> stderr" << endl;
4204 out << "fi" << endl;
4206 out << "if [ $error = 0 ] ; then" << endl;
4207 out << " echo \"* ---------------- Job Validated ------------------*\"" << outStream << endl;
4208 if (!IsKeepLogs()) {
4209 out << " echo \"* === Logs std* will be deleted === \"" << endl;
4211 out << " rm -f std*" << endl;
4213 out << "fi" << endl;
4215 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl;
4216 out << "echo \"*******************************************************\"" << outStream << endl;
4217 out << "cd -" << endl;
4218 out << "exit $error" << endl;
4220 Bool_t copy = kTRUE;
4221 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
4224 TString workdir = gGrid->GetHomeDirectory();
4225 workdir += fGridWorkingDir;
4226 Info("WriteValidationScript", "\n##### Copying validation script <%s> to your AliEn working space", validationScript.Data());
4227 if (FileExists(validationScript)) gGrid->Rm(validationScript);
4228 TFile::Cp(Form("file:%s",validationScript.Data()), Form("alien://%s/%s", workdir.Data(),validationScript.Data()));