1 /**************************************************************************
2 * Copyright(c) 1998-2007, ALICE Experiment at CERN, All rights reserved. *
4 * Author: The ALICE Off-line Project. *
5 * Contributors are mentioned in the code where appropriate. *
7 * Permission to use, copy, modify and distribute this software and its *
8 * documentation strictly for non-commercial purposes is hereby granted *
9 * without fee, provided that the above copyright notice appears in all *
10 * copies and that both the copyright notice and this permission notice *
11 * appear in the supporting documentation. The authors make no claims *
12 * about the suitability of this software for any purpose. It is *
13 * provided "as is" without express or implied warranty. *
14 **************************************************************************/
16 // Author: Mihaela Gheata, 01/09/2008
18 //==============================================================================
19 // AliAnalysisAlien - AliEn utility class. Provides interface for creating
20 // a personalized JDL, finding and creating a dataset.
21 //==============================================================================
23 #include "Riostream.h"
30 #include "TObjString.h"
31 #include "TObjArray.h"
33 #include "TGridResult.h"
34 #include "TGridCollection.h"
36 #include "TGridJobStatusList.h"
37 #include "TGridJobStatus.h"
38 #include "TFileMerger.h"
39 #include "AliAnalysisManager.h"
40 #include "AliVEventHandler.h"
41 #include "AliAnalysisDataContainer.h"
42 #include "AliAnalysisAlien.h"
44 ClassImp(AliAnalysisAlien)
46 //______________________________________________________________________________
47 AliAnalysisAlien::AliAnalysisAlien()
53 fSplitMaxInputFileNumber(0),
55 fMasterResubmitThreshold(0),
73 fAdditionalRootLibs(),
104 //______________________________________________________________________________
105 AliAnalysisAlien::AliAnalysisAlien(const char *name)
106 :AliAnalysisGrid(name),
111 fSplitMaxInputFileNumber(0),
113 fMasterResubmitThreshold(0),
126 fExecutableCommand(),
131 fAdditionalRootLibs(),
162 //______________________________________________________________________________
163 AliAnalysisAlien::AliAnalysisAlien(const AliAnalysisAlien& other)
164 :AliAnalysisGrid(other),
167 fPrice(other.fPrice),
169 fSplitMaxInputFileNumber(other.fSplitMaxInputFileNumber),
170 fMaxInitFailed(other.fMaxInitFailed),
171 fMasterResubmitThreshold(other.fMasterResubmitThreshold),
172 fNtestFiles(other.fNtestFiles),
173 fNrunsPerMaster(other.fNrunsPerMaster),
174 fMaxMergeFiles(other.fMaxMergeFiles),
175 fNsubmitted(other.fNsubmitted),
176 fProductionMode(other.fProductionMode),
177 fOutputToRunNo(other.fOutputToRunNo),
178 fMergeViaJDL(other.fMergeViaJDL),
179 fFastReadOption(other.fFastReadOption),
180 fOverwriteMode(other.fOverwriteMode),
181 fNreplicas(other.fNreplicas),
182 fRunNumbers(other.fRunNumbers),
183 fExecutable(other.fExecutable),
184 fExecutableCommand(other.fExecutableCommand),
185 fArguments(other.fArguments),
186 fExecutableArgs(other.fExecutableArgs),
187 fAnalysisMacro(other.fAnalysisMacro),
188 fAnalysisSource(other.fAnalysisSource),
189 fAdditionalRootLibs(other.fAdditionalRootLibs),
190 fAdditionalLibs(other.fAdditionalLibs),
191 fSplitMode(other.fSplitMode),
192 fAPIVersion(other.fAPIVersion),
193 fROOTVersion(other.fROOTVersion),
194 fAliROOTVersion(other.fAliROOTVersion),
195 fExternalPackages(other.fExternalPackages),
197 fGridWorkingDir(other.fGridWorkingDir),
198 fGridDataDir(other.fGridDataDir),
199 fDataPattern(other.fDataPattern),
200 fGridOutputDir(other.fGridOutputDir),
201 fOutputArchive(other.fOutputArchive),
202 fOutputFiles(other.fOutputFiles),
203 fInputFormat(other.fInputFormat),
204 fDatasetName(other.fDatasetName),
205 fJDLName(other.fJDLName),
206 fMergeExcludes(other.fMergeExcludes),
207 fIncludePath(other.fIncludePath),
208 fCloseSE(other.fCloseSE),
209 fFriendChainName(other.fFriendChainName),
210 fJobTag(other.fJobTag),
211 fOutputSingle(other.fOutputSingle),
212 fRunPrefix(other.fRunPrefix),
217 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
218 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
219 fRunRange[0] = other.fRunRange[0];
220 fRunRange[1] = other.fRunRange[1];
221 if (other.fInputFiles) {
222 fInputFiles = new TObjArray();
223 TIter next(other.fInputFiles);
225 while ((obj=next())) fInputFiles->Add(new TObjString(obj->GetName()));
226 fInputFiles->SetOwner();
228 if (other.fPackages) {
229 fPackages = new TObjArray();
230 TIter next(other.fPackages);
232 while ((obj=next())) fPackages->Add(new TObjString(obj->GetName()));
233 fPackages->SetOwner();
237 //______________________________________________________________________________
238 AliAnalysisAlien::~AliAnalysisAlien()
241 if (fGridJDL) delete fGridJDL;
242 if (fMergingJDL) delete fMergingJDL;
243 if (fInputFiles) delete fInputFiles;
244 if (fPackages) delete fPackages;
247 //______________________________________________________________________________
248 AliAnalysisAlien &AliAnalysisAlien::operator=(const AliAnalysisAlien& other)
251 if (this != &other) {
252 AliAnalysisGrid::operator=(other);
253 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
254 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
255 fPrice = other.fPrice;
257 fSplitMaxInputFileNumber = other.fSplitMaxInputFileNumber;
258 fMaxInitFailed = other.fMaxInitFailed;
259 fMasterResubmitThreshold = other.fMasterResubmitThreshold;
260 fNtestFiles = other.fNtestFiles;
261 fNrunsPerMaster = other.fNrunsPerMaster;
262 fMaxMergeFiles = other.fMaxMergeFiles;
263 fNsubmitted = other.fNsubmitted;
264 fProductionMode = other.fProductionMode;
265 fOutputToRunNo = other.fOutputToRunNo;
266 fMergeViaJDL = other.fMergeViaJDL;
267 fFastReadOption = other.fFastReadOption;
268 fOverwriteMode = other.fOverwriteMode;
269 fNreplicas = other.fNreplicas;
270 fRunNumbers = other.fRunNumbers;
271 fExecutable = other.fExecutable;
272 fExecutableCommand = other.fExecutableCommand;
273 fArguments = other.fArguments;
274 fExecutableArgs = other.fExecutableArgs;
275 fAnalysisMacro = other.fAnalysisMacro;
276 fAnalysisSource = other.fAnalysisSource;
277 fAdditionalRootLibs = other.fAdditionalRootLibs;
278 fAdditionalLibs = other.fAdditionalLibs;
279 fSplitMode = other.fSplitMode;
280 fAPIVersion = other.fAPIVersion;
281 fROOTVersion = other.fROOTVersion;
282 fAliROOTVersion = other.fAliROOTVersion;
283 fExternalPackages = other.fExternalPackages;
285 fGridWorkingDir = other.fGridWorkingDir;
286 fGridDataDir = other.fGridDataDir;
287 fDataPattern = other.fDataPattern;
288 fGridOutputDir = other.fGridOutputDir;
289 fOutputArchive = other.fOutputArchive;
290 fOutputFiles = other.fOutputFiles;
291 fInputFormat = other.fInputFormat;
292 fDatasetName = other.fDatasetName;
293 fJDLName = other.fJDLName;
294 fMergeExcludes = other.fMergeExcludes;
295 fIncludePath = other.fIncludePath;
296 fCloseSE = other.fCloseSE;
297 fFriendChainName = other.fFriendChainName;
298 fJobTag = other.fJobTag;
299 fOutputSingle = other.fOutputSingle;
300 fRunPrefix = other.fRunPrefix;
301 if (other.fInputFiles) {
302 fInputFiles = new TObjArray();
303 TIter next(other.fInputFiles);
305 while ((obj=next())) fInputFiles->Add(new TObjString(obj->GetName()));
306 fInputFiles->SetOwner();
308 if (other.fPackages) {
309 fPackages = new TObjArray();
310 TIter next(other.fPackages);
312 while ((obj=next())) fPackages->Add(new TObjString(obj->GetName()));
313 fPackages->SetOwner();
319 //______________________________________________________________________________
320 void AliAnalysisAlien::AddIncludePath(const char *path)
322 // Add include path in the remote analysis macro.
324 if (p.Contains("-I")) fIncludePath += Form("%s ", path);
325 else fIncludePath += Form("-I%s ", path);
328 //______________________________________________________________________________
329 void AliAnalysisAlien::AddRunNumber(Int_t run)
331 // Add a run number to the list of runs to be processed.
332 if (fRunNumbers.Length()) fRunNumbers += " ";
333 fRunNumbers += Form("%s%d", fRunPrefix.Data(), run);
336 //______________________________________________________________________________
337 void AliAnalysisAlien::AddRunNumber(const char* run)
339 // Add a run number to the list of runs to be processed.
340 if (fRunNumbers.Length()) fRunNumbers += " ";
344 //______________________________________________________________________________
345 void AliAnalysisAlien::AddDataFile(const char *lfn)
347 // Adds a data file to the input to be analysed. The file should be a valid LFN
348 // or point to an existing file in the alien workdir.
349 if (!fInputFiles) fInputFiles = new TObjArray();
350 fInputFiles->Add(new TObjString(lfn));
353 //______________________________________________________________________________
354 void AliAnalysisAlien::AddExternalPackage(const char *package)
356 // Adds external packages w.r.t to the default ones (root,aliroot and gapi)
357 if (fExternalPackages) fExternalPackages += " ";
358 fExternalPackages += package;
361 //______________________________________________________________________________
362 Bool_t AliAnalysisAlien::Connect()
364 // Try to connect to AliEn. User needs a valid token and /tmp/gclient_env_$UID sourced.
365 if (gGrid && gGrid->IsConnected()) return kTRUE;
367 Info("Connect", "Trying to connect to AliEn ...");
368 TGrid::Connect("alien://");
370 if (!gGrid || !gGrid->IsConnected()) {
371 Error("Connect", "Did not managed to connect to AliEn. Make sure you have a valid token.");
374 fUser = gGrid->GetUser();
375 Info("Connect", "\n##### Connected to AliEn as user %s. Setting analysis user to <%s>", fUser.Data(), fUser.Data());
379 //______________________________________________________________________________
380 void AliAnalysisAlien::CdWork()
382 // Check validity of alien workspace. Create directory if possible.
384 Error("CdWork", "Alien connection required");
387 TString homedir = gGrid->GetHomeDirectory();
388 TString workdir = homedir + fGridWorkingDir;
389 if (DirectoryExists(workdir)) {
393 // Work directory not existing - create it
395 if (gGrid->Mkdir(workdir, "-p")) {
396 gGrid->Cd(fGridWorkingDir);
397 Info("CreateJDL", "\n##### Created alien working directory %s", fGridWorkingDir.Data());
399 Warning("CreateJDL", "Working directory %s cannot be created.\n Using %s instead.",
400 workdir.Data(), homedir.Data());
401 fGridWorkingDir = "";
405 //______________________________________________________________________________
406 Bool_t AliAnalysisAlien::CheckFileCopy(const char *alienpath)
408 // Check if file copying is possible.
410 Error("CheckFileCopy", "Not connected to AliEn. File copying cannot be tested.");
413 Info("CheckFileCopy", "Checking possibility to copy files to your AliEn home directory... \
414 \n +++ NOTE: You can disable this via: plugin->SetCheckCopy(kFALSE);");
415 // Check if alien_CLOSE_SE is defined
416 TString closeSE = gSystem->Getenv("alien_CLOSE_SE");
417 if (!closeSE.IsNull()) {
418 Info("CheckFileCopy", "Your current close storage is pointing to: \
419 \n alien_CLOSE_SE = \"%s\"", closeSE.Data());
421 Warning("CheckFileCopy", "Your current close storage is empty ! Depending on your location, file copying may fail.");
423 // Check if grid directory exists.
424 if (!DirectoryExists(alienpath)) {
425 Error("CheckFileCopy", "Alien path %s does not seem to exist", alienpath);
428 TFile f("plugin_test_copy", "RECREATE");
429 // User may not have write permissions to current directory
431 Error("CheckFileCopy", "Cannot create local test file. Do you have write access to current directory: <%s> ?",
432 gSystem->WorkingDirectory());
436 if (FileExists(Form("alien://%s/%s",alienpath, f.GetName()))) gGrid->Rm(Form("alien://%s/%s",alienpath, f.GetName()));
437 if (!TFile::Cp(f.GetName(), Form("alien://%s/%s",alienpath, f.GetName()))) {
438 Error("CheckFileCopy", "Cannot copy files to Alien destination: <%s> This may be temporary, or: \
439 \n# 1. Make sure you have write permissions there. If this is the case: \
440 \n# 2. Check the storage availability at: http://alimonitor.cern.ch/stats?page=SE/table \
441 \n# Do: export alien_CLOSE_SE=\"working_disk_SE\" \
442 \n# To make this permanent put in in your .bashrc (in .alienshrc is not enough) \
443 \n# Redo token: rm /tmp/x509up_u$UID then: alien-token-init <username>", alienpath);
444 gSystem->Unlink(f.GetName());
447 gSystem->Unlink(f.GetName());
448 gGrid->Rm(Form("%s%s",alienpath,f.GetName()));
449 Info("CheckFileCopy", "### ...SUCCESS ###");
453 //______________________________________________________________________________
454 Bool_t AliAnalysisAlien::CheckInputData()
456 // Check validity of input data. If necessary, create xml files.
457 if (!fInputFiles && !fRunNumbers.Length() && !fRunRange[0]) {
458 if (!fGridDataDir.Length()) {
459 Error("CkeckInputData", "AliEn path to base data directory must be set.\n = Use: SetGridDataDir()");
462 Info("CheckInputData", "Analysis will make a single xml for base data directory %s",fGridDataDir.Data());
463 if (fDataPattern.Contains("tag") && TestBit(AliAnalysisGrid::kTest))
464 TObject::SetBit(AliAnalysisGrid::kUseTags, kTRUE); // ADDED (fix problem in determining the tag usage in test mode)
467 // Process declared files
468 Bool_t isCollection = kFALSE;
469 Bool_t isXml = kFALSE;
470 Bool_t useTags = kFALSE;
471 Bool_t checked = kFALSE;
474 TString workdir = gGrid->GetHomeDirectory();
475 workdir += fGridWorkingDir;
478 TIter next(fInputFiles);
479 while ((objstr=(TObjString*)next())) {
482 file += objstr->GetString();
483 // Store full lfn path
484 if (FileExists(file)) objstr->SetString(file);
486 file = objstr->GetName();
487 if (!FileExists(objstr->GetName())) {
488 Error("CheckInputData", "Data file %s not found or not in your working dir: %s",
489 objstr->GetName(), workdir.Data());
493 Bool_t iscoll, isxml, usetags;
494 CheckDataType(file, iscoll, isxml, usetags);
497 isCollection = iscoll;
500 TObject::SetBit(AliAnalysisGrid::kUseTags, useTags);
502 if ((iscoll != isCollection) || (isxml != isXml) || (usetags != useTags)) {
503 Error("CheckInputData", "Some conflict was found in the types of inputs");
509 // Process requested run numbers
510 if (!fRunNumbers.Length() && !fRunRange[0]) return kTRUE;
511 // Check validity of alien data directory
512 if (!fGridDataDir.Length()) {
513 Error("CkeckInputData", "AliEn path to base data directory must be set.\n = Use: SetGridDataDir()");
516 if (!DirectoryExists(fGridDataDir)) {
517 Error("CheckInputData", "Data directory %s not existing.", fGridDataDir.Data());
521 Error("CheckInputData", "You are using raw AliEn collections as input. Cannot process run numbers.");
525 if (checked && !isXml) {
526 Error("CheckInputData", "Cannot mix processing of full runs with non-xml files");
529 // Check validity of run number(s)
533 TString schunk, schunk2;
537 useTags = fDataPattern.Contains("tag");
538 TObject::SetBit(AliAnalysisGrid::kUseTags, useTags);
540 if (useTags != fDataPattern.Contains("tag")) {
541 Error("CheckInputData", "Cannot mix input files using/not using tags");
544 if (fRunNumbers.Length()) {
545 Info("CheckDataType", "Using supplied run numbers (run ranges are ignored)");
546 arr = fRunNumbers.Tokenize(" ");
548 while ((os=(TObjString*)next())) {
549 path = Form("%s/%s ", fGridDataDir.Data(), os->GetString().Data());
550 if (!DirectoryExists(path)) {
551 Warning("CheckInputData", "Run number %s not found in path: <%s>", os->GetString().Data(), path.Data());
554 path = Form("%s/%s.xml", workdir.Data(),os->GetString().Data());
555 TString msg = "\n##### file: ";
557 msg += " type: xml_collection;";
558 if (useTags) msg += " using_tags: Yes";
559 else msg += " using_tags: No";
560 Info("CheckDataType", "%s", msg.Data());
561 if (fNrunsPerMaster<2) {
562 AddDataFile(Form("%s.xml", os->GetString().Data()));
565 if (((nruns-1)%fNrunsPerMaster) == 0) {
566 schunk = os->GetString();
568 if ((nruns%fNrunsPerMaster)!=0 && os!=arr->Last()) continue;
569 schunk += Form("_%s.xml", os->GetString().Data());
575 Info("CheckDataType", "Using run range [%d, %d]", fRunRange[0], fRunRange[1]);
576 for (Int_t irun=fRunRange[0]; irun<=fRunRange[1]; irun++) {
577 path = Form("%s/%s%d ", fGridDataDir.Data(), fRunPrefix.Data(), irun);
578 if (!DirectoryExists(path)) {
579 // Warning("CheckInputData", "Run number %d not found in path: <%s>", irun, path.Data());
582 path = Form("%s/%s%d.xml", workdir.Data(),fRunPrefix.Data(),irun);
583 TString msg = "\n##### file: ";
585 msg += " type: xml_collection;";
586 if (useTags) msg += " using_tags: Yes";
587 else msg += " using_tags: No";
588 Info("CheckDataType", "%s", msg.Data());
589 if (fNrunsPerMaster<2) {
590 AddDataFile(Form("%s%d.xml",fRunPrefix.Data(),irun));
593 if (((nruns-1)%fNrunsPerMaster) == 0) {
594 schunk = Form("%s%d", fRunPrefix.Data(),irun);
596 schunk2 = Form("_%s%d.xml", fRunPrefix.Data(), irun);
597 if ((nruns%fNrunsPerMaster)!=0 && irun != fRunRange[1]) continue;
610 //______________________________________________________________________________
611 Bool_t AliAnalysisAlien::CreateDataset(const char *pattern)
613 // Create dataset for the grid data directory + run number.
614 if (TestBit(AliAnalysisGrid::kOffline)) return kTRUE;
616 Error("CreateDataset", "Cannot create dataset with no grid connection");
622 TString workdir = gGrid->GetHomeDirectory();
623 workdir += fGridWorkingDir;
625 // Compose the 'find' command arguments
627 TString options = "-x collection ";
628 if (TestBit(AliAnalysisGrid::kTest)) options += Form("-l %d ", fNtestFiles);
629 TString conditions = "";
634 TString schunk, schunk2;
635 TGridCollection *cbase=0, *cadd=0;
636 if (!fRunNumbers.Length() && !fRunRange[0]) {
637 if (fInputFiles && fInputFiles->GetEntries()) return kTRUE;
638 // Make a single data collection from data directory.
640 if (!DirectoryExists(path)) {
641 Error("CreateDataset", "Path to data directory %s not valid",fGridDataDir.Data());
645 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
646 else file = Form("%s.xml", gSystem->BaseName(path));
647 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
653 command += conditions;
654 printf("command: %s\n", command.Data());
655 TGridResult *res = gGrid->Command(command);
657 // Write standard output to file
658 gROOT->ProcessLine(Form("gGrid->Stdout(); > %s", file.Data()));
659 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
660 Bool_t nullFile = kFALSE;
662 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
664 nullFile = (gSystem->Exec(Form("grep /event %s 2>/dev/null > /dev/null",file.Data()))==0)?kFALSE:kTRUE;
666 Error("CreateDataset","Dataset %s produced by the previous find command is empty !", file.Data());
671 Bool_t fileExists = FileExists(file);
672 if (!TestBit(AliAnalysisGrid::kTest) && (!fileExists || fOverwriteMode)) {
673 // Copy xml file to alien space
674 if (fileExists) gGrid->Rm(file);
675 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
676 if (!FileExists(file)) {
677 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
680 // Update list of files to be processed.
682 AddDataFile(Form("%s/%s", workdir.Data(), file.Data()));
686 Bool_t nullResult = kTRUE;
687 if (fRunNumbers.Length()) {
688 TObjArray *arr = fRunNumbers.Tokenize(" ");
691 while ((os=(TObjString*)next())) {
692 path = Form("%s/%s ", fGridDataDir.Data(), os->GetString().Data());
693 if (!DirectoryExists(path)) continue;
695 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
696 else file = Form("%s.xml", os->GetString().Data());
697 // If local collection file does not exist, create it via 'find' command.
698 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
703 command += conditions;
704 TGridResult *res = gGrid->Command(command);
706 // Write standard output to file
707 gROOT->ProcessLine(Form("gGrid->Stdout(); > %s", file.Data()));
708 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
709 Bool_t nullFile = kFALSE;
711 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
713 nullFile = (gSystem->Exec(Form("grep /event %s 2>/dev/null > /dev/null",file.Data()))==0)?kFALSE:kTRUE;
715 Warning("CreateDataset","Dataset %s produced by: <%s> is empty !", file.Data(), command.Data());
716 fRunNumbers.ReplaceAll(os->GetString().Data(), "");
722 if (TestBit(AliAnalysisGrid::kTest)) break;
723 // Check if there is one run per master job.
724 if (fNrunsPerMaster<2) {
725 if (FileExists(file)) {
726 if (fOverwriteMode) gGrid->Rm(file);
728 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
732 // Copy xml file to alien space
733 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
734 if (!FileExists(file)) {
735 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
741 if (((nruns-1)%fNrunsPerMaster) == 0) {
742 schunk = os->GetString();
743 cbase = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
745 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
746 printf(" Merging collection <%s> into masterjob input...\n", file.Data());
750 if ((nruns%fNrunsPerMaster)!=0 && os!=arr->Last()) {
753 schunk += Form("_%s.xml", os->GetString().Data());
754 if (FileExists(schunk)) {
755 if (fOverwriteMode) gGrid->Rm(file);
757 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", schunk.Data());
761 printf("Exporting merged collection <%s> and copying to AliEn\n", schunk.Data());
762 cbase->ExportXML(Form("file://%s", schunk.Data()),kFALSE,kFALSE, schunk, "Merged runs");
763 TFile::Cp(Form("file:%s",schunk.Data()), Form("alien://%s/%s",workdir.Data(), schunk.Data()));
764 if (!FileExists(schunk)) {
765 Error("CreateDataset", "Copy command did NOT succeed for %s", schunk.Data());
773 Error("CreateDataset", "No valid dataset corresponding to the query!");
777 // Process a full run range.
778 for (Int_t irun=fRunRange[0]; irun<=fRunRange[1]; irun++) {
779 path = Form("%s/%s%d ", fGridDataDir.Data(), fRunPrefix.Data(), irun);
780 if (!DirectoryExists(path)) continue;
782 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
783 else file = Form("%s%d.xml", fRunPrefix.Data(), irun);
784 if (FileExists(file) && fNrunsPerMaster<2 && !TestBit(AliAnalysisGrid::kTest)) {
785 if (fOverwriteMode) gGrid->Rm(file);
787 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
791 // If local collection file does not exist, create it via 'find' command.
792 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
797 command += conditions;
798 TGridResult *res = gGrid->Command(command);
800 // Write standard output to file
801 gROOT->ProcessLine(Form("gGrid->Stdout(); > %s", file.Data()));
802 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
803 Bool_t nullFile = kFALSE;
805 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
807 nullFile = (gSystem->Exec(Form("grep /event %s 2>/dev/null > /dev/null",file.Data()))==0)?kFALSE:kTRUE;
809 Warning("CreateDataset","Dataset %s produced by: <%s> is empty !", file.Data(), command.Data());
815 if (TestBit(AliAnalysisGrid::kTest)) break;
816 // Check if there is one run per master job.
817 if (fNrunsPerMaster<2) {
818 if (FileExists(file)) {
819 if (fOverwriteMode) gGrid->Rm(file);
821 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
825 // Copy xml file to alien space
826 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
827 if (!FileExists(file)) {
828 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
833 // Check if the collection for the chunk exist locally.
834 Int_t nchunk = (nruns-1)/fNrunsPerMaster;
835 if (FileExists(fInputFiles->At(nchunk)->GetName())) {
836 if (fOverwriteMode) gGrid->Rm(fInputFiles->At(nchunk)->GetName());
839 printf(" Merging collection <%s> into %d runs chunk...\n",file.Data(),fNrunsPerMaster);
840 if (((nruns-1)%fNrunsPerMaster) == 0) {
841 schunk = Form("%s%d", fRunPrefix.Data(), irun);
842 cbase = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
844 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
848 schunk2 = Form("%s_%s%d.xml", schunk.Data(), fRunPrefix.Data(), irun);
849 if ((nruns%fNrunsPerMaster)!=0 && irun!=fRunRange[1] && schunk2 != fInputFiles->Last()->GetName()) {
853 if (FileExists(schunk)) {
854 if (fOverwriteMode) gGrid->Rm(schunk);
856 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", schunk.Data());
860 printf("Exporting merged collection <%s> and copying to AliEn.\n", schunk.Data());
861 cbase->ExportXML(Form("file://%s", schunk.Data()),kFALSE,kFALSE, schunk, "Merged runs");
862 if (FileExists(schunk)) {
863 if (fOverwriteMode) gGrid->Rm(schunk);
865 Info("CreateDataset", "\n##### Dataset %s exist. Skipping copy...", schunk.Data());
869 TFile::Cp(Form("file:%s",schunk.Data()), Form("alien://%s/%s",workdir.Data(), schunk.Data()));
870 if (!FileExists(schunk)) {
871 Error("CreateDataset", "Copy command did NOT succeed for %s", schunk.Data());
877 Error("CreateDataset", "No valid dataset corresponding to the query!");
884 //______________________________________________________________________________
885 Bool_t AliAnalysisAlien::CreateJDL()
887 // Generate a JDL file according to current settings. The name of the file is
888 // specified by fJDLName.
889 Bool_t error = kFALSE;
892 if (TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
893 Bool_t generate = kTRUE;
894 if (TestBit(AliAnalysisGrid::kTest) || TestBit(AliAnalysisGrid::kSubmit)) generate = kFALSE;
896 Error("CreateJDL", "Alien connection required");
899 // Check validity of alien workspace
901 TString workdir = gGrid->GetHomeDirectory();
902 workdir += fGridWorkingDir;
906 Error("CreateJDL()", "Define some input files for your analysis.");
909 // Compose list of input files
910 // Check if output files were defined
911 if (!fOutputFiles.Length()) {
912 Error("CreateJDL", "You must define at least one output file");
915 // Check if an output directory was defined and valid
916 if (!fGridOutputDir.Length()) {
917 Error("CreateJDL", "You must define AliEn output directory");
920 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s", workdir.Data(), fGridOutputDir.Data());
921 if (!DirectoryExists(fGridOutputDir)) {
922 if (gGrid->Mkdir(fGridOutputDir,"-p")) {
923 Info("CreateJDL", "\n##### Created alien output directory %s", fGridOutputDir.Data());
925 Error("CreateJDL", "Could not create alien output directory %s", fGridOutputDir.Data());
931 // Exit if any error up to now
932 if (error) return kFALSE;
934 if (!fUser.IsNull()) {
935 fGridJDL->SetValue("User", Form("\"%s\"", fUser.Data()));
936 fMergingJDL->SetValue("User", Form("\"%s\"", fUser.Data()));
938 fGridJDL->SetExecutable(fExecutable, "This is the startup script");
939 TString mergeExec = fExecutable;
940 mergeExec.ReplaceAll(".sh", "_merge.sh");
941 fMergingJDL->SetExecutable(mergeExec, "This is the startup script");
942 mergeExec.ReplaceAll(".sh", ".C");
943 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),mergeExec.Data()), "List of input files to be uploaded to workers");
944 if (!fArguments.IsNull())
945 fGridJDL->SetArguments(fArguments, "Arguments for the executable command");
946 fMergingJDL->SetArguments("$1 $2 $3");
947 fGridJDL->SetValue("TTL", Form("\"%d\"",fTTL));
948 fGridJDL->SetDescription("TTL", Form("Time after which the job is killed (%d min.)", fTTL/60));
949 fMergingJDL->SetValue("TTL", Form("\"%d\"",fTTL));
950 fMergingJDL->SetDescription("TTL", Form("Time after which the job is killed (%d min.)", fTTL/60));
952 if (fMaxInitFailed > 0) {
953 fGridJDL->SetValue("MaxInitFailed", Form("\"%d\"",fMaxInitFailed));
954 fGridJDL->SetDescription("MaxInitFailed", "Maximum number of first failing jobs to abort the master job");
956 if (fSplitMaxInputFileNumber > 0) {
957 fGridJDL->SetValue("SplitMaxInputFileNumber", Form("\"%d\"", fSplitMaxInputFileNumber));
958 fGridJDL->SetDescription("SplitMaxInputFileNumber", "Maximum number of input files to be processed per subjob");
960 if (fSplitMode.Length()) {
961 fGridJDL->SetValue("Split", Form("\"%s\"", fSplitMode.Data()));
962 fGridJDL->SetDescription("Split", "We split per SE or file");
964 if (!fAliROOTVersion.IsNull()) {
965 fGridJDL->AddToPackages("AliRoot", fAliROOTVersion,"VO_ALICE", "List of requested packages");
966 fMergingJDL->AddToPackages("AliRoot", fAliROOTVersion, "VO_ALICE", "List of requested packages");
968 if (!fROOTVersion.IsNull()) {
969 fGridJDL->AddToPackages("ROOT", fROOTVersion);
970 fMergingJDL->AddToPackages("ROOT", fROOTVersion);
972 if (!fAPIVersion.IsNull()) {
973 fGridJDL->AddToPackages("APISCONFIG", fAPIVersion);
974 fMergingJDL->AddToPackages("APISCONFIG", fAPIVersion);
976 if (!fExternalPackages.IsNull()) {
977 arr = fExternalPackages.Tokenize(" ");
979 while ((os=(TObjString*)next())) {
980 TString pkgname = os->GetString();
981 Int_t index = pkgname.Index("::");
982 TString pkgversion = pkgname(index+2, pkgname.Length());
983 pkgname.Remove(index);
984 fGridJDL->AddToPackages(pkgname, pkgversion);
985 fMergingJDL->AddToPackages(pkgname, pkgversion);
989 fGridJDL->SetInputDataListFormat(fInputFormat, "Format of input data");
990 fGridJDL->SetInputDataList("wn.xml", "Collection name to be processed on each worker node");
991 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), fAnalysisMacro.Data()), "List of input files to be uploaded to workers");
992 TString analysisFile = fExecutable;
993 analysisFile.ReplaceAll(".sh", ".root");
994 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),analysisFile.Data()));
995 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),analysisFile.Data()));
996 if (IsUsingTags() && !gSystem->AccessPathName("ConfigureCuts.C"))
997 fGridJDL->AddToInputSandbox(Form("LF:%s/ConfigureCuts.C", workdir.Data()));
998 if (fAdditionalLibs.Length()) {
999 arr = fAdditionalLibs.Tokenize(" ");
1001 while ((os=(TObjString*)next())) {
1002 if (os->GetString().Contains(".so")) continue;
1003 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), os->GetString().Data()));
1004 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), os->GetString().Data()));
1009 TIter next(fPackages);
1011 while ((obj=next())) {
1012 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), obj->GetName()));
1013 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), obj->GetName()));
1016 if (fOutputArchive.Length()) {
1017 arr = fOutputArchive.Tokenize(" ");
1019 Bool_t first = kTRUE;
1020 const char *comment = "Files to be archived";
1021 const char *comment1 = comment;
1022 while ((os=(TObjString*)next())) {
1023 if (!first) comment = NULL;
1024 if (!os->GetString().Contains("@") && fCloseSE.Length())
1025 fGridJDL->AddToOutputArchive(Form("%s@%s",os->GetString().Data(), fCloseSE.Data()), comment);
1027 fGridJDL->AddToOutputArchive(os->GetString(), comment);
1031 TString outputArchive = fOutputArchive;
1032 if (!fMergeExcludes.IsNull()) {
1033 arr = fMergeExcludes.Tokenize(" ");
1035 while ((os=(TObjString*)next1())) {
1036 outputArchive.ReplaceAll(Form("%s,",os->GetString().Data()),"");
1037 outputArchive.ReplaceAll(os->GetString(),"");
1041 arr = outputArchive.Tokenize(" ");
1045 while ((os=(TObjString*)next2())) {
1046 if (!first) comment = NULL;
1047 TString currentfile = os->GetString();
1048 currentfile.ReplaceAll(".root", "*.root");
1049 currentfile.ReplaceAll(".zip", "-Stage$2_$3.zip");
1050 if (!currentfile.Contains("@") && fCloseSE.Length())
1051 fMergingJDL->AddToOutputArchive(Form("%s@%s",currentfile.Data(), fCloseSE.Data()), comment);
1053 fMergingJDL->AddToOutputArchive(currentfile, comment);
1058 arr = fOutputFiles.Tokenize(",");
1060 Bool_t first = kTRUE;
1061 const char *comment = "Files to be archived";
1062 const char *comment1 = comment;
1063 while ((os=(TObjString*)next())) {
1064 // Ignore ouputs in jdl that are also in outputarchive
1065 TString sout = os->GetString();
1066 if (sout.Index("@")>0) sout.Remove(sout.Index("@"));
1067 if (fOutputArchive.Contains(sout)) continue;
1068 if (!first) comment = NULL;
1069 if (!os->GetString().Contains("@") && fCloseSE.Length())
1070 fGridJDL->AddToOutputSandbox(Form("%s@%s",os->GetString().Data(), fCloseSE.Data()), comment);
1072 fGridJDL->AddToOutputSandbox(os->GetString(), comment);
1076 if (fOutputFiles.Length()) {
1077 TString outputFiles = fOutputFiles;
1078 if (!fMergeExcludes.IsNull()) {
1079 arr = fMergeExcludes.Tokenize(" ");
1081 while ((os=(TObjString*)next1())) {
1082 outputFiles.ReplaceAll(Form("%s,",os->GetString().Data()),"");
1083 outputFiles.ReplaceAll(os->GetString(),"");
1087 arr = outputFiles.Tokenize(" ");
1091 while ((os=(TObjString*)next2())) {
1092 // Ignore ouputs in jdl that are also in outputarchive
1093 TString sout = os->GetString();
1094 if (sout.Index("@")>0) sout.Remove(sout.Index("@"));
1095 if (fOutputArchive.Contains(sout)) continue;
1096 if (!first) comment = NULL;
1097 if (!os->GetString().Contains("@") && fCloseSE.Length())
1098 fMergingJDL->AddToOutputSandbox(Form("%s@%s",os->GetString().Data(), fCloseSE.Data()), comment);
1100 fMergingJDL->AddToOutputSandbox(os->GetString(), comment);
1105 fGridJDL->SetPrice((UInt_t)fPrice, "AliEn price for this job");
1106 fMergingJDL->SetPrice((UInt_t)fPrice, "AliEn price for this job");
1107 TString validationScript = fExecutable;
1108 validationScript.ReplaceAll(".sh", "_validation.sh");
1109 fGridJDL->SetValidationCommand(Form("%s/%s", workdir.Data(),validationScript.Data()), "Validation script to be run for each subjob");
1110 validationScript = fExecutable;
1111 validationScript.ReplaceAll(".sh", "_mergevalidation.sh");
1112 fMergingJDL->SetValidationCommand(Form("%s/%s", workdir.Data(),validationScript.Data()), "Validation script to be run for each subjob");
1113 if (fMasterResubmitThreshold) {
1114 fGridJDL->SetValue("MasterResubmitThreshold", Form("\"%d%%\"", fMasterResubmitThreshold));
1115 fGridJDL->SetDescription("MasterResubmitThreshold", "Resubmit failed jobs until DONE rate reaches this percentage");
1117 // Write a jdl with 2 input parameters: collection name and output dir name.
1120 // Copy jdl to grid workspace
1122 // Check if an output directory was defined and valid
1123 if (!fGridOutputDir.Length()) {
1124 Error("CreateJDL", "You must define AliEn output directory");
1127 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s", workdir.Data(), fGridOutputDir.Data());
1128 if (!fProductionMode && !DirectoryExists(fGridOutputDir)) {
1129 if (gGrid->Mkdir(fGridOutputDir,"-p")) {
1130 Info("CreateJDL", "\n##### Created alien output directory %s", fGridOutputDir.Data());
1132 Error("CreateJDL", "Could not create alien output directory %s", fGridOutputDir.Data());
1138 if (TestBit(AliAnalysisGrid::kSubmit)) {
1139 TString mergeJDLName = fExecutable;
1140 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
1141 TString locjdl = Form("%s/%s", fGridOutputDir.Data(),fJDLName.Data());
1142 TString locjdl1 = Form("%s/%s", fGridOutputDir.Data(),mergeJDLName.Data());
1143 if (fProductionMode) {
1144 locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
1145 locjdl1 = Form("%s/%s", workdir.Data(),mergeJDLName.Data());
1147 if (FileExists(locjdl)) gGrid->Rm(locjdl);
1148 if (FileExists(locjdl1)) gGrid->Rm(locjdl1);
1149 Info("CreateJDL", "\n##### Copying JDL file <%s> to your AliEn output directory", fJDLName.Data());
1150 TFile::Cp(Form("file:%s",fJDLName.Data()), Form("alien://%s", locjdl.Data()));
1152 Info("CreateJDL", "\n##### Copying merging JDL file <%s> to your AliEn output directory", mergeJDLName.Data());
1153 TFile::Cp(Form("file:%s",mergeJDLName.Data()), Form("alien://%s", locjdl1.Data()));
1156 if (fAdditionalLibs.Length()) {
1157 arr = fAdditionalLibs.Tokenize(" ");
1160 while ((os=(TObjString*)next())) {
1161 if (os->GetString().Contains(".so")) continue;
1162 Info("CreateJDL", "\n##### Copying dependency: <%s> to your alien workspace", os->GetString().Data());
1163 if (FileExists(os->GetString())) gGrid->Rm(os->GetString());
1164 TFile::Cp(Form("file:%s",os->GetString().Data()), Form("alien://%s/%s", workdir.Data(), os->GetString().Data()));
1169 TIter next(fPackages);
1171 while ((obj=next())) {
1172 if (FileExists(obj->GetName())) gGrid->Rm(obj->GetName());
1173 Info("CreateJDL", "\n##### Copying dependency: <%s> to your alien workspace", obj->GetName());
1174 TFile::Cp(Form("file:%s",obj->GetName()), Form("alien://%s/%s", workdir.Data(), obj->GetName()));
1181 //______________________________________________________________________________
1182 Bool_t AliAnalysisAlien::WriteJDL(Bool_t copy)
1184 // Writes one or more JDL's corresponding to findex. If findex is negative,
1185 // all run numbers are considered in one go (jdl). For non-negative indices
1186 // they correspond to the indices in the array fInputFiles.
1187 if (!fInputFiles) return kFALSE;
1189 TString workdir = gGrid->GetHomeDirectory();
1190 workdir += fGridWorkingDir;
1192 if (!fRunNumbers.Length() && !fRunRange[0]) {
1193 // One jdl with no parameters in case input data is specified by name.
1194 TIter next(fInputFiles);
1195 while ((os=(TObjString*)next()))
1196 fGridJDL->AddToInputDataCollection(Form("LF:%s,nodownload", os->GetString().Data()), "Input xml collections");
1197 if (!fOutputSingle.IsNull())
1198 fGridJDL->SetOutputDirectory(Form("#alienfulldir#/../%s",fOutputSingle.Data()), "Output directory");
1200 fGridJDL->SetOutputDirectory(Form("%s/#alien_counter_03i#", fGridOutputDir.Data()), "Output directory");
1201 fMergingJDL->SetOutputDirectory(fGridOutputDir);
1204 // One jdl to be submitted with 2 input parameters: data collection name and output dir prefix
1205 fGridJDL->AddToInputDataCollection(Form("LF:%s/$1,nodownload", workdir.Data()), "Input xml collections");
1206 if (!fOutputSingle.IsNull()) {
1207 if (!fOutputToRunNo) fGridJDL->SetOutputDirectory(Form("#alienfulldir#/%s",fOutputSingle.Data()), "Output directory");
1208 else fGridJDL->SetOutputDirectory(Form("%s/$2",fGridOutputDir.Data()), "Output directory");
1210 fGridJDL->SetOutputDirectory(Form("%s/$2/#alien_counter_03i#", fGridOutputDir.Data()), "Output directory");
1211 fMergingJDL->SetOutputDirectory("$1", "Output directory");
1216 // Generate the JDL as a string
1217 TString sjdl = fGridJDL->Generate();
1218 TString sjdl1 = fMergingJDL->Generate();
1220 sjdl.ReplaceAll("\"LF:", "\n \"LF:");
1221 sjdl.ReplaceAll("(member", "\n (member");
1222 sjdl.ReplaceAll("\",\"VO_", "\",\n \"VO_");
1223 sjdl.ReplaceAll("{", "{\n ");
1224 sjdl.ReplaceAll("};", "\n};");
1225 sjdl.ReplaceAll("{\n \n", "{\n");
1226 sjdl.ReplaceAll("\n\n", "\n");
1227 sjdl.ReplaceAll("OutputDirectory", "OutputDir");
1228 sjdl1.ReplaceAll("\"LF:", "\n \"LF:");
1229 sjdl1.ReplaceAll("(member", "\n (member");
1230 sjdl1.ReplaceAll("\",\"VO_", "\",\n \"VO_");
1231 sjdl1.ReplaceAll("{", "{\n ");
1232 sjdl1.ReplaceAll("};", "\n};");
1233 sjdl1.ReplaceAll("{\n \n", "{\n");
1234 sjdl1.ReplaceAll("\n\n", "\n");
1235 sjdl1.ReplaceAll("OutputDirectory", "OutputDir");
1236 sjdl += "JDLVariables = \n{\n \"Packages\",\n \"OutputDir\"\n};\n";
1237 sjdl.Prepend(Form("Jobtag = {\n \"comment:%s\"\n};\n", fJobTag.Data()));
1238 index = sjdl.Index("JDLVariables");
1239 if (index >= 0) sjdl.Insert(index, "\n# JDL variables\n");
1240 sjdl += "Workdirectorysize = {\"5000MB\"};";
1241 sjdl1 += "JDLVariables = \n{\n \"Packages\",\n \"OutputDir\"\n};\n";
1242 sjdl1.Prepend(Form("Jobtag = {\n \"comment:%s_Merging\"\n};\n", fJobTag.Data()));
1243 sjdl1.Prepend("# Generated merging jdl\n# $1 = full alien path to output directory to be merged\n# $2 = merging stage\n# $3 = merged chunk\n");
1244 index = sjdl1.Index("JDLVariables");
1245 if (index >= 0) sjdl1.Insert(index, "\n# JDL variables\n");
1246 sjdl1 += "Workdirectorysize = {\"5000MB\"};";
1247 // Write jdl to file
1249 out.open(fJDLName.Data(), ios::out);
1251 Error("CreateJDL", "Bad file name: %s", fJDLName.Data());
1254 out << sjdl << endl;
1255 TString mergeJDLName = fExecutable;
1256 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
1259 out1.open(mergeJDLName.Data(), ios::out);
1261 Error("CreateJDL", "Bad file name: %s", mergeJDLName.Data());
1264 out1 << sjdl1 << endl;
1267 // Copy jdl to grid workspace
1269 Info("CreateJDL", "\n##### You may want to review jdl:%s and analysis macro:%s before running in <submit> mode", fJDLName.Data(), fAnalysisMacro.Data());
1271 TString locjdl = Form("%s/%s", fGridOutputDir.Data(),fJDLName.Data());
1272 TString locjdl1 = Form("%s/%s", fGridOutputDir.Data(),mergeJDLName.Data());
1273 if (fProductionMode) {
1274 locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
1275 locjdl1 = Form("%s/%s", workdir.Data(),mergeJDLName.Data());
1277 if (FileExists(locjdl)) gGrid->Rm(locjdl);
1278 if (FileExists(locjdl1)) gGrid->Rm(locjdl1);
1279 Info("CreateJDL", "\n##### Copying JDL file <%s> to your AliEn output directory", fJDLName.Data());
1280 TFile::Cp(Form("file:%s",fJDLName.Data()), Form("alien://%s", locjdl.Data()));
1282 Info("CreateJDL", "\n##### Copying merging JDL file <%s> to your AliEn output directory", mergeJDLName.Data());
1283 TFile::Cp(Form("file:%s",mergeJDLName.Data()), Form("alien://%s", locjdl1.Data()));
1289 //______________________________________________________________________________
1290 Bool_t AliAnalysisAlien::FileExists(const char *lfn)
1292 // Returns true if file exists.
1293 if (!gGrid) return kFALSE;
1294 TGridResult *res = gGrid->Ls(lfn);
1295 if (!res) return kFALSE;
1296 TMap *map = dynamic_cast<TMap*>(res->At(0));
1301 TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("name"));
1302 if (!objs || !objs->GetString().Length()) {
1310 //______________________________________________________________________________
1311 Bool_t AliAnalysisAlien::DirectoryExists(const char *dirname)
1313 // Returns true if directory exists. Can be also a path.
1314 if (!gGrid) return kFALSE;
1315 // Check if dirname is a path
1316 TString dirstripped = dirname;
1317 dirstripped = dirstripped.Strip();
1318 dirstripped = dirstripped.Strip(TString::kTrailing, '/');
1319 TString dir = gSystem->BaseName(dirstripped);
1321 TString path = gSystem->DirName(dirstripped);
1322 TGridResult *res = gGrid->Ls(path, "-F");
1323 if (!res) return kFALSE;
1327 while ((map=dynamic_cast<TMap*>(next()))) {
1328 obj = map->GetValue("name");
1330 if (dir == obj->GetName()) {
1339 //______________________________________________________________________________
1340 void AliAnalysisAlien::CheckDataType(const char *lfn, Bool_t &isCollection, Bool_t &isXml, Bool_t &useTags)
1342 // Check input data type.
1343 isCollection = kFALSE;
1347 Error("CheckDataType", "No connection to grid");
1350 isCollection = IsCollection(lfn);
1351 TString msg = "\n##### file: ";
1354 msg += " type: raw_collection;";
1355 // special treatment for collections
1357 // check for tag files in the collection
1358 TGridResult *res = gGrid->Command(Form("listFilesFromCollection -z -v %s",lfn), kFALSE);
1360 msg += " using_tags: No (unknown)";
1361 Info("CheckDataType", "%s", msg.Data());
1364 const char* typeStr = res->GetKey(0, "origLFN");
1365 if (!typeStr || !strlen(typeStr)) {
1366 msg += " using_tags: No (unknown)";
1367 Info("CheckDataType", "%s", msg.Data());
1370 TString file = typeStr;
1371 useTags = file.Contains(".tag");
1372 if (useTags) msg += " using_tags: Yes";
1373 else msg += " using_tags: No";
1374 Info("CheckDataType", "%s", msg.Data());
1379 isXml = slfn.Contains(".xml");
1381 // Open xml collection and check if there are tag files inside
1382 msg += " type: xml_collection;";
1383 TGridCollection *coll = (TGridCollection*)gROOT->ProcessLine(Form("TAlienCollection::Open(\"alien://%s\",1);",lfn));
1385 msg += " using_tags: No (unknown)";
1386 Info("CheckDataType", "%s", msg.Data());
1389 TMap *map = coll->Next();
1391 msg += " using_tags: No (unknown)";
1392 Info("CheckDataType", "%s", msg.Data());
1395 map = (TMap*)map->GetValue("");
1397 if (map && map->GetValue("name")) file = map->GetValue("name")->GetName();
1398 useTags = file.Contains(".tag");
1400 if (useTags) msg += " using_tags: Yes";
1401 else msg += " using_tags: No";
1402 Info("CheckDataType", "%s", msg.Data());
1405 useTags = slfn.Contains(".tag");
1406 if (slfn.Contains(".root")) msg += " type: root file;";
1407 else msg += " type: unknown file;";
1408 if (useTags) msg += " using_tags: Yes";
1409 else msg += " using_tags: No";
1410 Info("CheckDataType", "%s", msg.Data());
1413 //______________________________________________________________________________
1414 void AliAnalysisAlien::EnablePackage(const char *package)
1416 // Enables a par file supposed to exist in the current directory.
1417 TString pkg(package);
1418 pkg.ReplaceAll(".par", "");
1420 if (gSystem->AccessPathName(pkg)) {
1421 Fatal("EnablePackage", "Package %s not found", pkg.Data());
1424 if (!TObject::TestBit(AliAnalysisGrid::kUsePars))
1425 Info("EnablePackage", "AliEn plugin will use .par packages");
1426 TObject::SetBit(AliAnalysisGrid::kUsePars, kTRUE);
1428 fPackages = new TObjArray();
1429 fPackages->SetOwner();
1431 fPackages->Add(new TObjString(pkg));
1434 //______________________________________________________________________________
1435 const char *AliAnalysisAlien::GetJobStatus(Int_t jobidstart, Int_t lastid, Int_t &nrunning, Int_t &nwaiting, Int_t &nerror, Int_t &ndone)
1437 // Get job status for all jobs with jobid>jobidstart.
1438 static char mstatus[20];
1444 TGridJobStatusList *list = gGrid->Ps("");
1445 if (!list) return mstatus;
1446 Int_t nentries = list->GetSize();
1447 TGridJobStatus *status;
1449 for (Int_t ijob=0; ijob<nentries; ijob++) {
1450 status = (TGridJobStatus *)list->At(ijob);
1451 pid = gROOT->ProcessLine(Form("atoi(((TAlienJobStatus*)0x%lx)->GetKey(\"queueId\"));", (ULong_t)status));
1452 if (pid<jobidstart) continue;
1453 if (pid == lastid) {
1454 gROOT->ProcessLine(Form("sprintf((char*)0x%lx,((TAlienJobStatus*)0x%lx)->GetKey(\"status\"));",(ULong_t)mstatus, (ULong_t)status));
1456 switch (status->GetStatus()) {
1457 case TGridJobStatus::kWAITING:
1459 case TGridJobStatus::kRUNNING:
1461 case TGridJobStatus::kABORTED:
1462 case TGridJobStatus::kFAIL:
1463 case TGridJobStatus::kUNKNOWN:
1465 case TGridJobStatus::kDONE:
1474 //______________________________________________________________________________
1475 Bool_t AliAnalysisAlien::IsCollection(const char *lfn) const
1477 // Returns true if file is a collection. Functionality duplicated from
1478 // TAlien::Type() because we don't want to directly depend on TAlien.
1480 Error("IsCollection", "No connection to grid");
1483 TGridResult *res = gGrid->Command(Form("type -z %s",lfn),kFALSE);
1484 if (!res) return kFALSE;
1485 const char* typeStr = res->GetKey(0, "type");
1486 if (!typeStr || !strlen(typeStr)) return kFALSE;
1487 if (!strcmp(typeStr, "collection")) return kTRUE;
1492 //______________________________________________________________________________
1493 Bool_t AliAnalysisAlien::IsSingleOutput() const
1495 // Check if single-ouput option is on.
1496 return (!fOutputSingle.IsNull());
1499 //______________________________________________________________________________
1500 void AliAnalysisAlien::Print(Option_t *) const
1502 // Print current plugin settings.
1503 printf("### AliEn analysis plugin current settings ###\n");
1504 printf("= OverwriteMode:________________________________ %d\n", fOverwriteMode);
1505 if (fOverwriteMode) {
1506 printf("***** NOTE: Overwrite mode will overwrite the input generated datasets and partial results from previous analysis. \
1507 \n***** To disable, use: plugin->SetOverwriteMode(kFALSE);\n");
1509 printf("= Copy files to grid: __________________________ %s\n", (IsUseCopy())?"YES":"NO");
1510 printf("= Check if files can be copied to grid: ________ %s\n", (IsCheckCopy())?"YES":"NO");
1511 printf("= Production mode:______________________________ %d\n", fProductionMode);
1512 printf("= Version of API requested: ____________________ %s\n", fAPIVersion.Data());
1513 printf("= Version of ROOT requested: ___________________ %s\n", fROOTVersion.Data());
1514 printf("= Version of AliRoot requested: ________________ %s\n", fAliROOTVersion.Data());
1516 printf("= User running the plugin: _____________________ %s\n", fUser.Data());
1517 printf("= Grid workdir relative to user $HOME: _________ %s\n", fGridWorkingDir.Data());
1518 printf("= Grid output directory relative to workdir: ___ %s\n", fGridOutputDir.Data());
1519 printf("= Data base directory path requested: __________ %s\n", fGridDataDir.Data());
1520 printf("= Data search pattern: _________________________ %s\n", fDataPattern.Data());
1521 printf("= Input data format: ___________________________ %s\n", fInputFormat.Data());
1522 if (fRunNumbers.Length())
1523 printf("= Run numbers to be processed: _________________ %s\n", fRunNumbers.Data());
1525 printf("= Run range to be processed: ___________________ %s%d-%s%d\n", fRunPrefix.Data(), fRunRange[0], fRunPrefix.Data(), fRunRange[1]);
1526 if (!fRunRange[0] && !fRunNumbers.Length()) {
1527 TIter next(fInputFiles);
1530 while ((obj=next())) list += obj->GetName();
1531 printf("= Input files to be processed: _________________ %s\n", list.Data());
1533 if (TestBit(AliAnalysisGrid::kTest))
1534 printf("= Number of input files used in test mode: _____ %d\n", fNtestFiles);
1535 printf("= List of output files to be registered: _______ %s\n", fOutputFiles.Data());
1536 printf("= List of outputs going to be archived: ________ %s\n", fOutputArchive.Data());
1537 printf("= List of outputs that should not be merged: ___ %s\n", fMergeExcludes.Data());
1538 printf("=====================================================================\n");
1539 printf("= Job price: ___________________________________ %d\n", fPrice);
1540 printf("= Time to live (TTL): __________________________ %d\n", fTTL);
1541 printf("= Max files per subjob: ________________________ %d\n", fSplitMaxInputFileNumber);
1542 if (fMaxInitFailed>0)
1543 printf("= Max number of subjob fails to kill: __________ %d\n", fMaxInitFailed);
1544 if (fMasterResubmitThreshold>0)
1545 printf("= Resubmit master job if failed subjobs >_______ %d\n", fMasterResubmitThreshold);
1546 printf("= Number of replicas for the output files_______ %d\n", fNreplicas);
1547 if (fNrunsPerMaster>0)
1548 printf("= Number of runs per master job: _______________ %d\n", fNrunsPerMaster);
1549 printf("= Number of files in one chunk to be merged: ___ %d\n", fMaxMergeFiles);
1550 printf("= Name of the generated execution script: ______ %s\n", fExecutable.Data());
1551 printf("= Executable command: __________________________ %s\n", fExecutableCommand.Data());
1552 if (fArguments.Length())
1553 printf("= Arguments for the execution script: __________ %s\n",fArguments.Data());
1554 if (fExecutableArgs.Length())
1555 printf("= Arguments after macro name in executable______ %s\n",fExecutableArgs.Data());
1556 printf("= Name of the generated analysis macro: ________ %s\n",fAnalysisMacro.Data());
1557 printf("= User analysis files to be deployed: __________ %s\n",fAnalysisSource.Data());
1558 printf("= Additional libs to be loaded or souces to be compiled runtime: <%s>\n",fAdditionalLibs.Data());
1559 printf("= Master jobs split mode: ______________________ %s\n",fSplitMode.Data());
1561 printf("= Custom name for the dataset to be created: ___ %s\n", fDatasetName.Data());
1562 printf("= Name of the generated JDL: ___________________ %s\n", fJDLName.Data());
1563 if (fIncludePath.Data())
1564 printf("= Include path for runtime task compilation: ___ %s\n", fIncludePath.Data());
1565 if (fCloseSE.Length())
1566 printf("= Force job outputs to storage element: ________ %s\n", fCloseSE.Data());
1567 if (fFriendChainName.Length())
1568 printf("= Open friend chain file on worker: ____________ %s\n", fFriendChainName.Data());
1570 TIter next(fPackages);
1573 while ((obj=next())) list += obj->GetName();
1574 printf("= Par files to be used: ________________________ %s\n", list.Data());
1578 //______________________________________________________________________________
1579 void AliAnalysisAlien::SetDefaults()
1581 // Set default values for everything. What cannot be filled will be left empty.
1582 if (fGridJDL) delete fGridJDL;
1583 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
1584 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
1587 fSplitMaxInputFileNumber = 100;
1589 fMasterResubmitThreshold = 0;
1594 fNrunsPerMaster = 1;
1595 fMaxMergeFiles = 100;
1597 fExecutable = "analysis.sh";
1598 fExecutableCommand = "root -b -q";
1600 fExecutableArgs = "";
1601 fAnalysisMacro = "myAnalysis.C";
1602 fAnalysisSource = "";
1603 fAdditionalLibs = "";
1607 fAliROOTVersion = "";
1608 fUser = ""; // Your alien user name
1609 fGridWorkingDir = "";
1610 fGridDataDir = ""; // Can be like: /alice/sim/PDC_08a/LHC08c9/
1611 fDataPattern = "*AliESDs.root"; // Can be like: *AliESDs.root, */pass1/*AliESDs.root, ...
1612 fFriendChainName = "";
1613 fGridOutputDir = "output";
1614 fOutputArchive = "log_archive.zip:std*@disk=1 root_archive.zip:*.root@disk=2";
1615 fOutputFiles = ""; // Like "AliAODs.root histos.root"
1616 fInputFormat = "xml-single";
1617 fJDLName = "analysis.jdl";
1618 fJobTag = "Automatically generated analysis JDL";
1619 fMergeExcludes = "";
1622 SetCheckCopy(kTRUE);
1623 SetDefaultOutputs(kTRUE);
1627 //______________________________________________________________________________
1628 Bool_t AliAnalysisAlien::CheckMergedFiles(const char *filename, const char *aliendir, Int_t nperchunk, Bool_t submit, const char *jdl)
1630 // Static method that checks the status of merging. This can submit merging jobs that did not produced the expected
1631 // output. If <submit> is false (checking) returns true only when the final merged file was found. If submit is true returns
1632 // true if the jobs were successfully submitted.
1633 Int_t countOrig = 0;
1634 Int_t countStage = 0;
1637 Bool_t doneFinal = kFALSE;
1639 TString saliendir(aliendir);
1640 TString sfilename, stmp;
1641 saliendir.ReplaceAll("//","/");
1642 saliendir = saliendir.Strip(TString::kTrailing, '/');
1644 ::Error("GetNregisteredFiles", "You need to be connected to AliEn.");
1647 sfilename = filename;
1648 sfilename.ReplaceAll(".root", "*.root");
1649 printf("Checking directory <%s> for merged files <%s> ...\n", aliendir, sfilename.Data());
1650 TString command = Form("find %s/ *%s", saliendir.Data(), sfilename.Data());
1651 TGridResult *res = gGrid->Command(command);
1653 ::Error("GetNregisteredFiles","Error: No result for the find command\n");
1658 while ((map=(TMap*)nextmap())) {
1659 TString turl = map->GetValue("turl")->GetName();
1660 if (!turl.Length()) {
1665 turl.ReplaceAll("alien://", "");
1666 turl.ReplaceAll(saliendir, "");
1667 sfilename = gSystem->BaseName(turl);
1668 turl = turl.Strip(TString::kLeading, '/');
1669 // Now check to what the file corresponds to:
1670 // original output - aliendir/%03d/filename
1671 // merged file (which stage) - aliendir/filename-Stage%02d_%04d
1672 // final merged file - aliendir/filename
1673 if (sfilename == turl) {
1674 if (sfilename == filename) {
1678 Int_t index = sfilename.Index("Stage");
1679 if (index<0) continue;
1680 stmp = sfilename(index+5,2);
1681 Int_t istage = atoi(stmp);
1682 stmp = sfilename(index+8,4);
1683 Int_t ijob = atoi(stmp);
1684 if (istage<stage) continue; // Ignore lower stages
1687 chunksDone.ResetAllBits();
1691 chunksDone.SetBitNumber(ijob);
1698 printf("=> Removing files from previous stages...\n");
1699 gGrid->Rm(Form("%s/*Stage*.root", aliendir));
1700 for (i=1; i<stage; i++)
1701 gGrid->Rm(Form("%s/*Stage%d*.zip", aliendir, i));
1706 // Compute number of jobs that were submitted for the current stage
1707 Int_t ntotstage = countOrig;
1708 for (i=1; i<=stage; i++) {
1709 if (ntotstage%nperchunk) ntotstage = (ntotstage/nperchunk)+1;
1710 else ntotstage = (ntotstage/nperchunk);
1712 // Now compare with the number of set bits in the chunksDone array
1713 Int_t nmissing = (stage>0)?(ntotstage - countStage):0;
1715 printf("*** Found %d original files\n", countOrig);
1716 if (stage==0) printf("*** No merging completed so far.\n");
1717 else printf("*** Found %d out of %d files merged for stage %d\n", countStage, ntotstage, stage);
1718 if (nmissing) printf("*** Number of merged files missing for this stage: %d -> check merging job completion\n", nmissing);
1719 if (!submit) return doneFinal;
1720 // Sumbit merging jobs for all missing chunks for the current stage.
1721 TString query = Form("submit %s %s", jdl, aliendir);
1724 for (i=0; i<nmissing; i++) {
1725 ichunk = chunksDone.FirstNullBit(ichunk+1);
1726 Int_t jobId = SubmitSingleJob(Form("%s %d %d", query.Data(), stage, ichunk));
1727 if (!jobId) return kFALSE;
1731 // Submit next stage of merging
1732 if (stage==0) countStage = countOrig;
1733 Int_t nchunks = (countStage/nperchunk);
1734 if (countStage%nperchunk) nchunks += 1;
1735 for (i=0; i<nchunks; i++) {
1736 Int_t jobId = SubmitSingleJob(Form("%s %d %d", query.Data(), stage+1, i));
1737 if (!jobId) return kFALSE;
1742 //______________________________________________________________________________
1743 Int_t AliAnalysisAlien::SubmitSingleJob(const char *query)
1745 // Submits a single job corresponding to the query and returns job id. If 0 submission failed.
1746 if (!gGrid) return 0;
1747 printf("=> %s ------> ",query);
1748 TGridResult *res = gGrid->Command(query);
1750 TString jobId = res->GetKey(0,"jobId");
1752 if (jobId.IsNull()) {
1753 printf("submission failed. Reason:\n");
1756 ::Error("SubmitSingleJob", "Your query %s could not be submitted", query);
1759 printf(" Job id: %s\n", jobId.Data());
1763 //______________________________________________________________________________
1764 Bool_t AliAnalysisAlien::MergeOutput(const char *output, const char *basedir, Int_t nmaxmerge, Int_t stage, Int_t ichunk)
1766 // Merge given output files from basedir. The file merger will merge nmaxmerge
1767 // files in a group. Merging can be done in stages:
1768 // stage=0 : will merge all existing files in a single stage
1769 // stage=1 : does a find command for all files that do NOT contain the string "Stage".
1770 // If their number is bigger that nmaxmerge, only the files from
1771 // ichunk*nmaxmerge to ichunk*(nmaxmerge+1)-1 will get merged as output_stage_<ichunk>
1772 // stage=n : does a find command for files named <output>Stage<stage-1>_*. If their number is bigger than
1773 // nmaxmerge, merge just the chunk ichunk, otherwise write the merged output to the file
1775 TString outputFile = output;
1777 TString outputChunk;
1778 TString previousChunk = "";
1779 Int_t countChunk = 0;
1780 Int_t countZero = nmaxmerge;
1781 Bool_t merged = kTRUE;
1782 Int_t index = outputFile.Index("@");
1783 if (index > 0) outputFile.Remove(index);
1784 TString inputFile = outputFile;
1785 if (stage>1) inputFile.ReplaceAll(".root", Form("-Stage%02d_*.root", stage-1));
1786 command = Form("find %s/ *%s", basedir, inputFile.Data());
1787 printf("command: %s\n", command.Data());
1788 TGridResult *res = gGrid->Command(command);
1790 ::Error("MergeOutput","No result for the find command\n");
1794 TFileMerger *fm = 0;
1797 // Check if there is a merge operation to resume. Works only for stage 0 or 1.
1798 outputChunk = outputFile;
1799 outputChunk.ReplaceAll(".root", "_*.root");
1800 // Check for existent temporary merge files
1801 // Check overwrite mode and remove previous partial results if needed
1802 // Preserve old merging functionality for stage 0.
1804 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
1806 // Skip as many input files as in a chunk
1807 for (Int_t counter=0; counter<nmaxmerge; counter++) map = (TMap*)nextmap();
1809 ::Error("MergeOutput", "Cannot resume merging for <%s>, nentries=%d", outputFile.Data(), res->GetSize());
1813 outputChunk = outputFile;
1814 outputChunk.ReplaceAll(".root", Form("_%04d.root", countChunk));
1816 if (gSystem->AccessPathName(outputChunk)) continue;
1817 // Merged file with chunks up to <countChunk> found
1818 ::Info("MergeOutput", "Resume merging of <%s> from <%s>\n", outputFile.Data(), outputChunk.Data());
1819 previousChunk = outputChunk;
1823 countZero = nmaxmerge;
1825 while ((map=(TMap*)nextmap())) {
1826 // Loop 'find' results and get next LFN
1827 if (countZero == nmaxmerge) {
1828 // First file in chunk - create file merger and add previous chunk if any.
1829 fm = new TFileMerger(kFALSE);
1830 fm->SetFastMethod(kTRUE);
1831 if (previousChunk.Length()) fm->AddFile(previousChunk.Data());
1832 outputChunk = outputFile;
1833 outputChunk.ReplaceAll(".root", Form("_%04d.root", countChunk));
1835 // If last file found, put merged results in the output file
1836 if (map == res->Last()) outputChunk = outputFile;
1837 TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("turl"));
1838 if (!objs || !objs->GetString().Length()) {
1839 // Nothing found - skip this output
1844 // Add file to be merged and decrement chunk counter.
1845 fm->AddFile(objs->GetString());
1847 if (countZero==0 || map == res->Last()) {
1848 if (!fm->GetMergeList() || !fm->GetMergeList()->GetSize()) {
1849 // Nothing found - skip this output
1850 ::Warning("MergeOutput", "No <%s> files found.", inputFile.Data());
1855 fm->OutputFile(outputChunk);
1856 // Merge the outputs, then go to next chunk
1858 ::Error("MergeOutput", "Could not merge all <%s> files", outputFile.Data());
1863 ::Info("MergeOutputs", "\n##### Merged %d output files to <%s>", fm->GetMergeList()->GetSize(), outputChunk.Data());
1864 gSystem->Unlink(previousChunk);
1866 if (map == res->Last()) {
1872 countZero = nmaxmerge;
1873 previousChunk = outputChunk;
1878 // Merging stage different than 0.
1879 // Move to the begining of the requested chunk.
1880 outputChunk = outputFile;
1881 if (nmaxmerge < res->GetSize()) {
1882 if (ichunk*nmaxmerge >= res->GetSize()) {
1883 ::Error("MergeOutput", "Cannot merge merge chunk %d grouping %d files from %d total.", ichunk, nmaxmerge, res->GetSize());
1887 for (Int_t counter=0; counter<ichunk*nmaxmerge; counter++) map = (TMap*)nextmap();
1888 outputChunk.ReplaceAll(".root", Form("-Stage%02d_%04d.root", stage, ichunk));
1890 countZero = nmaxmerge;
1891 fm = new TFileMerger(kFALSE);
1892 fm->SetFastMethod(kTRUE);
1893 while ((map=(TMap*)nextmap())) {
1894 // Loop 'find' results and get next LFN
1895 TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("turl"));
1896 if (!objs || !objs->GetString().Length()) {
1897 // Nothing found - skip this output
1902 // Add file to be merged and decrement chunk counter.
1903 fm->AddFile(objs->GetString());
1905 if (countZero==0) break;
1908 if (!fm->GetMergeList() || !fm->GetMergeList()->GetSize()) {
1909 // Nothing found - skip this output
1910 ::Warning("MergeOutput", "No <%s> files found.", inputFile.Data());
1914 fm->OutputFile(outputChunk);
1915 // Merge the outputs
1917 ::Error("MergeOutput", "Could not merge all <%s> files", outputFile.Data());
1921 ::Info("MergeOutput", "\n##### Merged %d output files to <%s>", fm->GetMergeList()->GetSize(), outputChunk.Data());
1927 //______________________________________________________________________________
1928 Bool_t AliAnalysisAlien::MergeOutputs()
1930 // Merge analysis outputs existing in the AliEn space.
1931 if (TestBit(AliAnalysisGrid::kTest)) return kTRUE;
1932 if (TestBit(AliAnalysisGrid::kOffline)) return kFALSE;
1934 Error("MergeOutputs", "Cannot merge outputs without grid connection. Terminate will NOT be executed");
1938 if (!TestBit(AliAnalysisGrid::kMerge)) {
1939 Info("MergeOutputs", "### Re-run with <MergeViaJDL> option in terminate mode of the plugin to submit merging jobs ###");
1942 if (fProductionMode) {
1943 Info("MergeOutputs", "### Merging will be submitted by LPM manager... ###");
1946 Info("MergeOutputs", "Submitting merging JDL");
1947 if (!SubmitMerging()) return kFALSE;
1948 Info("MergeOutputs", "### Re-run with <MergeViaJDL> off to collect results after merging jobs are done ###");
1949 Info("MergeOutputs", "### The Terminate() method is executed by the merging jobs");
1952 // Get the output path
1953 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("/%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
1954 if (!DirectoryExists(fGridOutputDir)) {
1955 Error("MergeOutputs", "Grid output directory %s not found. Terminate() will NOT be executed", fGridOutputDir.Data());
1958 if (!fOutputFiles.Length()) {
1959 Error("MergeOutputs", "No output file names defined. Are you running the right AliAnalysisAlien configuration ?");
1962 // Check if fast read option was requested
1963 Info("MergeOutputs", "Started local merging of output files from: alien://%s \
1964 \n======= overwrite mode = %d", fGridOutputDir.Data(), (Int_t)fOverwriteMode);
1965 if (fFastReadOption) {
1966 Warning("MergeOutputs", "You requested FastRead option. Using xrootd flags to reduce timeouts. This may skip some files that could be accessed ! \
1967 \n+++ NOTE: To disable this option, use: plugin->SetFastReadOption(kFALSE)");
1968 gEnv->SetValue("XNet.ConnectTimeout",10);
1969 gEnv->SetValue("XNet.RequestTimeout",10);
1970 gEnv->SetValue("XNet.MaxRedirectCount",2);
1971 gEnv->SetValue("XNet.ReconnectTimeout",10);
1972 gEnv->SetValue("XNet.FirstConnectMaxCnt",1);
1974 // Make sure we change the temporary directory
1975 gSystem->Setenv("TMPDIR", gSystem->pwd());
1976 TObjArray *list = fOutputFiles.Tokenize(",");
1980 Bool_t merged = kTRUE;
1981 while((str=(TObjString*)next())) {
1982 outputFile = str->GetString();
1983 Int_t index = outputFile.Index("@");
1984 if (index > 0) outputFile.Remove(index);
1985 TString outputChunk = outputFile;
1986 outputChunk.ReplaceAll(".root", "_*.root");
1987 // Skip already merged outputs
1988 if (!gSystem->AccessPathName(outputFile)) {
1989 if (fOverwriteMode) {
1990 Info("MergeOutputs", "Overwrite mode. Existing file %s was deleted.", outputFile.Data());
1991 gSystem->Unlink(outputFile);
1992 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
1993 Info("MergeOutput", "Overwrite mode: partial merged files %s will removed",
1994 outputChunk.Data());
1995 gSystem->Exec(Form("rm -f %s", outputChunk.Data()));
1998 Info("MergeOutputs", "Output file <%s> found. Not merging again.", outputFile.Data());
2002 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
2003 Info("MergeOutput", "Overwrite mode: partial merged files %s will removed",
2004 outputChunk.Data());
2005 gSystem->Exec(Form("rm -f %s", outputChunk.Data()));
2008 if (fMergeExcludes.Length() &&
2009 fMergeExcludes.Contains(outputFile.Data())) continue;
2010 // Perform a 'find' command in the output directory, looking for registered outputs
2011 merged = MergeOutput(outputFile, fGridOutputDir, fMaxMergeFiles);
2013 Error("MergeOutputs", "Terminate() will NOT be executed");
2016 TFile *fileOpened = (TFile*)gROOT->GetListOfFiles()->FindObject(outputFile);
2017 if (fileOpened) fileOpened->Close();
2022 //______________________________________________________________________________
2023 void AliAnalysisAlien::SetDefaultOutputs(Bool_t flag)
2025 // Use the output files connected to output containers from the analysis manager
2026 // rather than the files defined by SetOutputFiles
2027 if (flag && !TObject::TestBit(AliAnalysisGrid::kDefaultOutputs))
2028 Info("SetDefaultOutputs", "Plugin will use the output files taken from analysis manager");
2029 TObject::SetBit(AliAnalysisGrid::kDefaultOutputs, flag);
2032 //______________________________________________________________________________
2033 void AliAnalysisAlien::SetOutputFiles(const char *list)
2035 // Manually set the output files list.
2036 // Removes duplicates. Not allowed if default outputs are not disabled.
2037 if (TObject::TestBit(AliAnalysisGrid::kDefaultOutputs)) {
2038 Fatal("SetOutputFiles", "You have to explicitly call SetDefaultOutputs(kFALSE) to manually set output files.");
2041 Info("SetOutputFiles", "Output file list is set manually - you are on your own.");
2043 TString slist = list;
2044 if (slist.Contains("@")) Warning("SetOutputFiles","The plugin does not allow explicit SE's. Please use: SetNumberOfReplicas() instead.");
2045 TObjArray *arr = slist.Tokenize(" ");
2049 while ((os=(TObjString*)next())) {
2050 sout = os->GetString();
2051 if (sout.Index("@")>0) sout.Remove(sout.Index("@"));
2052 if (fOutputFiles.Contains(sout)) continue;
2053 if (!fOutputFiles.IsNull()) fOutputFiles += ",";
2054 fOutputFiles += sout;
2059 //______________________________________________________________________________
2060 void AliAnalysisAlien::SetOutputArchive(const char *list)
2062 // Manually set the output archive list. Free text - you are on your own...
2063 // Not allowed if default outputs are not disabled.
2064 if (TObject::TestBit(AliAnalysisGrid::kDefaultOutputs)) {
2065 Fatal("SetOutputArchive", "You have to explicitly call SetDefaultOutputs(kFALSE) to manually set the output archives.");
2068 Info("SetOutputArchive", "Output archive is set manually - you are on your own.");
2069 fOutputArchive = list;
2072 //______________________________________________________________________________
2073 void AliAnalysisAlien::SetPreferedSE(const char */*se*/)
2075 // Setting a prefered output SE is not allowed anymore.
2076 Warning("SetPreferedSE", "Setting a preferential SE is not allowed anymore via the plugin. Use SetNumberOfReplicas() and SetDefaultOutputs()");
2079 //______________________________________________________________________________
2080 Bool_t AliAnalysisAlien::StartAnalysis(Long64_t /*nentries*/, Long64_t /*firstEntry*/)
2082 // Start remote grid analysis.
2084 // Check if output files have to be taken from the analysis manager
2085 if (TestBit(AliAnalysisGrid::kDefaultOutputs)) {
2086 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
2087 if (!mgr || !mgr->IsInitialized()) {
2088 Error("StartAnalysis", "You need an initialized analysis manager for this");
2092 TIter next(mgr->GetOutputs());
2093 AliAnalysisDataContainer *output;
2094 while ((output=(AliAnalysisDataContainer*)next())) {
2095 const char *filename = output->GetFileName();
2096 if (!(strcmp(filename, "default"))) {
2097 if (!mgr->GetOutputEventHandler()) continue;
2098 filename = mgr->GetOutputEventHandler()->GetOutputFileName();
2100 if (fOutputFiles.Contains(filename)) continue;
2101 if (fOutputFiles.Length()) fOutputFiles += ",";
2102 fOutputFiles += filename;
2104 // Add extra files registered to the analysis manager
2105 if (mgr->GetExtraFiles().Length()) {
2106 if (fOutputFiles.Length()) fOutputFiles += ",";
2107 TString extra = mgr->GetExtraFiles();
2108 extra.ReplaceAll(" ", ",");
2109 // Protection in case extra files do not exist (will it work?)
2110 extra.ReplaceAll(".root", "*.root");
2111 fOutputFiles += extra;
2113 // Compose the output archive.
2114 fOutputArchive = "log_archive.zip:std*@disk=1 ";
2115 fOutputArchive += Form("root_archive.zip:%s@disk=%d",fOutputFiles.Data(),fNreplicas);
2117 // if (!fCloseSE.Length()) fCloseSE = gSystem->Getenv("alien_CLOSE_SE");
2118 if (TestBit(AliAnalysisGrid::kOffline)) {
2119 Info("StartAnalysis","\n##### OFFLINE MODE ##### Files to be used in GRID are produced but not copied \
2120 \n there nor any job run. You can revise the JDL and analysis \
2121 \n macro then run the same in \"submit\" mode.");
2122 } else if (TestBit(AliAnalysisGrid::kTest)) {
2123 Info("StartAnalysis","\n##### LOCAL MODE ##### Your analysis will be run locally on a subset of the requested \
2125 } else if (TestBit(AliAnalysisGrid::kSubmit)) {
2126 Info("StartAnalysis","\n##### SUBMIT MODE ##### Files required by your analysis are copied to your grid working \
2127 \n space and job submitted.");
2128 } else if (TestBit(AliAnalysisGrid::kMerge)) {
2129 Info("StartAnalysis","\n##### MERGE MODE ##### The registered outputs of the analysis will be merged");
2130 if (fMergeViaJDL) CheckInputData();
2133 Info("StartAnalysis","\n##### FULL ANALYSIS MODE ##### Producing needed files and submitting your analysis job...");
2138 Error("StartAnalysis", "Cannot start grid analysis without grid connection");
2141 if (IsCheckCopy()) CheckFileCopy(gGrid->GetHomeDirectory());
2142 if (!CheckInputData()) {
2143 Error("StartAnalysis", "There was an error in preprocessing your requested input data");
2146 if (!CreateDataset(fDataPattern)) {
2148 if (!fRunNumbers.Length() && !fRunRange[0]) serror = Form("path to data directory: <%s>", fGridDataDir.Data());
2149 if (fRunNumbers.Length()) serror = "run numbers";
2150 if (fRunRange[0]) serror = Form("run range [%d, %d]", fRunRange[0], fRunRange[1]);
2151 serror += Form("\n or data pattern <%s>", fDataPattern.Data());
2152 Error("StartAnalysis", "No data to process. Please fix %s in your plugin configuration.", serror.Data());
2155 WriteAnalysisFile();
2156 WriteAnalysisMacro();
2158 WriteValidationScript();
2160 WriteMergingMacro();
2161 WriteMergeExecutable();
2162 WriteValidationScript(kTRUE);
2164 if (!CreateJDL()) return kFALSE;
2165 if (TestBit(AliAnalysisGrid::kOffline)) return kFALSE;
2166 if (TestBit(AliAnalysisGrid::kTest)) {
2167 // Locally testing the analysis
2168 Info("StartAnalysis", "\n_______________________________________________________________________ \
2169 \n Running analysis script in a daughter shell as on a worker node \
2170 \n_______________________________________________________________________");
2171 TObjArray *list = fOutputFiles.Tokenize(",");
2175 while((str=(TObjString*)next())) {
2176 outputFile = str->GetString();
2177 Int_t index = outputFile.Index("@");
2178 if (index > 0) outputFile.Remove(index);
2179 if (!gSystem->AccessPathName(outputFile)) gSystem->Exec(Form("rm %s", outputFile.Data()));
2182 gSystem->Exec(Form("bash %s 2>stderr", fExecutable.Data()));
2183 TString validationScript = fExecutable;
2184 validationScript.ReplaceAll(".sh", "_validation.sh");
2185 gSystem->Exec(Form("bash %s",validationScript.Data()));
2186 // gSystem->Exec("cat stdout");
2189 // Check if submitting is managed by LPM manager
2190 if (fProductionMode) {
2191 TString prodfile = fJDLName;
2192 prodfile.ReplaceAll(".jdl", ".prod");
2193 WriteProductionFile(prodfile);
2194 Info("StartAnalysis", "Job submitting is managed by LPM. Rerun in terminate mode after jobs finished.");
2197 // Submit AliEn job(s)
2198 gGrid->Cd(fGridOutputDir);
2201 if (!fRunNumbers.Length() && !fRunRange[0]) {
2202 // Submit a given xml or a set of runs
2203 res = gGrid->Command(Form("submit %s", fJDLName.Data()));
2204 printf("*************************** %s\n",Form("submit %s", fJDLName.Data()));
2206 const char *cjobId = res->GetKey(0,"jobId");
2210 Error("StartAnalysis", "Your JDL %s could not be submitted", fJDLName.Data());
2213 Info("StartAnalysis", "\n_______________________________________________________________________ \
2214 \n##### Your JDL %s was successfully submitted. \nTHE JOB ID IS: %s \
2215 \n_______________________________________________________________________",
2216 fJDLName.Data(), cjobId);
2221 Error("StartAnalysis", "No grid result after submission !!! Bailing out...");
2225 // Submit for a range of enumeration of runs.
2226 if (!Submit()) return kFALSE;
2229 Info("StartAnalysis", "\n#### STARTING AN ALIEN SHELL FOR YOU. EXIT WHEN YOUR JOB %s HAS FINISHED. #### \
2230 \n You may exit at any time and terminate the job later using the option <terminate> \
2231 \n ##################################################################################", jobID.Data());
2232 gSystem->Exec("aliensh");
2236 //______________________________________________________________________________
2237 Bool_t AliAnalysisAlien::Submit()
2239 // Submit all master jobs.
2240 Int_t nmasterjobs = fInputFiles->GetEntries();
2241 Long_t tshoot = gSystem->Now();
2242 if (!fNsubmitted && !SubmitNext()) return kFALSE;
2243 while (fNsubmitted < nmasterjobs) {
2244 Long_t now = gSystem->Now();
2245 if ((now-tshoot)>30000) {
2247 if (!SubmitNext()) return kFALSE;
2253 //______________________________________________________________________________
2254 Bool_t AliAnalysisAlien::SubmitMerging()
2256 // Submit all merging jobs.
2257 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("/%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
2258 gGrid->Cd(fGridOutputDir);
2259 TString mergeJDLName = fExecutable;
2260 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
2261 Int_t ntosubmit = fInputFiles->GetEntries();
2262 for (Int_t i=0; i<ntosubmit; i++) {
2263 TString runOutDir = gSystem->BaseName(fInputFiles->At(i)->GetName());
2264 runOutDir.ReplaceAll(".xml", "");
2265 if (fOutputToRunNo) {
2266 // The output directory is the run number
2267 printf("### Submitting merging job for run <%s>\n", runOutDir.Data());
2268 runOutDir = Form("%s/%s", fGridOutputDir.Data(), runOutDir.Data());
2270 // The output directory is the master number in 3 digits format
2271 printf("### Submitting merging job for master <%03d>\n", i);
2272 runOutDir = Form("%s/%03d",fGridOutputDir.Data(), i);
2274 // Check now the number of merging stages.
2275 TObjArray *list = fOutputFiles.Tokenize(",");
2279 while((str=(TObjString*)next())) {
2280 outputFile = str->GetString();
2281 Int_t index = outputFile.Index("@");
2282 if (index > 0) outputFile.Remove(index);
2283 if (!fMergeExcludes.Contains(outputFile)) break;
2286 Bool_t done = CheckMergedFiles(outputFile, runOutDir, fMaxMergeFiles, kTRUE, mergeJDLName);
2287 if (!done) return kFALSE;
2289 if (!ntosubmit) return kTRUE;
2290 Info("StartAnalysis", "\n#### STARTING AN ALIEN SHELL FOR YOU. EXIT WHEN YOUR MERGING JOBS HAVE FINISHED. #### \
2291 \n You may exit at any time and terminate the job later using the option <terminate> but disabling SetMergeViaJDL\
2292 \n ##################################################################################");
2293 gSystem->Exec("aliensh");
2297 //______________________________________________________________________________
2298 Bool_t AliAnalysisAlien::SubmitNext()
2300 // Submit next bunch of master jobs if the queue is free. The first master job is
2301 // submitted right away, while the next will not be unless the previous was split.
2302 // The plugin will not submit new master jobs if there are more that 500 jobs in
2304 static Bool_t iscalled = kFALSE;
2305 static Int_t firstmaster = 0;
2306 static Int_t lastmaster = 0;
2307 static Int_t npermaster = 0;
2308 if (iscalled) return kTRUE;
2310 Int_t nrunning=0, nwaiting=0, nerror=0, ndone=0;
2311 Int_t ntosubmit = 0;
2314 if (!fNsubmitted) ntosubmit = 1;
2316 TString status = GetJobStatus(firstmaster, lastmaster, nrunning, nwaiting, nerror, ndone);
2317 printf("=== master %d: %s\n", lastmaster, status.Data());
2318 // If last master not split, just return
2319 if (status != "SPLIT") {iscalled = kFALSE; return kTRUE;}
2320 // No more than 100 waiting jobs
2321 if (nwaiting>500) {iscalled = kFALSE; return kTRUE;}
2322 npermaster = (nrunning+nwaiting+nerror+ndone)/fNsubmitted;
2323 if (npermaster) ntosubmit = (500-nwaiting)/npermaster;
2324 if (!ntosubmit) ntosubmit = 1;
2325 printf("=== WAITING(%d) RUNNING(%d) DONE(%d) OTHER(%d) NperMaster=%d => to submit %d jobs\n",
2326 nwaiting, nrunning, ndone, nerror, npermaster, ntosubmit);
2328 Int_t nmasterjobs = fInputFiles->GetEntries();
2329 for (Int_t i=0; i<ntosubmit; i++) {
2330 // Submit for a range of enumeration of runs.
2331 if (fNsubmitted>=nmasterjobs) {iscalled = kFALSE; return kTRUE;}
2333 TString runOutDir = gSystem->BaseName(fInputFiles->At(fNsubmitted)->GetName());
2334 runOutDir.ReplaceAll(".xml", "");
2336 query = Form("submit %s %s %s", fJDLName.Data(), fInputFiles->At(fNsubmitted)->GetName(), runOutDir.Data());
2338 query = Form("submit %s %s %03d", fJDLName.Data(), fInputFiles->At(fNsubmitted)->GetName(), fNsubmitted);
2339 printf("********* %s\n",query.Data());
2340 res = gGrid->Command(query);
2342 TString cjobId1 = res->GetKey(0,"jobId");
2343 if (!cjobId1.Length()) {
2347 Error("StartAnalysis", "Your JDL %s could not be submitted. The message was:", fJDLName.Data());
2350 Info("StartAnalysis", "\n_______________________________________________________________________ \
2351 \n##### Your JDL %s submitted (%d to go). \nTHE JOB ID IS: %s \
2352 \n_______________________________________________________________________",
2353 fJDLName.Data(), nmasterjobs-fNsubmitted-1, cjobId1.Data());
2356 lastmaster = cjobId1.Atoi();
2357 if (!firstmaster) firstmaster = lastmaster;
2362 Error("StartAnalysis", "No grid result after submission !!! Bailing out...");
2370 //______________________________________________________________________________
2371 void AliAnalysisAlien::WriteAnalysisFile()
2373 // Write current analysis manager into the file <analysisFile>
2374 TString analysisFile = fExecutable;
2375 analysisFile.ReplaceAll(".sh", ".root");
2376 if (!TestBit(AliAnalysisGrid::kSubmit)) {
2377 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
2378 if (!mgr || !mgr->IsInitialized()) {
2379 Error("WriteAnalysisFile", "You need an initialized analysis manager for this");
2382 // Check analysis type
2384 if (mgr->GetMCtruthEventHandler()) TObject::SetBit(AliAnalysisGrid::kUseMC);
2385 handler = (TObject*)mgr->GetInputEventHandler();
2387 if (handler->InheritsFrom("AliESDInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseESD);
2388 if (handler->InheritsFrom("AliAODInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseAOD);
2390 TDirectory *cdir = gDirectory;
2391 TFile *file = TFile::Open(analysisFile, "RECREATE");
2393 // Skip task Terminate calls for the grid job (but not in test mode, where we want to check also the terminate mode
2394 if (!TestBit(AliAnalysisGrid::kTest)) mgr->SetSkipTerminate(kTRUE);
2395 // Unless merging makes no sense
2396 if (IsSingleOutput()) mgr->SetSkipTerminate(kFALSE);
2399 // Enable termination for local jobs
2400 mgr->SetSkipTerminate(kFALSE);
2402 if (cdir) cdir->cd();
2403 Info("WriteAnalysisFile", "\n##### Analysis manager: %s wrote to file <%s>\n", mgr->GetName(),analysisFile.Data());
2405 Bool_t copy = kTRUE;
2406 if (TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
2409 TString workdir = gGrid->GetHomeDirectory();
2410 workdir += fGridWorkingDir;
2411 Info("CreateJDL", "\n##### Copying file <%s> containing your initialized analysis manager to your alien workspace", analysisFile.Data());
2412 if (FileExists(analysisFile)) gGrid->Rm(analysisFile);
2413 TFile::Cp(Form("file:%s",analysisFile.Data()), Form("alien://%s/%s", workdir.Data(),analysisFile.Data()));
2417 //______________________________________________________________________________
2418 void AliAnalysisAlien::WriteAnalysisMacro()
2420 // Write the analysis macro that will steer the analysis in grid mode.
2421 if (!TestBit(AliAnalysisGrid::kSubmit)) {
2423 out.open(fAnalysisMacro.Data(), ios::out);
2425 Error("WriteAnalysisMacro", "could not open file %s for writing", fAnalysisMacro.Data());
2428 Bool_t hasSTEERBase = kFALSE;
2429 Bool_t hasESD = kFALSE;
2430 Bool_t hasAOD = kFALSE;
2431 Bool_t hasANALYSIS = kFALSE;
2432 Bool_t hasANALYSISalice = kFALSE;
2433 Bool_t hasCORRFW = kFALSE;
2434 TString func = fAnalysisMacro;
2435 TString type = "ESD";
2436 TString comment = "// Analysis using ";
2437 if (TObject::TestBit(AliAnalysisGrid::kUseESD)) comment += "ESD";
2438 if (TObject::TestBit(AliAnalysisGrid::kUseAOD)) {
2442 if (type!="AOD" && fFriendChainName!="") {
2443 Error("WriteAnalysisMacro", "Friend chain can be attached only to AOD");
2446 if (TObject::TestBit(AliAnalysisGrid::kUseMC)) comment += "/MC";
2447 else comment += " data";
2448 out << "const char *anatype = \"" << type.Data() << "\";" << endl << endl;
2449 func.ReplaceAll(".C", "");
2450 out << "void " << func.Data() << "()" << endl;
2452 out << comment.Data() << endl;
2453 out << "// Automatically generated analysis steering macro executed in grid subjobs" << endl << endl;
2454 out << " TStopwatch timer;" << endl;
2455 out << " timer.Start();" << endl << endl;
2456 // Change temp directory to current one
2457 out << "// Set temporary merging directory to current one" << endl;
2458 out << " gSystem->Setenv(\"TMPDIR\", gSystem->pwd());" << endl << endl;
2459 out << "// load base root libraries" << endl;
2460 out << " gSystem->Load(\"libTree\");" << endl;
2461 out << " gSystem->Load(\"libGeom\");" << endl;
2462 out << " gSystem->Load(\"libVMC\");" << endl;
2463 out << " gSystem->Load(\"libPhysics\");" << endl << endl;
2464 out << " gSystem->Load(\"libMinuit\");" << endl << endl;
2465 if (fAdditionalRootLibs.Length()) {
2466 // in principle libtree /lib geom libvmc etc. can go into this list, too
2467 out << "// Add aditional libraries" << endl;
2468 TObjArray *list = fAdditionalRootLibs.Tokenize(" ");
2471 while((str=(TObjString*)next())) {
2472 if (str->GetString().Contains(".so"))
2473 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
2475 if (list) delete list;
2477 out << "// include path" << endl;
2478 if (fIncludePath.Length()) out << " gSystem->AddIncludePath(\"" << fIncludePath.Data() << "\");" << endl;
2479 out << " gSystem->AddIncludePath(\"-I$ALICE_ROOT/include\");" << endl << endl;
2480 out << "// Load analysis framework libraries" << endl;
2481 TString setupPar = "AliAnalysisAlien::SetupPar";
2483 out << " gSystem->Load(\"libSTEERBase\");" << endl;
2484 out << " gSystem->Load(\"libESD\");" << endl;
2485 out << " gSystem->Load(\"libAOD\");" << endl;
2486 out << " gSystem->Load(\"libANALYSIS\");" << endl;
2487 out << " gSystem->Load(\"libANALYSISalice\");" << endl;
2488 out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
2490 TIter next(fPackages);
2493 while ((obj=next())) {
2494 pkgname = obj->GetName();
2495 if (pkgname == "STEERBase" ||
2496 pkgname == "STEERBase.par") hasSTEERBase = kTRUE;
2497 if (pkgname == "ESD" ||
2498 pkgname == "ESD.par") hasESD = kTRUE;
2499 if (pkgname == "AOD" ||
2500 pkgname == "AOD.par") hasAOD = kTRUE;
2501 if (pkgname == "ANALYSIS" ||
2502 pkgname == "ANALYSIS.par") hasANALYSIS = kTRUE;
2503 if (pkgname == "ANALYSISalice" ||
2504 pkgname == "ANALYSISalice.par") hasANALYSISalice = kTRUE;
2505 if (pkgname == "CORRFW" ||
2506 pkgname == "CORRFW.par") hasCORRFW = kTRUE;
2508 if (hasANALYSISalice) setupPar = "SetupPar";
2509 if (!hasSTEERBase) out << " gSystem->Load(\"libSTEERBase\");" << endl;
2510 else out << " if (!" << setupPar << "(\"STEERBase\")) return;" << endl;
2511 if (!hasESD) out << " gSystem->Load(\"libESD\");" << endl;
2512 else out << " if (!" << setupPar << "(\"ESD\")) return;" << endl;
2513 if (!hasAOD) out << " gSystem->Load(\"libAOD\");" << endl;
2514 else out << " if (!" << setupPar << "(\"AOD\")) return;" << endl;
2515 if (!hasANALYSIS) out << " gSystem->Load(\"libANALYSIS\");" << endl;
2516 else out << " if (!" << setupPar << "(\"ANALYSIS\")) return;" << endl;
2517 if (!hasANALYSISalice) out << " gSystem->Load(\"libANALYSISalice\");" << endl;
2518 else out << " if (!" << setupPar << "(\"ANALYSISalice\")) return;" << endl;
2519 if (!hasCORRFW) out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
2520 else out << " if (!" << setupPar << "(\"CORRFW\")) return;" << endl << endl;
2521 out << "// Compile other par packages" << endl;
2523 while ((obj=next())) {
2524 pkgname = obj->GetName();
2525 if (pkgname == "STEERBase" ||
2526 pkgname == "STEERBase.par" ||
2528 pkgname == "ESD.par" ||
2530 pkgname == "AOD.par" ||
2531 pkgname == "ANALYSIS" ||
2532 pkgname == "ANALYSIS.par" ||
2533 pkgname == "ANALYSISalice" ||
2534 pkgname == "ANALYSISalice.par" ||
2535 pkgname == "CORRFW" ||
2536 pkgname == "CORRFW.par") continue;
2537 out << " if (!" << setupPar << "(\"" << obj->GetName() << "\")) return;" << endl;
2540 if (fAdditionalLibs.Length()) {
2541 out << "// Add aditional AliRoot libraries" << endl;
2542 TObjArray *list = fAdditionalLibs.Tokenize(" ");
2545 while((str=(TObjString*)next())) {
2546 if (str->GetString().Contains(".so"))
2547 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
2548 if (str->GetString().Contains(".par"))
2549 out << " if (!" << setupPar << "(\"" << str->GetString() << "\")) return;" << endl;
2551 if (list) delete list;
2554 out << "// analysis source to be compiled at runtime (if any)" << endl;
2555 if (fAnalysisSource.Length()) {
2556 TObjArray *list = fAnalysisSource.Tokenize(" ");
2559 while((str=(TObjString*)next())) {
2560 out << " gROOT->ProcessLine(\".L " << str->GetString().Data() << "+g\");" << endl;
2562 if (list) delete list;
2565 if (fFastReadOption) {
2566 Warning("WriteAnalysisMacro", "!!! You requested FastRead option. Using xrootd flags to reduce timeouts in the grid jobs. This may skip some files that could be accessed !!! \
2567 \n+++ NOTE: To disable this option, use: plugin->SetFastReadOption(kFALSE)");
2568 out << "// fast xrootd reading enabled" << endl;
2569 out << " printf(\"!!! You requested FastRead option. Using xrootd flags to reduce timeouts. Note that this may skip some files that could be accessed !!!\");" << endl;
2570 out << " gEnv->SetValue(\"XNet.ConnectTimeout\",10);" << endl;
2571 out << " gEnv->SetValue(\"XNet.RequestTimeout\",10);" << endl;
2572 out << " gEnv->SetValue(\"XNet.MaxRedirectCount\",2);" << endl;
2573 out << " gEnv->SetValue(\"XNet.ReconnectTimeout\",10);" << endl;
2574 out << " gEnv->SetValue(\"XNet.FirstConnectMaxCnt\",1);" << endl << endl;
2576 out << "// connect to AliEn and make the chain" << endl;
2577 out << " if (!TGrid::Connect(\"alien://\")) return;" << endl;
2578 if (IsUsingTags()) {
2579 out << " TChain *chain = CreateChainFromTags(\"wn.xml\", anatype);" << endl << endl;
2581 out << " TChain *chain = CreateChain(\"wn.xml\", anatype);" << endl << endl;
2583 out << "// read the analysis manager from file" << endl;
2584 TString analysisFile = fExecutable;
2585 analysisFile.ReplaceAll(".sh", ".root");
2586 out << " TFile *file = TFile::Open(\"" << analysisFile << "\");" << endl;
2587 out << " if (!file) return;" << endl;
2588 out << " TIter nextkey(file->GetListOfKeys());" << endl;
2589 out << " AliAnalysisManager *mgr = 0;" << endl;
2590 out << " TKey *key;" << endl;
2591 out << " while ((key=(TKey*)nextkey())) {" << endl;
2592 out << " if (!strcmp(key->GetClassName(), \"AliAnalysisManager\"))" << endl;
2593 out << " mgr = (AliAnalysisManager*)file->Get(key->GetName());" << endl;
2594 out << " };" << endl;
2595 out << " if (!mgr) {" << endl;
2596 out << " ::Error(\"" << func.Data() << "\", \"No analysis manager found in file " << analysisFile <<"\");" << endl;
2597 out << " return;" << endl;
2598 out << " }" << endl << endl;
2599 out << " mgr->PrintStatus();" << endl;
2600 if (AliAnalysisManager::GetAnalysisManager()) {
2601 if (AliAnalysisManager::GetAnalysisManager()->GetDebugLevel()>3) {
2602 out << " gEnv->SetValue(\"XNet.Debug\", \"1\");" << endl;
2604 out << " AliLog::SetGlobalLogLevel(AliLog::kError);" << endl;
2607 out << " mgr->StartAnalysis(\"localfile\", chain);" << endl;
2608 out << " timer.Stop();" << endl;
2609 out << " timer.Print();" << endl;
2610 out << "}" << endl << endl;
2611 if (IsUsingTags()) {
2612 out << "TChain* CreateChainFromTags(const char *xmlfile, const char *type=\"ESD\")" << endl;
2614 out << "// Create a chain using tags from the xml file." << endl;
2615 out << " TAlienCollection* coll = TAlienCollection::Open(xmlfile);" << endl;
2616 out << " if (!coll) {" << endl;
2617 out << " ::Error(\"CreateChainFromTags\", \"Cannot create an AliEn collection from %s\", xmlfile);" << endl;
2618 out << " return NULL;" << endl;
2619 out << " }" << endl;
2620 out << " TGridResult* tagResult = coll->GetGridResult(\"\",kFALSE,kFALSE);" << endl;
2621 out << " AliTagAnalysis *tagAna = new AliTagAnalysis(type);" << endl;
2622 out << " tagAna->ChainGridTags(tagResult);" << endl << endl;
2623 out << " AliRunTagCuts *runCuts = new AliRunTagCuts();" << endl;
2624 out << " AliLHCTagCuts *lhcCuts = new AliLHCTagCuts();" << endl;
2625 out << " AliDetectorTagCuts *detCuts = new AliDetectorTagCuts();" << endl;
2626 out << " AliEventTagCuts *evCuts = new AliEventTagCuts();" << endl;
2627 out << " // Check if the cuts configuration file was provided" << endl;
2628 out << " if (!gSystem->AccessPathName(\"ConfigureCuts.C\")) {" << endl;
2629 out << " gROOT->LoadMacro(\"ConfigureCuts.C\");" << endl;
2630 out << " ConfigureCuts(runCuts, lhcCuts, detCuts, evCuts);" << endl;
2631 out << " }" << endl;
2632 if (fFriendChainName=="") {
2633 out << " TChain *chain = tagAna->QueryTags(runCuts, lhcCuts, detCuts, evCuts);" << endl;
2635 out << " TString tmpColl=\"tmpCollection.xml\";" << endl;
2636 out << " tagAna->CreateXMLCollection(tmpColl.Data(),runCuts, lhcCuts, detCuts, evCuts);" << endl;
2637 out << " TChain *chain = CreateChain(tmpColl.Data(),type);" << endl;
2639 out << " if (!chain || !chain->GetNtrees()) return NULL;" << endl;
2640 out << " chain->ls();" << endl;
2641 out << " return chain;" << endl;
2642 out << "}" << endl << endl;
2643 if (gSystem->AccessPathName("ConfigureCuts.C")) {
2644 TString msg = "\n##### You may want to provide a macro ConfigureCuts.C with a method:\n";
2645 msg += " void ConfigureCuts(AliRunTagCuts *runCuts,\n";
2646 msg += " AliLHCTagCuts *lhcCuts,\n";
2647 msg += " AliDetectorTagCuts *detCuts,\n";
2648 msg += " AliEventTagCuts *evCuts)";
2649 Info("WriteAnalysisMacro", "%s", msg.Data());
2652 if (!IsUsingTags() || fFriendChainName!="") {
2653 out <<"//________________________________________________________________________________" << endl;
2654 out << "TChain* CreateChain(const char *xmlfile, const char *type=\"ESD\")" << endl;
2656 out << "// Create a chain using url's from xml file" << endl;
2657 out << " TString treename = type;" << endl;
2658 out << " treename.ToLower();" << endl;
2659 out << " treename += \"Tree\";" << endl;
2660 out << " printf(\"***************************************\\n\");" << endl;
2661 out << " printf(\" Getting chain of trees %s\\n\", treename.Data());" << endl;
2662 out << " printf(\"***************************************\\n\");" << endl;
2663 out << " TAlienCollection *coll = TAlienCollection::Open(xmlfile);" << endl;
2664 out << " if (!coll) {" << endl;
2665 out << " ::Error(\"CreateChain\", \"Cannot create an AliEn collection from %s\", xmlfile);" << endl;
2666 out << " return NULL;" << endl;
2667 out << " }" << endl;
2668 out << " TChain *chain = new TChain(treename);" << endl;
2669 if(fFriendChainName!="") {
2670 out << " TChain *chainFriend = new TChain(treename);" << endl;
2672 out << " coll->Reset();" << endl;
2673 out << " while (coll->Next()) {" << endl;
2674 out << " chain->Add(coll->GetTURL(\"\"));" << endl;
2675 if(fFriendChainName!="") {
2676 out << " TString fileFriend=coll->GetTURL(\"\");" << endl;
2677 out << " fileFriend.ReplaceAll(\"AliAOD.root\",\""<<fFriendChainName.Data()<<"\");" << endl;
2678 out << " fileFriend.ReplaceAll(\"AliAODs.root\",\""<<fFriendChainName.Data()<<"\");" << endl;
2679 out << " chainFriend->Add(fileFriend.Data());" << endl;
2681 out << " }" << endl;
2682 out << " if (!chain->GetNtrees()) {" << endl;
2683 out << " ::Error(\"CreateChain\", \"No tree found from collection %s\", xmlfile);" << endl;
2684 out << " return NULL;" << endl;
2685 out << " }" << endl;
2686 if(fFriendChainName!="") {
2687 out << " chain->AddFriend(chainFriend);" << endl;
2689 out << " return chain;" << endl;
2690 out << "}" << endl << endl;
2692 if (hasANALYSISalice) {
2693 out <<"//________________________________________________________________________________" << endl;
2694 out << "Bool_t SetupPar(const char *package) {" << endl;
2695 out << "// Compile the package and set it up." << endl;
2696 out << " TString pkgdir = package;" << endl;
2697 out << " pkgdir.ReplaceAll(\".par\",\"\");" << endl;
2698 out << " gSystem->Exec(Form(\"tar xvzf %s.par\", pkgdir.Data()));" << endl;
2699 out << " TString cdir = gSystem->WorkingDirectory();" << endl;
2700 out << " gSystem->ChangeDirectory(pkgdir);" << endl;
2701 out << " // Check for BUILD.sh and execute" << endl;
2702 out << " if (!gSystem->AccessPathName(\"PROOF-INF/BUILD.sh\")) {" << endl;
2703 out << " printf(\"*******************************\\n\");" << endl;
2704 out << " printf(\"*** Building PAR archive ***\\n\");" << endl;
2705 out << " printf(\"*******************************\\n\");" << endl;
2706 out << " if (gSystem->Exec(\"PROOF-INF/BUILD.sh\")) {" << endl;
2707 out << " ::Error(\"SetupPar\", \"Cannot build par archive %s\", pkgdir.Data());" << endl;
2708 out << " gSystem->ChangeDirectory(cdir);" << endl;
2709 out << " return kFALSE;" << endl;
2710 out << " }" << endl;
2711 out << " } else {" << endl;
2712 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/BUILD.sh for package %s\", pkgdir.Data());" << endl;
2713 out << " gSystem->ChangeDirectory(cdir);" << endl;
2714 out << " return kFALSE;" << endl;
2715 out << " }" << endl;
2716 out << " // Check for SETUP.C and execute" << endl;
2717 out << " if (!gSystem->AccessPathName(\"PROOF-INF/SETUP.C\")) {" << endl;
2718 out << " printf(\"*******************************\\n\");" << endl;
2719 out << " printf(\"*** Setup PAR archive ***\\n\");" << endl;
2720 out << " printf(\"*******************************\\n\");" << endl;
2721 out << " gROOT->Macro(\"PROOF-INF/SETUP.C\");" << endl;
2722 out << " } else {" << endl;
2723 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/SETUP.C for package %s\", pkgdir.Data());" << endl;
2724 out << " gSystem->ChangeDirectory(cdir);" << endl;
2725 out << " return kFALSE;" << endl;
2726 out << " }" << endl;
2727 out << " // Restore original workdir" << endl;
2728 out << " gSystem->ChangeDirectory(cdir);" << endl;
2729 out << " return kTRUE;" << endl;
2732 Info("WriteAnalysisMacro", "\n##### Analysis macro to run on worker nodes <%s> written",fAnalysisMacro.Data());
2734 Bool_t copy = kTRUE;
2735 if (TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
2738 TString workdir = gGrid->GetHomeDirectory();
2739 workdir += fGridWorkingDir;
2740 if (FileExists(fAnalysisMacro)) gGrid->Rm(fAnalysisMacro);
2741 if (IsUsingTags() && !gSystem->AccessPathName("ConfigureCuts.C")) {
2742 if (FileExists("ConfigureCuts.C")) gGrid->Rm("ConfigureCuts.C");
2743 Info("WriteAnalysisMacro", "\n##### Copying cuts configuration macro: <ConfigureCuts.C> to your alien workspace");
2744 TFile::Cp("file:ConfigureCuts.C", Form("alien://%s/ConfigureCuts.C", workdir.Data()));
2746 Info("WriteAnalysisMacro", "\n##### Copying analysis macro: <%s> to your alien workspace", fAnalysisMacro.Data());
2747 TFile::Cp(Form("file:%s",fAnalysisMacro.Data()), Form("alien://%s/%s", workdir.Data(), fAnalysisMacro.Data()));
2751 //______________________________________________________________________________
2752 void AliAnalysisAlien::WriteMergingMacro()
2754 // Write a macro to merge the outputs per master job.
2755 if (!fMergeViaJDL) return;
2756 if (!fOutputFiles.Length()) {
2757 Error("WriteMergingMacro", "No output file names defined. Are you running the right AliAnalysisAlien configuration ?");
2760 TString mergingMacro = fExecutable;
2761 mergingMacro.ReplaceAll(".sh","_merge.C");
2762 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("/%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
2763 if (!TestBit(AliAnalysisGrid::kSubmit)) {
2765 out.open(mergingMacro.Data(), ios::out);
2767 Error("WriteMergingMacro", "could not open file %s for writing", fAnalysisMacro.Data());
2770 Bool_t hasSTEERBase = kFALSE;
2771 Bool_t hasESD = kFALSE;
2772 Bool_t hasAOD = kFALSE;
2773 Bool_t hasANALYSIS = kFALSE;
2774 Bool_t hasANALYSISalice = kFALSE;
2775 Bool_t hasCORRFW = kFALSE;
2776 TString func = mergingMacro;
2778 func.ReplaceAll(".C", "");
2779 out << "void " << func.Data() << "(const char *dir, Int_t stage=0, Int_t ichunk=0)" << endl;
2781 out << "// Automatically generated merging macro executed in grid subjobs" << endl << endl;
2782 out << " TStopwatch timer;" << endl;
2783 out << " timer.Start();" << endl << endl;
2784 if (!fExecutableCommand.Contains("aliroot")) {
2785 out << "// load base root libraries" << endl;
2786 out << " gSystem->Load(\"libTree\");" << endl;
2787 out << " gSystem->Load(\"libGeom\");" << endl;
2788 out << " gSystem->Load(\"libVMC\");" << endl;
2789 out << " gSystem->Load(\"libPhysics\");" << endl << endl;
2790 out << " gSystem->Load(\"libMinuit\");" << endl << endl;
2792 if (fAdditionalRootLibs.Length()) {
2793 // in principle libtree /lib geom libvmc etc. can go into this list, too
2794 out << "// Add aditional libraries" << endl;
2795 TObjArray *list = fAdditionalRootLibs.Tokenize(" ");
2798 while((str=(TObjString*)next())) {
2799 if (str->GetString().Contains(".so"))
2800 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
2802 if (list) delete list;
2804 out << "// include path" << endl;
2805 if (fIncludePath.Length()) out << " gSystem->AddIncludePath(\"" << fIncludePath.Data() << "\");" << endl;
2806 out << " gSystem->AddIncludePath(\"-I$ALICE_ROOT/include\");" << endl << endl;
2807 out << "// Load analysis framework libraries" << endl;
2809 if (!fExecutableCommand.Contains("aliroot")) {
2810 out << " gSystem->Load(\"libSTEERBase\");" << endl;
2811 out << " gSystem->Load(\"libESD\");" << endl;
2812 out << " gSystem->Load(\"libAOD\");" << endl;
2814 out << " gSystem->Load(\"libANALYSIS\");" << endl;
2815 out << " gSystem->Load(\"libANALYSISalice\");" << endl;
2816 out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
2818 TIter next(fPackages);
2821 TString setupPar = "AliAnalysisAlien::SetupPar";
2822 while ((obj=next())) {
2823 pkgname = obj->GetName();
2824 if (pkgname == "STEERBase" ||
2825 pkgname == "STEERBase.par") hasSTEERBase = kTRUE;
2826 if (pkgname == "ESD" ||
2827 pkgname == "ESD.par") hasESD = kTRUE;
2828 if (pkgname == "AOD" ||
2829 pkgname == "AOD.par") hasAOD = kTRUE;
2830 if (pkgname == "ANALYSIS" ||
2831 pkgname == "ANALYSIS.par") hasANALYSIS = kTRUE;
2832 if (pkgname == "ANALYSISalice" ||
2833 pkgname == "ANALYSISalice.par") hasANALYSISalice = kTRUE;
2834 if (pkgname == "CORRFW" ||
2835 pkgname == "CORRFW.par") hasCORRFW = kTRUE;
2837 if (hasANALYSISalice) setupPar = "SetupPar";
2838 if (!hasSTEERBase) out << " gSystem->Load(\"libSTEERBase\");" << endl;
2839 else out << " if (!" << setupPar << "(\"STEERBase\")) return;" << endl;
2840 if (!hasESD) out << " gSystem->Load(\"libESD\");" << endl;
2841 else out << " if (!" << setupPar << "(\"ESD\")) return;" << endl;
2842 if (!hasAOD) out << " gSystem->Load(\"libAOD\");" << endl;
2843 else out << " if (!" << setupPar << "(\"AOD\")) return;" << endl;
2844 if (!hasANALYSIS) out << " gSystem->Load(\"libANALYSIS\");" << endl;
2845 else out << " if (!" << setupPar << "(\"ANALYSIS\")) return;" << endl;
2846 if (!hasANALYSISalice) out << " gSystem->Load(\"libANALYSISalice\");" << endl;
2847 else out << " if (!" << setupPar << "(\"ANALYSISalice\")) return;" << endl;
2848 if (!hasCORRFW) out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
2849 else out << " if (!" << setupPar << "(\"CORRFW\")) return;" << endl << endl;
2850 out << "// Compile other par packages" << endl;
2852 while ((obj=next())) {
2853 pkgname = obj->GetName();
2854 if (pkgname == "STEERBase" ||
2855 pkgname == "STEERBase.par" ||
2857 pkgname == "ESD.par" ||
2859 pkgname == "AOD.par" ||
2860 pkgname == "ANALYSIS" ||
2861 pkgname == "ANALYSIS.par" ||
2862 pkgname == "ANALYSISalice" ||
2863 pkgname == "ANALYSISalice.par" ||
2864 pkgname == "CORRFW" ||
2865 pkgname == "CORRFW.par") continue;
2866 out << " if (!" << setupPar << "(\"" << obj->GetName() << "\")) return;" << endl;
2869 if (fAdditionalLibs.Length()) {
2870 out << "// Add aditional AliRoot libraries" << endl;
2871 TObjArray *list = fAdditionalLibs.Tokenize(" ");
2874 while((str=(TObjString*)next())) {
2875 if (str->GetString().Contains(".so"))
2876 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
2878 if (list) delete list;
2881 out << "// Analysis source to be compiled at runtime (if any)" << endl;
2882 if (fAnalysisSource.Length()) {
2883 TObjArray *list = fAnalysisSource.Tokenize(" ");
2886 while((str=(TObjString*)next())) {
2887 out << " gROOT->ProcessLine(\".L " << str->GetString().Data() << "+g\");" << endl;
2889 if (list) delete list;
2893 if (fFastReadOption) {
2894 Warning("WriteMergingMacro", "!!! You requested FastRead option. Using xrootd flags to reduce timeouts in the grid merging jobs. Note that this may skip some files that could be accessed !!!");
2895 out << "// fast xrootd reading enabled" << endl;
2896 out << " printf(\"!!! You requested FastRead option. Using xrootd flags to reduce timeouts. Note that this may skip some files that could be accessed !!!\");" << endl;
2897 out << " gEnv->SetValue(\"XNet.ConnectTimeout\",10);" << endl;
2898 out << " gEnv->SetValue(\"XNet.RequestTimeout\",10);" << endl;
2899 out << " gEnv->SetValue(\"XNet.MaxRedirectCount\",2);" << endl;
2900 out << " gEnv->SetValue(\"XNet.ReconnectTimeout\",10);" << endl;
2901 out << " gEnv->SetValue(\"XNet.FirstConnectMaxCnt\",1);" << endl << endl;
2903 // Change temp directory to current one
2904 out << "// Set temporary merging directory to current one" << endl;
2905 out << " gSystem->Setenv(\"TMPDIR\", gSystem->pwd());" << endl << endl;
2906 out << "// Connect to AliEn" << endl;
2907 out << " if (!TGrid::Connect(\"alien://\")) return;" << endl;
2908 out << " Bool_t laststage = kFALSE;" << endl;
2909 out << " TString outputDir = dir;" << endl;
2910 out << " TString outputFiles = \"" << fOutputFiles << "\";" << endl;
2911 out << " TString mergeExcludes = \"" << fMergeExcludes << "\";" << endl;
2912 out << " mergeExcludes += \"" << AliAnalysisManager::GetAnalysisManager()->GetExtraFiles() << "\";" << endl;
2913 out << " TObjArray *list = outputFiles.Tokenize(\",\");" << endl;
2914 out << " TIter *iter = new TIter(list);" << endl;
2915 out << " TObjString *str;" << endl;
2916 out << " TString outputFile;" << endl;
2917 out << " Bool_t merged = kTRUE;" << endl;
2918 out << " while((str=(TObjString*)iter->Next())) {" << endl;
2919 out << " outputFile = str->GetString();" << endl;
2920 out << " if (outputFile.Contains(\"*\")) continue;" << endl;
2921 out << " Int_t index = outputFile.Index(\"@\");" << endl;
2922 out << " if (index > 0) outputFile.Remove(index);" << endl;
2923 out << " // Skip already merged outputs" << endl;
2924 out << " if (!gSystem->AccessPathName(outputFile)) {" << endl;
2925 out << " printf(\"Output file <%s> found. Not merging again.\",outputFile.Data());" << endl;
2926 out << " continue;" << endl;
2927 out << " }" << endl;
2928 out << " if (mergeExcludes.Contains(outputFile.Data())) continue;" << endl;
2929 out << " merged = AliAnalysisAlien::MergeOutput(outputFile, outputDir, " << fMaxMergeFiles << ", stage, ichunk);" << endl;
2930 out << " if (!merged) {" << endl;
2931 out << " printf(\"ERROR: Cannot merge %s\\n\", outputFile.Data());" << endl;
2932 out << " return;" << endl;
2933 out << " }" << endl;
2934 out << " // Check if this was the last stage. If yes, run terminate for the tasks." << endl;
2935 out << " if (!gSystem->AccessPathName(outputFile)) laststage = kTRUE;" << endl;
2936 out << " }" << endl;
2937 out << " // all outputs merged, validate" << endl;
2938 out << " ofstream out;" << endl;
2939 out << " out.open(\"outputs_valid\", ios::out);" << endl;
2940 out << " out.close();" << endl;
2941 out << " // read the analysis manager from file" << endl;
2942 TString analysisFile = fExecutable;
2943 analysisFile.ReplaceAll(".sh", ".root");
2944 out << " if (!laststage) return;" << endl;
2945 out << " TFile *file = TFile::Open(\"" << analysisFile << "\");" << endl;
2946 out << " if (!file) return;" << endl;
2947 out << " TIter nextkey(file->GetListOfKeys());" << endl;
2948 out << " AliAnalysisManager *mgr = 0;" << endl;
2949 out << " TKey *key;" << endl;
2950 out << " while ((key=(TKey*)nextkey())) {" << endl;
2951 out << " if (!strcmp(key->GetClassName(), \"AliAnalysisManager\"))" << endl;
2952 out << " mgr = (AliAnalysisManager*)file->Get(key->GetName());" << endl;
2953 out << " };" << endl;
2954 out << " if (!mgr) {" << endl;
2955 out << " ::Error(\"" << func.Data() << "\", \"No analysis manager found in file" << analysisFile <<"\");" << endl;
2956 out << " return;" << endl;
2957 out << " }" << endl << endl;
2958 out << " mgr->SetSkipTerminate(kFALSE);" << endl;
2959 out << " mgr->PrintStatus();" << endl;
2960 if (AliAnalysisManager::GetAnalysisManager()) {
2961 if (AliAnalysisManager::GetAnalysisManager()->GetDebugLevel()>3) {
2962 out << " gEnv->SetValue(\"XNet.Debug\", \"1\");" << endl;
2964 out << " AliLog::SetGlobalLogLevel(AliLog::kError);" << endl;
2967 out << " mgr->StartAnalysis(\"gridterminate\");" << endl;
2968 out << "}" << endl << endl;
2969 if (hasANALYSISalice) {
2970 out <<"//________________________________________________________________________________" << endl;
2971 out << "Bool_t SetupPar(const char *package) {" << endl;
2972 out << "// Compile the package and set it up." << endl;
2973 out << " TString pkgdir = package;" << endl;
2974 out << " pkgdir.ReplaceAll(\".par\",\"\");" << endl;
2975 out << " gSystem->Exec(Form(\"tar xvzf %s.par\", pkgdir.Data()));" << endl;
2976 out << " TString cdir = gSystem->WorkingDirectory();" << endl;
2977 out << " gSystem->ChangeDirectory(pkgdir);" << endl;
2978 out << " // Check for BUILD.sh and execute" << endl;
2979 out << " if (!gSystem->AccessPathName(\"PROOF-INF/BUILD.sh\")) {" << endl;
2980 out << " printf(\"*******************************\\n\");" << endl;
2981 out << " printf(\"*** Building PAR archive ***\\n\");" << endl;
2982 out << " printf(\"*******************************\\n\");" << endl;
2983 out << " if (gSystem->Exec(\"PROOF-INF/BUILD.sh\")) {" << endl;
2984 out << " ::Error(\"SetupPar\", \"Cannot build par archive %s\", pkgdir.Data());" << endl;
2985 out << " gSystem->ChangeDirectory(cdir);" << endl;
2986 out << " return kFALSE;" << endl;
2987 out << " }" << endl;
2988 out << " } else {" << endl;
2989 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/BUILD.sh for package %s\", pkgdir.Data());" << endl;
2990 out << " gSystem->ChangeDirectory(cdir);" << endl;
2991 out << " return kFALSE;" << endl;
2992 out << " }" << endl;
2993 out << " // Check for SETUP.C and execute" << endl;
2994 out << " if (!gSystem->AccessPathName(\"PROOF-INF/SETUP.C\")) {" << endl;
2995 out << " printf(\"*******************************\\n\");" << endl;
2996 out << " printf(\"*** Setup PAR archive ***\\n\");" << endl;
2997 out << " printf(\"*******************************\\n\");" << endl;
2998 out << " gROOT->Macro(\"PROOF-INF/SETUP.C\");" << endl;
2999 out << " } else {" << endl;
3000 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/SETUP.C for package %s\", pkgdir.Data());" << endl;
3001 out << " gSystem->ChangeDirectory(cdir);" << endl;
3002 out << " return kFALSE;" << endl;
3003 out << " }" << endl;
3004 out << " // Restore original workdir" << endl;
3005 out << " gSystem->ChangeDirectory(cdir);" << endl;
3006 out << " return kTRUE;" << endl;
3010 Bool_t copy = kTRUE;
3011 if (TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3014 TString workdir = gGrid->GetHomeDirectory();
3015 workdir += fGridWorkingDir;
3016 if (FileExists(mergingMacro)) gGrid->Rm(mergingMacro);
3017 Info("WriteMergingMacro", "\n##### Copying merging macro: <%s> to your alien workspace", mergingMacro.Data());
3018 TFile::Cp(Form("file:%s",mergingMacro.Data()), Form("alien://%s/%s", workdir.Data(), mergingMacro.Data()));
3022 //______________________________________________________________________________
3023 Bool_t AliAnalysisAlien::SetupPar(const char *package)
3025 // Compile the par file archive pointed by <package>. This must be present in the current directory.
3026 // Note that for loading the compiled library. The current directory should have precedence in
3028 TString pkgdir = package;
3029 pkgdir.ReplaceAll(".par","");
3030 gSystem->Exec(Form("tar xvzf %s.par", pkgdir.Data()));
3031 TString cdir = gSystem->WorkingDirectory();
3032 gSystem->ChangeDirectory(pkgdir);
3033 // Check for BUILD.sh and execute
3034 if (!gSystem->AccessPathName("PROOF-INF/BUILD.sh")) {
3035 printf("**************************************************\n");
3036 printf("*** Building PAR archive %s\n", package);
3037 printf("**************************************************\n");
3038 if (gSystem->Exec("PROOF-INF/BUILD.sh")) {
3039 ::Error("SetupPar", "Cannot build par archive %s", pkgdir.Data());
3040 gSystem->ChangeDirectory(cdir);
3044 ::Error("SetupPar","Cannot access PROOF-INF/BUILD.sh for package %s", pkgdir.Data());
3045 gSystem->ChangeDirectory(cdir);
3048 // Check for SETUP.C and execute
3049 if (!gSystem->AccessPathName("PROOF-INF/SETUP.C")) {
3050 printf("**************************************************\n");
3051 printf("*** Setup PAR archive %s\n", package);
3052 printf("**************************************************\n");
3053 gROOT->Macro("PROOF-INF/SETUP.C");
3054 printf("*** Loaded library: %s\n", gSystem->GetLibraries(pkgdir,"",kFALSE));
3056 ::Error("SetupPar","Cannot access PROOF-INF/SETUP.C for package %s", pkgdir.Data());
3057 gSystem->ChangeDirectory(cdir);
3060 // Restore original workdir
3061 gSystem->ChangeDirectory(cdir);
3065 //______________________________________________________________________________
3066 void AliAnalysisAlien::WriteExecutable()
3068 // Generate the alien executable script.
3069 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3071 out.open(fExecutable.Data(), ios::out);
3073 Error("WriteExecutable", "Bad file name for executable: %s", fExecutable.Data());
3076 out << "#!/bin/bash" << endl;
3077 out << "echo \"=========================================\"" << endl;
3078 out << "echo \"############## PATH : ##############\"" << endl;
3079 out << "echo $PATH" << endl;
3080 out << "echo \"############## LD_LIBRARY_PATH : ##############\"" << endl;
3081 out << "echo $LD_LIBRARY_PATH" << endl;
3082 out << "echo \"############## ROOTSYS : ##############\"" << endl;
3083 out << "echo $ROOTSYS" << endl;
3084 out << "echo \"############## which root : ##############\"" << endl;
3085 out << "which root" << endl;
3086 out << "echo \"############## ALICE_ROOT : ##############\"" << endl;
3087 out << "echo $ALICE_ROOT" << endl;
3088 out << "echo \"############## which aliroot : ##############\"" << endl;
3089 out << "which aliroot" << endl;
3090 out << "echo \"############## system limits : ##############\"" << endl;
3091 out << "ulimit -a" << endl;
3092 out << "echo \"############## memory : ##############\"" << endl;
3093 out << "free -m" << endl;
3094 out << "echo \"=========================================\"" << endl << endl;
3095 // Make sure we can properly compile par files
3096 if (TObject::TestBit(AliAnalysisGrid::kUsePars)) out << "export LD_LIBRARY_PATH=.:$LD_LIBRARY_PATH" << endl;
3097 out << fExecutableCommand << " ";
3098 out << fAnalysisMacro.Data() << " " << fExecutableArgs.Data() << endl << endl;
3099 out << "echo \"======== " << fAnalysisMacro.Data() << " finished with exit code: $? ========\"" << endl;
3100 out << "echo \"############## memory after: ##############\"" << endl;
3101 out << "free -m" << endl;
3103 Bool_t copy = kTRUE;
3104 if (TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3107 TString workdir = gGrid->GetHomeDirectory();
3108 TString bindir = Form("%s/bin", workdir.Data());
3109 if (!DirectoryExists(bindir)) gGrid->Mkdir(bindir,"-p");
3110 workdir += fGridWorkingDir;
3111 TString executable = Form("%s/bin/%s", gGrid->GetHomeDirectory(), fExecutable.Data());
3112 if (FileExists(executable)) gGrid->Rm(executable);
3113 Info("CreateJDL", "\n##### Copying executable file <%s> to your AliEn bin directory", fExecutable.Data());
3114 TFile::Cp(Form("file:%s",fExecutable.Data()), Form("alien://%s", executable.Data()));
3118 //______________________________________________________________________________
3119 void AliAnalysisAlien::WriteMergeExecutable()
3121 // Generate the alien executable script for the merging job.
3122 if (!fMergeViaJDL) return;
3123 TString mergeExec = fExecutable;
3124 mergeExec.ReplaceAll(".sh", "_merge.sh");
3125 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3127 out.open(mergeExec.Data(), ios::out);
3129 Error("WriteMergingExecutable", "Bad file name for executable: %s", mergeExec.Data());
3132 out << "#!/bin/bash" << endl;
3133 out << "echo \"=========================================\"" << endl;
3134 out << "echo \"############## PATH : ##############\"" << endl;
3135 out << "echo $PATH" << endl;
3136 out << "echo \"############## LD_LIBRARY_PATH : ##############\"" << endl;
3137 out << "echo $LD_LIBRARY_PATH" << endl;
3138 out << "echo \"############## ROOTSYS : ##############\"" << endl;
3139 out << "echo $ROOTSYS" << endl;
3140 out << "echo \"############## which root : ##############\"" << endl;
3141 out << "which root" << endl;
3142 out << "echo \"############## ALICE_ROOT : ##############\"" << endl;
3143 out << "echo $ALICE_ROOT" << endl;
3144 out << "echo \"############## which aliroot : ##############\"" << endl;
3145 out << "which aliroot" << endl;
3146 out << "echo \"############## system limits : ##############\"" << endl;
3147 out << "ulimit -a" << endl;
3148 out << "echo \"############## memory : ##############\"" << endl;
3149 out << "free -m" << endl;
3150 out << "echo \"=========================================\"" << endl << endl;
3151 // Make sure we can properly compile par files
3152 if (TObject::TestBit(AliAnalysisGrid::kUsePars)) out << "export LD_LIBRARY_PATH=.:$LD_LIBRARY_PATH" << endl;
3153 TString mergeMacro = fExecutable;
3154 mergeMacro.ReplaceAll(".sh", "_merge.C");
3155 out << "export ARG=\"" << mergeMacro << "(\\\"$1\\\",$2,$3)\"" << endl;
3156 out << fExecutableCommand << " " << "$ARG" << endl;
3157 out << "echo \"======== " << mergeMacro.Data() << " finished with exit code: $? ========\"" << endl;
3158 out << "echo \"############## memory after: ##############\"" << endl;
3159 out << "free -m" << endl;
3161 Bool_t copy = kTRUE;
3162 if (TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3165 TString workdir = gGrid->GetHomeDirectory();
3166 TString bindir = Form("%s/bin", workdir.Data());
3167 if (!DirectoryExists(bindir)) gGrid->Mkdir(bindir,"-p");
3168 workdir += fGridWorkingDir;
3169 TString executable = Form("%s/bin/%s", gGrid->GetHomeDirectory(), mergeExec.Data());
3170 if (FileExists(executable)) gGrid->Rm(executable);
3171 Info("CreateJDL", "\n##### Copying executable file <%s> to your AliEn bin directory", mergeExec.Data());
3172 TFile::Cp(Form("file:%s",mergeExec.Data()), Form("alien://%s", executable.Data()));
3176 //______________________________________________________________________________
3177 void AliAnalysisAlien::WriteProductionFile(const char *filename) const
3179 // Write the production file to be submitted by LPM manager. The format is:
3180 // First line: full_path_to_jdl estimated_no_subjobs_per_master
3181 // Next lines: full_path_to_dataset XXX (XXX is a string)
3182 // To submit, one has to: submit jdl XXX for all lines
3184 out.open(filename, ios::out);
3186 Error("WriteProductionFile", "Bad file name: %s", filename);
3189 TString workdir = gGrid->GetHomeDirectory();
3190 workdir += fGridWorkingDir;
3191 Int_t njobspermaster = 1000*fNrunsPerMaster/fSplitMaxInputFileNumber;
3192 TString locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
3193 out << locjdl << " " << njobspermaster << endl;
3194 Int_t nmasterjobs = fInputFiles->GetEntries();
3195 for (Int_t i=0; i<nmasterjobs; i++) {
3196 TString runOutDir = gSystem->BaseName(fInputFiles->At(i)->GetName());
3197 runOutDir.ReplaceAll(".xml", "");
3199 out << Form("%s", fInputFiles->At(i)->GetName()) << " " << runOutDir << endl;
3201 out << Form("%s", fInputFiles->At(i)->GetName()) << " " << Form("%03d", i) << endl;
3203 Info("WriteProductionFile", "\n##### Copying production file <%s> to your work directory", filename);
3204 if (FileExists(filename)) gGrid->Rm(filename);
3205 TFile::Cp(Form("file:%s",filename), Form("alien://%s/%s", workdir.Data(),filename));
3208 //______________________________________________________________________________
3209 void AliAnalysisAlien::WriteValidationScript(Bool_t merge)
3211 // Generate the alien validation script.
3212 // Generate the validation script
3214 TString validationScript = fExecutable;
3215 if (merge) validationScript.ReplaceAll(".sh", "_mergevalidation.sh");
3216 else validationScript.ReplaceAll(".sh", "_validation.sh");
3218 Error("WriteValidationScript", "Alien connection required");
3221 TString outStream = "";
3222 if (!TestBit(AliAnalysisGrid::kTest)) outStream = " >> stdout";
3223 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3225 out.open(validationScript, ios::out);
3226 out << "#!/bin/bash" << endl;
3227 out << "##################################################" << endl;
3228 out << "validateout=`dirname $0`" << endl;
3229 out << "validatetime=`date`" << endl;
3230 out << "validated=\"0\";" << endl;
3231 out << "error=0" << endl;
3232 out << "if [ -z $validateout ]" << endl;
3233 out << "then" << endl;
3234 out << " validateout=\".\"" << endl;
3235 out << "fi" << endl << endl;
3236 out << "cd $validateout;" << endl;
3237 out << "validateworkdir=`pwd`;" << endl << endl;
3238 out << "echo \"*******************************************************\"" << outStream << endl;
3239 out << "echo \"* Automatically generated validation script *\"" << outStream << endl;
3241 out << "echo \"* Time: $validatetime \"" << outStream << endl;
3242 out << "echo \"* Dir: $validateout\"" << outStream << endl;
3243 out << "echo \"* Workdir: $validateworkdir\"" << outStream << endl;
3244 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl;
3245 out << "ls -la ./" << outStream << endl;
3246 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl << endl;
3247 out << "##################################################" << endl;
3250 out << "if [ ! -f stderr ] ; then" << endl;
3251 out << " error=1" << endl;
3252 out << " echo \"* ########## Job not validated - no stderr ###\" " << outStream << endl;
3253 out << " echo \"Error = $error\" " << outStream << endl;
3254 out << "fi" << endl;
3256 out << "parArch=`grep -Ei \"Cannot Build the PAR Archive\" stderr`" << endl;
3257 out << "segViol=`grep -Ei \"Segmentation violation\" stderr`" << endl;
3258 out << "segFault=`grep -Ei \"Segmentation fault\" stderr`" << endl;
3259 out << "glibcErr=`grep -Ei \"*** glibc detected ***\" stderr`" << endl;
3262 out << "if [ \"$parArch\" != \"\" ] ; then" << endl;
3263 out << " error=1" << endl;
3264 out << " echo \"* ########## Job not validated - PAR archive not built ###\" " << outStream << endl;
3265 out << " echo \"$parArch\" " << outStream << endl;
3266 out << " echo \"Error = $error\" " << outStream << endl;
3267 out << "fi" << endl;
3269 out << "if [ \"$segViol\" != \"\" ] ; then" << endl;
3270 out << " error=1" << endl;
3271 out << " echo \"* ########## Job not validated - Segment. violation ###\" " << outStream << endl;
3272 out << " echo \"$segViol\" " << outStream << endl;
3273 out << " echo \"Error = $error\" " << outStream << endl;
3274 out << "fi" << endl;
3276 out << "if [ \"$segFault\" != \"\" ] ; then" << endl;
3277 out << " error=1" << endl;
3278 out << " echo \"* ########## Job not validated - Segment. fault ###\" " << outStream << endl;
3279 out << " echo \"$segFault\" " << outStream << endl;
3280 out << " echo \"Error = $error\" " << outStream << endl;
3281 out << "fi" << endl;
3283 out << "if [ \"$glibcErr\" != \"\" ] ; then" << endl;
3284 out << " error=1" << endl;
3285 out << " echo \"* ########## Job not validated - *** glibc detected *** ###\" " << outStream << endl;
3286 out << " echo \"$glibcErr\" " << outStream << endl;
3287 out << " echo \"Error = $error\" " << outStream << endl;
3288 out << "fi" << endl;
3290 // Part dedicated to the specific analyses running into the train
3292 TObjArray *arr = fOutputFiles.Tokenize(",");
3295 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
3296 TString extra = mgr->GetExtraFiles();
3297 while ((os=(TObjString*)next1())) {
3299 outputFile = os->GetString();
3300 Int_t index = outputFile.Index("@");
3301 if (index > 0) outputFile.Remove(index);
3302 if (merge && fMergeExcludes.Contains(outputFile)) continue;
3303 if (extra.Contains(outputFile)) continue;
3304 if (outputFile.Contains("*")) continue;
3305 out << "if ! [ -f " << outputFile.Data() << " ] ; then" << endl;
3306 out << " error=1" << endl;
3307 out << " echo \"Output file " << outputFile << " not found. Job FAILED !\"" << outStream << endl;
3308 out << " echo \"Output file " << outputFile << " not found. Job FAILED !\" >> stderr" << endl;
3309 out << "fi" << endl;
3312 out << "if ! [ -f outputs_valid ] ; then" << endl;
3313 out << " error=1" << endl;
3314 out << " echo \"Output files were not validated by the analysis manager\" >> stdout" << endl;
3315 out << " echo \"Output files were not validated by the analysis manager\" >> stderr" << endl;
3316 out << "fi" << endl;
3318 out << "if [ $error = 0 ] ; then" << endl;
3319 out << " echo \"* ---------------- Job Validated ------------------*\"" << outStream << endl;
3320 if (!IsKeepLogs()) {
3321 out << " echo \"* === Logs std* will be deleted === \"" << endl;
3323 out << " rm -f std*" << endl;
3325 out << "fi" << endl;
3327 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl;
3328 out << "echo \"*******************************************************\"" << outStream << endl;
3329 out << "cd -" << endl;
3330 out << "exit $error" << endl;
3332 Bool_t copy = kTRUE;
3333 if (TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3336 TString workdir = gGrid->GetHomeDirectory();
3337 workdir += fGridWorkingDir;
3338 Info("CreateJDL", "\n##### Copying validation script <%s> to your AliEn working space", validationScript.Data());
3339 if (FileExists(validationScript)) gGrid->Rm(validationScript);
3340 TFile::Cp(Form("file:%s",validationScript.Data()), Form("alien://%s/%s", workdir.Data(),validationScript.Data()));