1 /**************************************************************************
2 * Copyright(c) 1998-2007, ALICE Experiment at CERN, All rights reserved. *
4 * Author: The ALICE Off-line Project. *
5 * Contributors are mentioned in the code where appropriate. *
7 * Permission to use, copy, modify and distribute this software and its *
8 * documentation strictly for non-commercial purposes is hereby granted *
9 * without fee, provided that the above copyright notice appears in all *
10 * copies and that both the copyright notice and this permission notice *
11 * appear in the supporting documentation. The authors make no claims *
12 * about the suitability of this software for any purpose. It is *
13 * provided "as is" without express or implied warranty. *
14 **************************************************************************/
16 // Author: Mihaela Gheata, 01/09/2008
18 //==============================================================================
19 // AliAnalysisAlien - AliEn utility class. Provides interface for creating
20 // a personalized JDL, finding and creating a dataset.
21 //==============================================================================
23 #include "AliAnalysisAlien.h"
25 #include "Riostream.h"
33 #include "TFileCollection.h"
35 #include "TObjString.h"
36 #include "TObjArray.h"
38 #include "TGridResult.h"
39 #include "TGridCollection.h"
41 #include "TGridJobStatusList.h"
42 #include "TGridJobStatus.h"
43 #include "TFileMerger.h"
44 #include "AliAnalysisManager.h"
45 #include "AliVEventHandler.h"
46 #include "AliAnalysisDataContainer.h"
47 #include "AliMultiInputEventHandler.h"
49 ClassImp(AliAnalysisAlien)
51 //______________________________________________________________________________
52 AliAnalysisAlien::AliAnalysisAlien()
58 fSplitMaxInputFileNumber(0),
60 fMasterResubmitThreshold(0),
73 fNproofWorkersPerSlave(0),
83 fAdditionalRootLibs(),
111 fRootVersionForProof(),
122 //______________________________________________________________________________
123 AliAnalysisAlien::AliAnalysisAlien(const char *name)
124 :AliAnalysisGrid(name),
129 fSplitMaxInputFileNumber(0),
131 fMasterResubmitThreshold(0),
144 fNproofWorkersPerSlave(0),
148 fExecutableCommand(),
154 fAdditionalRootLibs(),
182 fRootVersionForProof(),
193 //______________________________________________________________________________
194 AliAnalysisAlien::AliAnalysisAlien(const AliAnalysisAlien& other)
195 :AliAnalysisGrid(other),
198 fPrice(other.fPrice),
200 fSplitMaxInputFileNumber(other.fSplitMaxInputFileNumber),
201 fMaxInitFailed(other.fMaxInitFailed),
202 fMasterResubmitThreshold(other.fMasterResubmitThreshold),
203 fNtestFiles(other.fNtestFiles),
204 fNrunsPerMaster(other.fNrunsPerMaster),
205 fMaxMergeFiles(other.fMaxMergeFiles),
206 fMaxMergeStages(other.fMaxMergeStages),
207 fNsubmitted(other.fNsubmitted),
208 fProductionMode(other.fProductionMode),
209 fOutputToRunNo(other.fOutputToRunNo),
210 fMergeViaJDL(other.fMergeViaJDL),
211 fFastReadOption(other.fFastReadOption),
212 fOverwriteMode(other.fOverwriteMode),
213 fNreplicas(other.fNreplicas),
214 fNproofWorkers(other.fNproofWorkers),
215 fNproofWorkersPerSlave(other.fNproofWorkersPerSlave),
216 fProofReset(other.fProofReset),
217 fRunNumbers(other.fRunNumbers),
218 fExecutable(other.fExecutable),
219 fExecutableCommand(other.fExecutableCommand),
220 fArguments(other.fArguments),
221 fExecutableArgs(other.fExecutableArgs),
222 fAnalysisMacro(other.fAnalysisMacro),
223 fAnalysisSource(other.fAnalysisSource),
224 fValidationScript(other.fValidationScript),
225 fAdditionalRootLibs(other.fAdditionalRootLibs),
226 fAdditionalLibs(other.fAdditionalLibs),
227 fSplitMode(other.fSplitMode),
228 fAPIVersion(other.fAPIVersion),
229 fROOTVersion(other.fROOTVersion),
230 fAliROOTVersion(other.fAliROOTVersion),
231 fExternalPackages(other.fExternalPackages),
233 fGridWorkingDir(other.fGridWorkingDir),
234 fGridDataDir(other.fGridDataDir),
235 fDataPattern(other.fDataPattern),
236 fGridOutputDir(other.fGridOutputDir),
237 fOutputArchive(other.fOutputArchive),
238 fOutputFiles(other.fOutputFiles),
239 fInputFormat(other.fInputFormat),
240 fDatasetName(other.fDatasetName),
241 fJDLName(other.fJDLName),
242 fTerminateFiles(other.fTerminateFiles),
243 fMergeExcludes(other.fMergeExcludes),
244 fIncludePath(other.fIncludePath),
245 fCloseSE(other.fCloseSE),
246 fFriendChainName(other.fFriendChainName),
247 fJobTag(other.fJobTag),
248 fOutputSingle(other.fOutputSingle),
249 fRunPrefix(other.fRunPrefix),
250 fProofCluster(other.fProofCluster),
251 fProofDataSet(other.fProofDataSet),
252 fFileForTestMode(other.fFileForTestMode),
253 fRootVersionForProof(other.fRootVersionForProof),
254 fAliRootMode(other.fAliRootMode),
255 fMergeDirName(other.fMergeDirName),
261 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
262 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
263 fRunRange[0] = other.fRunRange[0];
264 fRunRange[1] = other.fRunRange[1];
265 if (other.fInputFiles) {
266 fInputFiles = new TObjArray();
267 TIter next(other.fInputFiles);
269 while ((obj=next())) fInputFiles->Add(new TObjString(obj->GetName()));
270 fInputFiles->SetOwner();
272 if (other.fPackages) {
273 fPackages = new TObjArray();
274 TIter next(other.fPackages);
276 while ((obj=next())) fPackages->Add(new TObjString(obj->GetName()));
277 fPackages->SetOwner();
281 //______________________________________________________________________________
282 AliAnalysisAlien::~AliAnalysisAlien()
285 if (fGridJDL) delete fGridJDL;
286 if (fMergingJDL) delete fMergingJDL;
287 if (fInputFiles) delete fInputFiles;
288 if (fPackages) delete fPackages;
289 fProofParam.DeleteAll();
292 //______________________________________________________________________________
293 AliAnalysisAlien &AliAnalysisAlien::operator=(const AliAnalysisAlien& other)
296 if (this != &other) {
297 AliAnalysisGrid::operator=(other);
298 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
299 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
300 fPrice = other.fPrice;
302 fSplitMaxInputFileNumber = other.fSplitMaxInputFileNumber;
303 fMaxInitFailed = other.fMaxInitFailed;
304 fMasterResubmitThreshold = other.fMasterResubmitThreshold;
305 fNtestFiles = other.fNtestFiles;
306 fNrunsPerMaster = other.fNrunsPerMaster;
307 fMaxMergeFiles = other.fMaxMergeFiles;
308 fMaxMergeStages = other.fMaxMergeStages;
309 fNsubmitted = other.fNsubmitted;
310 fProductionMode = other.fProductionMode;
311 fOutputToRunNo = other.fOutputToRunNo;
312 fMergeViaJDL = other.fMergeViaJDL;
313 fFastReadOption = other.fFastReadOption;
314 fOverwriteMode = other.fOverwriteMode;
315 fNreplicas = other.fNreplicas;
316 fNproofWorkers = other.fNproofWorkers;
317 fNproofWorkersPerSlave = other.fNproofWorkersPerSlave;
318 fProofReset = other.fProofReset;
319 fRunNumbers = other.fRunNumbers;
320 fExecutable = other.fExecutable;
321 fExecutableCommand = other.fExecutableCommand;
322 fArguments = other.fArguments;
323 fExecutableArgs = other.fExecutableArgs;
324 fAnalysisMacro = other.fAnalysisMacro;
325 fAnalysisSource = other.fAnalysisSource;
326 fValidationScript = other.fValidationScript;
327 fAdditionalRootLibs = other.fAdditionalRootLibs;
328 fAdditionalLibs = other.fAdditionalLibs;
329 fSplitMode = other.fSplitMode;
330 fAPIVersion = other.fAPIVersion;
331 fROOTVersion = other.fROOTVersion;
332 fAliROOTVersion = other.fAliROOTVersion;
333 fExternalPackages = other.fExternalPackages;
335 fGridWorkingDir = other.fGridWorkingDir;
336 fGridDataDir = other.fGridDataDir;
337 fDataPattern = other.fDataPattern;
338 fGridOutputDir = other.fGridOutputDir;
339 fOutputArchive = other.fOutputArchive;
340 fOutputFiles = other.fOutputFiles;
341 fInputFormat = other.fInputFormat;
342 fDatasetName = other.fDatasetName;
343 fJDLName = other.fJDLName;
344 fTerminateFiles = other.fTerminateFiles;
345 fMergeExcludes = other.fMergeExcludes;
346 fIncludePath = other.fIncludePath;
347 fCloseSE = other.fCloseSE;
348 fFriendChainName = other.fFriendChainName;
349 fJobTag = other.fJobTag;
350 fOutputSingle = other.fOutputSingle;
351 fRunPrefix = other.fRunPrefix;
352 fProofCluster = other.fProofCluster;
353 fProofDataSet = other.fProofDataSet;
354 fFileForTestMode = other.fFileForTestMode;
355 fRootVersionForProof = other.fRootVersionForProof;
356 fAliRootMode = other.fAliRootMode;
357 fMergeDirName = other.fMergeDirName;
358 if (other.fInputFiles) {
359 fInputFiles = new TObjArray();
360 TIter next(other.fInputFiles);
362 while ((obj=next())) fInputFiles->Add(new TObjString(obj->GetName()));
363 fInputFiles->SetOwner();
365 if (other.fPackages) {
366 fPackages = new TObjArray();
367 TIter next(other.fPackages);
369 while ((obj=next())) fPackages->Add(new TObjString(obj->GetName()));
370 fPackages->SetOwner();
376 //______________________________________________________________________________
377 void AliAnalysisAlien::SetRunPrefix(const char *prefix)
379 // Set the run number format. Can be a prefix or a format like "%09d"
381 if (!fRunPrefix.Contains("%")) fRunPrefix += "%d";
384 //______________________________________________________________________________
385 void AliAnalysisAlien::AddIncludePath(const char *path)
387 // Add include path in the remote analysis macro.
389 if (p.Contains("-I")) fIncludePath += Form("%s ", path);
390 else fIncludePath += Form("-I%s ", path);
393 //______________________________________________________________________________
394 void AliAnalysisAlien::AddRunNumber(Int_t run)
396 // Add a run number to the list of runs to be processed.
397 if (fRunNumbers.Length()) fRunNumbers += " ";
398 fRunNumbers += Form(fRunPrefix.Data(), run);
401 //______________________________________________________________________________
402 void AliAnalysisAlien::AddRunList(const char* runList)
404 // Add several runs into the list of runs; they are expected to be separated by a blank character.
405 TString sList = runList;
406 TObjArray *list = sList.Tokenize(" ");
407 Int_t n = list->GetEntries();
408 for (Int_t i = 0; i < n; i++) {
409 TObjString *os = (TObjString*)list->At(i);
410 AddRunNumber(os->GetString().Atoi());
415 //______________________________________________________________________________
416 void AliAnalysisAlien::AddRunNumber(const char* run)
418 // Add a run number to the list of runs to be processed.
421 TObjArray *arr = runs.Tokenize(" ");
424 prefix.Append(fRunPrefix, fRunPrefix.Index("%d"));
425 while ((os=(TObjString*)next())){
426 if (fRunNumbers.Length()) fRunNumbers += " ";
427 fRunNumbers += Form("%s%s", prefix.Data(), os->GetString().Data());
432 //______________________________________________________________________________
433 void AliAnalysisAlien::AddDataFile(const char *lfn)
435 // Adds a data file to the input to be analysed. The file should be a valid LFN
436 // or point to an existing file in the alien workdir.
437 if (!fInputFiles) fInputFiles = new TObjArray();
438 fInputFiles->Add(new TObjString(lfn));
441 //______________________________________________________________________________
442 void AliAnalysisAlien::AddExternalPackage(const char *package)
444 // Adds external packages w.r.t to the default ones (root,aliroot and gapi)
445 if (fExternalPackages) fExternalPackages += " ";
446 fExternalPackages += package;
449 //______________________________________________________________________________
450 Bool_t AliAnalysisAlien::Connect()
452 // Try to connect to AliEn. User needs a valid token and /tmp/gclient_env_$UID sourced.
453 if (gGrid && gGrid->IsConnected()) return kTRUE;
454 if (fProductionMode) return kTRUE;
456 Info("Connect", "Trying to connect to AliEn ...");
457 TGrid::Connect("alien://");
459 if (!gGrid || !gGrid->IsConnected()) {
460 Error("Connect", "Did not managed to connect to AliEn. Make sure you have a valid token.");
463 fUser = gGrid->GetUser();
464 Info("Connect", "\n##### Connected to AliEn as user %s. Setting analysis user to <%s>", fUser.Data(), fUser.Data());
468 //______________________________________________________________________________
469 void AliAnalysisAlien::CdWork()
471 // Check validity of alien workspace. Create directory if possible.
473 Error("CdWork", "Alien connection required");
476 TString homedir = gGrid->GetHomeDirectory();
477 TString workdir = homedir + fGridWorkingDir;
478 if (DirectoryExists(workdir)) {
482 // Work directory not existing - create it
484 if (gGrid->Mkdir(workdir, "-p")) {
485 gGrid->Cd(fGridWorkingDir);
486 Info("CdWork", "\n##### Created alien working directory %s", fGridWorkingDir.Data());
488 Warning("CdWork", "Working directory %s cannot be created.\n Using %s instead.",
489 workdir.Data(), homedir.Data());
490 fGridWorkingDir = "";
494 //______________________________________________________________________________
495 Bool_t AliAnalysisAlien::CheckFileCopy(const char *alienpath)
497 // Check if file copying is possible.
498 if (fProductionMode) return kTRUE;
500 Error("CheckFileCopy", "Not connected to AliEn. File copying cannot be tested.");
503 Info("CheckFileCopy", "Checking possibility to copy files to your AliEn home directory... \
504 \n +++ NOTE: You can disable this via: plugin->SetCheckCopy(kFALSE);");
505 // Check if alien_CLOSE_SE is defined
506 TString closeSE = gSystem->Getenv("alien_CLOSE_SE");
507 if (!closeSE.IsNull()) {
508 Info("CheckFileCopy", "Your current close storage is pointing to: \
509 \n alien_CLOSE_SE = \"%s\"", closeSE.Data());
511 Warning("CheckFileCopy", "Your current close storage is empty ! Depending on your location, file copying may fail.");
513 // Check if grid directory exists.
514 if (!DirectoryExists(alienpath)) {
515 Error("CheckFileCopy", "Alien path %s does not seem to exist", alienpath);
518 TFile f("plugin_test_copy", "RECREATE");
519 // User may not have write permissions to current directory
521 Error("CheckFileCopy", "Cannot create local test file. Do you have write access to current directory: <%s> ?",
522 gSystem->WorkingDirectory());
526 if (FileExists(Form("alien://%s/%s",alienpath, f.GetName()))) gGrid->Rm(Form("alien://%s/%s",alienpath, f.GetName()));
527 if (!TFile::Cp(f.GetName(), Form("alien://%s/%s",alienpath, f.GetName()))) {
528 Error("CheckFileCopy", "Cannot copy files to Alien destination: <%s> This may be temporary, or: \
529 \n# 1. Make sure you have write permissions there. If this is the case: \
530 \n# 2. Check the storage availability at: http://alimonitor.cern.ch/stats?page=SE/table \
531 \n# Do: export alien_CLOSE_SE=\"working_disk_SE\" \
532 \n# To make this permanent put in in your .bashrc (in .alienshrc is not enough) \
533 \n# Redo token: rm /tmp/x509up_u$UID then: alien-token-init <username>", alienpath);
534 gSystem->Unlink(f.GetName());
537 gSystem->Unlink(f.GetName());
538 gGrid->Rm(Form("%s%s",alienpath,f.GetName()));
539 Info("CheckFileCopy", "### ...SUCCESS ###");
543 //______________________________________________________________________________
544 Bool_t AliAnalysisAlien::CheckInputData()
546 // Check validity of input data. If necessary, create xml files.
547 if (fProductionMode) return kTRUE;
548 if (!fInputFiles && !fRunNumbers.Length() && !fRunRange[0]) {
549 if (!fGridDataDir.Length()) {
550 Error("CkeckInputData", "AliEn path to base data directory must be set.\n = Use: SetGridDataDir()");
554 Error("CheckInputData", "Merging via jdl works only with run numbers, run range or provided xml");
557 Info("CheckInputData", "Analysis will make a single xml for base data directory %s",fGridDataDir.Data());
558 if (fDataPattern.Contains("tag") && TestBit(AliAnalysisGrid::kTest))
559 TObject::SetBit(AliAnalysisGrid::kUseTags, kTRUE); // ADDED (fix problem in determining the tag usage in test mode)
562 // Process declared files
563 Bool_t isCollection = kFALSE;
564 Bool_t isXml = kFALSE;
565 Bool_t useTags = kFALSE;
566 Bool_t checked = kFALSE;
567 if (!TestBit(AliAnalysisGrid::kTest)) CdWork();
569 TString workdir = gGrid->GetHomeDirectory();
570 workdir += fGridWorkingDir;
573 TIter next(fInputFiles);
574 while ((objstr=(TObjString*)next())) {
577 file += objstr->GetString();
578 // Store full lfn path
579 if (FileExists(file)) objstr->SetString(file);
581 file = objstr->GetName();
582 if (!FileExists(objstr->GetName())) {
583 Error("CheckInputData", "Data file %s not found or not in your working dir: %s",
584 objstr->GetName(), workdir.Data());
588 Bool_t iscoll, isxml, usetags;
589 CheckDataType(file, iscoll, isxml, usetags);
592 isCollection = iscoll;
595 TObject::SetBit(AliAnalysisGrid::kUseTags, useTags);
597 if ((iscoll != isCollection) || (isxml != isXml) || (usetags != useTags)) {
598 Error("CheckInputData", "Some conflict was found in the types of inputs");
604 // Process requested run numbers
605 if (!fRunNumbers.Length() && !fRunRange[0]) return kTRUE;
606 // Check validity of alien data directory
607 if (!fGridDataDir.Length()) {
608 Error("CkeckInputData", "AliEn path to base data directory must be set.\n = Use: SetGridDataDir()");
611 if (!DirectoryExists(fGridDataDir)) {
612 Error("CheckInputData", "Data directory %s not existing.", fGridDataDir.Data());
616 Error("CheckInputData", "You are using raw AliEn collections as input. Cannot process run numbers.");
620 if (checked && !isXml) {
621 Error("CheckInputData", "Cannot mix processing of full runs with non-xml files");
624 // Check validity of run number(s)
629 TString schunk, schunk2;
633 useTags = fDataPattern.Contains("tag");
634 TObject::SetBit(AliAnalysisGrid::kUseTags, useTags);
636 if (useTags != fDataPattern.Contains("tag")) {
637 Error("CheckInputData", "Cannot mix input files using/not using tags");
640 if (fRunNumbers.Length()) {
641 Info("CheckDataType", "Using supplied run numbers (run ranges are ignored)");
642 arr = fRunNumbers.Tokenize(" ");
644 while ((os=(TObjString*)next())) {
645 path = Form("%s/%s ", fGridDataDir.Data(), os->GetString().Data());
646 if (!DirectoryExists(path)) {
647 Warning("CheckInputData", "Run number %s not found in path: <%s>", os->GetString().Data(), path.Data());
650 path = Form("%s/%s.xml", workdir.Data(),os->GetString().Data());
651 TString msg = "\n##### file: ";
653 msg += " type: xml_collection;";
654 if (useTags) msg += " using_tags: Yes";
655 else msg += " using_tags: No";
656 Info("CheckDataType", "%s", msg.Data());
657 if (fNrunsPerMaster<2) {
658 AddDataFile(Form("%s.xml", os->GetString().Data()));
661 if (((nruns-1)%fNrunsPerMaster) == 0) {
662 schunk = os->GetString();
664 if ((nruns%fNrunsPerMaster)!=0 && os!=arr->Last()) continue;
665 schunk += Form("_%s.xml", os->GetString().Data());
671 Info("CheckDataType", "Using run range [%d, %d]", fRunRange[0], fRunRange[1]);
672 for (Int_t irun=fRunRange[0]; irun<=fRunRange[1]; irun++) {
673 format = Form("%%s/%s ", fRunPrefix.Data());
674 path = Form(format.Data(), fGridDataDir.Data(), irun);
675 if (!DirectoryExists(path)) {
678 format = Form("%%s/%s.xml", fRunPrefix.Data());
679 path = Form(format.Data(), workdir.Data(),irun);
680 TString msg = "\n##### file: ";
682 msg += " type: xml_collection;";
683 if (useTags) msg += " using_tags: Yes";
684 else msg += " using_tags: No";
685 Info("CheckDataType", "%s", msg.Data());
686 if (fNrunsPerMaster<2) {
687 format = Form("%s.xml", fRunPrefix.Data());
688 AddDataFile(Form(format.Data(),irun));
691 if (((nruns-1)%fNrunsPerMaster) == 0) {
692 schunk = Form(fRunPrefix.Data(),irun);
694 format = Form("_%s.xml", fRunPrefix.Data());
695 schunk2 = Form(format.Data(), irun);
696 if ((nruns%fNrunsPerMaster)!=0 && irun != fRunRange[1]) continue;
709 //______________________________________________________________________________
710 Bool_t AliAnalysisAlien::CreateDataset(const char *pattern)
712 // Create dataset for the grid data directory + run number.
713 const Int_t gMaxEntries = 15000;
714 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline)) return kTRUE;
716 Error("CreateDataset", "Cannot create dataset with no grid connection");
721 if (!TestBit(AliAnalysisGrid::kTest)) CdWork();
722 TString workdir = gGrid->GetHomeDirectory();
723 workdir += fGridWorkingDir;
725 // Compose the 'find' command arguments
728 TString options = "-x collection ";
729 if (TestBit(AliAnalysisGrid::kTest)) options += Form("-l %d ", fNtestFiles);
730 else options += Form("-l %d ", gMaxEntries); // Protection for the find command
731 TString conditions = "";
738 TString schunk, schunk2;
739 TGridCollection *cbase=0, *cadd=0;
740 if (!fRunNumbers.Length() && !fRunRange[0]) {
741 if (fInputFiles && fInputFiles->GetEntries()) return kTRUE;
742 // Make a single data collection from data directory.
744 if (!DirectoryExists(path)) {
745 Error("CreateDataset", "Path to data directory %s not valid",fGridDataDir.Data());
749 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
750 else file = Form("%s.xml", gSystem->BaseName(path));
754 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
756 command += Form("%s -o %d ",options.Data(), nstart);
760 command += conditions;
761 printf("command: %s\n", command.Data());
762 TGridResult *res = gGrid->Command(command);
764 // Write standard output to file
765 gROOT->ProcessLine(Form("gGrid->Stdout(); > __tmp%d__%s", stage, file.Data()));
766 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
767 Bool_t nullFile = kFALSE;
769 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
771 nullFile = (gSystem->Exec(Form("grep -c /event __tmp%d__%s 2>/dev/null > __tmp__",stage,file.Data()))==0)?kFALSE:kTRUE;
773 Error("CreateDataset","Dataset %s produced by the previous find command is empty !", file.Data());
774 gSystem->Exec("rm -f __tmp*");
782 gSystem->Exec("rm -f __tmp__");
783 ncount = line.Atoi();
786 if (ncount == gMaxEntries) {
787 Info("CreateDataset", "Dataset %s has more than 15K entries. Trying to merge...", file.Data());
788 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
789 if (!cbase) cbase = cadd;
797 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
798 printf("... please wait - TAlienCollection::Add() scales badly...\n");
801 cbase->ExportXML(Form("file://%s", file.Data()),kFALSE,kFALSE, file, "Merged entries for a run");
802 delete cbase; cbase = 0;
804 TFile::Cp(Form("__tmp%d__%s",stage, file.Data()), file.Data());
806 gSystem->Exec("rm -f __tmp*");
807 Info("CreateDataset", "Created dataset %s with %d files", file.Data(), nstart+ncount);
811 Bool_t fileExists = FileExists(file);
812 if (!TestBit(AliAnalysisGrid::kTest) && (!fileExists || fOverwriteMode)) {
813 // Copy xml file to alien space
814 if (fileExists) gGrid->Rm(file);
815 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
816 if (!FileExists(file)) {
817 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
820 // Update list of files to be processed.
822 AddDataFile(Form("%s/%s", workdir.Data(), file.Data()));
826 Bool_t nullResult = kTRUE;
827 if (fRunNumbers.Length()) {
828 TObjArray *arr = fRunNumbers.Tokenize(" ");
831 while ((os=(TObjString*)next())) {
834 path = Form("%s/%s/ ", fGridDataDir.Data(), os->GetString().Data());
835 if (!DirectoryExists(path)) continue;
837 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
838 else file = Form("%s.xml", os->GetString().Data());
839 // If local collection file does not exist, create it via 'find' command.
843 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
845 command += Form("%s -o %d ",options.Data(), nstart);
848 command += conditions;
849 TGridResult *res = gGrid->Command(command);
851 // Write standard output to file
852 gROOT->ProcessLine(Form("gGrid->Stdout(); > __tmp%d__%s", stage,file.Data()));
853 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
854 Bool_t nullFile = kFALSE;
856 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
858 nullFile = (gSystem->Exec(Form("grep -c /event __tmp%d__%s 2>/dev/null > __tmp__",stage,file.Data()))==0)?kFALSE:kTRUE;
860 Warning("CreateDataset","Dataset %s produced by: <%s> is empty !", file.Data(), command.Data());
861 gSystem->Exec("rm -f __tmp*");
862 fRunNumbers.ReplaceAll(os->GetString().Data(), "");
870 gSystem->Exec("rm -f __tmp__");
871 ncount = line.Atoi();
875 if (ncount == gMaxEntries) {
876 Info("CreateDataset", "Dataset %s has more than 15K entries. Trying to merge...", file.Data());
877 if (fNrunsPerMaster > 1) {
878 Error("CreateDataset", "File %s has more than %d entries. Please set the number of runs per master to 1 !",
879 file.Data(),gMaxEntries);
882 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
883 if (!cbase) cbase = cadd;
890 if (cbase && fNrunsPerMaster<2) {
891 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
892 printf("... please wait - TAlienCollection::Add() scales badly...\n");
895 cbase->ExportXML(Form("file://%s", file.Data()),kFALSE,kFALSE, file, "Merged entries for a run");
896 delete cbase; cbase = 0;
898 TFile::Cp(Form("__tmp%d__%s",stage, file.Data()), file.Data());
900 gSystem->Exec("rm -f __tmp*");
901 Info("CreateDataset", "Created dataset %s with %d files", file.Data(), nstart+ncount);
905 if (TestBit(AliAnalysisGrid::kTest)) break;
906 // Check if there is one run per master job.
907 if (fNrunsPerMaster<2) {
908 if (FileExists(file)) {
909 if (fOverwriteMode) gGrid->Rm(file);
911 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
915 // Copy xml file to alien space
916 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
917 if (!FileExists(file)) {
918 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
924 if (((nruns-1)%fNrunsPerMaster) == 0) {
925 schunk = os->GetString();
926 cbase = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
928 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
929 printf(" Merging collection <%s> into masterjob input...\n", file.Data());
933 if ((nruns%fNrunsPerMaster)!=0 && os!=arr->Last()) {
936 schunk += Form("_%s.xml", os->GetString().Data());
937 if (FileExists(schunk)) {
938 if (fOverwriteMode) gGrid->Rm(file);
940 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", schunk.Data());
944 printf("Exporting merged collection <%s> and copying to AliEn\n", schunk.Data());
945 cbase->ExportXML(Form("file://%s", schunk.Data()),kFALSE,kFALSE, schunk, "Merged runs");
946 TFile::Cp(Form("file:%s",schunk.Data()), Form("alien://%s/%s",workdir.Data(), schunk.Data()));
947 if (!FileExists(schunk)) {
948 Error("CreateDataset", "Copy command did NOT succeed for %s", schunk.Data());
956 Error("CreateDataset", "No valid dataset corresponding to the query!");
960 // Process a full run range.
961 for (Int_t irun=fRunRange[0]; irun<=fRunRange[1]; irun++) {
962 format = Form("%%s/%s ", fRunPrefix.Data());
965 path = Form(format.Data(), fGridDataDir.Data(), irun);
966 if (!DirectoryExists(path)) continue;
968 format = Form("%s.xml", fRunPrefix.Data());
969 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
970 else file = Form(format.Data(), irun);
971 if (FileExists(file) && fNrunsPerMaster<2 && !TestBit(AliAnalysisGrid::kTest)) {
972 if (fOverwriteMode) gGrid->Rm(file);
974 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
978 // If local collection file does not exist, create it via 'find' command.
982 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
984 command += Form("%s -o %d ",options.Data(), nstart);
987 command += conditions;
988 TGridResult *res = gGrid->Command(command);
990 // Write standard output to file
991 gROOT->ProcessLine(Form("gGrid->Stdout(); > __tmp%d__%s", stage,file.Data()));
992 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
993 Bool_t nullFile = kFALSE;
995 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
997 nullFile = (gSystem->Exec(Form("grep -c /event __tmp%d__%s 2>/dev/null > __tmp__",stage,file.Data()))==0)?kFALSE:kTRUE;
999 Warning("CreateDataset","Dataset %s produced by: <%s> is empty !", file.Data(), command.Data());
1000 gSystem->Exec("rm -f __tmp*");
1008 gSystem->Exec("rm -f __tmp__");
1009 ncount = line.Atoi();
1011 nullResult = kFALSE;
1013 if (ncount == gMaxEntries) {
1014 Info("CreateDataset", "Dataset %s has more than 15K entries. Trying to merge...", file.Data());
1015 if (fNrunsPerMaster > 1) {
1016 Error("CreateDataset", "File %s has more than %d entries. Please set the number of runs per master to 1 !",
1017 file.Data(),gMaxEntries);
1020 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1021 if (!cbase) cbase = cadd;
1028 if (cbase && fNrunsPerMaster<2) {
1029 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
1030 printf("... please wait - TAlienCollection::Add() scales badly...\n");
1033 cbase->ExportXML(Form("file://%s", file.Data()),kFALSE,kFALSE, file, "Merged entries for a run");
1034 delete cbase; cbase = 0;
1036 TFile::Cp(Form("__tmp%d__%s",stage, file.Data()), file.Data());
1038 Info("CreateDataset", "Created dataset %s with %d files", file.Data(), nstart+ncount);
1042 if (TestBit(AliAnalysisGrid::kTest)) break;
1043 // Check if there is one run per master job.
1044 if (fNrunsPerMaster<2) {
1045 if (FileExists(file)) {
1046 if (fOverwriteMode) gGrid->Rm(file);
1048 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
1052 // Copy xml file to alien space
1053 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
1054 if (!FileExists(file)) {
1055 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
1060 // Check if the collection for the chunk exist locally.
1061 Int_t nchunk = (nruns-1)/fNrunsPerMaster;
1062 if (FileExists(fInputFiles->At(nchunk)->GetName())) {
1063 if (fOverwriteMode) gGrid->Rm(fInputFiles->At(nchunk)->GetName());
1066 printf(" Merging collection <%s> into %d runs chunk...\n",file.Data(),fNrunsPerMaster);
1067 if (((nruns-1)%fNrunsPerMaster) == 0) {
1068 schunk = Form(fRunPrefix.Data(), irun);
1069 cbase = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
1071 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
1075 format = Form("%%s_%s.xml", fRunPrefix.Data());
1076 schunk2 = Form(format.Data(), schunk.Data(), irun);
1077 if ((nruns%fNrunsPerMaster)!=0 && irun!=fRunRange[1] && schunk2 != fInputFiles->Last()->GetName()) {
1081 if (FileExists(schunk)) {
1082 if (fOverwriteMode) gGrid->Rm(schunk);
1084 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", schunk.Data());
1088 printf("Exporting merged collection <%s> and copying to AliEn.\n", schunk.Data());
1089 cbase->ExportXML(Form("file://%s", schunk.Data()),kFALSE,kFALSE, schunk, "Merged runs");
1090 if (FileExists(schunk)) {
1091 if (fOverwriteMode) gGrid->Rm(schunk);
1093 Info("CreateDataset", "\n##### Dataset %s exist. Skipping copy...", schunk.Data());
1097 TFile::Cp(Form("file:%s",schunk.Data()), Form("alien://%s/%s",workdir.Data(), schunk.Data()));
1098 if (!FileExists(schunk)) {
1099 Error("CreateDataset", "Copy command did NOT succeed for %s", schunk.Data());
1105 Error("CreateDataset", "No valid dataset corresponding to the query!");
1112 //______________________________________________________________________________
1113 Bool_t AliAnalysisAlien::CreateJDL()
1115 // Generate a JDL file according to current settings. The name of the file is
1116 // specified by fJDLName.
1117 Bool_t error = kFALSE;
1119 Bool_t copy = kTRUE;
1120 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
1121 Bool_t generate = kTRUE;
1122 if (TestBit(AliAnalysisGrid::kTest) || TestBit(AliAnalysisGrid::kSubmit)) generate = kFALSE;
1124 Error("CreateJDL", "Alien connection required");
1127 // Check validity of alien workspace
1129 if (!fProductionMode && !fGridWorkingDir.BeginsWith("/alice")) workdir = gGrid->GetHomeDirectory();
1130 if (!fProductionMode && !TestBit(AliAnalysisGrid::kTest)) CdWork();
1131 workdir += fGridWorkingDir;
1135 Error("CreateJDL()", "Define some input files for your analysis.");
1138 // Compose list of input files
1139 // Check if output files were defined
1140 if (!fOutputFiles.Length()) {
1141 Error("CreateJDL", "You must define at least one output file");
1144 // Check if an output directory was defined and valid
1145 if (!fGridOutputDir.Length()) {
1146 Error("CreateJDL", "You must define AliEn output directory");
1149 if (!fProductionMode) {
1150 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s", workdir.Data(), fGridOutputDir.Data());
1151 if (!DirectoryExists(fGridOutputDir)) {
1152 if (gGrid->Mkdir(fGridOutputDir,"-p")) {
1153 Info("CreateJDL", "\n##### Created alien output directory %s", fGridOutputDir.Data());
1155 Error("CreateJDL", "Could not create alien output directory %s", fGridOutputDir.Data());
1159 Warning("CreateJDL", "#### Output directory %s exists! If this contains old data, jobs will fail with ERROR_SV !!! ###", fGridOutputDir.Data());
1164 // Exit if any error up to now
1165 if (error) return kFALSE;
1167 if (!fUser.IsNull()) {
1168 fGridJDL->SetValue("User", Form("\"%s\"", fUser.Data()));
1169 fMergingJDL->SetValue("User", Form("\"%s\"", fUser.Data()));
1171 fGridJDL->SetExecutable(fExecutable, "This is the startup script");
1172 TString mergeExec = fExecutable;
1173 mergeExec.ReplaceAll(".sh", "_merge.sh");
1174 fMergingJDL->SetExecutable(mergeExec, "This is the startup script");
1175 mergeExec.ReplaceAll(".sh", ".C");
1176 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),mergeExec.Data()), "List of input files to be uploaded to workers");
1177 if (!fArguments.IsNull())
1178 fGridJDL->SetArguments(fArguments, "Arguments for the executable command");
1179 if (IsOneStageMerging()) fMergingJDL->SetArguments(fGridOutputDir);
1181 if (fProductionMode) fMergingJDL->SetArguments("wn.xml $4"); // xml, stage
1182 else fMergingJDL->SetArguments("wn.xml $2"); // xml, stage
1185 fGridJDL->SetValue("TTL", Form("\"%d\"",fTTL));
1186 fGridJDL->SetDescription("TTL", Form("Time after which the job is killed (%d min.)", fTTL/60));
1187 fMergingJDL->SetValue("TTL", Form("\"%d\"",fTTL));
1188 fMergingJDL->SetDescription("TTL", Form("Time after which the job is killed (%d min.)", fTTL/60));
1190 if (fMaxInitFailed > 0) {
1191 fGridJDL->SetValue("MaxInitFailed", Form("\"%d\"",fMaxInitFailed));
1192 fGridJDL->SetDescription("MaxInitFailed", "Maximum number of first failing jobs to abort the master job");
1194 if (fSplitMaxInputFileNumber > 0) {
1195 fGridJDL->SetValue("SplitMaxInputFileNumber", Form("\"%d\"", fSplitMaxInputFileNumber));
1196 fGridJDL->SetDescription("SplitMaxInputFileNumber", "Maximum number of input files to be processed per subjob");
1198 if (!IsOneStageMerging()) {
1199 fMergingJDL->SetValue("SplitMaxInputFileNumber", Form("\"%d\"",fMaxMergeFiles));
1200 fMergingJDL->SetDescription("SplitMaxInputFileNumber", "Maximum number of input files to be merged in one go");
1202 if (fSplitMode.Length()) {
1203 fGridJDL->SetValue("Split", Form("\"%s\"", fSplitMode.Data()));
1204 fGridJDL->SetDescription("Split", "We split per SE or file");
1206 fMergingJDL->SetValue("Split", "\"se\"");
1207 fMergingJDL->SetDescription("Split", "We split per SE for merging in stages");
1208 if (!fAliROOTVersion.IsNull()) {
1209 fGridJDL->AddToPackages("AliRoot", fAliROOTVersion,"VO_ALICE", "List of requested packages");
1210 fMergingJDL->AddToPackages("AliRoot", fAliROOTVersion, "VO_ALICE", "List of requested packages");
1212 if (!fROOTVersion.IsNull()) {
1213 fGridJDL->AddToPackages("ROOT", fROOTVersion);
1214 fMergingJDL->AddToPackages("ROOT", fROOTVersion);
1216 if (!fAPIVersion.IsNull()) {
1217 fGridJDL->AddToPackages("APISCONFIG", fAPIVersion);
1218 fMergingJDL->AddToPackages("APISCONFIG", fAPIVersion);
1220 if (!fExternalPackages.IsNull()) {
1221 arr = fExternalPackages.Tokenize(" ");
1223 while ((os=(TObjString*)next())) {
1224 TString pkgname = os->GetString();
1225 Int_t index = pkgname.Index("::");
1226 TString pkgversion = pkgname(index+2, pkgname.Length());
1227 pkgname.Remove(index);
1228 fGridJDL->AddToPackages(pkgname, pkgversion);
1229 fMergingJDL->AddToPackages(pkgname, pkgversion);
1233 fGridJDL->SetInputDataListFormat(fInputFormat, "Format of input data");
1234 fGridJDL->SetInputDataList("wn.xml", "Collection name to be processed on each worker node");
1235 fMergingJDL->SetInputDataListFormat(fInputFormat, "Format of input data");
1236 fMergingJDL->SetInputDataList("wn.xml", "Collection name to be processed on each worker node");
1237 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), fAnalysisMacro.Data()), "List of input files to be uploaded to workers");
1238 TString analysisFile = fExecutable;
1239 analysisFile.ReplaceAll(".sh", ".root");
1240 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),analysisFile.Data()));
1241 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),analysisFile.Data()));
1242 if (IsUsingTags() && !gSystem->AccessPathName("ConfigureCuts.C"))
1243 fGridJDL->AddToInputSandbox(Form("LF:%s/ConfigureCuts.C", workdir.Data()));
1244 if (fAdditionalLibs.Length()) {
1245 arr = fAdditionalLibs.Tokenize(" ");
1247 while ((os=(TObjString*)next())) {
1248 if (os->GetString().Contains(".so")) continue;
1249 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), os->GetString().Data()));
1250 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), os->GetString().Data()));
1255 TIter next(fPackages);
1257 while ((obj=next())) {
1258 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), obj->GetName()));
1259 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), obj->GetName()));
1262 if (fOutputArchive.Length()) {
1263 arr = fOutputArchive.Tokenize(" ");
1265 Bool_t first = kTRUE;
1266 const char *comment = "Files to be archived";
1267 const char *comment1 = comment;
1268 while ((os=(TObjString*)next())) {
1269 if (!first) comment = NULL;
1270 if (!os->GetString().Contains("@") && fCloseSE.Length())
1271 fGridJDL->AddToOutputArchive(Form("%s@%s",os->GetString().Data(), fCloseSE.Data()), comment);
1273 fGridJDL->AddToOutputArchive(os->GetString(), comment);
1277 // Output archive for the merging jdl
1278 TString outputArchive;
1279 if (TestBit(AliAnalysisGrid::kDefaultOutputs)) {
1280 outputArchive = "log_archive.zip:std*@disk=1 ";
1281 // Add normal output files, extra files + terminate files
1282 TString files = GetListOfFiles("outextter");
1283 // Do not register merge excludes
1284 if (!fMergeExcludes.IsNull()) {
1285 arr = fMergeExcludes.Tokenize(" ");
1287 while ((os=(TObjString*)next1())) {
1288 files.ReplaceAll(Form("%s,",os->GetString().Data()),"");
1289 files.ReplaceAll(os->GetString(),"");
1293 files.ReplaceAll(".root", "*.root");
1294 outputArchive += Form("root_archive.zip:%s,*.stat@disk=%d",files.Data(),fNreplicas);
1296 TString files = fOutputArchive;
1297 files.ReplaceAll(".root", "*.root"); // nreplicas etc should be already atttached by use
1298 outputArchive = files;
1300 arr = outputArchive.Tokenize(" ");
1304 while ((os=(TObjString*)next2())) {
1305 if (!first) comment = NULL;
1306 TString currentfile = os->GetString();
1307 if (!currentfile.Contains("@") && fCloseSE.Length())
1308 fMergingJDL->AddToOutputArchive(Form("%s@%s",currentfile.Data(), fCloseSE.Data()), comment);
1310 fMergingJDL->AddToOutputArchive(currentfile, comment);
1315 arr = fOutputFiles.Tokenize(",");
1317 Bool_t first = kTRUE;
1318 const char *comment = "Files to be saved";
1319 while ((os=(TObjString*)next())) {
1320 // Ignore ouputs in jdl that are also in outputarchive
1321 TString sout = os->GetString();
1322 sout.ReplaceAll("*", "");
1323 sout.ReplaceAll(".root", "");
1324 if (sout.Index("@")>0) sout.Remove(sout.Index("@"));
1325 if (fOutputArchive.Contains(sout)) continue;
1326 if (!first) comment = NULL;
1327 if (!os->GetString().Contains("@") && fCloseSE.Length())
1328 fGridJDL->AddToOutputSandbox(Form("%s@%s",os->GetString().Data(), fCloseSE.Data()), comment);
1330 fGridJDL->AddToOutputSandbox(os->GetString(), comment);
1332 if (fMergeExcludes.Contains(sout)) continue;
1333 if (!os->GetString().Contains("@") && fCloseSE.Length())
1334 fMergingJDL->AddToOutputSandbox(Form("%s@%s",os->GetString().Data(), fCloseSE.Data()), comment);
1336 fMergingJDL->AddToOutputSandbox(os->GetString(), comment);
1339 fGridJDL->SetPrice((UInt_t)fPrice, "AliEn price for this job");
1340 fMergingJDL->SetPrice((UInt_t)fPrice, "AliEn price for this job");
1341 TString validationScript = fValidationScript;
1342 fGridJDL->SetValidationCommand(Form("%s/%s", workdir.Data(),validationScript.Data()), "Validation script to be run for each subjob");
1343 validationScript.ReplaceAll(".sh", "_merge.sh");
1344 fMergingJDL->SetValidationCommand(Form("%s/%s", workdir.Data(),validationScript.Data()), "Validation script to be run for each subjob");
1345 if (fMasterResubmitThreshold) {
1346 fGridJDL->SetValue("MasterResubmitThreshold", Form("\"%d%%\"", fMasterResubmitThreshold));
1347 fGridJDL->SetDescription("MasterResubmitThreshold", "Resubmit failed jobs until DONE rate reaches this percentage");
1349 // Write a jdl with 2 input parameters: collection name and output dir name.
1352 // Copy jdl to grid workspace
1354 // Check if an output directory was defined and valid
1355 if (!fGridOutputDir.Length()) {
1356 Error("CreateJDL", "You must define AliEn output directory");
1359 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s", workdir.Data(), fGridOutputDir.Data());
1360 if (!fProductionMode && !DirectoryExists(fGridOutputDir)) {
1361 if (gGrid->Mkdir(fGridOutputDir,"-p")) {
1362 Info("CreateJDL", "\n##### Created alien output directory %s", fGridOutputDir.Data());
1364 Error("CreateJDL", "Could not create alien output directory %s", fGridOutputDir.Data());
1370 if (TestBit(AliAnalysisGrid::kSubmit)) {
1371 TString mergeJDLName = fExecutable;
1372 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
1373 TString locjdl = Form("%s/%s", fGridOutputDir.Data(),fJDLName.Data());
1374 TString locjdl1 = Form("%s/%s", fGridOutputDir.Data(),mergeJDLName.Data());
1375 if (fProductionMode) {
1376 locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
1377 locjdl1 = Form("%s/%s", workdir.Data(),mergeJDLName.Data());
1379 if (FileExists(locjdl)) gGrid->Rm(locjdl);
1380 if (FileExists(locjdl1)) gGrid->Rm(locjdl1);
1381 Info("CreateJDL", "\n##### Copying JDL file <%s> to your AliEn output directory", fJDLName.Data());
1382 TFile::Cp(Form("file:%s",fJDLName.Data()), Form("alien://%s", locjdl.Data()));
1384 Info("CreateJDL", "\n##### Copying merging JDL file <%s> to your AliEn output directory", mergeJDLName.Data());
1385 TFile::Cp(Form("file:%s",mergeJDLName.Data()), Form("alien://%s", locjdl1.Data()));
1388 if (fAdditionalLibs.Length()) {
1389 arr = fAdditionalLibs.Tokenize(" ");
1392 while ((os=(TObjString*)next())) {
1393 if (os->GetString().Contains(".so")) continue;
1394 Info("CreateJDL", "\n##### Copying dependency: <%s> to your alien workspace", os->GetString().Data());
1395 if (FileExists(os->GetString())) gGrid->Rm(os->GetString());
1396 TFile::Cp(Form("file:%s",os->GetString().Data()), Form("alien://%s/%s", workdir.Data(), os->GetString().Data()));
1401 TIter next(fPackages);
1403 while ((obj=next())) {
1404 if (FileExists(obj->GetName())) gGrid->Rm(obj->GetName());
1405 Info("CreateJDL", "\n##### Copying dependency: <%s> to your alien workspace", obj->GetName());
1406 TFile::Cp(Form("file:%s",obj->GetName()), Form("alien://%s/%s", workdir.Data(), obj->GetName()));
1413 //______________________________________________________________________________
1414 Bool_t AliAnalysisAlien::WriteJDL(Bool_t copy)
1416 // Writes one or more JDL's corresponding to findex. If findex is negative,
1417 // all run numbers are considered in one go (jdl). For non-negative indices
1418 // they correspond to the indices in the array fInputFiles.
1419 if (!fInputFiles) return kFALSE;
1422 if (!fProductionMode && !fGridWorkingDir.BeginsWith("/alice")) workdir = gGrid->GetHomeDirectory();
1423 workdir += fGridWorkingDir;
1424 TString stageName = "$2";
1425 if (fProductionMode) stageName = "$4";
1426 if (!fMergeDirName.IsNull()) {
1427 fMergingJDL->AddToInputDataCollection(Form("LF:$1/%s/Stage_%s.xml,nodownload",fMergeDirName.Data(),stageName.Data()), "Collection of files to be merged for current stage");
1428 fMergingJDL->SetOutputDirectory(Form("$1/%s/Stage_%s/#alien_counter_03i#",fMergeDirName.Data(),stageName.Data()), "Output directory");
1430 fMergingJDL->AddToInputDataCollection(Form("LF:$1/Stage_%s.xml,nodownload",stageName.Data()), "Collection of files to be merged for current stage");
1431 fMergingJDL->SetOutputDirectory(Form("$1/Stage_%s/#alien_counter_03i#",stageName.Data()), "Output directory");
1433 if (fProductionMode) {
1434 TIter next(fInputFiles);
1435 while ((os=next())) {
1436 fGridJDL->AddToInputDataCollection(Form("LF:%s,nodownload", os->GetName()), "Input xml collections");
1438 if (!fOutputToRunNo)
1439 fGridJDL->SetOutputDirectory(Form("%s/#alien_counter_04i#", fGridOutputDir.Data()));
1441 fGridJDL->SetOutputDirectory(fGridOutputDir);
1443 if (!fRunNumbers.Length() && !fRunRange[0]) {
1444 // One jdl with no parameters in case input data is specified by name.
1445 TIter next(fInputFiles);
1447 fGridJDL->AddToInputDataCollection(Form("LF:%s,nodownload", os->GetName()), "Input xml collections");
1448 if (!fOutputSingle.IsNull())
1449 fGridJDL->SetOutputDirectory(Form("#alienfulldir#/../%s",fOutputSingle.Data()), "Output directory");
1451 fGridJDL->SetOutputDirectory(Form("%s/#alien_counter_03i#", fGridOutputDir.Data()), "Output directory");
1452 fMergingJDL->SetOutputDirectory(fGridOutputDir);
1455 // One jdl to be submitted with 2 input parameters: data collection name and output dir prefix
1456 fGridJDL->AddToInputDataCollection(Form("LF:%s/$1,nodownload", workdir.Data()), "Input xml collections");
1457 if (!fOutputSingle.IsNull()) {
1458 if (!fOutputToRunNo) fGridJDL->SetOutputDirectory(Form("#alienfulldir#/%s",fOutputSingle.Data()), "Output directory");
1459 else fGridJDL->SetOutputDirectory(Form("%s/$2",fGridOutputDir.Data()), "Output directory");
1461 fGridJDL->SetOutputDirectory(Form("%s/$2/#alien_counter_03i#", fGridOutputDir.Data()), "Output directory");
1466 // Generate the JDL as a string
1467 TString sjdl = fGridJDL->Generate();
1468 TString sjdl1 = fMergingJDL->Generate();
1470 if (!fMergeDirName.IsNull()) {
1471 fMergingJDL->SetOutputDirectory(Form("$1/%s",fMergeDirName.Data()), "Output directory");
1472 fMergingJDL->AddToInputSandbox(Form("LF:$1/%s/Stage_%s.xml",fMergeDirName.Data(),stageName.Data()));
1474 fMergingJDL->SetOutputDirectory("$1", "Output directory");
1475 fMergingJDL->AddToInputSandbox(Form("LF:$1/Stage_%s.xml",stageName.Data()));
1477 TString sjdl2 = fMergingJDL->Generate();
1478 Int_t index, index1;
1479 sjdl.ReplaceAll("\"LF:", "\n \"LF:");
1480 sjdl.ReplaceAll("(member", "\n (member");
1481 sjdl.ReplaceAll("\",\"VO_", "\",\n \"VO_");
1482 sjdl.ReplaceAll("{", "{\n ");
1483 sjdl.ReplaceAll("};", "\n};");
1484 sjdl.ReplaceAll("{\n \n", "{\n");
1485 sjdl.ReplaceAll("\n\n", "\n");
1486 sjdl.ReplaceAll("OutputDirectory", "OutputDir");
1487 sjdl1.ReplaceAll("\"LF:", "\n \"LF:");
1488 sjdl1.ReplaceAll("(member", "\n (member");
1489 sjdl1.ReplaceAll("\",\"VO_", "\",\n \"VO_");
1490 sjdl1.ReplaceAll("{", "{\n ");
1491 sjdl1.ReplaceAll("};", "\n};");
1492 sjdl1.ReplaceAll("{\n \n", "{\n");
1493 sjdl1.ReplaceAll("\n\n", "\n");
1494 sjdl1.ReplaceAll("OutputDirectory", "OutputDir");
1495 sjdl2.ReplaceAll("\"LF:", "\n \"LF:");
1496 sjdl2.ReplaceAll("(member", "\n (member");
1497 sjdl2.ReplaceAll("\",\"VO_", "\",\n \"VO_");
1498 sjdl2.ReplaceAll("{", "{\n ");
1499 sjdl2.ReplaceAll("};", "\n};");
1500 sjdl2.ReplaceAll("{\n \n", "{\n");
1501 sjdl2.ReplaceAll("\n\n", "\n");
1502 sjdl2.ReplaceAll("OutputDirectory", "OutputDir");
1503 sjdl += "JDLVariables = \n{\n \"Packages\",\n \"OutputDir\"\n};\n";
1504 sjdl.Prepend(Form("Jobtag = {\n \"comment:%s\"\n};\n", fJobTag.Data()));
1505 index = sjdl.Index("JDLVariables");
1506 if (index >= 0) sjdl.Insert(index, "\n# JDL variables\n");
1507 sjdl += "Workdirectorysize = {\"5000MB\"};";
1508 sjdl1 += "Workdirectorysize = {\"5000MB\"};";
1509 sjdl1 += "JDLVariables = \n{\n \"Packages\",\n \"OutputDir\"\n};\n";
1510 index = fJobTag.Index(":");
1511 if (index < 0) index = fJobTag.Length();
1512 TString jobTag = fJobTag;
1513 if (fProductionMode) jobTag.Insert(index,"_Stage$4");
1514 sjdl1.Prepend(Form("Jobtag = {\n \"comment:%s_Merging\"\n};\n", jobTag.Data()));
1515 if (fProductionMode) {
1516 sjdl1.Prepend("# Generated merging jdl (production mode) \
1517 \n# $1 = full alien path to output directory to be merged \
1518 \n# $2 = train number \
1519 \n# $3 = production (like LHC10b) \
1520 \n# $4 = merging stage \
1521 \n# Stage_<n>.xml made via: find <OutputDir> *Stage<n-1>/*root_archive.zip\n");
1522 sjdl2.Prepend(Form("Jobtag = {\n \"comment:%s_FinalMerging\"\n};\n", jobTag.Data()));
1523 sjdl2.Prepend("# Generated merging jdl \
1524 \n# $1 = full alien path to output directory to be merged \
1525 \n# $2 = train number \
1526 \n# $3 = production (like LHC10b) \
1527 \n# $4 = merging stage \
1528 \n# Stage_<n>.xml made via: find <OutputDir> *Stage<n-1>/*root_archive.zip\n");
1530 sjdl1.Prepend("# Generated merging jdl \
1531 \n# $1 = full alien path to output directory to be merged \
1532 \n# $2 = merging stage \
1533 \n# xml made via: find <OutputDir> *Stage<n-1>/*root_archive.zip\n");
1534 sjdl2.Prepend(Form("Jobtag = {\n \"comment:%s_FinalMerging\"\n};\n", jobTag.Data()));
1535 sjdl2.Prepend("# Generated merging jdl \
1536 \n# $1 = full alien path to output directory to be merged \
1537 \n# $2 = merging stage \
1538 \n# xml made via: find <OutputDir> *Stage<n-1>/*root_archive.zip\n");
1540 index = sjdl1.Index("JDLVariables");
1541 if (index >= 0) sjdl1.Insert(index, "\n# JDL variables\n");
1542 index = sjdl2.Index("JDLVariables");
1543 if (index >= 0) sjdl2.Insert(index, "\n# JDL variables\n");
1544 sjdl1 += "Workdirectorysize = {\"5000MB\"};";
1545 sjdl2 += "Workdirectorysize = {\"5000MB\"};";
1546 index = sjdl2.Index("Split =");
1548 index1 = sjdl2.Index("\n", index);
1549 sjdl2.Remove(index, index1-index+1);
1551 index = sjdl2.Index("SplitMaxInputFileNumber");
1553 index1 = sjdl2.Index("\n", index);
1554 sjdl2.Remove(index, index1-index+1);
1556 index = sjdl2.Index("InputDataCollection");
1558 index1 = sjdl2.Index(";", index);
1559 sjdl2.Remove(index, index1-index+1);
1561 index = sjdl2.Index("InputDataListFormat");
1563 index1 = sjdl2.Index("\n", index);
1564 sjdl2.Remove(index, index1-index+1);
1566 index = sjdl2.Index("InputDataList");
1568 index1 = sjdl2.Index("\n", index);
1569 sjdl2.Remove(index, index1-index+1);
1571 sjdl2.ReplaceAll("wn.xml", Form("Stage_%s.xml",stageName.Data()));
1572 // Write jdl to file
1574 out.open(fJDLName.Data(), ios::out);
1576 Error("WriteJDL", "Bad file name: %s", fJDLName.Data());
1579 out << sjdl << endl;
1581 TString mergeJDLName = fExecutable;
1582 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
1585 out1.open(mergeJDLName.Data(), ios::out);
1587 Error("WriteJDL", "Bad file name: %s", mergeJDLName.Data());
1590 out1 << sjdl1 << endl;
1593 TString finalJDL = mergeJDLName;
1594 finalJDL.ReplaceAll(".jdl", "_final.jdl");
1595 out2.open(finalJDL.Data(), ios::out);
1597 Error("WriteJDL", "Bad file name: %s", finalJDL.Data());
1600 out2 << sjdl2 << endl;
1604 // Copy jdl to grid workspace
1606 Info("WriteJDL", "\n##### You may want to review jdl:%s and analysis macro:%s before running in <submit> mode", fJDLName.Data(), fAnalysisMacro.Data());
1608 TString locjdl = Form("%s/%s", fGridOutputDir.Data(),fJDLName.Data());
1609 TString locjdl1 = Form("%s/%s", fGridOutputDir.Data(),mergeJDLName.Data());
1610 TString finalJDL = mergeJDLName;
1611 finalJDL.ReplaceAll(".jdl", "_final.jdl");
1612 TString locjdl2 = Form("%s/%s", fGridOutputDir.Data(),finalJDL.Data());
1613 if (fProductionMode) {
1614 locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
1615 locjdl1 = Form("%s/%s", workdir.Data(),mergeJDLName.Data());
1616 locjdl2 = Form("%s/%s", workdir.Data(),finalJDL.Data());
1618 if (FileExists(locjdl)) gGrid->Rm(locjdl);
1619 if (FileExists(locjdl1)) gGrid->Rm(locjdl1);
1620 if (FileExists(locjdl2)) gGrid->Rm(locjdl2);
1621 Info("WriteJDL", "\n##### Copying JDL file <%s> to your AliEn output directory", fJDLName.Data());
1622 TFile::Cp(Form("file:%s",fJDLName.Data()), Form("alien://%s", locjdl.Data()));
1624 Info("WriteJDL", "\n##### Copying merging JDL files <%s> to your AliEn output directory", mergeJDLName.Data());
1625 TFile::Cp(Form("file:%s",mergeJDLName.Data()), Form("alien://%s", locjdl1.Data()));
1626 TFile::Cp(Form("file:%s",finalJDL.Data()), Form("alien://%s", locjdl2.Data()));
1632 //______________________________________________________________________________
1633 Bool_t AliAnalysisAlien::FileExists(const char *lfn)
1635 // Returns true if file exists.
1636 if (!gGrid) return kFALSE;
1638 slfn.ReplaceAll("alien://","");
1639 TGridResult *res = gGrid->Ls(slfn);
1640 if (!res) return kFALSE;
1641 TMap *map = dynamic_cast<TMap*>(res->At(0));
1646 TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("name"));
1647 if (!objs || !objs->GetString().Length()) {
1655 //______________________________________________________________________________
1656 Bool_t AliAnalysisAlien::DirectoryExists(const char *dirname)
1658 // Returns true if directory exists. Can be also a path.
1659 if (!gGrid) return kFALSE;
1660 // Check if dirname is a path
1661 TString dirstripped = dirname;
1662 dirstripped = dirstripped.Strip();
1663 dirstripped = dirstripped.Strip(TString::kTrailing, '/');
1664 TString dir = gSystem->BaseName(dirstripped);
1666 TString path = gSystem->DirName(dirstripped);
1667 TGridResult *res = gGrid->Ls(path, "-F");
1668 if (!res) return kFALSE;
1672 while ((map=dynamic_cast<TMap*>(next()))) {
1673 obj = map->GetValue("name");
1675 if (dir == obj->GetName()) {
1684 //______________________________________________________________________________
1685 void AliAnalysisAlien::CheckDataType(const char *lfn, Bool_t &isCollection, Bool_t &isXml, Bool_t &useTags)
1687 // Check input data type.
1688 isCollection = kFALSE;
1692 Error("CheckDataType", "No connection to grid");
1695 isCollection = IsCollection(lfn);
1696 TString msg = "\n##### file: ";
1699 msg += " type: raw_collection;";
1700 // special treatment for collections
1702 // check for tag files in the collection
1703 TGridResult *res = gGrid->Command(Form("listFilesFromCollection -z -v %s",lfn), kFALSE);
1705 msg += " using_tags: No (unknown)";
1706 Info("CheckDataType", "%s", msg.Data());
1709 const char* typeStr = res->GetKey(0, "origLFN");
1710 if (!typeStr || !strlen(typeStr)) {
1711 msg += " using_tags: No (unknown)";
1712 Info("CheckDataType", "%s", msg.Data());
1715 TString file = typeStr;
1716 useTags = file.Contains(".tag");
1717 if (useTags) msg += " using_tags: Yes";
1718 else msg += " using_tags: No";
1719 Info("CheckDataType", "%s", msg.Data());
1724 isXml = slfn.Contains(".xml");
1726 // Open xml collection and check if there are tag files inside
1727 msg += " type: xml_collection;";
1728 TGridCollection *coll = (TGridCollection*)gROOT->ProcessLine(Form("TAlienCollection::Open(\"alien://%s\",1);",lfn));
1730 msg += " using_tags: No (unknown)";
1731 Info("CheckDataType", "%s", msg.Data());
1734 TMap *map = coll->Next();
1736 msg += " using_tags: No (unknown)";
1737 Info("CheckDataType", "%s", msg.Data());
1740 map = (TMap*)map->GetValue("");
1742 if (map && map->GetValue("name")) file = map->GetValue("name")->GetName();
1743 useTags = file.Contains(".tag");
1745 if (useTags) msg += " using_tags: Yes";
1746 else msg += " using_tags: No";
1747 Info("CheckDataType", "%s", msg.Data());
1750 useTags = slfn.Contains(".tag");
1751 if (slfn.Contains(".root")) msg += " type: root file;";
1752 else msg += " type: unknown file;";
1753 if (useTags) msg += " using_tags: Yes";
1754 else msg += " using_tags: No";
1755 Info("CheckDataType", "%s", msg.Data());
1758 //______________________________________________________________________________
1759 void AliAnalysisAlien::EnablePackage(const char *package)
1761 // Enables a par file supposed to exist in the current directory.
1762 TString pkg(package);
1763 pkg.ReplaceAll(".par", "");
1765 if (gSystem->AccessPathName(pkg)) {
1766 Fatal("EnablePackage", "Package %s not found", pkg.Data());
1769 if (!TObject::TestBit(AliAnalysisGrid::kUsePars))
1770 Info("EnablePackage", "AliEn plugin will use .par packages");
1771 TObject::SetBit(AliAnalysisGrid::kUsePars, kTRUE);
1773 fPackages = new TObjArray();
1774 fPackages->SetOwner();
1776 fPackages->Add(new TObjString(pkg));
1779 //______________________________________________________________________________
1780 TChain *AliAnalysisAlien::GetChainForTestMode(const char *treeName) const
1782 // Make a tree from files having the location specified in fFileForTestMode.
1783 // Inspired from JF's CreateESDChain.
1784 if (fFileForTestMode.IsNull()) {
1785 Error("GetChainForTestMode", "For proof test mode please use SetFileForTestMode() pointing to a file that contains data file locations.");
1788 if (gSystem->AccessPathName(fFileForTestMode)) {
1789 Error("GetChainForTestMode", "File not found: %s", fFileForTestMode.Data());
1794 in.open(fFileForTestMode);
1796 // Read the input list of files and add them to the chain
1798 TChain *chain = new TChain(treeName);
1802 if (line.IsNull()) continue;
1803 if (count++ == fNtestFiles) break;
1804 TString esdFile(line);
1805 TFile *file = TFile::Open(esdFile);
1807 if (!file->IsZombie()) chain->Add(esdFile);
1810 Error("GetChainforTestMode", "Skipping un-openable file: %s", esdFile.Data());
1814 if (!chain->GetListOfFiles()->GetEntries()) {
1815 Error("GetChainForTestMode", "No file from %s could be opened", fFileForTestMode.Data());
1823 //______________________________________________________________________________
1824 const char *AliAnalysisAlien::GetJobStatus(Int_t jobidstart, Int_t lastid, Int_t &nrunning, Int_t &nwaiting, Int_t &nerror, Int_t &ndone)
1826 // Get job status for all jobs with jobid>jobidstart.
1827 static char mstatus[20];
1833 TGridJobStatusList *list = gGrid->Ps("");
1834 if (!list) return mstatus;
1835 Int_t nentries = list->GetSize();
1836 TGridJobStatus *status;
1838 for (Int_t ijob=0; ijob<nentries; ijob++) {
1839 status = (TGridJobStatus *)list->At(ijob);
1840 pid = gROOT->ProcessLine(Form("atoi(((TAlienJobStatus*)%p)->GetKey(\"queueId\"));", status));
1841 if (pid<jobidstart) continue;
1842 if (pid == lastid) {
1843 gROOT->ProcessLine(Form("sprintf((char*)%p,((TAlienJobStatus*)%p)->GetKey(\"status\"));",mstatus, status));
1845 switch (status->GetStatus()) {
1846 case TGridJobStatus::kWAITING:
1848 case TGridJobStatus::kRUNNING:
1850 case TGridJobStatus::kABORTED:
1851 case TGridJobStatus::kFAIL:
1852 case TGridJobStatus::kUNKNOWN:
1854 case TGridJobStatus::kDONE:
1863 //______________________________________________________________________________
1864 Bool_t AliAnalysisAlien::IsCollection(const char *lfn) const
1866 // Returns true if file is a collection. Functionality duplicated from
1867 // TAlien::Type() because we don't want to directly depend on TAlien.
1869 Error("IsCollection", "No connection to grid");
1872 TGridResult *res = gGrid->Command(Form("type -z %s",lfn),kFALSE);
1873 if (!res) return kFALSE;
1874 const char* typeStr = res->GetKey(0, "type");
1875 if (!typeStr || !strlen(typeStr)) return kFALSE;
1876 if (!strcmp(typeStr, "collection")) return kTRUE;
1881 //______________________________________________________________________________
1882 Bool_t AliAnalysisAlien::IsSingleOutput() const
1884 // Check if single-ouput option is on.
1885 return (!fOutputSingle.IsNull());
1888 //______________________________________________________________________________
1889 void AliAnalysisAlien::Print(Option_t *) const
1891 // Print current plugin settings.
1892 printf("### AliEn analysis plugin current settings ###\n");
1893 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
1894 if (mgr && mgr->IsProofMode()) {
1895 TString proofType = "= PLUGIN IN PROOF MODE ON CLUSTER:_________________";
1896 if (TestBit(AliAnalysisGrid::kTest))
1897 proofType = "= PLUGIN IN PROOF LITE MODE ON CLUSTER:____________";
1898 printf("%s %s\n", proofType.Data(), fProofCluster.Data());
1899 if (!fProofDataSet.IsNull())
1900 printf("= Requested data set:___________________________ %s\n", fProofDataSet.Data());
1902 printf("= Soft reset signal will be send to master______ CHANGE BEHAVIOR AFTER COMPLETION\n");
1904 printf("= Hard reset signal will be send to master______ CHANGE BEHAVIOR AFTER COMPLETION\n");
1905 if (!fRootVersionForProof.IsNull())
1906 printf("= ROOT version requested________________________ %s\n", fRootVersionForProof.Data());
1908 printf("= ROOT version requested________________________ default\n");
1909 printf("= AliRoot version requested_____________________ %s\n", fAliROOTVersion.Data());
1910 if (!fAliRootMode.IsNull())
1911 printf("= Requested AliRoot mode________________________ %s\n", fAliRootMode.Data());
1913 printf("= Number of PROOF workers limited to____________ %d\n", fNproofWorkers);
1914 if (fNproofWorkersPerSlave)
1915 printf("= Maximum number of workers per slave___________ %d\n", fNproofWorkersPerSlave);
1916 if (TestSpecialBit(kClearPackages))
1917 printf("= ClearPackages requested...\n");
1918 if (fIncludePath.Data())
1919 printf("= Include path for runtime task compilation: ___ %s\n", fIncludePath.Data());
1920 printf("= Additional libs to be loaded or souces to be compiled runtime: <%s>\n",fAdditionalLibs.Data());
1921 if (fPackages && fPackages->GetEntries()) {
1922 TIter next(fPackages);
1925 while ((obj=next())) list += obj->GetName();
1926 printf("= Par files to be used: ________________________ %s\n", list.Data());
1928 if (TestSpecialBit(kProofConnectGrid))
1929 printf("= Requested PROOF connection to grid\n");
1932 printf("= OverwriteMode:________________________________ %d\n", fOverwriteMode);
1933 if (fOverwriteMode) {
1934 printf("***** NOTE: Overwrite mode will overwrite the input generated datasets and partial results from previous analysis. \
1935 \n***** To disable, use: plugin->SetOverwriteMode(kFALSE);\n");
1937 printf("= Copy files to grid: __________________________ %s\n", (IsUseCopy())?"YES":"NO");
1938 printf("= Check if files can be copied to grid: ________ %s\n", (IsCheckCopy())?"YES":"NO");
1939 printf("= Production mode:______________________________ %d\n", fProductionMode);
1940 printf("= Version of API requested: ____________________ %s\n", fAPIVersion.Data());
1941 printf("= Version of ROOT requested: ___________________ %s\n", fROOTVersion.Data());
1942 printf("= Version of AliRoot requested: ________________ %s\n", fAliROOTVersion.Data());
1944 printf("= User running the plugin: _____________________ %s\n", fUser.Data());
1945 printf("= Grid workdir relative to user $HOME: _________ %s\n", fGridWorkingDir.Data());
1946 printf("= Grid output directory relative to workdir: ___ %s\n", fGridOutputDir.Data());
1947 printf("= Data base directory path requested: __________ %s\n", fGridDataDir.Data());
1948 printf("= Data search pattern: _________________________ %s\n", fDataPattern.Data());
1949 printf("= Input data format: ___________________________ %s\n", fInputFormat.Data());
1950 if (fRunNumbers.Length())
1951 printf("= Run numbers to be processed: _________________ %s\n", fRunNumbers.Data());
1953 printf("= Run range to be processed: ___________________ %d-%d\n", fRunRange[0], fRunRange[1]);
1954 if (!fRunRange[0] && !fRunNumbers.Length()) {
1955 TIter next(fInputFiles);
1958 while ((obj=next())) list += obj->GetName();
1959 printf("= Input files to be processed: _________________ %s\n", list.Data());
1961 if (TestBit(AliAnalysisGrid::kTest))
1962 printf("= Number of input files used in test mode: _____ %d\n", fNtestFiles);
1963 printf("= List of output files to be registered: _______ %s\n", fOutputFiles.Data());
1964 printf("= List of outputs going to be archived: ________ %s\n", fOutputArchive.Data());
1965 printf("= List of outputs that should not be merged: ___ %s\n", fMergeExcludes.Data());
1966 printf("= List of outputs produced during Terminate: ___ %s\n", fTerminateFiles.Data());
1967 printf("=====================================================================\n");
1968 printf("= Job price: ___________________________________ %d\n", fPrice);
1969 printf("= Time to live (TTL): __________________________ %d\n", fTTL);
1970 printf("= Max files per subjob: ________________________ %d\n", fSplitMaxInputFileNumber);
1971 if (fMaxInitFailed>0)
1972 printf("= Max number of subjob fails to kill: __________ %d\n", fMaxInitFailed);
1973 if (fMasterResubmitThreshold>0)
1974 printf("= Resubmit master job if failed subjobs >_______ %d\n", fMasterResubmitThreshold);
1975 printf("= Number of replicas for the output files_______ %d\n", fNreplicas);
1976 if (fNrunsPerMaster>0)
1977 printf("= Number of runs per master job: _______________ %d\n", fNrunsPerMaster);
1978 printf("= Number of files in one chunk to be merged: ___ %d\n", fMaxMergeFiles);
1979 printf("= Name of the generated execution script: ______ %s\n", fExecutable.Data());
1980 printf("= Executable command: __________________________ %s\n", fExecutableCommand.Data());
1981 if (fArguments.Length())
1982 printf("= Arguments for the execution script: __________ %s\n",fArguments.Data());
1983 if (fExecutableArgs.Length())
1984 printf("= Arguments after macro name in executable______ %s\n",fExecutableArgs.Data());
1985 printf("= Name of the generated analysis macro: ________ %s\n",fAnalysisMacro.Data());
1986 printf("= User analysis files to be deployed: __________ %s\n",fAnalysisSource.Data());
1987 printf("= Additional libs to be loaded or souces to be compiled runtime: <%s>\n",fAdditionalLibs.Data());
1988 printf("= Master jobs split mode: ______________________ %s\n",fSplitMode.Data());
1990 printf("= Custom name for the dataset to be created: ___ %s\n", fDatasetName.Data());
1991 printf("= Name of the generated JDL: ___________________ %s\n", fJDLName.Data());
1992 if (fIncludePath.Data())
1993 printf("= Include path for runtime task compilation: ___ %s\n", fIncludePath.Data());
1994 if (fCloseSE.Length())
1995 printf("= Force job outputs to storage element: ________ %s\n", fCloseSE.Data());
1996 if (fFriendChainName.Length())
1997 printf("= Open friend chain file on worker: ____________ %s\n", fFriendChainName.Data());
1998 if (fPackages && fPackages->GetEntries()) {
1999 TIter next(fPackages);
2002 while ((obj=next())) list += obj->GetName();
2003 printf("= Par files to be used: ________________________ %s\n", list.Data());
2007 //______________________________________________________________________________
2008 void AliAnalysisAlien::SetDefaults()
2010 // Set default values for everything. What cannot be filled will be left empty.
2011 if (fGridJDL) delete fGridJDL;
2012 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
2013 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
2016 fSplitMaxInputFileNumber = 100;
2018 fMasterResubmitThreshold = 0;
2024 fNrunsPerMaster = 1;
2025 fMaxMergeFiles = 100;
2027 fExecutable = "analysis.sh";
2028 fExecutableCommand = "root -b -q";
2030 fExecutableArgs = "";
2031 fAnalysisMacro = "myAnalysis.C";
2032 fAnalysisSource = "";
2033 fAdditionalLibs = "";
2037 fAliROOTVersion = "";
2038 fUser = ""; // Your alien user name
2039 fGridWorkingDir = "";
2040 fGridDataDir = ""; // Can be like: /alice/sim/PDC_08a/LHC08c9/
2041 fDataPattern = "*AliESDs.root"; // Can be like: *AliESDs.root, */pass1/*AliESDs.root, ...
2042 fFriendChainName = "";
2043 fGridOutputDir = "output";
2044 fOutputArchive = "log_archive.zip:std*@disk=1 root_archive.zip:*.root@disk=2";
2045 fOutputFiles = ""; // Like "AliAODs.root histos.root"
2046 fInputFormat = "xml-single";
2047 fJDLName = "analysis.jdl";
2048 fJobTag = "Automatically generated analysis JDL";
2049 fMergeExcludes = "";
2052 SetCheckCopy(kTRUE);
2053 SetDefaultOutputs(kTRUE);
2057 //______________________________________________________________________________
2058 Bool_t AliAnalysisAlien::CheckMergedFiles(const char *filename, const char *aliendir, Int_t nperchunk, const char *jdl)
2060 // Checks current merge stage, makes xml for the next stage, counts number of files, submits next stage.
2061 // First check if the result is already in the output directory.
2062 if (FileExists(Form("%s/%s",aliendir,filename))) {
2063 printf("Final merged results found. Not merging again.\n");
2066 // Now check the last stage done.
2069 if (!FileExists(Form("%s/Stage_%d.xml",aliendir, stage+1))) break;
2072 // Next stage of merging
2074 TString pattern = "*root_archive.zip";
2075 if (stage>1) pattern = Form("Stage_%d/*root_archive.zip", stage-1);
2076 TGridResult *res = gGrid->Command(Form("find -x Stage_%d %s %s", stage, aliendir, pattern.Data()));
2077 if (res) delete res;
2078 // Write standard output to file
2079 gROOT->ProcessLine(Form("gGrid->Stdout(); > %s", Form("Stage_%d.xml",stage)));
2080 // Count the number of files inside
2082 ifile.open(Form("Stage_%d.xml",stage));
2083 if (!ifile.good()) {
2084 ::Error("CheckMergedFiles", "Could not redirect result of the find command to file %s", Form("Stage_%d.xml",stage));
2089 while (!ifile.eof()) {
2091 if (line.Contains("/event")) nfiles++;
2095 ::Error("CheckMergedFiles", "Cannot start Stage_%d merging since Stage_%d did not produced yet output", stage, stage-1);
2098 printf("=== Stage_%d produced %d files\n", stage-1, nfiles);
2100 // Copy the file in the output directory
2101 printf("===> Copying collection %s in the output directory %s\n", Form("Stage_%d.xml",stage), aliendir);
2102 TFile::Cp(Form("Stage_%d.xml",stage), Form("alien://%s/Stage_%d.xml",aliendir,stage));
2103 // Check if this is the last stage to be done.
2104 Bool_t laststage = (nfiles<nperchunk);
2105 if (fMaxMergeStages && stage>=fMaxMergeStages) laststage = kTRUE;
2107 printf("### Submiting final merging stage %d\n", stage);
2108 TString finalJDL = jdl;
2109 finalJDL.ReplaceAll(".jdl", "_final.jdl");
2110 TString query = Form("submit %s %s %d", finalJDL.Data(), aliendir, stage);
2111 Int_t jobId = SubmitSingleJob(query);
2112 if (!jobId) return kFALSE;
2114 printf("### Submiting merging stage %d\n", stage);
2115 TString query = Form("submit %s %s %d", jdl, aliendir, stage);
2116 Int_t jobId = SubmitSingleJob(query);
2117 if (!jobId) return kFALSE;
2122 //______________________________________________________________________________
2123 AliAnalysisManager *AliAnalysisAlien::LoadAnalysisManager(const char *fname)
2125 // Loat the analysis manager from a file.
2126 TFile *file = TFile::Open(fname);
2128 ::Error("LoadAnalysisManager", "Cannot open file %s", fname);
2131 TIter nextkey(file->GetListOfKeys());
2132 AliAnalysisManager *mgr = 0;
2134 while ((key=(TKey*)nextkey())) {
2135 if (!strcmp(key->GetClassName(), "AliAnalysisManager"))
2136 mgr = (AliAnalysisManager*)file->Get(key->GetName());
2139 ::Error("LoadAnalysisManager", "No analysis manager found in file %s", fname);
2143 //______________________________________________________________________________
2144 Int_t AliAnalysisAlien::SubmitSingleJob(const char *query)
2146 // Submits a single job corresponding to the query and returns job id. If 0 submission failed.
2147 if (!gGrid) return 0;
2148 printf("=> %s ------> ",query);
2149 TGridResult *res = gGrid->Command(query);
2151 TString jobId = res->GetKey(0,"jobId");
2153 if (jobId.IsNull()) {
2154 printf("submission failed. Reason:\n");
2157 ::Error("SubmitSingleJob", "Your query %s could not be submitted", query);
2160 printf(" Job id: %s\n", jobId.Data());
2164 //______________________________________________________________________________
2165 Bool_t AliAnalysisAlien::MergeOutput(const char *output, const char *basedir, Int_t nmaxmerge, Int_t stage)
2167 // Merge given output files from basedir. Basedir can be an alien output directory
2168 // but also an xml file with root_archive.zip locations. The file merger will merge nmaxmerge
2169 // files in a group (ignored for xml input). Merging can be done in stages:
2170 // stage=0 : will merge all existing files in a single stage, supporting resume if run locally
2171 // stage=1 : works with an xml of all root_archive.zip in the output directory
2172 // stage>1 : works with an xml of all root_archive.zip in the Stage_<n-1> directory
2173 TString outputFile = output;
2175 TString outputChunk;
2176 TString previousChunk = "";
2177 TObjArray *listoffiles = new TObjArray();
2178 // listoffiles->SetOwner();
2179 Int_t countChunk = 0;
2180 Int_t countZero = nmaxmerge;
2181 Bool_t merged = kTRUE;
2182 Int_t index = outputFile.Index("@");
2183 if (index > 0) outputFile.Remove(index);
2184 TString inputFile = outputFile;
2185 TString sbasedir = basedir;
2186 if (sbasedir.Contains(".xml")) {
2187 // Merge files pointed by the xml - ignore nmaxmerge and set ichunk to 0
2188 nmaxmerge = 9999999;
2189 TGridCollection *coll = (TGridCollection*)gROOT->ProcessLine(Form("TAlienCollection::Open(\"%s\");", basedir));
2191 ::Error("MergeOutput", "Input XML collection empty.");
2194 // Iterate grid collection
2195 while (coll->Next()) {
2196 TString fname = gSystem->DirName(coll->GetTURL());
2199 listoffiles->Add(new TNamed(fname.Data(),""));
2202 command = Form("find %s/ *%s", basedir, inputFile.Data());
2203 printf("command: %s\n", command.Data());
2204 TGridResult *res = gGrid->Command(command);
2206 ::Error("MergeOutput","No result for the find command\n");
2212 while ((map=(TMap*)nextmap())) {
2213 TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("turl"));
2214 if (!objs || !objs->GetString().Length()) {
2215 // Nothing found - skip this output
2220 listoffiles->Add(new TNamed(objs->GetName(),""));
2224 if (!listoffiles->GetEntries()) {
2225 ::Error("MergeOutput","No result for the find command\n");
2230 TFileMerger *fm = 0;
2231 TIter next0(listoffiles);
2232 TObjArray *listoffilestmp = new TObjArray();
2233 listoffilestmp->SetOwner();
2236 // Keep only the files at upper level
2237 Int_t countChar = 0;
2238 while ((nextfile=next0())) {
2239 snextfile = nextfile->GetName();
2240 Int_t crtCount = snextfile.CountChar('/');
2241 if (nextfile == listoffiles->First()) countChar = crtCount;
2242 if (crtCount < countChar) countChar = crtCount;
2245 while ((nextfile=next0())) {
2246 snextfile = nextfile->GetName();
2247 Int_t crtCount = snextfile.CountChar('/');
2248 if (crtCount > countChar) {
2252 listoffilestmp->Add(nextfile);
2255 listoffiles = listoffilestmp; // Now contains 'good' files
2256 listoffiles->Print();
2257 TIter next(listoffiles);
2258 // Check if there is a merge operation to resume. Works only for stage 0 or 1.
2259 outputChunk = outputFile;
2260 outputChunk.ReplaceAll(".root", "_*.root");
2261 // Check for existent temporary merge files
2262 // Check overwrite mode and remove previous partial results if needed
2263 // Preserve old merging functionality for stage 0.
2265 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
2267 // Skip as many input files as in a chunk
2268 for (Int_t counter=0; counter<nmaxmerge; counter++) {
2271 ::Error("MergeOutput", "Mismatch found. Please remove partial merged files from local dir.");
2275 snextfile = nextfile->GetName();
2277 outputChunk = outputFile;
2278 outputChunk.ReplaceAll(".root", Form("_%04d.root", countChunk));
2280 if (gSystem->AccessPathName(outputChunk)) continue;
2281 // Merged file with chunks up to <countChunk> found
2282 ::Info("MergeOutput", "Resume merging of <%s> from <%s>\n", outputFile.Data(), outputChunk.Data());
2283 previousChunk = outputChunk;
2287 countZero = nmaxmerge;
2289 while ((nextfile=next())) {
2290 snextfile = nextfile->GetName();
2291 // Loop 'find' results and get next LFN
2292 if (countZero == nmaxmerge) {
2293 // First file in chunk - create file merger and add previous chunk if any.
2294 fm = new TFileMerger(kFALSE);
2295 fm->SetFastMethod(kTRUE);
2296 if (previousChunk.Length()) fm->AddFile(previousChunk.Data());
2297 outputChunk = outputFile;
2298 outputChunk.ReplaceAll(".root", Form("_%04d.root", countChunk));
2300 // If last file found, put merged results in the output file
2301 if (nextfile == listoffiles->Last()) outputChunk = outputFile;
2302 // Add file to be merged and decrement chunk counter.
2303 fm->AddFile(snextfile);
2305 if (countZero==0 || nextfile == listoffiles->Last()) {
2306 if (!fm->GetMergeList() || !fm->GetMergeList()->GetSize()) {
2307 // Nothing found - skip this output
2308 ::Warning("MergeOutput", "No <%s> files found.", inputFile.Data());
2312 fm->OutputFile(outputChunk);
2313 // Merge the outputs, then go to next chunk
2315 ::Error("MergeOutput", "Could not merge all <%s> files", outputFile.Data());
2319 ::Info("MergeOutputs", "\n##### Merged %d output files to <%s>", fm->GetMergeList()->GetSize(), outputChunk.Data());
2320 gSystem->Unlink(previousChunk);
2322 if (nextfile == listoffiles->Last()) break;
2324 countZero = nmaxmerge;
2325 previousChunk = outputChunk;
2332 // Merging stage different than 0.
2333 // Move to the begining of the requested chunk.
2334 fm = new TFileMerger(kFALSE);
2335 fm->SetFastMethod(kTRUE);
2336 while ((nextfile=next())) fm->AddFile(nextfile->GetName());
2338 if (!fm->GetMergeList() || !fm->GetMergeList()->GetSize()) {
2339 // Nothing found - skip this output
2340 ::Warning("MergeOutput", "No <%s> files found.", inputFile.Data());
2344 fm->OutputFile(outputFile);
2345 // Merge the outputs
2347 ::Error("MergeOutput", "Could not merge all <%s> files", outputFile.Data());
2351 ::Info("MergeOutput", "\n##### Merged %d output files to <%s>", fm->GetMergeList()->GetSize(), outputFile.Data());
2357 //______________________________________________________________________________
2358 Bool_t AliAnalysisAlien::MergeOutputs()
2360 // Merge analysis outputs existing in the AliEn space.
2361 if (TestBit(AliAnalysisGrid::kTest)) return kTRUE;
2362 if (TestBit(AliAnalysisGrid::kOffline)) return kFALSE;
2364 Error("MergeOutputs", "Cannot merge outputs without grid connection. Terminate will NOT be executed");
2368 if (!TestBit(AliAnalysisGrid::kMerge)) {
2369 Info("MergeOutputs", "### Re-run with <MergeViaJDL> option in terminate mode of the plugin to submit merging jobs ###");
2372 if (fProductionMode) {
2373 Info("MergeOutputs", "### Merging will be submitted by LPM manager... ###");
2376 Info("MergeOutputs", "Submitting merging JDL");
2377 if (!SubmitMerging()) return kFALSE;
2378 Info("MergeOutputs", "### Re-run with <MergeViaJDL> off to collect results after merging jobs are done ###");
2379 Info("MergeOutputs", "### The Terminate() method is executed by the merging jobs");
2382 // Get the output path
2383 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
2384 if (!DirectoryExists(fGridOutputDir)) {
2385 Error("MergeOutputs", "Grid output directory %s not found. Terminate() will NOT be executed", fGridOutputDir.Data());
2388 if (!fOutputFiles.Length()) {
2389 Error("MergeOutputs", "No output file names defined. Are you running the right AliAnalysisAlien configuration ?");
2392 // Check if fast read option was requested
2393 Info("MergeOutputs", "Started local merging of output files from: alien://%s \
2394 \n======= overwrite mode = %d", fGridOutputDir.Data(), (Int_t)fOverwriteMode);
2395 if (fFastReadOption) {
2396 Warning("MergeOutputs", "You requested FastRead option. Using xrootd flags to reduce timeouts. This may skip some files that could be accessed ! \
2397 \n+++ NOTE: To disable this option, use: plugin->SetFastReadOption(kFALSE)");
2398 gEnv->SetValue("XNet.ConnectTimeout",50);
2399 gEnv->SetValue("XNet.RequestTimeout",50);
2400 gEnv->SetValue("XNet.MaxRedirectCount",2);
2401 gEnv->SetValue("XNet.ReconnectTimeout",50);
2402 gEnv->SetValue("XNet.FirstConnectMaxCnt",1);
2404 // Make sure we change the temporary directory
2405 gSystem->Setenv("TMPDIR", gSystem->pwd());
2406 // Set temporary compilation directory to current one
2407 gSystem->SetBuildDir(gSystem->pwd(), kTRUE);
2408 TObjArray *list = fOutputFiles.Tokenize(",");
2412 Bool_t merged = kTRUE;
2413 while((str=(TObjString*)next())) {
2414 outputFile = str->GetString();
2415 Int_t index = outputFile.Index("@");
2416 if (index > 0) outputFile.Remove(index);
2417 TString outputChunk = outputFile;
2418 outputChunk.ReplaceAll(".root", "_*.root");
2419 // Skip already merged outputs
2420 if (!gSystem->AccessPathName(outputFile)) {
2421 if (fOverwriteMode) {
2422 Info("MergeOutputs", "Overwrite mode. Existing file %s was deleted.", outputFile.Data());
2423 gSystem->Unlink(outputFile);
2424 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
2425 Info("MergeOutput", "Overwrite mode: partial merged files %s will removed",
2426 outputChunk.Data());
2427 gSystem->Exec(Form("rm -f %s", outputChunk.Data()));
2430 Info("MergeOutputs", "Output file <%s> found. Not merging again.", outputFile.Data());
2434 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
2435 Info("MergeOutput", "Overwrite mode: partial merged files %s will removed",
2436 outputChunk.Data());
2437 gSystem->Exec(Form("rm -f %s", outputChunk.Data()));
2440 if (fMergeExcludes.Length() &&
2441 fMergeExcludes.Contains(outputFile.Data())) continue;
2442 // Perform a 'find' command in the output directory, looking for registered outputs
2443 merged = MergeOutput(outputFile, fGridOutputDir, fMaxMergeFiles);
2445 Error("MergeOutputs", "Terminate() will NOT be executed");
2448 TFile *fileOpened = (TFile*)gROOT->GetListOfFiles()->FindObject(outputFile);
2449 if (fileOpened) fileOpened->Close();
2454 //______________________________________________________________________________
2455 void AliAnalysisAlien::SetDefaultOutputs(Bool_t flag)
2457 // Use the output files connected to output containers from the analysis manager
2458 // rather than the files defined by SetOutputFiles
2459 if (flag && !TObject::TestBit(AliAnalysisGrid::kDefaultOutputs))
2460 Info("SetDefaultOutputs", "Plugin will use the output files taken from analysis manager");
2461 TObject::SetBit(AliAnalysisGrid::kDefaultOutputs, flag);
2464 //______________________________________________________________________________
2465 void AliAnalysisAlien::SetOutputFiles(const char *list)
2467 // Manually set the output files list.
2468 // Removes duplicates. Not allowed if default outputs are not disabled.
2469 if (TObject::TestBit(AliAnalysisGrid::kDefaultOutputs)) {
2470 Fatal("SetOutputFiles", "You have to explicitly call SetDefaultOutputs(kFALSE) to manually set output files.");
2473 Info("SetOutputFiles", "Output file list is set manually - you are on your own.");
2475 TString slist = list;
2476 if (slist.Contains("@")) Warning("SetOutputFiles","The plugin does not allow explicit SE's. Please use: SetNumberOfReplicas() instead.");
2477 TObjArray *arr = slist.Tokenize(" ");
2481 while ((os=(TObjString*)next())) {
2482 sout = os->GetString();
2483 if (sout.Index("@")>0) sout.Remove(sout.Index("@"));
2484 if (fOutputFiles.Contains(sout)) continue;
2485 if (!fOutputFiles.IsNull()) fOutputFiles += ",";
2486 fOutputFiles += sout;
2491 //______________________________________________________________________________
2492 void AliAnalysisAlien::SetOutputArchive(const char *list)
2494 // Manually set the output archive list. Free text - you are on your own...
2495 // Not allowed if default outputs are not disabled.
2496 if (TObject::TestBit(AliAnalysisGrid::kDefaultOutputs)) {
2497 Fatal("SetOutputArchive", "You have to explicitly call SetDefaultOutputs(kFALSE) to manually set the output archives.");
2500 Info("SetOutputArchive", "Output archive is set manually - you are on your own.");
2501 fOutputArchive = list;
2504 //______________________________________________________________________________
2505 void AliAnalysisAlien::SetPreferedSE(const char */*se*/)
2507 // Setting a prefered output SE is not allowed anymore.
2508 Warning("SetPreferedSE", "Setting a preferential SE is not allowed anymore via the plugin. Use SetNumberOfReplicas() and SetDefaultOutputs()");
2511 //______________________________________________________________________________
2512 void AliAnalysisAlien::SetProofParameter(const char *pname, const char *value)
2514 // Set some PROOF special parameter.
2515 TPair *pair = dynamic_cast<TPair*>(fProofParam.FindObject(pname));
2517 TObject *old = pair->Key();
2518 TObject *val = pair->Value();
2519 fProofParam.Remove(old);
2523 fProofParam.Add(new TObjString(pname), new TObjString(value));
2526 //______________________________________________________________________________
2527 const char *AliAnalysisAlien::GetProofParameter(const char *pname) const
2529 // Returns a special PROOF parameter.
2530 TPair *pair = dynamic_cast<TPair*>(fProofParam.FindObject(pname));
2531 if (!pair) return 0;
2532 return pair->Value()->GetName();
2535 //______________________________________________________________________________
2536 Bool_t AliAnalysisAlien::StartAnalysis(Long64_t /*nentries*/, Long64_t /*firstEntry*/)
2538 // Start remote grid analysis.
2539 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
2540 Bool_t testMode = TestBit(AliAnalysisGrid::kTest);
2541 if (!mgr || !mgr->IsInitialized()) {
2542 Error("StartAnalysis", "You need an initialized analysis manager for this");
2545 // Are we in PROOF mode ?
2546 if (mgr->IsProofMode()) {
2547 if (testMode) Info("StartAnalysis", "##### Starting PROOF analysis with Proof Lite via the plugin #####");
2548 else Info("StartAnalysis", "##### Starting PROOF analysis on cluster <%s> via the plugin #####", fProofCluster.Data());
2549 if (fProofCluster.IsNull()) {
2550 Error("StartAnalysis", "You need to specify the proof cluster name via SetProofCluster");
2553 if (fProofDataSet.IsNull() && !testMode) {
2554 Error("StartAnalysis", "You need to specify a dataset using SetProofDataSet()");
2557 // Set the needed environment
2558 gEnv->SetValue("XSec.GSI.DelegProxy","2");
2559 // Do we need to reset PROOF ? The success of the Reset operation cannot be checked
2560 if (fProofReset && !testMode) {
2561 if (fProofReset==1) {
2562 Info("StartAnalysis", "Sending soft reset signal to proof cluster %s", fProofCluster.Data());
2563 gROOT->ProcessLine(Form("TProof::Reset(\"%s\", kFALSE);", fProofCluster.Data()));
2565 Info("StartAnalysis", "Sending hard reset signal to proof cluster %s", fProofCluster.Data());
2566 gROOT->ProcessLine(Form("TProof::Reset(\"%s\", kTRUE);", fProofCluster.Data()));
2568 Info("StartAnalysis", "Stopping the analysis. Please use SetProofReset(0) to resume.");
2573 // Check if there is an old active session
2574 Long_t nsessions = gROOT->ProcessLine(Form("TProof::Mgr(\"%s\")->QuerySessions(\"\")->GetEntries();", fProofCluster.Data()));
2576 Error("StartAnalysis","You have to reset your old session first\n");
2580 // Do we need to change the ROOT version ? The success of this cannot be checked.
2581 if (!fRootVersionForProof.IsNull() && !testMode) {
2582 gROOT->ProcessLine(Form("TProof::Mgr(\"%s\")->SetROOTVersion(\"%s\");",
2583 fProofCluster.Data(), fRootVersionForProof.Data()));
2585 // Connect to PROOF and check the status
2588 if (fNproofWorkersPerSlave) sworkers = Form("workers=%dx", fNproofWorkersPerSlave);
2589 else if (fNproofWorkers) sworkers = Form("workers=%d", fNproofWorkers);
2591 if (!sworkers.IsNull())
2592 proof = gROOT->ProcessLine(Form("TProof::Open(\"%s\", \"%s\");", fProofCluster.Data(), sworkers.Data()));
2594 proof = gROOT->ProcessLine(Form("TProof::Open(\"%s\");", fProofCluster.Data()));
2596 proof = gROOT->ProcessLine("TProof::Open(\"\");");
2598 Error("StartAnalysis", "Could not start PROOF in test mode");
2603 Error("StartAnalysis", "Could not connect to PROOF cluster <%s>", fProofCluster.Data());
2606 if (fNproofWorkersPerSlave*fNproofWorkers > 0)
2607 gROOT->ProcessLine(Form("gProof->SetParallel(%d);", fNproofWorkers));
2608 // Set proof special parameters if any
2609 TIter nextpp(&fProofParam);
2610 TObject *proofparam;
2611 while ((proofparam=nextpp())) {
2612 TString svalue = GetProofParameter(proofparam->GetName());
2613 gROOT->ProcessLine(Form("gProof->SetParameter(\"%s\",%s);", proofparam->GetName(), svalue.Data()));
2615 // Is dataset existing ?
2617 TString dataset = fProofDataSet;
2618 Int_t index = dataset.Index("#");
2619 if (index>=0) dataset.Remove(index);
2620 // if (!gROOT->ProcessLine(Form("gProof->ExistsDataSet(\"%s\");",fProofDataSet.Data()))) {
2621 // Error("StartAnalysis", "Dataset %s not existing", fProofDataSet.Data());
2624 // Info("StartAnalysis", "Dataset %s found", dataset.Data());
2626 // Is ClearPackages() needed ?
2627 if (TestSpecialBit(kClearPackages)) {
2628 Info("StartAnalysis", "ClearPackages signal sent to PROOF. Use SetClearPackages(kFALSE) to reset this.");
2629 gROOT->ProcessLine("gProof->ClearPackages();");
2631 // Is a given aliroot mode requested ?
2634 if (!fAliRootMode.IsNull()) {
2635 TString alirootMode = fAliRootMode;
2636 if (alirootMode == "default") alirootMode = "";
2637 Info("StartAnalysis", "You are requesting AliRoot mode: %s", fAliRootMode.Data());
2638 optionsList.SetOwner();
2639 optionsList.Add(new TNamed("ALIROOT_MODE", alirootMode.Data()));
2640 // Check the additional libs to be loaded
2642 Bool_t parMode = kFALSE;
2643 if (!alirootMode.IsNull()) extraLibs = "ANALYSIS:OADB:ANALYSISalice";
2644 // Parse the extra libs for .so
2645 if (fAdditionalLibs.Length()) {
2646 TObjArray *list = fAdditionalLibs.Tokenize(" ");
2649 while((str=(TObjString*)next())) {
2650 if (str->GetString().Contains(".so")) {
2652 Warning("StartAnalysis", "Plugin does not support loading libs after par files in PROOF mode. Library %s and following will not load on workers", str->GetName());
2655 TString stmp = str->GetName();
2656 if (stmp.BeginsWith("lib")) stmp.Remove(0,3);
2657 stmp.ReplaceAll(".so","");
2658 if (!extraLibs.IsNull()) extraLibs += ":";
2662 if (str->GetString().Contains(".par")) {
2663 // The first par file found in the list will not allow any further .so
2665 if (!parLibs.IsNull()) parLibs += ":";
2666 parLibs += str->GetName();
2670 if (list) delete list;
2672 if (!extraLibs.IsNull()) {
2673 Info("StartAnalysis", "Adding extra libs: %s",extraLibs.Data());
2674 optionsList.Add(new TNamed("ALIROOT_EXTRA_LIBS",extraLibs.Data()));
2676 // Check extra includes
2677 if (!fIncludePath.IsNull()) {
2678 TString includePath = fIncludePath;
2679 includePath.ReplaceAll(" ",":");
2680 includePath.ReplaceAll("$ALICE_ROOT/","");
2681 includePath.ReplaceAll("${ALICE_ROOT}/","");
2682 includePath.ReplaceAll("-I","");
2683 includePath.Remove(TString::kTrailing, ':');
2684 Info("StartAnalysis", "Adding extra includes: %s",includePath.Data());
2685 optionsList.Add(new TNamed("ALIROOT_EXTRA_INCLUDES",includePath.Data()));
2687 // Check if connection to grid is requested
2688 if (TestSpecialBit(kProofConnectGrid))
2689 optionsList.Add(new TNamed("ALIROOT_ENABLE_ALIEN", "1"));
2690 // Enable AliRoot par
2692 // Enable proof lite package
2693 TString alirootLite = gSystem->ExpandPathName("$ALICE_ROOT/ANALYSIS/macros/AliRootProofLite.par");
2694 for (Int_t i=0; i<optionsList.GetSize(); i++) {
2695 TNamed *obj = (TNamed*)optionsList.At(i);
2696 printf("%s %s\n", obj->GetName(), obj->GetTitle());
2698 if (!gROOT->ProcessLine(Form("gProof->UploadPackage(\"%s\");",alirootLite.Data()))
2699 && !gROOT->ProcessLine(Form("gProof->EnablePackage(\"%s\", (TList*)%p);",alirootLite.Data(),&optionsList))) {
2700 Info("StartAnalysis", "AliRootProofLite enabled");
2702 Error("StartAnalysis", "There was an error trying to enable package AliRootProofLite.par");
2706 if ( ! fAliROOTVersion.IsNull() ) {
2707 if (gROOT->ProcessLine(Form("gProof->EnablePackage(\"VO_ALICE@AliRoot::%s\", (TList*)%p, kTRUE);",
2708 fAliROOTVersion.Data(), &optionsList))) {
2709 Error("StartAnalysis", "There was an error trying to enable package VO_ALICE@AliRoot::%s", fAliROOTVersion.Data());
2714 // Enable first par files from fAdditionalLibs
2715 if (!parLibs.IsNull()) {
2716 TObjArray *list = parLibs.Tokenize(":");
2718 TObjString *package;
2719 while((package=(TObjString*)next())) {
2720 TString spkg = package->GetName();
2721 spkg.ReplaceAll(".par", "");
2722 gSystem->Exec(TString::Format("rm -rf %s", spkg.Data()));
2723 if (!gROOT->ProcessLine(Form("gProof->UploadPackage(\"%s\");", package->GetName()))) {
2724 TString enablePackage = (testMode)?Form("gProof->EnablePackage(\"%s\",kFALSE);", package->GetName()):Form("gProof->EnablePackage(\"%s\",kTRUE);", package->GetName());
2725 if (gROOT->ProcessLine(enablePackage)) {
2726 Error("StartAnalysis", "There was an error trying to enable package %s", package->GetName());
2730 Error("StartAnalysis", "There was an error trying to upload package %s", package->GetName());
2734 if (list) delete list;
2737 if (fAdditionalLibs.Contains(".so") && !testMode) {
2738 Error("StartAnalysis", "You request additional libs to be loaded but did not enabled any AliRoot mode. Please refer to: \
2739 \n http://aaf.cern.ch/node/83 and use a parameter for SetAliRootMode()");
2743 // Enable par files if requested
2744 if (fPackages && fPackages->GetEntries()) {
2745 TIter next(fPackages);
2747 while ((package=next())) {
2748 // Skip packages already enabled
2749 if (parLibs.Contains(package->GetName())) continue;
2750 TString spkg = package->GetName();
2751 spkg.ReplaceAll(".par", "");
2752 gSystem->Exec(TString::Format("rm -rf %s", spkg.Data()));
2753 if (!gROOT->ProcessLine(Form("gProof->UploadPackage(\"%s\");", package->GetName()))) {
2754 if (gROOT->ProcessLine(Form("gProof->EnablePackage(\"%s\",kTRUE);", package->GetName()))) {
2755 Error("StartAnalysis", "There was an error trying to enable package %s", package->GetName());
2759 Error("StartAnalysis", "There was an error trying to upload package %s", package->GetName());
2764 // Do we need to load analysis source files ?
2765 // NOTE: don't load on client since this is anyway done by the user to attach his task.
2766 if (fAnalysisSource.Length()) {
2767 TObjArray *list = fAnalysisSource.Tokenize(" ");
2770 while((str=(TObjString*)next())) {
2771 gROOT->ProcessLine(Form("gProof->Load(\"%s+g\", kTRUE);", str->GetName()));
2773 if (list) delete list;
2776 // Register dataset to proof lite.
2777 if (fFileForTestMode.IsNull()) {
2778 Error("GetChainForTestMode", "For proof test mode please use SetFileForTestMode() pointing to a file that contains data file locations.");
2781 if (gSystem->AccessPathName(fFileForTestMode)) {
2782 Error("GetChainForTestMode", "File not found: %s", fFileForTestMode.Data());
2785 TFileCollection *coll = new TFileCollection();
2786 coll->AddFromFile(fFileForTestMode);
2787 gROOT->ProcessLine(Form("gProof->RegisterDataSet(\"test_collection\", (TFileCollection*)%p, \"OV\");", coll));
2788 gROOT->ProcessLine("gProof->ShowDataSets()");
2793 // Check if output files have to be taken from the analysis manager
2794 if (TestBit(AliAnalysisGrid::kDefaultOutputs)) {
2795 // Add output files and AOD files
2796 fOutputFiles = GetListOfFiles("outaod");
2797 // Add extra files registered to the analysis manager
2798 TString extra = GetListOfFiles("ext");
2799 if (!extra.IsNull()) {
2800 extra.ReplaceAll(".root", "*.root");
2801 if (!fOutputFiles.IsNull()) fOutputFiles += ",";
2802 fOutputFiles += extra;
2804 // Compose the output archive.
2805 fOutputArchive = "log_archive.zip:std*@disk=1 ";
2806 fOutputArchive += Form("root_archive.zip:%s,*.stat@disk=%d",fOutputFiles.Data(),fNreplicas);
2808 // if (!fCloseSE.Length()) fCloseSE = gSystem->Getenv("alien_CLOSE_SE");
2809 if (TestBit(AliAnalysisGrid::kOffline)) {
2810 Info("StartAnalysis","\n##### OFFLINE MODE ##### Files to be used in GRID are produced but not copied \
2811 \n there nor any job run. You can revise the JDL and analysis \
2812 \n macro then run the same in \"submit\" mode.");
2813 } else if (TestBit(AliAnalysisGrid::kTest)) {
2814 Info("StartAnalysis","\n##### LOCAL MODE ##### Your analysis will be run locally on a subset of the requested \
2816 } else if (TestBit(AliAnalysisGrid::kSubmit)) {
2817 Info("StartAnalysis","\n##### SUBMIT MODE ##### Files required by your analysis are copied to your grid working \
2818 \n space and job submitted.");
2819 } else if (TestBit(AliAnalysisGrid::kMerge)) {
2820 Info("StartAnalysis","\n##### MERGE MODE ##### The registered outputs of the analysis will be merged");
2821 if (fMergeViaJDL) CheckInputData();
2824 Info("StartAnalysis","\n##### FULL ANALYSIS MODE ##### Producing needed files and submitting your analysis job...");
2829 Error("StartAnalysis", "Cannot start grid analysis without grid connection");
2832 if (IsCheckCopy() && gGrid) CheckFileCopy(gGrid->GetHomeDirectory());
2833 if (!CheckInputData()) {
2834 Error("StartAnalysis", "There was an error in preprocessing your requested input data");
2837 if (!CreateDataset(fDataPattern)) {
2839 if (!fRunNumbers.Length() && !fRunRange[0]) serror = Form("path to data directory: <%s>", fGridDataDir.Data());
2840 if (fRunNumbers.Length()) serror = "run numbers";
2841 if (fRunRange[0]) serror = Form("run range [%d, %d]", fRunRange[0], fRunRange[1]);
2842 serror += Form("\n or data pattern <%s>", fDataPattern.Data());
2843 Error("StartAnalysis", "No data to process. Please fix %s in your plugin configuration.", serror.Data());
2846 WriteAnalysisFile();
2847 WriteAnalysisMacro();
2849 WriteValidationScript();
2851 WriteMergingMacro();
2852 WriteMergeExecutable();
2853 WriteValidationScript(kTRUE);
2855 if (!CreateJDL()) return kFALSE;
2856 if (TestBit(AliAnalysisGrid::kOffline)) return kFALSE;
2858 // Locally testing the analysis
2859 Info("StartAnalysis", "\n_______________________________________________________________________ \
2860 \n Running analysis script in a daughter shell as on a worker node \
2861 \n_______________________________________________________________________");
2862 TObjArray *list = fOutputFiles.Tokenize(",");
2866 while((str=(TObjString*)next())) {
2867 outputFile = str->GetString();
2868 Int_t index = outputFile.Index("@");
2869 if (index > 0) outputFile.Remove(index);
2870 if (!gSystem->AccessPathName(outputFile)) gSystem->Exec(Form("rm %s", outputFile.Data()));
2873 gSystem->Exec(Form("bash %s 2>stderr", fExecutable.Data()));
2874 gSystem->Exec(Form("bash %s",fValidationScript.Data()));
2875 // gSystem->Exec("cat stdout");
2878 // Check if submitting is managed by LPM manager
2879 if (fProductionMode) {
2880 TString prodfile = fJDLName;
2881 prodfile.ReplaceAll(".jdl", ".prod");
2882 WriteProductionFile(prodfile);
2883 Info("StartAnalysis", "Job submitting is managed by LPM. Rerun in terminate mode after jobs finished.");
2886 // Submit AliEn job(s)
2887 gGrid->Cd(fGridOutputDir);
2890 if (!fRunNumbers.Length() && !fRunRange[0]) {
2891 // Submit a given xml or a set of runs
2892 res = gGrid->Command(Form("submit %s", fJDLName.Data()));
2893 printf("*************************** %s\n",Form("submit %s", fJDLName.Data()));
2895 const char *cjobId = res->GetKey(0,"jobId");
2899 Error("StartAnalysis", "Your JDL %s could not be submitted", fJDLName.Data());
2902 Info("StartAnalysis", "\n_______________________________________________________________________ \
2903 \n##### Your JDL %s was successfully submitted. \nTHE JOB ID IS: %s \
2904 \n_______________________________________________________________________",
2905 fJDLName.Data(), cjobId);
2910 Error("StartAnalysis", "No grid result after submission !!! Bailing out...");
2914 // Submit for a range of enumeration of runs.
2915 if (!Submit()) return kFALSE;
2918 Info("StartAnalysis", "\n#### STARTING AN ALIEN SHELL FOR YOU. EXIT WHEN YOUR JOB %s HAS FINISHED. #### \
2919 \n You may exit at any time and terminate the job later using the option <terminate> \
2920 \n ##################################################################################", jobID.Data());
2921 gSystem->Exec("aliensh");
2925 //______________________________________________________________________________
2926 const char *AliAnalysisAlien::GetListOfFiles(const char *type)
2928 // Get a comma-separated list of output files of the requested type.
2929 // Type can be (case unsensitive):
2930 // aod - list of aod files (std, extensions and filters)
2931 // out - list of output files connected to containers (but not aod's or extras)
2932 // ext - list of extra files registered to the manager
2933 // ter - list of files produced in terminate
2934 static TString files;
2936 TString stype = type;
2938 TString aodfiles, extra;
2939 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
2941 ::Error("GetListOfFiles", "Cannot call this without analysis manager");
2942 return files.Data();
2944 if (mgr->GetOutputEventHandler()) {
2945 aodfiles = mgr->GetOutputEventHandler()->GetOutputFileName();
2946 TString extraaod = mgr->GetOutputEventHandler()->GetExtraOutputs();
2947 if (!extraaod.IsNull()) {
2949 aodfiles += extraaod;
2952 if (stype.Contains("aod")) {
2954 if (stype == "aod") return files.Data();
2956 // Add output files that are not in the list of AOD files
2957 TString outputfiles = "";
2958 TIter next(mgr->GetOutputs());
2959 AliAnalysisDataContainer *output;
2960 const char *filename = 0;
2961 while ((output=(AliAnalysisDataContainer*)next())) {
2962 filename = output->GetFileName();
2963 if (!(strcmp(filename, "default"))) continue;
2964 if (outputfiles.Contains(filename)) continue;
2965 if (aodfiles.Contains(filename)) continue;
2966 if (!outputfiles.IsNull()) outputfiles += ",";
2967 outputfiles += filename;
2969 if (stype.Contains("out")) {
2970 if (!files.IsNull()) files += ",";
2971 files += outputfiles;
2972 if (stype == "out") return files.Data();
2974 // Add extra files registered to the analysis manager
2976 extra = mgr->GetExtraFiles();
2977 if (!extra.IsNull()) {
2979 extra.ReplaceAll(" ", ",");
2980 TObjArray *fextra = extra.Tokenize(",");
2981 TIter nextx(fextra);
2983 while ((obj=nextx())) {
2984 if (aodfiles.Contains(obj->GetName())) continue;
2985 if (outputfiles.Contains(obj->GetName())) continue;
2986 if (sextra.Contains(obj->GetName())) continue;
2987 if (!sextra.IsNull()) sextra += ",";
2988 sextra += obj->GetName();
2991 if (stype.Contains("ext")) {
2992 if (!files.IsNull()) files += ",";
2996 if (stype == "ext") return files.Data();
2998 if (!fTerminateFiles.IsNull()) {
2999 fTerminateFiles.Strip();
3000 fTerminateFiles.ReplaceAll(" ",",");
3001 TObjArray *fextra = fTerminateFiles.Tokenize(",");
3002 TIter nextx(fextra);
3004 while ((obj=nextx())) {
3005 if (aodfiles.Contains(obj->GetName())) continue;
3006 if (outputfiles.Contains(obj->GetName())) continue;
3007 if (termfiles.Contains(obj->GetName())) continue;
3008 if (sextra.Contains(obj->GetName())) continue;
3009 if (!termfiles.IsNull()) termfiles += ",";
3010 termfiles += obj->GetName();
3014 if (stype.Contains("ter")) {
3015 if (!files.IsNull() && !termfiles.IsNull()) {
3020 return files.Data();
3023 //______________________________________________________________________________
3024 Bool_t AliAnalysisAlien::Submit()
3026 // Submit all master jobs.
3027 Int_t nmasterjobs = fInputFiles->GetEntries();
3028 Long_t tshoot = gSystem->Now();
3029 if (!fNsubmitted && !SubmitNext()) return kFALSE;
3030 while (fNsubmitted < nmasterjobs) {
3031 Long_t now = gSystem->Now();
3032 if ((now-tshoot)>30000) {
3034 if (!SubmitNext()) return kFALSE;
3040 //______________________________________________________________________________
3041 Bool_t AliAnalysisAlien::SubmitMerging()
3043 // Submit all merging jobs.
3044 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
3045 gGrid->Cd(fGridOutputDir);
3046 TString mergeJDLName = fExecutable;
3047 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
3049 Error("SubmitMerging", "You have to use explicit run numbers or run range to merge via JDL!");
3052 Int_t ntosubmit = fInputFiles->GetEntries();
3053 for (Int_t i=0; i<ntosubmit; i++) {
3054 TString runOutDir = gSystem->BaseName(fInputFiles->At(i)->GetName());
3055 runOutDir.ReplaceAll(".xml", "");
3056 if (fOutputToRunNo) {
3057 // The output directory is the run number
3058 printf("### Submitting merging job for run <%s>\n", runOutDir.Data());
3059 runOutDir = Form("%s/%s", fGridOutputDir.Data(), runOutDir.Data());
3061 if (!fRunNumbers.Length() && !fRunRange[0]) {
3062 // The output directory is the grid outdir
3063 printf("### Submitting merging job for the full output directory %s.\n", fGridOutputDir.Data());
3064 runOutDir = fGridOutputDir;
3066 // The output directory is the master number in 3 digits format
3067 printf("### Submitting merging job for master <%03d>\n", i);
3068 runOutDir = Form("%s/%03d",fGridOutputDir.Data(), i);
3071 // Check now the number of merging stages.
3072 TObjArray *list = fOutputFiles.Tokenize(",");
3076 while((str=(TObjString*)next())) {
3077 outputFile = str->GetString();
3078 Int_t index = outputFile.Index("@");
3079 if (index > 0) outputFile.Remove(index);
3080 if (!fMergeExcludes.Contains(outputFile)) break;
3083 Bool_t done = CheckMergedFiles(outputFile, runOutDir, fMaxMergeFiles, mergeJDLName);
3084 if (!done && (i==ntosubmit-1)) return kFALSE;
3085 if (!fRunNumbers.Length() && !fRunRange[0]) break;
3087 if (!ntosubmit) return kTRUE;
3088 Info("StartAnalysis", "\n #### STARTING AN ALIEN SHELL FOR YOU. You can exit any time or inspect your jobs in a different shell.##########\
3089 \n Make sure your jobs are in a final state (you can resubmit failed ones via 'masterjob <id> resubmit ERROR_ALL')\
3090 \n Rerun in 'terminate' mode to submit all merging stages, each AFTER the previous one completed. The final merged \
3091 \n output will be written to your alien output directory, while separate stages in <Stage_n>. \
3092 \n ################################################################################################################");
3093 gSystem->Exec("aliensh");
3097 //______________________________________________________________________________
3098 Bool_t AliAnalysisAlien::SubmitNext()
3100 // Submit next bunch of master jobs if the queue is free. The first master job is
3101 // submitted right away, while the next will not be unless the previous was split.
3102 // The plugin will not submit new master jobs if there are more that 500 jobs in
3104 static Bool_t iscalled = kFALSE;
3105 static Int_t firstmaster = 0;
3106 static Int_t lastmaster = 0;
3107 static Int_t npermaster = 0;
3108 if (iscalled) return kTRUE;
3110 Int_t nrunning=0, nwaiting=0, nerror=0, ndone=0;
3111 Int_t ntosubmit = 0;
3114 Int_t nmasterjobs = fInputFiles->GetEntries();
3117 if (!IsUseSubmitPolicy()) {
3119 Info("SubmitNext","### Warning submit policy not used ! Submitting too many jobs at a time may be prohibitted. \
3120 \n### You can use SetUseSubmitPolicy() to enable if you have problems.");
3121 ntosubmit = nmasterjobs;
3124 TString status = GetJobStatus(firstmaster, lastmaster, nrunning, nwaiting, nerror, ndone);
3125 printf("=== master %d: %s\n", lastmaster, status.Data());
3126 // If last master not split, just return
3127 if (status != "SPLIT") {iscalled = kFALSE; return kTRUE;}
3128 // No more than 100 waiting jobs
3129 if (nwaiting>500) {iscalled = kFALSE; return kTRUE;}
3130 npermaster = (nrunning+nwaiting+nerror+ndone)/fNsubmitted;
3131 if (npermaster) ntosubmit = (500-nwaiting)/npermaster;
3132 if (!ntosubmit) ntosubmit = 1;
3133 printf("=== WAITING(%d) RUNNING(%d) DONE(%d) OTHER(%d) NperMaster=%d => to submit %d jobs\n",
3134 nwaiting, nrunning, ndone, nerror, npermaster, ntosubmit);
3136 for (Int_t i=0; i<ntosubmit; i++) {
3137 // Submit for a range of enumeration of runs.
3138 if (fNsubmitted>=nmasterjobs) {iscalled = kFALSE; return kTRUE;}
3140 TString runOutDir = gSystem->BaseName(fInputFiles->At(fNsubmitted)->GetName());
3141 runOutDir.ReplaceAll(".xml", "");
3143 query = Form("submit %s %s %s", fJDLName.Data(), fInputFiles->At(fNsubmitted)->GetName(), runOutDir.Data());
3145 query = Form("submit %s %s %03d", fJDLName.Data(), fInputFiles->At(fNsubmitted)->GetName(), fNsubmitted);
3146 printf("********* %s\n",query.Data());
3147 res = gGrid->Command(query);
3149 TString cjobId1 = res->GetKey(0,"jobId");
3150 if (!cjobId1.Length()) {
3154 Error("StartAnalysis", "Your JDL %s could not be submitted. The message was:", fJDLName.Data());
3157 Info("StartAnalysis", "\n_______________________________________________________________________ \
3158 \n##### Your JDL %s submitted (%d to go). \nTHE JOB ID IS: %s \
3159 \n_______________________________________________________________________",
3160 fJDLName.Data(), nmasterjobs-fNsubmitted-1, cjobId1.Data());
3163 lastmaster = cjobId1.Atoi();
3164 if (!firstmaster) firstmaster = lastmaster;
3169 Error("StartAnalysis", "No grid result after submission !!! Bailing out...");
3177 //______________________________________________________________________________
3178 void AliAnalysisAlien::WriteAnalysisFile()
3180 // Write current analysis manager into the file <analysisFile>
3181 TString analysisFile = fExecutable;
3182 analysisFile.ReplaceAll(".sh", ".root");
3183 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3184 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
3185 if (!mgr || !mgr->IsInitialized()) {
3186 Error("WriteAnalysisFile", "You need an initialized analysis manager for this");
3189 // Check analysis type
3191 if (mgr->GetMCtruthEventHandler()) TObject::SetBit(AliAnalysisGrid::kUseMC);
3192 handler = (TObject*)mgr->GetInputEventHandler();
3194 if (handler->InheritsFrom("AliMultiInputEventHandler")) {
3195 AliMultiInputEventHandler *multiIH = (AliMultiInputEventHandler*)handler;
3196 if (multiIH->GetFirstInputEventHandler()->InheritsFrom("AliESDInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseESD);
3197 if (multiIH->GetFirstInputEventHandler()->InheritsFrom("AliAODInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseAOD);
3199 if (handler->InheritsFrom("AliESDInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseESD);
3200 if (handler->InheritsFrom("AliAODInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseAOD);
3203 TDirectory *cdir = gDirectory;
3204 TFile *file = TFile::Open(analysisFile, "RECREATE");
3206 // Skip task Terminate calls for the grid job (but not in test mode, where we want to check also the terminate mode
3207 if (!TestBit(AliAnalysisGrid::kTest)) mgr->SetSkipTerminate(kTRUE);
3208 // Unless merging makes no sense
3209 if (IsSingleOutput()) mgr->SetSkipTerminate(kFALSE);
3212 // Enable termination for local jobs
3213 mgr->SetSkipTerminate(kFALSE);
3215 if (cdir) cdir->cd();
3216 Info("WriteAnalysisFile", "\n##### Analysis manager: %s wrote to file <%s>\n", mgr->GetName(),analysisFile.Data());
3218 Bool_t copy = kTRUE;
3219 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3222 TString workdir = gGrid->GetHomeDirectory();
3223 workdir += fGridWorkingDir;
3224 Info("WriteAnalysisFile", "\n##### Copying file <%s> containing your initialized analysis manager to your alien workspace", analysisFile.Data());
3225 if (FileExists(analysisFile)) gGrid->Rm(analysisFile);
3226 TFile::Cp(Form("file:%s",analysisFile.Data()), Form("alien://%s/%s", workdir.Data(),analysisFile.Data()));
3230 //______________________________________________________________________________
3231 void AliAnalysisAlien::WriteAnalysisMacro()
3233 // Write the analysis macro that will steer the analysis in grid mode.
3234 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3236 out.open(fAnalysisMacro.Data(), ios::out);
3238 Error("WriteAnalysisMacro", "could not open file %s for writing", fAnalysisMacro.Data());
3241 Bool_t hasSTEERBase = kFALSE;
3242 Bool_t hasESD = kFALSE;
3243 Bool_t hasAOD = kFALSE;
3244 Bool_t hasANALYSIS = kFALSE;
3245 Bool_t hasOADB = kFALSE;
3246 Bool_t hasANALYSISalice = kFALSE;
3247 Bool_t hasCORRFW = kFALSE;
3248 TString func = fAnalysisMacro;
3249 TString type = "ESD";
3250 TString comment = "// Analysis using ";
3251 if (IsUseMCchain()) {
3255 if (TObject::TestBit(AliAnalysisGrid::kUseESD)) comment += "ESD";
3256 if (TObject::TestBit(AliAnalysisGrid::kUseAOD)) {
3261 if (type!="AOD" && fFriendChainName!="") {
3262 Error("WriteAnalysisMacro", "Friend chain can be attached only to AOD");
3265 if (TObject::TestBit(AliAnalysisGrid::kUseMC)) comment += "/MC";
3266 else comment += " data";
3267 out << "const char *anatype = \"" << type.Data() << "\";" << endl << endl;
3268 func.ReplaceAll(".C", "");
3269 out << "void " << func.Data() << "()" << endl;
3271 out << comment.Data() << endl;
3272 out << "// Automatically generated analysis steering macro executed in grid subjobs" << endl << endl;
3273 out << " TStopwatch timer;" << endl;
3274 out << " timer.Start();" << endl << endl;
3275 // Change temp directory to current one
3276 out << "// Set temporary merging directory to current one" << endl;
3277 out << " gSystem->Setenv(\"TMPDIR\", gSystem->pwd());" << endl << endl;
3278 out << "// Set temporary compilation directory to current one" << endl;
3279 out << " gSystem->SetBuildDir(gSystem->pwd(), kTRUE);" << endl << endl;
3280 // Reset existing include path
3281 out << "// Reset existing include path and add current directory first in the search" << endl;
3282 out << " gSystem->SetIncludePath(\"-I.\");" << endl;
3283 if (!fExecutableCommand.Contains("aliroot")) {
3284 out << "// load base root libraries" << endl;
3285 out << " gSystem->Load(\"libTree\");" << endl;
3286 out << " gSystem->Load(\"libGeom\");" << endl;
3287 out << " gSystem->Load(\"libVMC\");" << endl;
3288 out << " gSystem->Load(\"libPhysics\");" << endl << endl;
3289 out << " gSystem->Load(\"libMinuit\");" << endl << endl;
3291 if (fAdditionalRootLibs.Length()) {
3292 // in principle libtree /lib geom libvmc etc. can go into this list, too
3293 out << "// Add aditional libraries" << endl;
3294 TObjArray *list = fAdditionalRootLibs.Tokenize(" ");
3297 while((str=(TObjString*)next())) {
3298 if (str->GetString().Contains(".so"))
3299 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
3301 if (list) delete list;
3303 out << "// Load analysis framework libraries" << endl;
3304 TString setupPar = "AliAnalysisAlien::SetupPar";
3306 if (!fExecutableCommand.Contains("aliroot")) {
3307 out << " gSystem->Load(\"libSTEERBase\");" << endl;
3308 out << " gSystem->Load(\"libESD\");" << endl;
3309 out << " gSystem->Load(\"libAOD\");" << endl;
3311 out << " gSystem->Load(\"libANALYSIS\");" << endl;
3312 out << " gSystem->Load(\"libOADB\");" << endl;
3313 out << " gSystem->Load(\"libANALYSISalice\");" << endl;
3314 out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
3316 TIter next(fPackages);
3319 while ((obj=next())) {
3320 pkgname = obj->GetName();
3321 if (pkgname == "STEERBase" ||
3322 pkgname == "STEERBase.par") hasSTEERBase = kTRUE;
3323 if (pkgname == "ESD" ||
3324 pkgname == "ESD.par") hasESD = kTRUE;
3325 if (pkgname == "AOD" ||
3326 pkgname == "AOD.par") hasAOD = kTRUE;
3327 if (pkgname == "ANALYSIS" ||
3328 pkgname == "ANALYSIS.par") hasANALYSIS = kTRUE;
3329 if (pkgname == "OADB" ||
3330 pkgname == "OADB.par") hasOADB = kTRUE;
3331 if (pkgname == "ANALYSISalice" ||
3332 pkgname == "ANALYSISalice.par") hasANALYSISalice = kTRUE;
3333 if (pkgname == "CORRFW" ||
3334 pkgname == "CORRFW.par") hasCORRFW = kTRUE;
3336 if (hasANALYSISalice) setupPar = "SetupPar";
3337 if (!hasSTEERBase) out << " gSystem->Load(\"libSTEERBase\");" << endl;
3338 else out << " if (!" << setupPar << "(\"STEERBase\")) return;" << endl;
3339 if (!hasESD) out << " gSystem->Load(\"libESD\");" << endl;
3340 else out << " if (!" << setupPar << "(\"ESD\")) return;" << endl;
3341 if (!hasAOD) out << " gSystem->Load(\"libAOD\");" << endl;
3342 else out << " if (!" << setupPar << "(\"AOD\")) return;" << endl;
3343 if (!hasANALYSIS) out << " gSystem->Load(\"libANALYSIS\");" << endl;
3344 else out << " if (!" << setupPar << "(\"ANALYSIS\")) return;" << endl;
3345 if (!hasOADB) out << " gSystem->Load(\"libOADB\");" << endl;
3346 else out << " if (!" << setupPar << "(\"OADB\")) return;" << endl;
3347 if (!hasANALYSISalice) out << " gSystem->Load(\"libANALYSISalice\");" << endl;
3348 else out << " if (!" << setupPar << "(\"ANALYSISalice\")) return;" << endl;
3349 if (!hasCORRFW) out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
3350 else out << " if (!" << setupPar << "(\"CORRFW\")) return;" << endl << endl;
3351 out << "// Compile other par packages" << endl;
3353 while ((obj=next())) {
3354 pkgname = obj->GetName();
3355 if (pkgname == "STEERBase" ||
3356 pkgname == "STEERBase.par" ||
3358 pkgname == "ESD.par" ||
3360 pkgname == "AOD.par" ||
3361 pkgname == "ANALYSIS" ||
3362 pkgname == "ANALYSIS.par" ||
3363 pkgname == "OADB" ||
3364 pkgname == "OADB.par" ||
3365 pkgname == "ANALYSISalice" ||
3366 pkgname == "ANALYSISalice.par" ||
3367 pkgname == "CORRFW" ||
3368 pkgname == "CORRFW.par") continue;
3369 out << " if (!" << setupPar << "(\"" << obj->GetName() << "\")) return;" << endl;
3372 out << "// include path" << endl;
3373 // Get the include path from the interpreter and remove entries pointing to AliRoot
3374 out << " TString intPath = gInterpreter->GetIncludePath();" << endl;
3375 out << " TObjArray *listpaths = intPath.Tokenize(\" \");" << endl;
3376 out << " TIter nextpath(listpaths);" << endl;
3377 out << " TObjString *pname;" << endl;
3378 out << " while ((pname=(TObjString*)nextpath())) {" << endl;
3379 out << " TString current = pname->GetName();" << endl;
3380 out << " if (current.Contains(\"AliRoot\") || current.Contains(\"ALICE_ROOT\")) continue;" << endl;
3381 out << " gSystem->AddIncludePath(current);" << endl;
3382 out << " }" << endl;
3383 out << " if (listpaths) delete listpaths;" << endl;
3384 if (fIncludePath.Length()) out << " gSystem->AddIncludePath(\"" << fIncludePath.Data() << "\");" << endl;
3385 out << " gROOT->ProcessLine(\".include $ALICE_ROOT/include\");" << endl;
3386 out << " printf(\"Include path: %s\\n\", gSystem->GetIncludePath());" << endl << endl;
3387 if (fAdditionalLibs.Length()) {
3388 out << "// Add aditional AliRoot libraries" << endl;
3389 TObjArray *list = fAdditionalLibs.Tokenize(" ");
3392 while((str=(TObjString*)next())) {
3393 if (str->GetString().Contains(".so"))
3394 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
3395 if (str->GetString().Contains(".par"))
3396 out << " if (!" << setupPar << "(\"" << str->GetString() << "\")) return;" << endl;
3398 if (list) delete list;
3401 out << "// analysis source to be compiled at runtime (if any)" << endl;
3402 if (fAnalysisSource.Length()) {
3403 TObjArray *list = fAnalysisSource.Tokenize(" ");
3406 while((str=(TObjString*)next())) {
3407 out << " gROOT->ProcessLine(\".L " << str->GetString().Data() << "+g\");" << endl;
3409 if (list) delete list;
3412 // out << " printf(\"Currently load libraries:\\n\");" << endl;
3413 // out << " printf(\"%s\\n\", gSystem->GetLibraries());" << endl;
3414 if (fFastReadOption) {
3415 Warning("WriteAnalysisMacro", "!!! You requested FastRead option. Using xrootd flags to reduce timeouts in the grid jobs. This may skip some files that could be accessed !!! \
3416 \n+++ NOTE: To disable this option, use: plugin->SetFastReadOption(kFALSE)");
3417 out << "// fast xrootd reading enabled" << endl;
3418 out << " printf(\"!!! You requested FastRead option. Using xrootd flags to reduce timeouts. Note that this may skip some files that could be accessed !!!\");" << endl;
3419 out << " gEnv->SetValue(\"XNet.ConnectTimeout\",50);" << endl;
3420 out << " gEnv->SetValue(\"XNet.RequestTimeout\",50);" << endl;
3421 out << " gEnv->SetValue(\"XNet.MaxRedirectCount\",2);" << endl;
3422 out << " gEnv->SetValue(\"XNet.ReconnectTimeout\",50);" << endl;
3423 out << " gEnv->SetValue(\"XNet.FirstConnectMaxCnt\",1);" << endl << endl;
3425 out << "// connect to AliEn and make the chain" << endl;
3426 out << " if (!TGrid::Connect(\"alien://\")) return;" << endl;
3427 out << "// read the analysis manager from file" << endl;
3428 TString analysisFile = fExecutable;
3429 analysisFile.ReplaceAll(".sh", ".root");
3430 out << " AliAnalysisManager *mgr = AliAnalysisAlien::LoadAnalysisManager(\""
3431 << analysisFile << "\");" << endl;
3432 out << " if (!mgr) return;" << endl;
3433 out << " mgr->PrintStatus();" << endl;
3434 if (AliAnalysisManager::GetAnalysisManager()) {
3435 if (AliAnalysisManager::GetAnalysisManager()->GetDebugLevel()>3) {
3436 out << " gEnv->SetValue(\"XNet.Debug\", \"1\");" << endl;
3438 if (TestBit(AliAnalysisGrid::kTest))
3439 out << " AliLog::SetGlobalLogLevel(AliLog::kWarning);" << endl;
3441 out << " AliLog::SetGlobalLogLevel(AliLog::kError);" << endl;
3444 if (IsUsingTags()) {
3445 out << " TChain *chain = CreateChainFromTags(\"wn.xml\", anatype);" << endl << endl;
3447 out << " TChain *chain = CreateChain(\"wn.xml\", anatype);" << endl << endl;
3449 out << " mgr->StartAnalysis(\"localfile\", chain);" << endl;
3450 out << " timer.Stop();" << endl;
3451 out << " timer.Print();" << endl;
3452 out << "}" << endl << endl;
3453 if (IsUsingTags()) {
3454 out << "TChain* CreateChainFromTags(const char *xmlfile, const char *type=\"ESD\")" << endl;
3456 out << "// Create a chain using tags from the xml file." << endl;
3457 out << " TAlienCollection* coll = TAlienCollection::Open(xmlfile);" << endl;
3458 out << " if (!coll) {" << endl;
3459 out << " ::Error(\"CreateChainFromTags\", \"Cannot create an AliEn collection from %s\", xmlfile);" << endl;
3460 out << " return NULL;" << endl;
3461 out << " }" << endl;
3462 out << " TGridResult* tagResult = coll->GetGridResult(\"\",kFALSE,kFALSE);" << endl;
3463 out << " AliTagAnalysis *tagAna = new AliTagAnalysis(type);" << endl;
3464 out << " tagAna->ChainGridTags(tagResult);" << endl << endl;
3465 out << " AliRunTagCuts *runCuts = new AliRunTagCuts();" << endl;
3466 out << " AliLHCTagCuts *lhcCuts = new AliLHCTagCuts();" << endl;
3467 out << " AliDetectorTagCuts *detCuts = new AliDetectorTagCuts();" << endl;
3468 out << " AliEventTagCuts *evCuts = new AliEventTagCuts();" << endl;
3469 out << " // Check if the cuts configuration file was provided" << endl;
3470 out << " if (!gSystem->AccessPathName(\"ConfigureCuts.C\")) {" << endl;
3471 out << " gROOT->LoadMacro(\"ConfigureCuts.C\");" << endl;
3472 out << " ConfigureCuts(runCuts, lhcCuts, detCuts, evCuts);" << endl;
3473 out << " }" << endl;
3474 if (fFriendChainName=="") {
3475 out << " TChain *chain = tagAna->QueryTags(runCuts, lhcCuts, detCuts, evCuts);" << endl;
3477 out << " TString tmpColl=\"tmpCollection.xml\";" << endl;
3478 out << " tagAna->CreateXMLCollection(tmpColl.Data(),runCuts, lhcCuts, detCuts, evCuts);" << endl;
3479 out << " TChain *chain = CreateChain(tmpColl.Data(),type);" << endl;
3481 out << " if (!chain || !chain->GetNtrees()) return NULL;" << endl;
3482 out << " chain->ls();" << endl;
3483 out << " return chain;" << endl;
3484 out << "}" << endl << endl;
3485 if (gSystem->AccessPathName("ConfigureCuts.C")) {
3486 TString msg = "\n##### You may want to provide a macro ConfigureCuts.C with a method:\n";
3487 msg += " void ConfigureCuts(AliRunTagCuts *runCuts,\n";
3488 msg += " AliLHCTagCuts *lhcCuts,\n";
3489 msg += " AliDetectorTagCuts *detCuts,\n";
3490 msg += " AliEventTagCuts *evCuts)";
3491 Info("WriteAnalysisMacro", "%s", msg.Data());
3494 if (!IsUsingTags() || fFriendChainName!="") {
3495 out <<"//________________________________________________________________________________" << endl;
3496 out << "TChain* CreateChain(const char *xmlfile, const char *type=\"ESD\")" << endl;
3498 out << "// Create a chain using url's from xml file" << endl;
3499 out << " TString filename;" << endl;
3500 out << " Int_t run = 0;" << endl;
3501 if (IsUseMCchain()) {
3502 out << " TString treename = \"TE\";" << endl;
3504 out << " TString treename = type;" << endl;
3505 out << " treename.ToLower();" << endl;
3506 out << " treename += \"Tree\";" << endl;
3508 out << " printf(\"***************************************\\n\");" << endl;
3509 out << " printf(\" Getting chain of trees %s\\n\", treename.Data());" << endl;
3510 out << " printf(\"***************************************\\n\");" << endl;
3511 out << " TAlienCollection *coll = TAlienCollection::Open(xmlfile);" << endl;
3512 out << " if (!coll) {" << endl;
3513 out << " ::Error(\"CreateChain\", \"Cannot create an AliEn collection from %s\", xmlfile);" << endl;
3514 out << " return NULL;" << endl;
3515 out << " }" << endl;
3516 out << " AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();" << endl;
3517 out << " TChain *chain = new TChain(treename);" << endl;
3518 if(fFriendChainName!="") {
3519 out << " TChain *chainFriend = new TChain(treename);" << endl;
3521 out << " coll->Reset();" << endl;
3522 out << " while (coll->Next()) {" << endl;
3523 out << " filename = coll->GetTURL("");" << endl;
3524 out << " if (mgr) {" << endl;
3525 out << " Int_t nrun = AliAnalysisManager::GetRunFromAlienPath(filename);" << endl;
3526 out << " if (nrun && nrun != run) {" << endl;
3527 out << " printf(\"### Run number detected from chain: %d\\n\", nrun);" << endl;
3528 out << " mgr->SetRunFromPath(nrun);" << endl;
3529 out << " run = nrun;" << endl;
3530 out << " }" << endl;
3531 out << " }" << endl;
3532 out << " chain->Add(filename);" << endl;
3533 if(fFriendChainName!="") {
3534 out << " TString fileFriend=coll->GetTURL(\"\");" << endl;
3535 out << " fileFriend.ReplaceAll(\"AliAOD.root\",\""<<fFriendChainName.Data()<<"\");" << endl;
3536 out << " fileFriend.ReplaceAll(\"AliAODs.root\",\""<<fFriendChainName.Data()<<"\");" << endl;
3537 out << " chainFriend->Add(fileFriend.Data());" << endl;
3539 out << " }" << endl;
3540 out << " if (!chain->GetNtrees()) {" << endl;
3541 out << " ::Error(\"CreateChain\", \"No tree found from collection %s\", xmlfile);" << endl;
3542 out << " return NULL;" << endl;
3543 out << " }" << endl;
3544 if(fFriendChainName!="") {
3545 out << " chain->AddFriend(chainFriend);" << endl;
3547 out << " return chain;" << endl;
3548 out << "}" << endl << endl;
3550 if (hasANALYSISalice) {
3551 out <<"//________________________________________________________________________________" << endl;
3552 out << "Bool_t SetupPar(const char *package) {" << endl;
3553 out << "// Compile the package and set it up." << endl;
3554 out << " TString pkgdir = package;" << endl;
3555 out << " pkgdir.ReplaceAll(\".par\",\"\");" << endl;
3556 out << " gSystem->Exec(TString::Format(\"tar xvzf %s.par\", pkgdir.Data()));" << endl;
3557 out << " TString cdir = gSystem->WorkingDirectory();" << endl;
3558 out << " gSystem->ChangeDirectory(pkgdir);" << endl;
3559 out << " // Check for BUILD.sh and execute" << endl;
3560 out << " if (!gSystem->AccessPathName(\"PROOF-INF/BUILD.sh\")) {" << endl;
3561 out << " printf(\"*******************************\\n\");" << endl;
3562 out << " printf(\"*** Building PAR archive ***\\n\");" << endl;
3563 out << " printf(\"*******************************\\n\");" << endl;
3564 out << " if (gSystem->Exec(\"PROOF-INF/BUILD.sh\")) {" << endl;
3565 out << " ::Error(\"SetupPar\", \"Cannot build par archive %s\", pkgdir.Data());" << endl;
3566 out << " gSystem->ChangeDirectory(cdir);" << endl;
3567 out << " return kFALSE;" << endl;
3568 out << " }" << endl;
3569 out << " } else {" << endl;
3570 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/BUILD.sh for package %s\", pkgdir.Data());" << endl;
3571 out << " gSystem->ChangeDirectory(cdir);" << endl;
3572 out << " return kFALSE;" << endl;
3573 out << " }" << endl;
3574 out << " // Check for SETUP.C and execute" << endl;
3575 out << " if (!gSystem->AccessPathName(\"PROOF-INF/SETUP.C\")) {" << endl;
3576 out << " printf(\"*******************************\\n\");" << endl;
3577 out << " printf(\"*** Setup PAR archive ***\\n\");" << endl;
3578 out << " printf(\"*******************************\\n\");" << endl;
3579 out << " gROOT->Macro(\"PROOF-INF/SETUP.C\");" << endl;
3580 out << " } else {" << endl;
3581 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/SETUP.C for package %s\", pkgdir.Data());" << endl;
3582 out << " gSystem->ChangeDirectory(cdir);" << endl;
3583 out << " return kFALSE;" << endl;
3584 out << " }" << endl;
3585 out << " // Restore original workdir" << endl;
3586 out << " gSystem->ChangeDirectory(cdir);" << endl;
3587 out << " return kTRUE;" << endl;
3590 Info("WriteAnalysisMacro", "\n##### Analysis macro to run on worker nodes <%s> written",fAnalysisMacro.Data());
3592 Bool_t copy = kTRUE;
3593 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3596 TString workdir = gGrid->GetHomeDirectory();
3597 workdir += fGridWorkingDir;
3598 if (FileExists(fAnalysisMacro)) gGrid->Rm(fAnalysisMacro);
3599 if (IsUsingTags() && !gSystem->AccessPathName("ConfigureCuts.C")) {
3600 if (FileExists("ConfigureCuts.C")) gGrid->Rm("ConfigureCuts.C");
3601 Info("WriteAnalysisMacro", "\n##### Copying cuts configuration macro: <ConfigureCuts.C> to your alien workspace");
3602 TFile::Cp("file:ConfigureCuts.C", Form("alien://%s/ConfigureCuts.C", workdir.Data()));
3604 Info("WriteAnalysisMacro", "\n##### Copying analysis macro: <%s> to your alien workspace", fAnalysisMacro.Data());
3605 TFile::Cp(Form("file:%s",fAnalysisMacro.Data()), Form("alien://%s/%s", workdir.Data(), fAnalysisMacro.Data()));
3609 //______________________________________________________________________________
3610 void AliAnalysisAlien::WriteMergingMacro()
3612 // Write a macro to merge the outputs per master job.
3613 if (!fMergeViaJDL) return;
3614 if (!fOutputFiles.Length()) {
3615 Error("WriteMergingMacro", "No output file names defined. Are you running the right AliAnalysisAlien configuration ?");
3618 TString mergingMacro = fExecutable;
3619 mergingMacro.ReplaceAll(".sh","_merge.C");
3620 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
3621 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3623 out.open(mergingMacro.Data(), ios::out);
3625 Error("WriteMergingMacro", "could not open file %s for writing", fAnalysisMacro.Data());
3628 Bool_t hasSTEERBase = kFALSE;
3629 Bool_t hasESD = kFALSE;
3630 Bool_t hasAOD = kFALSE;
3631 Bool_t hasANALYSIS = kFALSE;
3632 Bool_t hasOADB = kFALSE;
3633 Bool_t hasANALYSISalice = kFALSE;
3634 Bool_t hasCORRFW = kFALSE;
3635 TString func = mergingMacro;
3637 func.ReplaceAll(".C", "");
3638 out << "void " << func.Data() << "(const char *dir, Int_t stage=0)" << endl;
3640 out << "// Automatically generated merging macro executed in grid subjobs" << endl << endl;
3641 out << " TStopwatch timer;" << endl;
3642 out << " timer.Start();" << endl << endl;
3643 // Reset existing include path
3644 out << "// Reset existing include path and add current directory first in the search" << endl;
3645 out << " gSystem->SetIncludePath(\"-I.\");" << endl;
3646 if (!fExecutableCommand.Contains("aliroot")) {
3647 out << "// load base root libraries" << endl;
3648 out << " gSystem->Load(\"libTree\");" << endl;
3649 out << " gSystem->Load(\"libGeom\");" << endl;
3650 out << " gSystem->Load(\"libVMC\");" << endl;
3651 out << " gSystem->Load(\"libPhysics\");" << endl << endl;
3652 out << " gSystem->Load(\"libMinuit\");" << endl << endl;
3654 if (fAdditionalRootLibs.Length()) {
3655 // in principle libtree /lib geom libvmc etc. can go into this list, too
3656 out << "// Add aditional libraries" << endl;
3657 TObjArray *list = fAdditionalRootLibs.Tokenize(" ");
3660 while((str=(TObjString*)next())) {
3661 if (str->GetString().Contains(".so"))
3662 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
3664 if (list) delete list;
3666 out << "// Load analysis framework libraries" << endl;
3668 if (!fExecutableCommand.Contains("aliroot")) {
3669 out << " gSystem->Load(\"libSTEERBase\");" << endl;
3670 out << " gSystem->Load(\"libESD\");" << endl;
3671 out << " gSystem->Load(\"libAOD\");" << endl;
3673 out << " gSystem->Load(\"libANALYSIS\");" << endl;
3674 out << " gSystem->Load(\"libOADB\");" << endl;
3675 out << " gSystem->Load(\"libANALYSISalice\");" << endl;
3676 out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
3678 TIter next(fPackages);
3681 TString setupPar = "AliAnalysisAlien::SetupPar";
3682 while ((obj=next())) {
3683 pkgname = obj->GetName();
3684 if (pkgname == "STEERBase" ||
3685 pkgname == "STEERBase.par") hasSTEERBase = kTRUE;
3686 if (pkgname == "ESD" ||
3687 pkgname == "ESD.par") hasESD = kTRUE;
3688 if (pkgname == "AOD" ||
3689 pkgname == "AOD.par") hasAOD = kTRUE;
3690 if (pkgname == "ANALYSIS" ||
3691 pkgname == "ANALYSIS.par") hasANALYSIS = kTRUE;
3692 if (pkgname == "OADB" ||
3693 pkgname == "OADB.par") hasOADB = kTRUE;
3694 if (pkgname == "ANALYSISalice" ||
3695 pkgname == "ANALYSISalice.par") hasANALYSISalice = kTRUE;
3696 if (pkgname == "CORRFW" ||
3697 pkgname == "CORRFW.par") hasCORRFW = kTRUE;
3699 if (hasANALYSISalice) setupPar = "SetupPar";
3700 if (!hasSTEERBase) out << " gSystem->Load(\"libSTEERBase\");" << endl;
3701 else out << " if (!" << setupPar << "(\"STEERBase\")) return;" << endl;
3702 if (!hasESD) out << " gSystem->Load(\"libESD\");" << endl;
3703 else out << " if (!" << setupPar << "(\"ESD\")) return;" << endl;
3704 if (!hasAOD) out << " gSystem->Load(\"libAOD\");" << endl;
3705 else out << " if (!" << setupPar << "(\"AOD\")) return;" << endl;
3706 out << " gSystem->Load(\"libOADB\");" << endl;
3707 if (!hasANALYSIS) out << " gSystem->Load(\"libANALYSIS\");" << endl;
3708 else out << " if (!" << setupPar << "(\"ANALYSIS\")) return;" << endl;
3709 if (!hasOADB) out << " gSystem->Load(\"libOADB\");" << endl;
3710 else out << " if (!" << setupPar << "(\"OADB\")) return;" << endl;
3711 if (!hasANALYSISalice) out << " gSystem->Load(\"libANALYSISalice\");" << endl;
3712 else out << " if (!" << setupPar << "(\"ANALYSISalice\")) return;" << endl;
3713 if (!hasCORRFW) out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
3714 else out << " if (!" << setupPar << "(\"CORRFW\")) return;" << endl << endl;
3715 out << "// Compile other par packages" << endl;
3717 while ((obj=next())) {
3718 pkgname = obj->GetName();
3719 if (pkgname == "STEERBase" ||
3720 pkgname == "STEERBase.par" ||
3722 pkgname == "ESD.par" ||
3724 pkgname == "AOD.par" ||
3725 pkgname == "ANALYSIS" ||
3726 pkgname == "ANALYSIS.par" ||
3727 pkgname == "OADB" ||
3728 pkgname == "OADB.par" ||
3729 pkgname == "ANALYSISalice" ||
3730 pkgname == "ANALYSISalice.par" ||
3731 pkgname == "CORRFW" ||
3732 pkgname == "CORRFW.par") continue;
3733 out << " if (!" << setupPar << "(\"" << obj->GetName() << "\")) return;" << endl;
3736 out << "// include path" << endl;
3737 // Get the include path from the interpreter and remove entries pointing to AliRoot
3738 out << " TString intPath = gInterpreter->GetIncludePath();" << endl;
3739 out << " TObjArray *listpaths = intPath.Tokenize(\" \");" << endl;
3740 out << " TIter nextpath(listpaths);" << endl;
3741 out << " TObjString *pname;" << endl;
3742 out << " while ((pname=(TObjString*)nextpath())) {" << endl;
3743 out << " TString current = pname->GetName();" << endl;
3744 out << " if (current.Contains(\"AliRoot\") || current.Contains(\"ALICE_ROOT\")) continue;" << endl;
3745 out << " gSystem->AddIncludePath(current);" << endl;
3746 out << " }" << endl;
3747 out << " if (listpaths) delete listpaths;" << endl;
3748 if (fIncludePath.Length()) out << " gSystem->AddIncludePath(\"" << fIncludePath.Data() << "\");" << endl;
3749 out << " gROOT->ProcessLine(\".include $ALICE_ROOT/include\");" << endl;
3750 out << " printf(\"Include path: %s\\n\", gSystem->GetIncludePath());" << endl << endl;
3751 if (fAdditionalLibs.Length()) {
3752 out << "// Add aditional AliRoot libraries" << endl;
3753 TObjArray *list = fAdditionalLibs.Tokenize(" ");
3756 while((str=(TObjString*)next())) {
3757 if (str->GetString().Contains(".so"))
3758 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
3760 if (list) delete list;
3763 out << "// Analysis source to be compiled at runtime (if any)" << endl;
3764 if (fAnalysisSource.Length()) {
3765 TObjArray *list = fAnalysisSource.Tokenize(" ");
3768 while((str=(TObjString*)next())) {
3769 out << " gROOT->ProcessLine(\".L " << str->GetString().Data() << "+g\");" << endl;
3771 if (list) delete list;
3775 if (fFastReadOption) {
3776 Warning("WriteMergingMacro", "!!! You requested FastRead option. Using xrootd flags to reduce timeouts in the grid merging jobs. Note that this may skip some files that could be accessed !!!");
3777 out << "// fast xrootd reading enabled" << endl;
3778 out << " printf(\"!!! You requested FastRead option. Using xrootd flags to reduce timeouts. Note that this may skip some files that could be accessed !!!\");" << endl;
3779 out << " gEnv->SetValue(\"XNet.ConnectTimeout\",50);" << endl;
3780 out << " gEnv->SetValue(\"XNet.RequestTimeout\",50);" << endl;
3781 out << " gEnv->SetValue(\"XNet.MaxRedirectCount\",2);" << endl;
3782 out << " gEnv->SetValue(\"XNet.ReconnectTimeout\",50);" << endl;
3783 out << " gEnv->SetValue(\"XNet.FirstConnectMaxCnt\",1);" << endl << endl;
3785 // Change temp directory to current one
3786 out << "// Set temporary merging directory to current one" << endl;
3787 out << " gSystem->Setenv(\"TMPDIR\", gSystem->pwd());" << endl << endl;
3788 out << "// Set temporary compilation directory to current one" << endl;
3789 out << " gSystem->SetBuildDir(gSystem->pwd(), kTRUE);" << endl << endl;
3790 out << "// Connect to AliEn" << endl;
3791 out << " if (!TGrid::Connect(\"alien://\")) return;" << endl;
3792 out << " TString outputDir = dir;" << endl;
3793 out << " TString outputFiles = \"" << GetListOfFiles("out") << "\";" << endl;
3794 out << " TString mergeExcludes = \"" << fMergeExcludes << "\";" << endl;
3795 out << " TObjArray *list = outputFiles.Tokenize(\",\");" << endl;
3796 out << " TIter *iter = new TIter(list);" << endl;
3797 out << " TObjString *str;" << endl;
3798 out << " TString outputFile;" << endl;
3799 out << " Bool_t merged = kTRUE;" << endl;
3800 out << " while((str=(TObjString*)iter->Next())) {" << endl;
3801 out << " outputFile = str->GetString();" << endl;
3802 out << " if (outputFile.Contains(\"*\")) continue;" << endl;
3803 out << " Int_t index = outputFile.Index(\"@\");" << endl;
3804 out << " if (index > 0) outputFile.Remove(index);" << endl;
3805 out << " // Skip already merged outputs" << endl;
3806 out << " if (!gSystem->AccessPathName(outputFile)) {" << endl;
3807 out << " printf(\"Output file <%s> found. Not merging again.\",outputFile.Data());" << endl;
3808 out << " continue;" << endl;
3809 out << " }" << endl;
3810 out << " if (mergeExcludes.Contains(outputFile.Data())) continue;" << endl;
3811 out << " merged = AliAnalysisAlien::MergeOutput(outputFile, outputDir, " << fMaxMergeFiles << ", stage);" << endl;
3812 out << " if (!merged) {" << endl;
3813 out << " printf(\"ERROR: Cannot merge %s\\n\", outputFile.Data());" << endl;
3814 out << " return;" << endl;
3815 out << " }" << endl;
3816 out << " }" << endl;
3817 out << " // all outputs merged, validate" << endl;
3818 out << " ofstream out;" << endl;
3819 out << " out.open(\"outputs_valid\", ios::out);" << endl;
3820 out << " out.close();" << endl;
3821 out << " // read the analysis manager from file" << endl;
3822 TString analysisFile = fExecutable;
3823 analysisFile.ReplaceAll(".sh", ".root");
3824 out << " if (!outputDir.Contains(\"Stage\")) return;" << endl;
3825 out << " AliAnalysisManager *mgr = AliAnalysisAlien::LoadAnalysisManager(\""
3826 << analysisFile << "\");" << endl;
3827 out << " if (!mgr) return;" << endl;
3828 out << " mgr->SetRunFromPath(mgr->GetRunFromAlienPath(dir));" << endl;
3829 out << " mgr->SetSkipTerminate(kFALSE);" << endl;
3830 out << " mgr->PrintStatus();" << endl;
3831 if (AliAnalysisManager::GetAnalysisManager()) {
3832 if (AliAnalysisManager::GetAnalysisManager()->GetDebugLevel()>3) {
3833 out << " gEnv->SetValue(\"XNet.Debug\", \"1\");" << endl;
3835 if (TestBit(AliAnalysisGrid::kTest))
3836 out << " AliLog::SetGlobalLogLevel(AliLog::kWarning);" << endl;
3838 out << " AliLog::SetGlobalLogLevel(AliLog::kError);" << endl;
3841 out << " TTree *tree = NULL;" << endl;
3842 out << " mgr->StartAnalysis(\"gridterminate\", tree);" << endl;
3843 out << "}" << endl << endl;
3844 if (hasANALYSISalice) {
3845 out <<"//________________________________________________________________________________" << endl;
3846 out << "Bool_t SetupPar(const char *package) {" << endl;
3847 out << "// Compile the package and set it up." << endl;
3848 out << " TString pkgdir = package;" << endl;
3849 out << " pkgdir.ReplaceAll(\".par\",\"\");" << endl;
3850 out << " gSystem->Exec(TString::Format(\"tar xvzf %s.par\", pkgdir.Data()));" << endl;
3851 out << " TString cdir = gSystem->WorkingDirectory();" << endl;
3852 out << " gSystem->ChangeDirectory(pkgdir);" << endl;
3853 out << " // Check for BUILD.sh and execute" << endl;
3854 out << " if (!gSystem->AccessPathName(\"PROOF-INF/BUILD.sh\")) {" << endl;
3855 out << " printf(\"*******************************\\n\");" << endl;
3856 out << " printf(\"*** Building PAR archive ***\\n\");" << endl;
3857 out << " printf(\"*******************************\\n\");" << endl;
3858 out << " if (gSystem->Exec(\"PROOF-INF/BUILD.sh\")) {" << endl;
3859 out << " ::Error(\"SetupPar\", \"Cannot build par archive %s\", pkgdir.Data());" << endl;
3860 out << " gSystem->ChangeDirectory(cdir);" << endl;
3861 out << " return kFALSE;" << endl;
3862 out << " }" << endl;
3863 out << " } else {" << endl;
3864 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/BUILD.sh for package %s\", pkgdir.Data());" << endl;
3865 out << " gSystem->ChangeDirectory(cdir);" << endl;
3866 out << " return kFALSE;" << endl;
3867 out << " }" << endl;
3868 out << " // Check for SETUP.C and execute" << endl;
3869 out << " if (!gSystem->AccessPathName(\"PROOF-INF/SETUP.C\")) {" << endl;
3870 out << " printf(\"*******************************\\n\");" << endl;
3871 out << " printf(\"*** Setup PAR archive ***\\n\");" << endl;
3872 out << " printf(\"*******************************\\n\");" << endl;
3873 out << " gROOT->Macro(\"PROOF-INF/SETUP.C\");" << endl;
3874 out << " } else {" << endl;
3875 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/SETUP.C for package %s\", pkgdir.Data());" << endl;
3876 out << " gSystem->ChangeDirectory(cdir);" << endl;
3877 out << " return kFALSE;" << endl;
3878 out << " }" << endl;
3879 out << " // Restore original workdir" << endl;
3880 out << " gSystem->ChangeDirectory(cdir);" << endl;
3881 out << " return kTRUE;" << endl;
3885 Bool_t copy = kTRUE;
3886 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3889 TString workdir = gGrid->GetHomeDirectory();
3890 workdir += fGridWorkingDir;
3891 if (FileExists(mergingMacro)) gGrid->Rm(mergingMacro);
3892 Info("WriteMergingMacro", "\n##### Copying merging macro: <%s> to your alien workspace", mergingMacro.Data());
3893 TFile::Cp(Form("file:%s",mergingMacro.Data()), Form("alien://%s/%s", workdir.Data(), mergingMacro.Data()));
3897 //______________________________________________________________________________
3898 Bool_t AliAnalysisAlien::SetupPar(const char *package)
3900 // Compile the par file archive pointed by <package>. This must be present in the current directory.
3901 // Note that for loading the compiled library. The current directory should have precedence in
3903 TString pkgdir = package;
3904 pkgdir.ReplaceAll(".par","");
3905 gSystem->Exec(TString::Format("tar xzf %s.par", pkgdir.Data()));
3906 TString cdir = gSystem->WorkingDirectory();
3907 gSystem->ChangeDirectory(pkgdir);
3908 // Check for BUILD.sh and execute
3909 if (!gSystem->AccessPathName("PROOF-INF/BUILD.sh")) {
3910 printf("**************************************************\n");
3911 printf("*** Building PAR archive %s\n", package);
3912 printf("**************************************************\n");
3913 if (gSystem->Exec("PROOF-INF/BUILD.sh")) {
3914 ::Error("SetupPar", "Cannot build par archive %s", pkgdir.Data());
3915 gSystem->ChangeDirectory(cdir);
3919 ::Error("SetupPar","Cannot access PROOF-INF/BUILD.sh for package %s", pkgdir.Data());
3920 gSystem->ChangeDirectory(cdir);
3923 // Check for SETUP.C and execute
3924 if (!gSystem->AccessPathName("PROOF-INF/SETUP.C")) {
3925 printf("**************************************************\n");
3926 printf("*** Setup PAR archive %s\n", package);
3927 printf("**************************************************\n");
3928 gROOT->Macro("PROOF-INF/SETUP.C");
3929 printf("*** Loaded library: %s\n", gSystem->GetLibraries(pkgdir,"",kFALSE));
3931 ::Error("SetupPar","Cannot access PROOF-INF/SETUP.C for package %s", pkgdir.Data());
3932 gSystem->ChangeDirectory(cdir);
3935 // Restore original workdir
3936 gSystem->ChangeDirectory(cdir);
3940 //______________________________________________________________________________
3941 void AliAnalysisAlien::WriteExecutable()
3943 // Generate the alien executable script.
3944 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3946 out.open(fExecutable.Data(), ios::out);
3948 Error("WriteExecutable", "Bad file name for executable: %s", fExecutable.Data());
3951 out << "#!/bin/bash" << endl;
3952 // Make sure we can properly compile par files
3953 out << "export LD_LIBRARY_PATH=.:$LD_LIBRARY_PATH" << endl;
3954 out << "echo \"=========================================\"" << endl;
3955 out << "echo \"############## PATH : ##############\"" << endl;
3956 out << "echo $PATH" << endl;
3957 out << "echo \"############## LD_LIBRARY_PATH : ##############\"" << endl;
3958 out << "echo $LD_LIBRARY_PATH" << endl;
3959 out << "echo \"############## ROOTSYS : ##############\"" << endl;
3960 out << "echo $ROOTSYS" << endl;
3961 out << "echo \"############## which root : ##############\"" << endl;
3962 out << "which root" << endl;
3963 out << "echo \"############## ALICE_ROOT : ##############\"" << endl;
3964 out << "echo $ALICE_ROOT" << endl;
3965 out << "echo \"############## which aliroot : ##############\"" << endl;
3966 out << "which aliroot" << endl;
3967 out << "echo \"############## system limits : ##############\"" << endl;
3968 out << "ulimit -a" << endl;
3969 out << "echo \"############## memory : ##############\"" << endl;
3970 out << "free -m" << endl;
3971 out << "echo \"=========================================\"" << endl << endl;
3972 out << fExecutableCommand << " ";
3973 out << fAnalysisMacro.Data() << " " << fExecutableArgs.Data() << endl << endl;
3974 out << "echo \"======== " << fAnalysisMacro.Data() << " finished with exit code: $? ========\"" << endl;
3975 out << "echo \"############## memory after: ##############\"" << endl;
3976 out << "free -m" << endl;
3978 Bool_t copy = kTRUE;
3979 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3982 TString workdir = gGrid->GetHomeDirectory();
3983 TString bindir = Form("%s/bin", workdir.Data());
3984 if (!DirectoryExists(bindir)) gGrid->Mkdir(bindir,"-p");
3985 workdir += fGridWorkingDir;
3986 TString executable = Form("%s/bin/%s", gGrid->GetHomeDirectory(), fExecutable.Data());
3987 if (FileExists(executable)) gGrid->Rm(executable);
3988 Info("WriteExecutable", "\n##### Copying executable file <%s> to your AliEn bin directory", fExecutable.Data());
3989 TFile::Cp(Form("file:%s",fExecutable.Data()), Form("alien://%s", executable.Data()));
3993 //______________________________________________________________________________
3994 void AliAnalysisAlien::WriteMergeExecutable()
3996 // Generate the alien executable script for the merging job.
3997 if (!fMergeViaJDL) return;
3998 TString mergeExec = fExecutable;
3999 mergeExec.ReplaceAll(".sh", "_merge.sh");
4000 if (!TestBit(AliAnalysisGrid::kSubmit)) {
4002 out.open(mergeExec.Data(), ios::out);
4004 Error("WriteMergingExecutable", "Bad file name for executable: %s", mergeExec.Data());
4007 out << "#!/bin/bash" << endl;
4008 // Make sure we can properly compile par files
4009 out << "export LD_LIBRARY_PATH=.:$LD_LIBRARY_PATH" << endl;
4010 out << "echo \"=========================================\"" << endl;
4011 out << "echo \"############## PATH : ##############\"" << endl;
4012 out << "echo $PATH" << endl;
4013 out << "echo \"############## LD_LIBRARY_PATH : ##############\"" << endl;
4014 out << "echo $LD_LIBRARY_PATH" << endl;
4015 out << "echo \"############## ROOTSYS : ##############\"" << endl;
4016 out << "echo $ROOTSYS" << endl;
4017 out << "echo \"############## which root : ##############\"" << endl;
4018 out << "which root" << endl;
4019 out << "echo \"############## ALICE_ROOT : ##############\"" << endl;
4020 out << "echo $ALICE_ROOT" << endl;
4021 out << "echo \"############## which aliroot : ##############\"" << endl;
4022 out << "which aliroot" << endl;
4023 out << "echo \"############## system limits : ##############\"" << endl;
4024 out << "ulimit -a" << endl;
4025 out << "echo \"############## memory : ##############\"" << endl;
4026 out << "free -m" << endl;
4027 out << "echo \"=========================================\"" << endl << endl;
4028 TString mergeMacro = fExecutable;
4029 mergeMacro.ReplaceAll(".sh", "_merge.C");
4030 if (IsOneStageMerging())
4031 out << "export ARG=\"" << mergeMacro << "(\\\"$1\\\")\"" << endl;
4033 out << "export ARG=\"" << mergeMacro << "(\\\"$1\\\",$2)\"" << endl;
4034 out << fExecutableCommand << " " << "$ARG" << endl;
4035 out << "echo \"======== " << mergeMacro.Data() << " finished with exit code: $? ========\"" << endl;
4036 out << "echo \"############## memory after: ##############\"" << endl;
4037 out << "free -m" << endl;
4039 Bool_t copy = kTRUE;
4040 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
4043 TString workdir = gGrid->GetHomeDirectory();
4044 TString bindir = Form("%s/bin", workdir.Data());
4045 if (!DirectoryExists(bindir)) gGrid->Mkdir(bindir,"-p");
4046 workdir += fGridWorkingDir;
4047 TString executable = Form("%s/bin/%s", gGrid->GetHomeDirectory(), mergeExec.Data());
4048 if (FileExists(executable)) gGrid->Rm(executable);
4049 Info("WriteMergeExecutable", "\n##### Copying executable file <%s> to your AliEn bin directory", mergeExec.Data());
4050 TFile::Cp(Form("file:%s",mergeExec.Data()), Form("alien://%s", executable.Data()));
4054 //______________________________________________________________________________
4055 void AliAnalysisAlien::WriteProductionFile(const char *filename) const
4057 // Write the production file to be submitted by LPM manager. The format is:
4058 // First line: full_path_to_jdl estimated_no_subjobs_per_master
4059 // Next lines: full_path_to_dataset XXX (XXX is a string)
4060 // To submit, one has to: submit jdl XXX for all lines
4062 out.open(filename, ios::out);
4064 Error("WriteProductionFile", "Bad file name: %s", filename);
4068 if (!fProductionMode && !fGridWorkingDir.BeginsWith("/alice"))
4069 workdir = gGrid->GetHomeDirectory();
4070 workdir += fGridWorkingDir;
4071 Int_t njobspermaster = 1000*fNrunsPerMaster/fSplitMaxInputFileNumber;
4072 TString locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
4073 out << locjdl << " " << njobspermaster << endl;
4074 Int_t nmasterjobs = fInputFiles->GetEntries();
4075 for (Int_t i=0; i<nmasterjobs; i++) {
4076 TString runOutDir = gSystem->BaseName(fInputFiles->At(i)->GetName());
4077 runOutDir.ReplaceAll(".xml", "");
4079 out << Form("%s", fInputFiles->At(i)->GetName()) << " " << runOutDir << endl;
4081 out << Form("%s", fInputFiles->At(i)->GetName()) << " " << Form("%03d", i) << endl;
4084 Info("WriteProductionFile", "\n##### Copying production file <%s> to your work directory", filename);
4085 if (FileExists(filename)) gGrid->Rm(filename);
4086 TFile::Cp(Form("file:%s",filename), Form("alien://%s/%s", workdir.Data(),filename));
4090 //______________________________________________________________________________
4091 void AliAnalysisAlien::WriteValidationScript(Bool_t merge)
4093 // Generate the alien validation script.
4094 // Generate the validation script
4096 if (fValidationScript.IsNull()) {
4097 fValidationScript = fExecutable;
4098 fValidationScript.ReplaceAll(".sh", "_validation.sh");
4100 TString validationScript = fValidationScript;
4101 if (merge) validationScript.ReplaceAll(".sh", "_merge.sh");
4103 Error("WriteValidationScript", "Alien connection required");
4106 if (!fTerminateFiles.IsNull()) {
4107 fTerminateFiles.Strip();
4108 fTerminateFiles.ReplaceAll(" ",",");
4110 TString outStream = "";
4111 if (!TestBit(AliAnalysisGrid::kTest)) outStream = " >> stdout";
4112 if (!TestBit(AliAnalysisGrid::kSubmit)) {
4114 out.open(validationScript, ios::out);
4115 out << "#!/bin/bash" << endl;
4116 out << "##################################################" << endl;
4117 out << "validateout=`dirname $0`" << endl;
4118 out << "validatetime=`date`" << endl;
4119 out << "validated=\"0\";" << endl;
4120 out << "error=0" << endl;
4121 out << "if [ -z $validateout ]" << endl;
4122 out << "then" << endl;
4123 out << " validateout=\".\"" << endl;
4124 out << "fi" << endl << endl;
4125 out << "cd $validateout;" << endl;
4126 out << "validateworkdir=`pwd`;" << endl << endl;
4127 out << "echo \"*******************************************************\"" << outStream << endl;
4128 out << "echo \"* Automatically generated validation script *\"" << outStream << endl;
4130 out << "echo \"* Time: $validatetime \"" << outStream << endl;
4131 out << "echo \"* Dir: $validateout\"" << outStream << endl;
4132 out << "echo \"* Workdir: $validateworkdir\"" << outStream << endl;
4133 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl;
4134 out << "ls -la ./" << outStream << endl;
4135 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl << endl;
4136 out << "##################################################" << endl;
4139 out << "if [ ! -f stderr ] ; then" << endl;
4140 out << " error=1" << endl;
4141 out << " echo \"* ########## Job not validated - no stderr ###\" " << outStream << endl;
4142 out << " echo \"Error = $error\" " << outStream << endl;
4143 out << "fi" << endl;
4145 out << "parArch=`grep -Ei \"Cannot Build the PAR Archive\" stderr`" << endl;
4146 out << "segViol=`grep -Ei \"Segmentation violation\" stderr`" << endl;
4147 out << "segFault=`grep -Ei \"Segmentation fault\" stderr`" << endl;
4148 out << "glibcErr=`grep -Ei \"*** glibc detected ***\" stderr`" << endl;
4151 out << "if [ \"$parArch\" != \"\" ] ; then" << endl;
4152 out << " error=1" << endl;
4153 out << " echo \"* ########## Job not validated - PAR archive not built ###\" " << outStream << endl;
4154 out << " echo \"$parArch\" " << outStream << endl;
4155 out << " echo \"Error = $error\" " << outStream << endl;
4156 out << "fi" << endl;
4158 out << "if [ \"$segViol\" != \"\" ] ; then" << endl;
4159 out << " error=1" << endl;
4160 out << " echo \"* ########## Job not validated - Segment. violation ###\" " << outStream << endl;
4161 out << " echo \"$segViol\" " << outStream << endl;
4162 out << " echo \"Error = $error\" " << outStream << endl;
4163 out << "fi" << endl;
4165 out << "if [ \"$segFault\" != \"\" ] ; then" << endl;
4166 out << " error=1" << endl;
4167 out << " echo \"* ########## Job not validated - Segment. fault ###\" " << outStream << endl;
4168 out << " echo \"$segFault\" " << outStream << endl;
4169 out << " echo \"Error = $error\" " << outStream << endl;
4170 out << "fi" << endl;
4172 out << "if [ \"$glibcErr\" != \"\" ] ; then" << endl;
4173 out << " error=1" << endl;
4174 out << " echo \"* ########## Job not validated - *** glibc detected *** ###\" " << outStream << endl;
4175 out << " echo \"$glibcErr\" " << outStream << endl;
4176 out << " echo \"Error = $error\" " << outStream << endl;
4177 out << "fi" << endl;
4179 // Part dedicated to the specific analyses running into the train
4181 TString outputFiles = fOutputFiles;
4182 if (merge && !fTerminateFiles.IsNull()) {
4184 outputFiles += fTerminateFiles;
4186 TObjArray *arr = outputFiles.Tokenize(",");
4189 while (!merge && (os=(TObjString*)next1())) {
4190 // No need to validate outputs produced by merging since the merging macro does this
4191 outputFile = os->GetString();
4192 Int_t index = outputFile.Index("@");
4193 if (index > 0) outputFile.Remove(index);
4194 if (fTerminateFiles.Contains(outputFile)) continue;
4195 if (outputFile.Contains("*")) continue;
4196 out << "if ! [ -f " << outputFile.Data() << " ] ; then" << endl;
4197 out << " error=1" << endl;
4198 out << " echo \"Output file " << outputFile << " not found. Job FAILED !\"" << outStream << endl;
4199 out << " echo \"Output file " << outputFile << " not found. Job FAILED !\" >> stderr" << endl;
4200 out << "fi" << endl;
4203 out << "if ! [ -f outputs_valid ] ; then" << endl;
4204 out << " error=1" << endl;
4205 out << " echo \"Output files were not validated by the analysis manager\" >> stdout" << endl;
4206 out << " echo \"Output files were not validated by the analysis manager\" >> stderr" << endl;
4207 out << "fi" << endl;
4209 out << "if [ $error = 0 ] ; then" << endl;
4210 out << " echo \"* ---------------- Job Validated ------------------*\"" << outStream << endl;
4211 if (!IsKeepLogs()) {
4212 out << " echo \"* === Logs std* will be deleted === \"" << endl;
4214 out << " rm -f std*" << endl;
4216 out << "fi" << endl;
4218 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl;
4219 out << "echo \"*******************************************************\"" << outStream << endl;
4220 out << "cd -" << endl;
4221 out << "exit $error" << endl;
4223 Bool_t copy = kTRUE;
4224 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
4227 TString workdir = gGrid->GetHomeDirectory();
4228 workdir += fGridWorkingDir;
4229 Info("WriteValidationScript", "\n##### Copying validation script <%s> to your AliEn working space", validationScript.Data());
4230 if (FileExists(validationScript)) gGrid->Rm(validationScript);
4231 TFile::Cp(Form("file:%s",validationScript.Data()), Form("alien://%s/%s", workdir.Data(),validationScript.Data()));