1 /**************************************************************************
2 * Copyright(c) 1998-2007, ALICE Experiment at CERN, All rights reserved. *
4 * Author: The ALICE Off-line Project. *
5 * Contributors are mentioned in the code where appropriate. *
7 * Permission to use, copy, modify and distribute this software and its *
8 * documentation strictly for non-commercial purposes is hereby granted *
9 * without fee, provided that the above copyright notice appears in all *
10 * copies and that both the copyright notice and this permission notice *
11 * appear in the supporting documentation. The authors make no claims *
12 * about the suitability of this software for any purpose. It is *
13 * provided "as is" without express or implied warranty. *
14 **************************************************************************/
16 // Author: Mihaela Gheata, 01/09/2008
18 //==============================================================================
19 // AliAnalysisAlien - AliEn utility class. Provides interface for creating
20 // a personalized JDL, finding and creating a dataset.
21 //==============================================================================
23 #include "Riostream.h"
30 #include "TObjString.h"
31 #include "TObjArray.h"
33 #include "TGridResult.h"
34 #include "TGridCollection.h"
36 #include "TGridJobStatusList.h"
37 #include "TGridJobStatus.h"
38 #include "TFileMerger.h"
39 #include "AliAnalysisManager.h"
40 #include "AliVEventHandler.h"
41 #include "AliAnalysisDataContainer.h"
42 #include "AliAnalysisAlien.h"
44 ClassImp(AliAnalysisAlien)
46 //______________________________________________________________________________
47 AliAnalysisAlien::AliAnalysisAlien()
53 fSplitMaxInputFileNumber(0),
55 fMasterResubmitThreshold(0),
73 fAdditionalRootLibs(),
104 //______________________________________________________________________________
105 AliAnalysisAlien::AliAnalysisAlien(const char *name)
106 :AliAnalysisGrid(name),
111 fSplitMaxInputFileNumber(0),
113 fMasterResubmitThreshold(0),
126 fExecutableCommand(),
131 fAdditionalRootLibs(),
162 //______________________________________________________________________________
163 AliAnalysisAlien::AliAnalysisAlien(const AliAnalysisAlien& other)
164 :AliAnalysisGrid(other),
167 fPrice(other.fPrice),
169 fSplitMaxInputFileNumber(other.fSplitMaxInputFileNumber),
170 fMaxInitFailed(other.fMaxInitFailed),
171 fMasterResubmitThreshold(other.fMasterResubmitThreshold),
172 fNtestFiles(other.fNtestFiles),
173 fNrunsPerMaster(other.fNrunsPerMaster),
174 fMaxMergeFiles(other.fMaxMergeFiles),
175 fNsubmitted(other.fNsubmitted),
176 fProductionMode(other.fProductionMode),
177 fOutputToRunNo(other.fOutputToRunNo),
178 fMergeViaJDL(other.fMergeViaJDL),
179 fFastReadOption(other.fFastReadOption),
180 fOverwriteMode(other.fOverwriteMode),
181 fNreplicas(other.fNreplicas),
182 fRunNumbers(other.fRunNumbers),
183 fExecutable(other.fExecutable),
184 fExecutableCommand(other.fExecutableCommand),
185 fArguments(other.fArguments),
186 fExecutableArgs(other.fExecutableArgs),
187 fAnalysisMacro(other.fAnalysisMacro),
188 fAnalysisSource(other.fAnalysisSource),
189 fAdditionalRootLibs(other.fAdditionalRootLibs),
190 fAdditionalLibs(other.fAdditionalLibs),
191 fSplitMode(other.fSplitMode),
192 fAPIVersion(other.fAPIVersion),
193 fROOTVersion(other.fROOTVersion),
194 fAliROOTVersion(other.fAliROOTVersion),
195 fExternalPackages(other.fExternalPackages),
197 fGridWorkingDir(other.fGridWorkingDir),
198 fGridDataDir(other.fGridDataDir),
199 fDataPattern(other.fDataPattern),
200 fGridOutputDir(other.fGridOutputDir),
201 fOutputArchive(other.fOutputArchive),
202 fOutputFiles(other.fOutputFiles),
203 fInputFormat(other.fInputFormat),
204 fDatasetName(other.fDatasetName),
205 fJDLName(other.fJDLName),
206 fMergeExcludes(other.fMergeExcludes),
207 fIncludePath(other.fIncludePath),
208 fCloseSE(other.fCloseSE),
209 fFriendChainName(other.fFriendChainName),
210 fJobTag(other.fJobTag),
211 fOutputSingle(other.fOutputSingle),
212 fRunPrefix(other.fRunPrefix),
217 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
218 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
219 fRunRange[0] = other.fRunRange[0];
220 fRunRange[1] = other.fRunRange[1];
221 if (other.fInputFiles) {
222 fInputFiles = new TObjArray();
223 TIter next(other.fInputFiles);
225 while ((obj=next())) fInputFiles->Add(new TObjString(obj->GetName()));
226 fInputFiles->SetOwner();
228 if (other.fPackages) {
229 fPackages = new TObjArray();
230 TIter next(other.fPackages);
232 while ((obj=next())) fPackages->Add(new TObjString(obj->GetName()));
233 fPackages->SetOwner();
237 //______________________________________________________________________________
238 AliAnalysisAlien::~AliAnalysisAlien()
241 if (fGridJDL) delete fGridJDL;
242 if (fMergingJDL) delete fMergingJDL;
243 if (fInputFiles) delete fInputFiles;
244 if (fPackages) delete fPackages;
247 //______________________________________________________________________________
248 AliAnalysisAlien &AliAnalysisAlien::operator=(const AliAnalysisAlien& other)
251 if (this != &other) {
252 AliAnalysisGrid::operator=(other);
253 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
254 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
255 fPrice = other.fPrice;
257 fSplitMaxInputFileNumber = other.fSplitMaxInputFileNumber;
258 fMaxInitFailed = other.fMaxInitFailed;
259 fMasterResubmitThreshold = other.fMasterResubmitThreshold;
260 fNtestFiles = other.fNtestFiles;
261 fNrunsPerMaster = other.fNrunsPerMaster;
262 fMaxMergeFiles = other.fMaxMergeFiles;
263 fNsubmitted = other.fNsubmitted;
264 fProductionMode = other.fProductionMode;
265 fOutputToRunNo = other.fOutputToRunNo;
266 fMergeViaJDL = other.fMergeViaJDL;
267 fFastReadOption = other.fFastReadOption;
268 fOverwriteMode = other.fOverwriteMode;
269 fNreplicas = other.fNreplicas;
270 fRunNumbers = other.fRunNumbers;
271 fExecutable = other.fExecutable;
272 fExecutableCommand = other.fExecutableCommand;
273 fArguments = other.fArguments;
274 fExecutableArgs = other.fExecutableArgs;
275 fAnalysisMacro = other.fAnalysisMacro;
276 fAnalysisSource = other.fAnalysisSource;
277 fAdditionalRootLibs = other.fAdditionalRootLibs;
278 fAdditionalLibs = other.fAdditionalLibs;
279 fSplitMode = other.fSplitMode;
280 fAPIVersion = other.fAPIVersion;
281 fROOTVersion = other.fROOTVersion;
282 fAliROOTVersion = other.fAliROOTVersion;
283 fExternalPackages = other.fExternalPackages;
285 fGridWorkingDir = other.fGridWorkingDir;
286 fGridDataDir = other.fGridDataDir;
287 fDataPattern = other.fDataPattern;
288 fGridOutputDir = other.fGridOutputDir;
289 fOutputArchive = other.fOutputArchive;
290 fOutputFiles = other.fOutputFiles;
291 fInputFormat = other.fInputFormat;
292 fDatasetName = other.fDatasetName;
293 fJDLName = other.fJDLName;
294 fMergeExcludes = other.fMergeExcludes;
295 fIncludePath = other.fIncludePath;
296 fCloseSE = other.fCloseSE;
297 fFriendChainName = other.fFriendChainName;
298 fJobTag = other.fJobTag;
299 fOutputSingle = other.fOutputSingle;
300 fRunPrefix = other.fRunPrefix;
301 if (other.fInputFiles) {
302 fInputFiles = new TObjArray();
303 TIter next(other.fInputFiles);
305 while ((obj=next())) fInputFiles->Add(new TObjString(obj->GetName()));
306 fInputFiles->SetOwner();
308 if (other.fPackages) {
309 fPackages = new TObjArray();
310 TIter next(other.fPackages);
312 while ((obj=next())) fPackages->Add(new TObjString(obj->GetName()));
313 fPackages->SetOwner();
319 //______________________________________________________________________________
320 void AliAnalysisAlien::AddIncludePath(const char *path)
322 // Add include path in the remote analysis macro.
324 if (p.Contains("-I")) fIncludePath += Form("%s ", path);
325 else fIncludePath += Form("-I%s ", path);
328 //______________________________________________________________________________
329 void AliAnalysisAlien::AddRunNumber(Int_t run)
331 // Add a run number to the list of runs to be processed.
332 if (fRunNumbers.Length()) fRunNumbers += " ";
333 fRunNumbers += Form("%s%d", fRunPrefix.Data(), run);
336 //______________________________________________________________________________
337 void AliAnalysisAlien::AddRunNumber(const char* run)
339 // Add a run number to the list of runs to be processed.
340 if (fRunNumbers.Length()) fRunNumbers += " ";
344 //______________________________________________________________________________
345 void AliAnalysisAlien::AddDataFile(const char *lfn)
347 // Adds a data file to the input to be analysed. The file should be a valid LFN
348 // or point to an existing file in the alien workdir.
349 if (!fInputFiles) fInputFiles = new TObjArray();
350 fInputFiles->Add(new TObjString(lfn));
353 //______________________________________________________________________________
354 void AliAnalysisAlien::AddExternalPackage(const char *package)
356 // Adds external packages w.r.t to the default ones (root,aliroot and gapi)
357 if (fExternalPackages) fExternalPackages += " ";
358 fExternalPackages += package;
361 //______________________________________________________________________________
362 Bool_t AliAnalysisAlien::Connect()
364 // Try to connect to AliEn. User needs a valid token and /tmp/gclient_env_$UID sourced.
365 if (gGrid && gGrid->IsConnected()) return kTRUE;
367 Info("Connect", "Trying to connect to AliEn ...");
368 TGrid::Connect("alien://");
370 if (!gGrid || !gGrid->IsConnected()) {
371 Error("Connect", "Did not managed to connect to AliEn. Make sure you have a valid token.");
374 fUser = gGrid->GetUser();
375 Info("Connect", "\n##### Connected to AliEn as user %s. Setting analysis user to <%s>", fUser.Data(), fUser.Data());
379 //______________________________________________________________________________
380 void AliAnalysisAlien::CdWork()
382 // Check validity of alien workspace. Create directory if possible.
384 Error("CdWork", "Alien connection required");
387 TString homedir = gGrid->GetHomeDirectory();
388 TString workdir = homedir + fGridWorkingDir;
389 if (DirectoryExists(workdir)) {
393 // Work directory not existing - create it
395 if (gGrid->Mkdir(workdir, "-p")) {
396 gGrid->Cd(fGridWorkingDir);
397 Info("CreateJDL", "\n##### Created alien working directory %s", fGridWorkingDir.Data());
399 Warning("CreateJDL", "Working directory %s cannot be created.\n Using %s instead.",
400 workdir.Data(), homedir.Data());
401 fGridWorkingDir = "";
405 //______________________________________________________________________________
406 Bool_t AliAnalysisAlien::CheckFileCopy(const char *alienpath)
408 // Check if file copying is possible.
410 Error("CheckFileCopy", "Not connected to AliEn. File copying cannot be tested.");
413 Info("CheckFileCopy", "Checking possibility to copy files to your AliEn home directory... \
414 \n +++ NOTE: You can disable this via: plugin->SetCheckCopy(kFALSE);");
415 // Check if alien_CLOSE_SE is defined
416 TString closeSE = gSystem->Getenv("alien_CLOSE_SE");
417 if (!closeSE.IsNull()) {
418 Info("CheckFileCopy", "Your current close storage is pointing to: \
419 \n alien_CLOSE_SE = \"%s\"", closeSE.Data());
421 Warning("CheckFileCopy", "Your current close storage is empty ! Depending on your location, file copying may fail.");
423 // Check if grid directory exists.
424 if (!DirectoryExists(alienpath)) {
425 Error("CheckFileCopy", "Alien path %s does not seem to exist", alienpath);
428 TFile f("plugin_test_copy", "RECREATE");
429 // User may not have write permissions to current directory
431 Error("CheckFileCopy", "Cannot create local test file. Do you have write access to current directory: <%s> ?",
432 gSystem->WorkingDirectory());
436 if (FileExists(Form("alien://%s/%s",alienpath, f.GetName()))) gGrid->Rm(Form("alien://%s/%s",alienpath, f.GetName()));
437 if (!TFile::Cp(f.GetName(), Form("alien://%s/%s",alienpath, f.GetName()))) {
438 Error("CheckFileCopy", "Cannot copy files to Alien destination: <%s> This may be temporary, or: \
439 \n# 1. Make sure you have write permissions there. If this is the case: \
440 \n# 2. Check the storage availability at: http://alimonitor.cern.ch/stats?page=SE/table \
441 \n# Do: export alien_CLOSE_SE=\"working_disk_SE\" \
442 \n# To make this permanent put in in your .bashrc (in .alienshrc is not enough) \
443 \n# Redo token: rm /tmp/x509up_u$UID then: alien-token-init <username>", alienpath);
444 gSystem->Unlink(f.GetName());
447 gSystem->Unlink(f.GetName());
448 gGrid->Rm(Form("%s%s",alienpath,f.GetName()));
449 Info("CheckFileCopy", "### ...SUCCESS ###");
453 //______________________________________________________________________________
454 Bool_t AliAnalysisAlien::CheckInputData()
456 // Check validity of input data. If necessary, create xml files.
457 if (!fInputFiles && !fRunNumbers.Length() && !fRunRange[0]) {
458 if (!fGridDataDir.Length()) {
459 Error("CkeckInputData", "AliEn path to base data directory must be set.\n = Use: SetGridDataDir()");
462 Info("CheckInputData", "Analysis will make a single xml for base data directory %s",fGridDataDir.Data());
465 // Process declared files
466 Bool_t isCollection = kFALSE;
467 Bool_t isXml = kFALSE;
468 Bool_t useTags = kFALSE;
469 Bool_t checked = kFALSE;
472 TString workdir = gGrid->GetHomeDirectory();
473 workdir += fGridWorkingDir;
476 TIter next(fInputFiles);
477 while ((objstr=(TObjString*)next())) {
480 file += objstr->GetString();
481 // Store full lfn path
482 if (FileExists(file)) objstr->SetString(file);
484 file = objstr->GetName();
485 if (!FileExists(objstr->GetName())) {
486 Error("CheckInputData", "Data file %s not found or not in your working dir: %s",
487 objstr->GetName(), workdir.Data());
491 Bool_t iscoll, isxml, usetags;
492 CheckDataType(file, iscoll, isxml, usetags);
495 isCollection = iscoll;
498 TObject::SetBit(AliAnalysisGrid::kUseTags, useTags);
500 if ((iscoll != isCollection) || (isxml != isXml) || (usetags != useTags)) {
501 Error("CheckInputData", "Some conflict was found in the types of inputs");
507 // Process requested run numbers
508 if (!fRunNumbers.Length() && !fRunRange[0]) return kTRUE;
509 // Check validity of alien data directory
510 if (!fGridDataDir.Length()) {
511 Error("CkeckInputData", "AliEn path to base data directory must be set.\n = Use: SetGridDataDir()");
514 if (!DirectoryExists(fGridDataDir)) {
515 Error("CheckInputData", "Data directory %s not existing.", fGridDataDir.Data());
519 Error("CheckInputData", "You are using raw AliEn collections as input. Cannot process run numbers.");
523 if (checked && !isXml) {
524 Error("CheckInputData", "Cannot mix processing of full runs with non-xml files");
527 // Check validity of run number(s)
531 TString schunk, schunk2;
535 useTags = fDataPattern.Contains("tag");
536 TObject::SetBit(AliAnalysisGrid::kUseTags, useTags);
538 if (useTags != fDataPattern.Contains("tag")) {
539 Error("CheckInputData", "Cannot mix input files using/not using tags");
542 if (fRunNumbers.Length()) {
543 Info("CheckDataType", "Using supplied run numbers (run ranges are ignored)");
544 arr = fRunNumbers.Tokenize(" ");
546 while ((os=(TObjString*)next())) {
547 path = Form("%s/%s ", fGridDataDir.Data(), os->GetString().Data());
548 if (!DirectoryExists(path)) {
549 Warning("CheckInputData", "Run number %s not found in path: <%s>", os->GetString().Data(), path.Data());
552 path = Form("%s/%s.xml", workdir.Data(),os->GetString().Data());
553 TString msg = "\n##### file: ";
555 msg += " type: xml_collection;";
556 if (useTags) msg += " using_tags: Yes";
557 else msg += " using_tags: No";
558 Info("CheckDataType", msg.Data());
559 if (fNrunsPerMaster<2) {
560 AddDataFile(Form("%s.xml", os->GetString().Data()));
563 if (((nruns-1)%fNrunsPerMaster) == 0) {
564 schunk = os->GetString();
566 if ((nruns%fNrunsPerMaster)!=0 && os!=arr->Last()) continue;
567 schunk += Form("_%s.xml", os->GetString().Data());
573 Info("CheckDataType", "Using run range [%d, %d]", fRunRange[0], fRunRange[1]);
574 for (Int_t irun=fRunRange[0]; irun<=fRunRange[1]; irun++) {
575 path = Form("%s/%s%d ", fGridDataDir.Data(), fRunPrefix.Data(), irun);
576 if (!DirectoryExists(path)) {
577 // Warning("CheckInputData", "Run number %d not found in path: <%s>", irun, path.Data());
580 path = Form("%s/%s%d.xml", workdir.Data(),fRunPrefix.Data(),irun);
581 TString msg = "\n##### file: ";
583 msg += " type: xml_collection;";
584 if (useTags) msg += " using_tags: Yes";
585 else msg += " using_tags: No";
586 Info("CheckDataType", msg.Data());
587 if (fNrunsPerMaster<2) {
588 AddDataFile(Form("%s%d.xml",fRunPrefix.Data(),irun));
591 if (((nruns-1)%fNrunsPerMaster) == 0) {
592 schunk = Form("%s%d", fRunPrefix.Data(),irun);
594 schunk2 = Form("_%s%d.xml", fRunPrefix.Data(), irun);
595 if ((nruns%fNrunsPerMaster)!=0 && irun != fRunRange[1]) continue;
608 //______________________________________________________________________________
609 Bool_t AliAnalysisAlien::CreateDataset(const char *pattern)
611 // Create dataset for the grid data directory + run number.
612 if (TestBit(AliAnalysisGrid::kOffline)) return kTRUE;
614 Error("CreateDataset", "Cannot create dataset with no grid connection");
620 TString workdir = gGrid->GetHomeDirectory();
621 workdir += fGridWorkingDir;
623 // Compose the 'find' command arguments
625 TString options = "-x collection ";
626 if (TestBit(AliAnalysisGrid::kTest)) options += Form("-l %d ", fNtestFiles);
627 TString conditions = "";
632 TString schunk, schunk2;
633 TGridCollection *cbase=0, *cadd=0;
634 if (!fRunNumbers.Length() && !fRunRange[0]) {
635 if (fInputFiles && fInputFiles->GetEntries()) return kTRUE;
636 // Make a single data collection from data directory.
638 if (!DirectoryExists(path)) {
639 Error("CreateDataset", "Path to data directory %s not valid",fGridDataDir.Data());
643 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
644 else file = Form("%s.xml", gSystem->BaseName(path));
645 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
651 command += conditions;
652 printf("command: %s\n", command.Data());
653 TGridResult *res = gGrid->Command(command);
655 // Write standard output to file
656 gROOT->ProcessLine(Form("gGrid->Stdout(); > %s", file.Data()));
657 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
658 Bool_t nullFile = kFALSE;
660 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
662 nullFile = (gSystem->Exec(Form("grep /event %s 2>/dev/null > /dev/null",file.Data()))==0)?kFALSE:kTRUE;
664 Error("CreateDataset","Dataset %s produced by the previous find command is empty !", file.Data());
669 Bool_t fileExists = FileExists(file);
670 if (!TestBit(AliAnalysisGrid::kTest) && (!fileExists || fOverwriteMode)) {
671 // Copy xml file to alien space
672 if (fileExists) gGrid->Rm(file);
673 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
674 if (!FileExists(file)) {
675 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
678 // Update list of files to be processed.
680 AddDataFile(Form("%s/%s", workdir.Data(), file.Data()));
684 Bool_t nullResult = kTRUE;
685 if (fRunNumbers.Length()) {
686 TObjArray *arr = fRunNumbers.Tokenize(" ");
689 while ((os=(TObjString*)next())) {
690 path = Form("%s/%s ", fGridDataDir.Data(), os->GetString().Data());
691 if (!DirectoryExists(path)) continue;
693 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
694 else file = Form("%s.xml", os->GetString().Data());
695 // If local collection file does not exist, create it via 'find' command.
696 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
701 command += conditions;
702 TGridResult *res = gGrid->Command(command);
704 // Write standard output to file
705 gROOT->ProcessLine(Form("gGrid->Stdout(); > %s", file.Data()));
706 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
707 Bool_t nullFile = kFALSE;
709 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
711 nullFile = (gSystem->Exec(Form("grep /event %s 2>/dev/null > /dev/null",file.Data()))==0)?kFALSE:kTRUE;
713 Warning("CreateDataset","Dataset %s produced by: <%s> is empty !", file.Data(), command.Data());
714 fRunNumbers.ReplaceAll(os->GetString().Data(), "");
720 if (TestBit(AliAnalysisGrid::kTest)) break;
721 // Check if there is one run per master job.
722 if (fNrunsPerMaster<2) {
723 if (FileExists(file)) {
724 if (fOverwriteMode) gGrid->Rm(file);
726 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
730 // Copy xml file to alien space
731 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
732 if (!FileExists(file)) {
733 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
739 if (((nruns-1)%fNrunsPerMaster) == 0) {
740 schunk = os->GetString();
741 cbase = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
743 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
744 printf(" Merging collection <%s> into masterjob input...\n", file.Data());
748 if ((nruns%fNrunsPerMaster)!=0 && os!=arr->Last()) {
751 schunk += Form("_%s.xml", os->GetString().Data());
752 if (FileExists(schunk)) {
753 if (fOverwriteMode) gGrid->Rm(file);
755 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", schunk.Data());
759 printf("Exporting merged collection <%s> and copying to AliEn\n", schunk.Data());
760 cbase->ExportXML(Form("file://%s", schunk.Data()),kFALSE,kFALSE, schunk, "Merged runs");
761 TFile::Cp(Form("file:%s",schunk.Data()), Form("alien://%s/%s",workdir.Data(), schunk.Data()));
762 if (!FileExists(schunk)) {
763 Error("CreateDataset", "Copy command did NOT succeed for %s", schunk.Data());
771 Error("CreateDataset", "No valid dataset corresponding to the query!");
775 // Process a full run range.
776 for (Int_t irun=fRunRange[0]; irun<=fRunRange[1]; irun++) {
777 path = Form("%s/%s%d ", fGridDataDir.Data(), fRunPrefix.Data(), irun);
778 if (!DirectoryExists(path)) continue;
780 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
781 else file = Form("%s%d.xml", fRunPrefix.Data(), irun);
782 if (FileExists(file) && fNrunsPerMaster<2 && !TestBit(AliAnalysisGrid::kTest)) {
783 if (fOverwriteMode) gGrid->Rm(file);
785 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
789 // If local collection file does not exist, create it via 'find' command.
790 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
795 command += conditions;
796 TGridResult *res = gGrid->Command(command);
798 // Write standard output to file
799 gROOT->ProcessLine(Form("gGrid->Stdout(); > %s", file.Data()));
800 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
801 Bool_t nullFile = kFALSE;
803 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
805 nullFile = (gSystem->Exec(Form("grep /event %s 2>/dev/null > /dev/null",file.Data()))==0)?kFALSE:kTRUE;
807 Warning("CreateDataset","Dataset %s produced by: <%s> is empty !", file.Data(), command.Data());
813 if (TestBit(AliAnalysisGrid::kTest)) break;
814 // Check if there is one run per master job.
815 if (fNrunsPerMaster<2) {
816 if (FileExists(file)) {
817 if (fOverwriteMode) gGrid->Rm(file);
819 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
823 // Copy xml file to alien space
824 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
825 if (!FileExists(file)) {
826 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
831 // Check if the collection for the chunk exist locally.
832 Int_t nchunk = (nruns-1)/fNrunsPerMaster;
833 if (FileExists(fInputFiles->At(nchunk)->GetName())) {
834 if (fOverwriteMode) gGrid->Rm(fInputFiles->At(nchunk)->GetName());
837 printf(" Merging collection <%s> into %d runs chunk...\n",file.Data(),fNrunsPerMaster);
838 if (((nruns-1)%fNrunsPerMaster) == 0) {
839 schunk = Form("%s%d", fRunPrefix.Data(), irun);
840 cbase = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
842 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
846 schunk2 = Form("%s_%s%d.xml", schunk.Data(), fRunPrefix.Data(), irun);
847 if ((nruns%fNrunsPerMaster)!=0 && irun!=fRunRange[1] && schunk2 != fInputFiles->Last()->GetName()) {
851 if (FileExists(schunk)) {
852 if (fOverwriteMode) gGrid->Rm(schunk);
854 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", schunk.Data());
858 printf("Exporting merged collection <%s> and copying to AliEn.\n", schunk.Data());
859 cbase->ExportXML(Form("file://%s", schunk.Data()),kFALSE,kFALSE, schunk, "Merged runs");
860 if (FileExists(schunk)) {
861 if (fOverwriteMode) gGrid->Rm(schunk);
863 Info("CreateDataset", "\n##### Dataset %s exist. Skipping copy...", schunk.Data());
867 TFile::Cp(Form("file:%s",schunk.Data()), Form("alien://%s/%s",workdir.Data(), schunk.Data()));
868 if (!FileExists(schunk)) {
869 Error("CreateDataset", "Copy command did NOT succeed for %s", schunk.Data());
875 Error("CreateDataset", "No valid dataset corresponding to the query!");
882 //______________________________________________________________________________
883 Bool_t AliAnalysisAlien::CreateJDL()
885 // Generate a JDL file according to current settings. The name of the file is
886 // specified by fJDLName.
887 Bool_t error = kFALSE;
890 if (TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
891 Bool_t generate = kTRUE;
892 if (TestBit(AliAnalysisGrid::kTest) || TestBit(AliAnalysisGrid::kSubmit)) generate = kFALSE;
894 Error("CreateJDL", "Alien connection required");
897 // Check validity of alien workspace
899 TString workdir = gGrid->GetHomeDirectory();
900 workdir += fGridWorkingDir;
904 Error("CreateJDL()", "Define some input files for your analysis.");
907 // Compose list of input files
908 // Check if output files were defined
909 if (!fOutputFiles.Length()) {
910 Error("CreateJDL", "You must define at least one output file");
913 // Check if an output directory was defined and valid
914 if (!fGridOutputDir.Length()) {
915 Error("CreateJDL", "You must define AliEn output directory");
918 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s", workdir.Data(), fGridOutputDir.Data());
919 if (!DirectoryExists(fGridOutputDir)) {
920 if (gGrid->Mkdir(fGridOutputDir,"-p")) {
921 Info("CreateJDL", "\n##### Created alien output directory %s", fGridOutputDir.Data());
923 Error("CreateJDL", "Could not create alien output directory %s", fGridOutputDir.Data());
929 // Exit if any error up to now
930 if (error) return kFALSE;
932 if (!fUser.IsNull()) {
933 fGridJDL->SetValue("User", Form("\"%s\"", fUser.Data()));
934 fMergingJDL->SetValue("User", Form("\"%s\"", fUser.Data()));
936 fGridJDL->SetExecutable(fExecutable, "This is the startup script");
937 TString mergeExec = fExecutable;
938 mergeExec.ReplaceAll(".sh", "_merge.sh");
939 fMergingJDL->SetExecutable(mergeExec, "This is the startup script");
940 mergeExec.ReplaceAll(".sh", ".C");
941 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),mergeExec.Data()), "List of input files to be uploaded to workers");
942 if (!fArguments.IsNull())
943 fGridJDL->SetArguments(fArguments, "Arguments for the executable command");
944 fMergingJDL->SetArguments("$1 $2 $3");
945 fGridJDL->SetValue("TTL", Form("\"%d\"",fTTL));
946 fGridJDL->SetDescription("TTL", Form("Time after which the job is killed (%d min.)", fTTL/60));
947 fMergingJDL->SetValue("TTL", Form("\"%d\"",fTTL));
948 fMergingJDL->SetDescription("TTL", Form("Time after which the job is killed (%d min.)", fTTL/60));
950 if (fMaxInitFailed > 0) {
951 fGridJDL->SetValue("MaxInitFailed", Form("\"%d\"",fMaxInitFailed));
952 fGridJDL->SetDescription("MaxInitFailed", "Maximum number of first failing jobs to abort the master job");
954 if (fSplitMaxInputFileNumber > 0) {
955 fGridJDL->SetValue("SplitMaxInputFileNumber", Form("\"%d\"", fSplitMaxInputFileNumber));
956 fGridJDL->SetDescription("SplitMaxInputFileNumber", "Maximum number of input files to be processed per subjob");
958 if (fSplitMode.Length()) {
959 fGridJDL->SetValue("Split", Form("\"%s\"", fSplitMode.Data()));
960 fGridJDL->SetDescription("Split", "We split per SE or file");
962 if (!fAliROOTVersion.IsNull()) {
963 fGridJDL->AddToPackages("AliRoot", fAliROOTVersion,"VO_ALICE", "List of requested packages");
964 fMergingJDL->AddToPackages("AliRoot", fAliROOTVersion, "VO_ALICE", "List of requested packages");
966 if (!fROOTVersion.IsNull()) {
967 fGridJDL->AddToPackages("ROOT", fROOTVersion);
968 fMergingJDL->AddToPackages("ROOT", fROOTVersion);
970 if (!fAPIVersion.IsNull()) {
971 fGridJDL->AddToPackages("APISCONFIG", fAPIVersion);
972 fMergingJDL->AddToPackages("APISCONFIG", fAPIVersion);
974 if (!fExternalPackages.IsNull()) {
975 arr = fExternalPackages.Tokenize(" ");
977 while ((os=(TObjString*)next())) {
978 TString pkgname = os->GetString();
979 Int_t index = pkgname.Index("::");
980 TString pkgversion = pkgname(index+2, pkgname.Length());
981 pkgname.Remove(index);
982 fGridJDL->AddToPackages(pkgname, pkgversion);
983 fMergingJDL->AddToPackages(pkgname, pkgversion);
987 fGridJDL->SetInputDataListFormat(fInputFormat, "Format of input data");
988 fGridJDL->SetInputDataList("wn.xml", "Collection name to be processed on each worker node");
989 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), fAnalysisMacro.Data()), "List of input files to be uploaded to workers");
990 TString analysisFile = fExecutable;
991 analysisFile.ReplaceAll(".sh", ".root");
992 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),analysisFile.Data()));
993 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),analysisFile.Data()));
994 if (IsUsingTags() && !gSystem->AccessPathName("ConfigureCuts.C"))
995 fGridJDL->AddToInputSandbox(Form("LF:%s/ConfigureCuts.C", workdir.Data()));
996 if (fAdditionalLibs.Length()) {
997 arr = fAdditionalLibs.Tokenize(" ");
999 while ((os=(TObjString*)next())) {
1000 if (os->GetString().Contains(".so")) continue;
1001 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), os->GetString().Data()));
1002 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), os->GetString().Data()));
1007 TIter next(fPackages);
1009 while ((obj=next())) {
1010 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), obj->GetName()));
1011 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), obj->GetName()));
1014 if (fOutputArchive.Length()) {
1015 arr = fOutputArchive.Tokenize(" ");
1017 Bool_t first = kTRUE;
1018 const char *comment = "Files to be archived";
1019 const char *comment1 = comment;
1020 while ((os=(TObjString*)next())) {
1021 if (!first) comment = NULL;
1022 if (!os->GetString().Contains("@") && fCloseSE.Length())
1023 fGridJDL->AddToOutputArchive(Form("%s@%s",os->GetString().Data(), fCloseSE.Data()), comment);
1025 fGridJDL->AddToOutputArchive(os->GetString(), comment);
1029 TString outputArchive = fOutputArchive;
1030 if (!fMergeExcludes.IsNull()) {
1031 arr = fMergeExcludes.Tokenize(" ");
1033 while ((os=(TObjString*)next1())) {
1034 outputArchive.ReplaceAll(Form("%s,",os->GetString().Data()),"");
1035 outputArchive.ReplaceAll(os->GetString(),"");
1039 arr = outputArchive.Tokenize(" ");
1043 while ((os=(TObjString*)next2())) {
1044 if (!first) comment = NULL;
1045 TString currentfile = os->GetString();
1046 currentfile.ReplaceAll(".root", "*.root");
1047 currentfile.ReplaceAll(".zip", "-Stage$2_$3.zip");
1048 if (!currentfile.Contains("@") && fCloseSE.Length())
1049 fMergingJDL->AddToOutputArchive(Form("%s@%s",currentfile.Data(), fCloseSE.Data()), comment);
1051 fMergingJDL->AddToOutputArchive(currentfile, comment);
1056 arr = fOutputFiles.Tokenize(",");
1058 Bool_t first = kTRUE;
1059 const char *comment = "Files to be archived";
1060 const char *comment1 = comment;
1061 while ((os=(TObjString*)next())) {
1062 // Ignore ouputs in jdl that are also in outputarchive
1063 TString sout = os->GetString();
1064 if (sout.Index("@")>0) sout.Remove(sout.Index("@"));
1065 if (fOutputArchive.Contains(sout)) continue;
1066 if (!first) comment = NULL;
1067 if (!os->GetString().Contains("@") && fCloseSE.Length())
1068 fGridJDL->AddToOutputSandbox(Form("%s@%s",os->GetString().Data(), fCloseSE.Data()), comment);
1070 fGridJDL->AddToOutputSandbox(os->GetString(), comment);
1074 if (fOutputFiles.Length()) {
1075 TString outputFiles = fOutputFiles;
1076 if (!fMergeExcludes.IsNull()) {
1077 arr = fMergeExcludes.Tokenize(" ");
1079 while ((os=(TObjString*)next1())) {
1080 outputFiles.ReplaceAll(Form("%s,",os->GetString().Data()),"");
1081 outputFiles.ReplaceAll(os->GetString(),"");
1085 arr = outputFiles.Tokenize(" ");
1089 while ((os=(TObjString*)next2())) {
1090 // Ignore ouputs in jdl that are also in outputarchive
1091 TString sout = os->GetString();
1092 if (sout.Index("@")>0) sout.Remove(sout.Index("@"));
1093 if (fOutputArchive.Contains(sout)) continue;
1094 if (!first) comment = NULL;
1095 if (!os->GetString().Contains("@") && fCloseSE.Length())
1096 fMergingJDL->AddToOutputSandbox(Form("%s@%s",os->GetString().Data(), fCloseSE.Data()), comment);
1098 fMergingJDL->AddToOutputSandbox(os->GetString(), comment);
1102 fGridJDL->SetPrice((UInt_t)fPrice, "AliEn price for this job");
1103 fMergingJDL->SetPrice((UInt_t)fPrice, "AliEn price for this job");
1104 TString validationScript = fExecutable;
1105 validationScript.ReplaceAll(".sh", "_validation.sh");
1106 fGridJDL->SetValidationCommand(Form("%s/%s", workdir.Data(),validationScript.Data()), "Validation script to be run for each subjob");
1107 validationScript = fExecutable;
1108 validationScript.ReplaceAll(".sh", "_mergevalidation.sh");
1109 fMergingJDL->SetValidationCommand(Form("%s/%s", workdir.Data(),validationScript.Data()), "Validation script to be run for each subjob");
1110 if (fMasterResubmitThreshold) {
1111 fGridJDL->SetValue("MasterResubmitThreshold", Form("\"%d%%\"", fMasterResubmitThreshold));
1112 fGridJDL->SetDescription("MasterResubmitThreshold", "Resubmit failed jobs until DONE rate reaches this percentage");
1114 // Write a jdl with 2 input parameters: collection name and output dir name.
1117 // Copy jdl to grid workspace
1119 // Check if an output directory was defined and valid
1120 if (!fGridOutputDir.Length()) {
1121 Error("CreateJDL", "You must define AliEn output directory");
1124 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s", workdir.Data(), fGridOutputDir.Data());
1125 if (!fProductionMode && !DirectoryExists(fGridOutputDir)) {
1126 if (gGrid->Mkdir(fGridOutputDir,"-p")) {
1127 Info("CreateJDL", "\n##### Created alien output directory %s", fGridOutputDir.Data());
1129 Error("CreateJDL", "Could not create alien output directory %s", fGridOutputDir.Data());
1135 if (TestBit(AliAnalysisGrid::kSubmit)) {
1136 TString mergeJDLName = fExecutable;
1137 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
1138 TString locjdl = Form("%s/%s", fGridOutputDir.Data(),fJDLName.Data());
1139 TString locjdl1 = Form("%s/%s", fGridOutputDir.Data(),mergeJDLName.Data());
1140 if (fProductionMode) {
1141 locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
1142 locjdl1 = Form("%s/%s", workdir.Data(),mergeJDLName.Data());
1144 if (FileExists(locjdl)) gGrid->Rm(locjdl);
1145 if (FileExists(locjdl1)) gGrid->Rm(locjdl1);
1146 Info("CreateJDL", "\n##### Copying JDL file <%s> to your AliEn output directory", fJDLName.Data());
1147 TFile::Cp(Form("file:%s",fJDLName.Data()), Form("alien://%s", locjdl.Data()));
1149 Info("CreateJDL", "\n##### Copying merging JDL file <%s> to your AliEn output directory", mergeJDLName.Data());
1150 TFile::Cp(Form("file:%s",mergeJDLName.Data()), Form("alien://%s", locjdl1.Data()));
1153 if (fAdditionalLibs.Length()) {
1154 arr = fAdditionalLibs.Tokenize(" ");
1157 while ((os=(TObjString*)next())) {
1158 if (os->GetString().Contains(".so")) continue;
1159 Info("CreateJDL", "\n##### Copying dependency: <%s> to your alien workspace", os->GetString().Data());
1160 if (FileExists(os->GetString())) gGrid->Rm(os->GetString());
1161 TFile::Cp(Form("file:%s",os->GetString().Data()), Form("alien://%s/%s", workdir.Data(), os->GetString().Data()));
1166 TIter next(fPackages);
1168 while ((obj=next())) {
1169 if (FileExists(obj->GetName())) gGrid->Rm(obj->GetName());
1170 Info("CreateJDL", "\n##### Copying dependency: <%s> to your alien workspace", obj->GetName());
1171 TFile::Cp(Form("file:%s",obj->GetName()), Form("alien://%s/%s", workdir.Data(), obj->GetName()));
1178 //______________________________________________________________________________
1179 Bool_t AliAnalysisAlien::WriteJDL(Bool_t copy)
1181 // Writes one or more JDL's corresponding to findex. If findex is negative,
1182 // all run numbers are considered in one go (jdl). For non-negative indices
1183 // they correspond to the indices in the array fInputFiles.
1184 if (!fInputFiles) return kFALSE;
1186 TString workdir = gGrid->GetHomeDirectory();
1187 workdir += fGridWorkingDir;
1189 if (!fRunNumbers.Length() && !fRunRange[0]) {
1190 // One jdl with no parameters in case input data is specified by name.
1191 TIter next(fInputFiles);
1192 while ((os=(TObjString*)next()))
1193 fGridJDL->AddToInputDataCollection(Form("LF:%s,nodownload", os->GetString().Data()), "Input xml collections");
1194 if (!fOutputSingle.IsNull())
1195 fGridJDL->SetOutputDirectory(Form("#alienfulldir#/../%s",fOutputSingle.Data()), "Output directory");
1197 fGridJDL->SetOutputDirectory(Form("%s/#alien_counter_03i#", fGridOutputDir.Data()), "Output directory");
1198 fMergingJDL->SetOutputDirectory(fGridOutputDir);
1201 // One jdl to be submitted with 2 input parameters: data collection name and output dir prefix
1202 fGridJDL->AddToInputDataCollection(Form("LF:%s/$1,nodownload", workdir.Data()), "Input xml collections");
1203 if (!fOutputSingle.IsNull()) {
1204 if (!fOutputToRunNo) fGridJDL->SetOutputDirectory(Form("#alienfulldir#/%s",fOutputSingle.Data()), "Output directory");
1205 else fGridJDL->SetOutputDirectory(Form("%s/$2",fGridOutputDir.Data()), "Output directory");
1207 fGridJDL->SetOutputDirectory(Form("%s/$2/#alien_counter_03i#", fGridOutputDir.Data()), "Output directory");
1208 fMergingJDL->SetOutputDirectory(Form("$1", fGridOutputDir.Data()), "Output directory");
1213 // Generate the JDL as a string
1214 TString sjdl = fGridJDL->Generate();
1215 TString sjdl1 = fMergingJDL->Generate();
1217 sjdl.ReplaceAll("\"LF:", "\n \"LF:");
1218 sjdl.ReplaceAll("(member", "\n (member");
1219 sjdl.ReplaceAll("\",\"VO_", "\",\n \"VO_");
1220 sjdl.ReplaceAll("{", "{\n ");
1221 sjdl.ReplaceAll("};", "\n};");
1222 sjdl.ReplaceAll("{\n \n", "{\n");
1223 sjdl.ReplaceAll("\n\n", "\n");
1224 sjdl.ReplaceAll("OutputDirectory", "OutputDir");
1225 sjdl1.ReplaceAll("\"LF:", "\n \"LF:");
1226 sjdl1.ReplaceAll("(member", "\n (member");
1227 sjdl1.ReplaceAll("\",\"VO_", "\",\n \"VO_");
1228 sjdl1.ReplaceAll("{", "{\n ");
1229 sjdl1.ReplaceAll("};", "\n};");
1230 sjdl1.ReplaceAll("{\n \n", "{\n");
1231 sjdl1.ReplaceAll("\n\n", "\n");
1232 sjdl1.ReplaceAll("OutputDirectory", "OutputDir");
1233 sjdl += "JDLVariables = \n{\n \"Packages\",\n \"OutputDir\"\n};\n";
1234 sjdl.Prepend(Form("Jobtag = {\n \"comment:%s\"\n};\n", fJobTag.Data()));
1235 index = sjdl.Index("JDLVariables");
1236 if (index >= 0) sjdl.Insert(index, "\n# JDL variables\n");
1237 sjdl += "Workdirectorysize = {\"5000MB\"};";
1238 sjdl1 += "JDLVariables = \n{\n \"Packages\",\n \"OutputDir\"\n};\n";
1239 sjdl1.Prepend(Form("Jobtag = {\n \"comment:%s_Merging\"\n};\n", fJobTag.Data()));
1240 sjdl1.Prepend("# Generated merging jdl\n# $1 = full alien path to output directory to be merged\n# $2 = merging stage\n# $3 = merged chunk\n");
1241 index = sjdl1.Index("JDLVariables");
1242 if (index >= 0) sjdl1.Insert(index, "\n# JDL variables\n");
1243 sjdl1 += "Workdirectorysize = {\"5000MB\"};";
1244 // Write jdl to file
1246 out.open(fJDLName.Data(), ios::out);
1248 Error("CreateJDL", "Bad file name: %s", fJDLName.Data());
1251 out << sjdl << endl;
1252 TString mergeJDLName = fExecutable;
1253 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
1256 out1.open(mergeJDLName.Data(), ios::out);
1258 Error("CreateJDL", "Bad file name: %s", mergeJDLName.Data());
1261 out1 << sjdl1 << endl;
1264 // Copy jdl to grid workspace
1266 Info("CreateJDL", "\n##### You may want to review jdl:%s and analysis macro:%s before running in <submit> mode", fJDLName.Data(), fAnalysisMacro.Data());
1268 TString locjdl = Form("%s/%s", fGridOutputDir.Data(),fJDLName.Data());
1269 TString locjdl1 = Form("%s/%s", fGridOutputDir.Data(),mergeJDLName.Data());
1270 if (fProductionMode) {
1271 locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
1272 locjdl1 = Form("%s/%s", workdir.Data(),mergeJDLName.Data());
1274 if (FileExists(locjdl)) gGrid->Rm(locjdl);
1275 if (FileExists(locjdl1)) gGrid->Rm(locjdl1);
1276 Info("CreateJDL", "\n##### Copying JDL file <%s> to your AliEn output directory", fJDLName.Data());
1277 TFile::Cp(Form("file:%s",fJDLName.Data()), Form("alien://%s", locjdl.Data()));
1279 Info("CreateJDL", "\n##### Copying merging JDL file <%s> to your AliEn output directory", mergeJDLName.Data());
1280 TFile::Cp(Form("file:%s",mergeJDLName.Data()), Form("alien://%s", locjdl1.Data()));
1286 //______________________________________________________________________________
1287 Bool_t AliAnalysisAlien::FileExists(const char *lfn)
1289 // Returns true if file exists.
1290 if (!gGrid) return kFALSE;
1291 TGridResult *res = gGrid->Ls(lfn);
1292 if (!res) return kFALSE;
1293 TMap *map = dynamic_cast<TMap*>(res->At(0));
1298 TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("name"));
1299 if (!objs || !objs->GetString().Length()) {
1307 //______________________________________________________________________________
1308 Bool_t AliAnalysisAlien::DirectoryExists(const char *dirname)
1310 // Returns true if directory exists. Can be also a path.
1311 if (!gGrid) return kFALSE;
1312 // Check if dirname is a path
1313 TString dirstripped = dirname;
1314 dirstripped = dirstripped.Strip();
1315 dirstripped = dirstripped.Strip(TString::kTrailing, '/');
1316 TString dir = gSystem->BaseName(dirstripped);
1318 TString path = gSystem->DirName(dirstripped);
1319 TGridResult *res = gGrid->Ls(path, "-F");
1320 if (!res) return kFALSE;
1324 while ((map=dynamic_cast<TMap*>(next()))) {
1325 obj = map->GetValue("name");
1327 if (dir == obj->GetName()) {
1336 //______________________________________________________________________________
1337 void AliAnalysisAlien::CheckDataType(const char *lfn, Bool_t &isCollection, Bool_t &isXml, Bool_t &useTags)
1339 // Check input data type.
1340 isCollection = kFALSE;
1344 Error("CheckDataType", "No connection to grid");
1347 isCollection = IsCollection(lfn);
1348 TString msg = "\n##### file: ";
1351 msg += " type: raw_collection;";
1352 // special treatment for collections
1354 // check for tag files in the collection
1355 TGridResult *res = gGrid->Command(Form("listFilesFromCollection -z -v %s",lfn), kFALSE);
1357 msg += " using_tags: No (unknown)";
1358 Info("CheckDataType", msg.Data());
1361 const char* typeStr = res->GetKey(0, "origLFN");
1362 if (!typeStr || !strlen(typeStr)) {
1363 msg += " using_tags: No (unknown)";
1364 Info("CheckDataType", msg.Data());
1367 TString file = typeStr;
1368 useTags = file.Contains(".tag");
1369 if (useTags) msg += " using_tags: Yes";
1370 else msg += " using_tags: No";
1371 Info("CheckDataType", msg.Data());
1376 isXml = slfn.Contains(".xml");
1378 // Open xml collection and check if there are tag files inside
1379 msg += " type: xml_collection;";
1380 TGridCollection *coll = (TGridCollection*)gROOT->ProcessLine(Form("TAlienCollection::Open(\"alien://%s\",1);",lfn));
1382 msg += " using_tags: No (unknown)";
1383 Info("CheckDataType", msg.Data());
1386 TMap *map = coll->Next();
1388 msg += " using_tags: No (unknown)";
1389 Info("CheckDataType", msg.Data());
1392 map = (TMap*)map->GetValue("");
1394 if (map && map->GetValue("name")) file = map->GetValue("name")->GetName();
1395 useTags = file.Contains(".tag");
1397 if (useTags) msg += " using_tags: Yes";
1398 else msg += " using_tags: No";
1399 Info("CheckDataType", msg.Data());
1402 useTags = slfn.Contains(".tag");
1403 if (slfn.Contains(".root")) msg += " type: root file;";
1404 else msg += " type: unknown file;";
1405 if (useTags) msg += " using_tags: Yes";
1406 else msg += " using_tags: No";
1407 Info("CheckDataType", msg.Data());
1410 //______________________________________________________________________________
1411 void AliAnalysisAlien::EnablePackage(const char *package)
1413 // Enables a par file supposed to exist in the current directory.
1414 TString pkg(package);
1415 pkg.ReplaceAll(".par", "");
1417 if (gSystem->AccessPathName(pkg)) {
1418 Fatal("EnablePackage", "Package %s not found", pkg.Data());
1421 if (!TObject::TestBit(AliAnalysisGrid::kUsePars))
1422 Info("EnablePackage", "AliEn plugin will use .par packages");
1423 TObject::SetBit(AliAnalysisGrid::kUsePars, kTRUE);
1425 fPackages = new TObjArray();
1426 fPackages->SetOwner();
1428 fPackages->Add(new TObjString(pkg));
1431 //______________________________________________________________________________
1432 const char *AliAnalysisAlien::GetJobStatus(Int_t jobidstart, Int_t lastid, Int_t &nrunning, Int_t &nwaiting, Int_t &nerror, Int_t &ndone)
1434 // Get job status for all jobs with jobid>jobidstart.
1435 static char mstatus[20];
1441 TGridJobStatusList *list = gGrid->Ps("");
1442 if (!list) return mstatus;
1443 Int_t nentries = list->GetSize();
1444 TGridJobStatus *status;
1446 for (Int_t ijob=0; ijob<nentries; ijob++) {
1447 status = (TGridJobStatus *)list->At(ijob);
1448 pid = gROOT->ProcessLine(Form("atoi(((TAlienJobStatus*)0x%lx)->GetKey(\"queueId\"));", (ULong_t)status));
1449 if (pid<jobidstart) continue;
1450 if (pid == lastid) {
1451 gROOT->ProcessLine(Form("sprintf((char*)0x%lx,((TAlienJobStatus*)0x%lx)->GetKey(\"status\"));",(ULong_t)mstatus, (ULong_t)status));
1453 switch (status->GetStatus()) {
1454 case TGridJobStatus::kWAITING:
1456 case TGridJobStatus::kRUNNING:
1458 case TGridJobStatus::kABORTED:
1459 case TGridJobStatus::kFAIL:
1460 case TGridJobStatus::kUNKNOWN:
1462 case TGridJobStatus::kDONE:
1471 //______________________________________________________________________________
1472 Bool_t AliAnalysisAlien::IsCollection(const char *lfn) const
1474 // Returns true if file is a collection. Functionality duplicated from
1475 // TAlien::Type() because we don't want to directly depend on TAlien.
1477 Error("IsCollection", "No connection to grid");
1480 TGridResult *res = gGrid->Command(Form("type -z %s",lfn),kFALSE);
1481 if (!res) return kFALSE;
1482 const char* typeStr = res->GetKey(0, "type");
1483 if (!typeStr || !strlen(typeStr)) return kFALSE;
1484 if (!strcmp(typeStr, "collection")) return kTRUE;
1489 //______________________________________________________________________________
1490 Bool_t AliAnalysisAlien::IsSingleOutput() const
1492 // Check if single-ouput option is on.
1493 return (!fOutputSingle.IsNull());
1496 //______________________________________________________________________________
1497 void AliAnalysisAlien::Print(Option_t *) const
1499 // Print current plugin settings.
1500 printf("### AliEn analysis plugin current settings ###\n");
1501 printf("= OverwriteMode:________________________________ %d\n", fOverwriteMode);
1502 if (fOverwriteMode) {
1503 printf("***** NOTE: Overwrite mode will overwrite the input generated datasets and partial results from previous analysis. \
1504 \n***** To disable, use: plugin->SetOverwriteMode(kFALSE);\n");
1506 printf("= Copy files to grid: __________________________ %s\n", (IsUseCopy())?"YES":"NO");
1507 printf("= Check if files can be copied to grid: ________ %s\n", (IsCheckCopy())?"YES":"NO");
1508 printf("= Production mode:______________________________ %d\n", fProductionMode);
1509 printf("= Version of API requested: ____________________ %s\n", fAPIVersion.Data());
1510 printf("= Version of ROOT requested: ___________________ %s\n", fROOTVersion.Data());
1511 printf("= Version of AliRoot requested: ________________ %s\n", fAliROOTVersion.Data());
1513 printf("= User running the plugin: _____________________ %s\n", fUser.Data());
1514 printf("= Grid workdir relative to user $HOME: _________ %s\n", fGridWorkingDir.Data());
1515 printf("= Grid output directory relative to workdir: ___ %s\n", fGridOutputDir.Data());
1516 printf("= Data base directory path requested: __________ %s\n", fGridDataDir.Data());
1517 printf("= Data search pattern: _________________________ %s\n", fDataPattern.Data());
1518 printf("= Input data format: ___________________________ %s\n", fInputFormat.Data());
1519 if (fRunNumbers.Length())
1520 printf("= Run numbers to be processed: _________________ %s\n", fRunNumbers.Data());
1522 printf("= Run range to be processed: ___________________ %s%d-%s%d\n", fRunPrefix.Data(), fRunRange[0], fRunPrefix.Data(), fRunRange[1]);
1523 if (!fRunRange[0] && !fRunNumbers.Length()) {
1524 TIter next(fInputFiles);
1527 while ((obj=next())) list += obj->GetName();
1528 printf("= Input files to be processed: _________________ %s\n", list.Data());
1530 if (TestBit(AliAnalysisGrid::kTest))
1531 printf("= Number of input files used in test mode: _____ %d\n", fNtestFiles);
1532 printf("= List of output files to be registered: _______ %s\n", fOutputFiles.Data());
1533 printf("= List of outputs going to be archived: ________ %s\n", fOutputArchive.Data());
1534 printf("= List of outputs that should not be merged: ___ %s\n", fMergeExcludes.Data());
1535 printf("=====================================================================\n");
1536 printf("= Job price: ___________________________________ %d\n", fPrice);
1537 printf("= Time to live (TTL): __________________________ %d\n", fTTL);
1538 printf("= Max files per subjob: ________________________ %d\n", fSplitMaxInputFileNumber);
1539 if (fMaxInitFailed>0)
1540 printf("= Max number of subjob fails to kill: __________ %d\n", fMaxInitFailed);
1541 if (fMasterResubmitThreshold>0)
1542 printf("= Resubmit master job if failed subjobs >_______ %d\n", fMasterResubmitThreshold);
1543 printf("= Number of replicas for the output files_______ %d\n", fNreplicas);
1544 if (fNrunsPerMaster>0)
1545 printf("= Number of runs per master job: _______________ %d\n", fNrunsPerMaster);
1546 printf("= Number of files in one chunk to be merged: ___ %d\n", fMaxMergeFiles);
1547 printf("= Name of the generated execution script: ______ %s\n", fExecutable.Data());
1548 printf("= Executable command: __________________________ %s\n", fExecutableCommand.Data());
1549 if (fArguments.Length())
1550 printf("= Arguments for the execution script: __________ %s\n",fArguments.Data());
1551 if (fExecutableArgs.Length())
1552 printf("= Arguments after macro name in executable______ %s\n",fExecutableArgs.Data());
1553 printf("= Name of the generated analysis macro: ________ %s\n",fAnalysisMacro.Data());
1554 printf("= User analysis files to be deployed: __________ %s\n",fAnalysisSource.Data());
1555 printf("= Additional libs to be loaded or souces to be compiled runtime: <%s>\n",fAdditionalLibs.Data());
1556 printf("= Master jobs split mode: ______________________ %s\n",fSplitMode.Data());
1558 printf("= Custom name for the dataset to be created: ___ %s\n", fDatasetName.Data());
1559 printf("= Name of the generated JDL: ___________________ %s\n", fJDLName.Data());
1560 if (fIncludePath.Data())
1561 printf("= Include path for runtime task compilation: ___ %s\n", fIncludePath.Data());
1562 if (fCloseSE.Length())
1563 printf("= Force job outputs to storage element: ________ %s\n", fCloseSE.Data());
1564 if (fFriendChainName.Length())
1565 printf("= Open friend chain file on worker: ____________ %s\n", fFriendChainName.Data());
1567 TIter next(fPackages);
1570 while ((obj=next())) list += obj->GetName();
1571 printf("= Par files to be used: ________________________ %s\n", list.Data());
1575 //______________________________________________________________________________
1576 void AliAnalysisAlien::SetDefaults()
1578 // Set default values for everything. What cannot be filled will be left empty.
1579 if (fGridJDL) delete fGridJDL;
1580 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
1581 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
1584 fSplitMaxInputFileNumber = 100;
1586 fMasterResubmitThreshold = 0;
1591 fNrunsPerMaster = 1;
1592 fMaxMergeFiles = 100;
1594 fExecutable = "analysis.sh";
1595 fExecutableCommand = "root -b -q";
1597 fExecutableArgs = "";
1598 fAnalysisMacro = "myAnalysis.C";
1599 fAnalysisSource = "";
1600 fAdditionalLibs = "";
1604 fAliROOTVersion = "";
1605 fUser = ""; // Your alien user name
1606 fGridWorkingDir = "";
1607 fGridDataDir = ""; // Can be like: /alice/sim/PDC_08a/LHC08c9/
1608 fDataPattern = "*AliESDs.root"; // Can be like: *AliESDs.root, */pass1/*AliESDs.root, ...
1609 fFriendChainName = "";
1610 fGridOutputDir = "output";
1611 fOutputArchive = "log_archive.zip:std*@disk=1 root_archive.zip:*.root@disk=2";
1612 fOutputFiles = ""; // Like "AliAODs.root histos.root"
1613 fInputFormat = "xml-single";
1614 fJDLName = "analysis.jdl";
1615 fJobTag = "Automatically generated analysis JDL";
1616 fMergeExcludes = "";
1619 SetCheckCopy(kTRUE);
1620 SetDefaultOutputs(kTRUE);
1624 //______________________________________________________________________________
1625 Bool_t AliAnalysisAlien::CheckMergedFiles(const char *filename, const char *aliendir, Int_t nperchunk, Bool_t submit, const char *jdl)
1627 // Static method that checks the status of merging. This can submit merging jobs that did not produced the expected
1628 // output. If <submit> is false (checking) returns true only when the final merged file was found. If submit is true returns
1629 // true if the jobs were successfully submitted.
1630 Int_t countOrig = 0;
1631 Int_t countStage = 0;
1634 Bool_t doneFinal = kFALSE;
1636 TString saliendir(aliendir);
1637 TString sfilename, stmp;
1638 saliendir.ReplaceAll("//","/");
1639 saliendir = saliendir.Strip(TString::kTrailing, '/');
1641 ::Error("GetNregisteredFiles", "You need to be connected to AliEn.");
1644 sfilename = filename;
1645 sfilename.ReplaceAll(".root", "*.root");
1646 printf("Checking directory <%s> for merged files <%s> ...\n", aliendir, sfilename.Data());
1647 TString command = Form("find %s/ *%s", saliendir.Data(), sfilename.Data());
1648 TGridResult *res = gGrid->Command(command);
1650 ::Error("GetNregisteredFiles","Error: No result for the find command\n");
1655 while ((map=(TMap*)nextmap())) {
1656 TString turl = map->GetValue("turl")->GetName();
1657 if (!turl.Length()) {
1662 turl.ReplaceAll("alien://", "");
1663 turl.ReplaceAll(saliendir, "");
1664 sfilename = gSystem->BaseName(turl);
1665 turl = turl.Strip(TString::kLeading, '/');
1666 // Now check to what the file corresponds to:
1667 // original output - aliendir/%03d/filename
1668 // merged file (which stage) - aliendir/filename-Stage%02d_%04d
1669 // final merged file - aliendir/filename
1670 if (sfilename == turl) {
1671 if (sfilename == filename) {
1675 Int_t index = sfilename.Index("Stage");
1676 if (index<0) continue;
1677 stmp = sfilename(index+5,2);
1678 Int_t istage = atoi(stmp);
1679 stmp = sfilename(index+8,4);
1680 Int_t ijob = atoi(stmp);
1681 if (istage<stage) continue; // Ignore lower stages
1684 chunksDone.ResetAllBits();
1688 chunksDone.SetBitNumber(ijob);
1695 printf("=> Removing files from previous stages...\n");
1696 gGrid->Rm(Form("%s/*Stage*.root", aliendir));
1701 // Compute number of jobs that were submitted for the current stage
1702 Int_t ntotstage = countOrig;
1703 for (i=1; i<=stage; i++) {
1704 if (ntotstage%nperchunk) ntotstage = (ntotstage/nperchunk)+1;
1705 else ntotstage = (ntotstage/nperchunk);
1707 // Now compare with the number of set bits in the chunksDone array
1708 Int_t nmissing = (stage>0)?(ntotstage - countStage):0;
1710 printf("*** Found %d original files\n", countOrig);
1711 if (stage==0) printf("*** No merging completed so far.\n");
1712 else printf("*** Found %d out of %d files merged for stage %d\n", countStage, ntotstage, stage);
1713 if (nmissing) printf("*** Number of merged files missing for this stage: %d -> check merging job completion\n", nmissing);
1714 if (!submit) return doneFinal;
1715 // Sumbit merging jobs for all missing chunks for the current stage.
1716 TString query = Form("submit %s %s", jdl, aliendir);
1719 for (i=0; i<nmissing; i++) {
1720 ichunk = chunksDone.FirstNullBit(ichunk+1);
1721 Int_t jobId = SubmitSingleJob(Form("%s %d %d", query.Data(), stage, ichunk));
1722 if (!jobId) return kFALSE;
1726 // Submit next stage of merging
1727 if (stage==0) countStage = countOrig;
1728 Int_t nchunks = (countStage/nperchunk);
1729 if (countStage%nperchunk) nchunks += 1;
1730 for (i=0; i<nchunks; i++) {
1731 Int_t jobId = SubmitSingleJob(Form("%s %d %d", query.Data(), stage+1, i));
1732 if (!jobId) return kFALSE;
1737 //______________________________________________________________________________
1738 Int_t AliAnalysisAlien::SubmitSingleJob(const char *query)
1740 // Submits a single job corresponding to the query and returns job id. If 0 submission failed.
1741 if (!gGrid) return 0;
1742 printf("=> %s ------> ",query);
1743 TGridResult *res = gGrid->Command(query);
1745 TString jobId = res->GetKey(0,"jobId");
1747 if (jobId.IsNull()) {
1748 printf("submission failed. Reason:\n");
1751 ::Error("SubmitSingleJob", "Your query %s could not be submitted", query);
1754 printf(" Job id: %s\n", jobId.Data());
1758 //______________________________________________________________________________
1759 Bool_t AliAnalysisAlien::MergeOutput(const char *output, const char *basedir, Int_t nmaxmerge, Int_t stage, Int_t ichunk)
1761 // Merge given output files from basedir. The file merger will merge nmaxmerge
1762 // files in a group. Merging can be done in stages:
1763 // stage=0 : will merge all existing files in a single stage
1764 // stage=1 : does a find command for all files that do NOT contain the string "Stage".
1765 // If their number is bigger that nmaxmerge, only the files from
1766 // ichunk*nmaxmerge to ichunk*(nmaxmerge+1)-1 will get merged as output_stage_<ichunk>
1767 // stage=n : does a find command for files named <output>Stage<stage-1>_*. If their number is bigger than
1768 // nmaxmerge, merge just the chunk ichunk, otherwise write the merged output to the file
1770 TString outputFile = output;
1772 TString outputChunk;
1773 TString previousChunk = "";
1774 Int_t countChunk = 0;
1775 Int_t countZero = nmaxmerge;
1776 Bool_t merged = kTRUE;
1777 Int_t index = outputFile.Index("@");
1778 if (index > 0) outputFile.Remove(index);
1779 TString inputFile = outputFile;
1780 if (stage>1) inputFile.ReplaceAll(".root", Form("-Stage%02d_*.root", stage-1));
1781 command = Form("find %s/ *%s", basedir, inputFile.Data());
1782 printf("command: %s\n", command.Data());
1783 TGridResult *res = gGrid->Command(command);
1785 ::Error("MergeOutput","No result for the find command\n");
1789 TFileMerger *fm = 0;
1792 // Check if there is a merge operation to resume. Works only for stage 0 or 1.
1793 outputChunk = outputFile;
1794 outputChunk.ReplaceAll(".root", "_*.root");
1795 // Check for existent temporary merge files
1796 // Check overwrite mode and remove previous partial results if needed
1797 // Preserve old merging functionality for stage 0.
1799 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
1801 // Skip as many input files as in a chunk
1802 for (Int_t counter=0; counter<nmaxmerge; counter++) map = (TMap*)nextmap();
1804 ::Error("MergeOutput", "Cannot resume merging for <%s>, nentries=%d", outputFile.Data(), res->GetSize());
1808 outputChunk = outputFile;
1809 outputChunk.ReplaceAll(".root", Form("_%04d.root", countChunk));
1811 if (gSystem->AccessPathName(outputChunk)) continue;
1812 // Merged file with chunks up to <countChunk> found
1813 ::Info("MergeOutput", "Resume merging of <%s> from <%s>\n", outputFile.Data(), outputChunk.Data());
1814 previousChunk = outputChunk;
1818 countZero = nmaxmerge;
1820 while ((map=(TMap*)nextmap())) {
1821 // Loop 'find' results and get next LFN
1822 if (countZero == nmaxmerge) {
1823 // First file in chunk - create file merger and add previous chunk if any.
1824 fm = new TFileMerger(kFALSE);
1825 fm->SetFastMethod(kTRUE);
1826 if (previousChunk.Length()) fm->AddFile(previousChunk.Data());
1827 outputChunk = outputFile;
1828 outputChunk.ReplaceAll(".root", Form("_%04d.root", countChunk));
1830 // If last file found, put merged results in the output file
1831 if (map == res->Last()) outputChunk = outputFile;
1832 TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("turl"));
1833 if (!objs || !objs->GetString().Length()) {
1834 // Nothing found - skip this output
1839 // Add file to be merged and decrement chunk counter.
1840 fm->AddFile(objs->GetString());
1842 if (countZero==0 || map == res->Last()) {
1843 if (!fm->GetMergeList() || !fm->GetMergeList()->GetSize()) {
1844 // Nothing found - skip this output
1845 ::Warning("MergeOutput", "No <%s> files found.", inputFile.Data());
1850 fm->OutputFile(outputChunk);
1851 // Merge the outputs, then go to next chunk
1853 ::Error("MergeOutput", "Could not merge all <%s> files", outputFile.Data());
1858 ::Info("MergeOutputs", "\n##### Merged %d output files to <%s>", fm->GetMergeList()->GetSize(), outputChunk.Data());
1859 gSystem->Unlink(previousChunk);
1861 if (map == res->Last()) {
1867 countZero = nmaxmerge;
1868 previousChunk = outputChunk;
1873 // Merging stage different than 0.
1874 // Move to the begining of the requested chunk.
1875 outputChunk = outputFile;
1876 if (nmaxmerge < res->GetSize()) {
1877 if (ichunk*nmaxmerge >= res->GetSize()) {
1878 ::Error("MergeOutput", "Cannot merge merge chunk %d grouping %d files from %d total.", ichunk, nmaxmerge, res->GetSize());
1882 for (Int_t counter=0; counter<ichunk*nmaxmerge; counter++) map = (TMap*)nextmap();
1883 outputChunk.ReplaceAll(".root", Form("-Stage%02d_%04d.root", stage, ichunk));
1885 countZero = nmaxmerge;
1886 fm = new TFileMerger(kFALSE);
1887 fm->SetFastMethod(kTRUE);
1888 while ((map=(TMap*)nextmap())) {
1889 // Loop 'find' results and get next LFN
1890 TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("turl"));
1891 if (!objs || !objs->GetString().Length()) {
1892 // Nothing found - skip this output
1897 // Add file to be merged and decrement chunk counter.
1898 fm->AddFile(objs->GetString());
1900 if (countZero==0) break;
1903 if (!fm->GetMergeList() || !fm->GetMergeList()->GetSize()) {
1904 // Nothing found - skip this output
1905 ::Warning("MergeOutput", "No <%s> files found.", inputFile.Data());
1909 fm->OutputFile(outputChunk);
1910 // Merge the outputs
1912 ::Error("MergeOutput", "Could not merge all <%s> files", outputFile.Data());
1916 ::Info("MergeOutput", "\n##### Merged %d output files to <%s>", fm->GetMergeList()->GetSize(), outputChunk.Data());
1922 //______________________________________________________________________________
1923 Bool_t AliAnalysisAlien::MergeOutputs()
1925 // Merge analysis outputs existing in the AliEn space.
1926 if (TestBit(AliAnalysisGrid::kTest)) return kTRUE;
1927 if (TestBit(AliAnalysisGrid::kOffline)) return kFALSE;
1929 Error("MergeOutputs", "Cannot merge outputs without grid connection. Terminate will NOT be executed");
1933 if (!TestBit(AliAnalysisGrid::kMerge)) {
1934 Info("MergeOutputs", "### Re-run with <MergeViaJDL> option in terminate mode of the plugin to submit merging jobs ###");
1937 if (fProductionMode) {
1938 Info("MergeOutputs", "### Merging will be submitted by LPM manager... ###");
1941 Info("MergeOutputs", "Submitting merging JDL");
1942 if (!SubmitMerging()) return kFALSE;
1943 Info("MergeOutputs", "### Re-run with <MergeViaJDL> off to collect results after merging jobs are done ###");
1944 Info("MergeOutputs", "### The Terminate() method is executed by the merging jobs");
1947 // Get the output path
1948 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("/%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
1949 if (!DirectoryExists(fGridOutputDir)) {
1950 Error("MergeOutputs", "Grid output directory %s not found. Terminate() will NOT be executed", fGridOutputDir.Data());
1953 if (!fOutputFiles.Length()) {
1954 Error("MergeOutputs", "No output file names defined. Are you running the right AliAnalysisAlien configuration ?");
1957 // Check if fast read option was requested
1958 Info("MergeOutputs", "Started local merging of output files from: alien://%s \
1959 \n======= overwrite mode = %d", fGridOutputDir.Data(), (Int_t)fOverwriteMode);
1960 if (fFastReadOption) {
1961 Warning("MergeOutputs", "You requested FastRead option. Using xrootd flags to reduce timeouts. This may skip some files that could be accessed ! \
1962 \n+++ NOTE: To disable this option, use: plugin->SetFastReadOption(kFALSE)");
1963 gEnv->SetValue("XNet.ConnectTimeout",10);
1964 gEnv->SetValue("XNet.RequestTimeout",10);
1965 gEnv->SetValue("XNet.MaxRedirectCount",2);
1966 gEnv->SetValue("XNet.ReconnectTimeout",10);
1967 gEnv->SetValue("XNet.FirstConnectMaxCnt",1);
1969 // Make sure we change the temporary directory
1970 gSystem->Setenv("TMPDIR", gSystem->pwd());
1971 TObjArray *list = fOutputFiles.Tokenize(",");
1975 Bool_t merged = kTRUE;
1976 while((str=(TObjString*)next())) {
1977 outputFile = str->GetString();
1978 Int_t index = outputFile.Index("@");
1979 if (index > 0) outputFile.Remove(index);
1980 TString outputChunk = outputFile;
1981 outputChunk.ReplaceAll(".root", "_*.root");
1982 // Skip already merged outputs
1983 if (!gSystem->AccessPathName(outputFile)) {
1984 if (fOverwriteMode) {
1985 Info("MergeOutputs", "Overwrite mode. Existing file %s was deleted.", outputFile.Data());
1986 gSystem->Unlink(outputFile);
1987 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
1988 Info("MergeOutput", "Overwrite mode: partial merged files %s will removed",
1989 outputChunk.Data());
1990 gSystem->Exec(Form("rm -f %s", outputChunk.Data()));
1993 Info("MergeOutputs", "Output file <%s> found. Not merging again.", outputFile.Data());
1997 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
1998 Info("MergeOutput", "Overwrite mode: partial merged files %s will removed",
1999 outputChunk.Data());
2000 gSystem->Exec(Form("rm -f %s", outputChunk.Data()));
2003 if (fMergeExcludes.Length() &&
2004 fMergeExcludes.Contains(outputFile.Data())) continue;
2005 // Perform a 'find' command in the output directory, looking for registered outputs
2006 merged = MergeOutput(outputFile, fGridOutputDir, fMaxMergeFiles);
2008 Error("MergeOutputs", "Terminate() will NOT be executed");
2011 TFile *fileOpened = (TFile*)gROOT->GetListOfFiles()->FindObject(outputFile);
2012 if (fileOpened) fileOpened->Close();
2017 //______________________________________________________________________________
2018 void AliAnalysisAlien::SetDefaultOutputs(Bool_t flag)
2020 // Use the output files connected to output containers from the analysis manager
2021 // rather than the files defined by SetOutputFiles
2022 if (flag && !TObject::TestBit(AliAnalysisGrid::kDefaultOutputs))
2023 Info("SetDefaultOutputs", "Plugin will use the output files taken from analysis manager");
2024 TObject::SetBit(AliAnalysisGrid::kDefaultOutputs, flag);
2027 //______________________________________________________________________________
2028 void AliAnalysisAlien::SetOutputFiles(const char *list)
2030 // Manually set the output files list.
2031 // Removes duplicates. Not allowed if default outputs are not disabled.
2032 if (TObject::TestBit(AliAnalysisGrid::kDefaultOutputs)) {
2033 Fatal("SetOutputFiles", "You have to explicitly call SetDefaultOutputs(kFALSE) to manually set output files.");
2036 Info("SetOutputFiles", "Output file list is set manually - you are on your own.");
2038 TString slist = list;
2039 if (slist.Contains("@")) Warning("SetOutputFiles","The plugin does not allow explicit SE's. Please use: SetNumberOfReplicas() instead.");
2040 TObjArray *arr = slist.Tokenize(" ");
2044 while ((os=(TObjString*)next())) {
2045 sout = os->GetString();
2046 if (sout.Index("@")>0) sout.Remove(sout.Index("@"));
2047 if (fOutputFiles.Contains(sout)) continue;
2048 if (!fOutputFiles.IsNull()) fOutputFiles += ",";
2049 fOutputFiles += sout;
2054 //______________________________________________________________________________
2055 void AliAnalysisAlien::SetOutputArchive(const char *list)
2057 // Manually set the output archive list. Free text - you are on your own...
2058 // Not allowed if default outputs are not disabled.
2059 if (TObject::TestBit(AliAnalysisGrid::kDefaultOutputs)) {
2060 Fatal("SetOutputArchive", "You have to explicitly call SetDefaultOutputs(kFALSE) to manually set the output archives.");
2063 Info("SetOutputArchive", "Output archive is set manually - you are on your own.");
2064 fOutputArchive = list;
2067 //______________________________________________________________________________
2068 void AliAnalysisAlien::SetPreferedSE(const char */*se*/)
2070 // Setting a prefered output SE is not allowed anymore.
2071 Warning("SetPreferedSE", "Setting a preferential SE is not allowed anymore via the plugin. Use SetNumberOfReplicas() and SetDefaultOutputs()");
2074 //______________________________________________________________________________
2075 Bool_t AliAnalysisAlien::StartAnalysis(Long64_t /*nentries*/, Long64_t /*firstEntry*/)
2077 // Start remote grid analysis.
2079 // Check if output files have to be taken from the analysis manager
2080 if (TestBit(AliAnalysisGrid::kDefaultOutputs)) {
2081 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
2082 if (!mgr || !mgr->IsInitialized()) {
2083 Error("StartAnalysis", "You need an initialized analysis manager for this");
2087 TIter next(mgr->GetOutputs());
2088 AliAnalysisDataContainer *output;
2089 while ((output=(AliAnalysisDataContainer*)next())) {
2090 const char *filename = output->GetFileName();
2091 if (!(strcmp(filename, "default"))) {
2092 if (!mgr->GetOutputEventHandler()) continue;
2093 filename = mgr->GetOutputEventHandler()->GetOutputFileName();
2095 if (fOutputFiles.Contains(filename)) continue;
2096 if (fOutputFiles.Length()) fOutputFiles += ",";
2097 fOutputFiles += filename;
2099 // Add extra files registered to the analysis manager
2100 if (mgr->GetExtraFiles().Length()) {
2101 if (fOutputFiles.Length()) fOutputFiles += ",";
2102 TString extra = mgr->GetExtraFiles();
2103 extra.ReplaceAll(" ", ",");
2104 // Protection in case extra files do not exist (will it work?)
2105 extra.ReplaceAll(".root", "*.root");
2106 fOutputFiles += extra;
2108 // Compose the output archive.
2109 fOutputArchive = "log_archive.zip:std*@disk=1 ";
2110 fOutputArchive += Form("root_archive.zip:%s@disk=%d",fOutputFiles.Data(),fNreplicas);
2112 // if (!fCloseSE.Length()) fCloseSE = gSystem->Getenv("alien_CLOSE_SE");
2113 if (TestBit(AliAnalysisGrid::kOffline)) {
2114 Info("StartAnalysis","\n##### OFFLINE MODE ##### Files to be used in GRID are produced but not copied \
2115 \n there nor any job run. You can revise the JDL and analysis \
2116 \n macro then run the same in \"submit\" mode.");
2117 } else if (TestBit(AliAnalysisGrid::kTest)) {
2118 Info("StartAnalysis","\n##### LOCAL MODE ##### Your analysis will be run locally on a subset of the requested \
2120 } else if (TestBit(AliAnalysisGrid::kSubmit)) {
2121 Info("StartAnalysis","\n##### SUBMIT MODE ##### Files required by your analysis are copied to your grid working \
2122 \n space and job submitted.");
2123 } else if (TestBit(AliAnalysisGrid::kMerge)) {
2124 Info("StartAnalysis","\n##### MERGE MODE ##### The registered outputs of the analysis will be merged");
2125 if (fMergeViaJDL) CheckInputData();
2128 Info("StartAnalysis","\n##### FULL ANALYSIS MODE ##### Producing needed files and submitting your analysis job...");
2133 Error("StartAnalysis", "Cannot start grid analysis without grid connection");
2136 if (IsCheckCopy()) CheckFileCopy(gGrid->GetHomeDirectory());
2137 if (!CheckInputData()) {
2138 Error("StartAnalysis", "There was an error in preprocessing your requested input data");
2141 if (!CreateDataset(fDataPattern)) {
2143 if (!fRunNumbers.Length() && !fRunRange[0]) serror = Form("path to data directory: <%s>", fGridDataDir.Data());
2144 if (fRunNumbers.Length()) serror = "run numbers";
2145 if (fRunRange[0]) serror = Form("run range [%d, %d]", fRunRange[0], fRunRange[1]);
2146 serror += Form("\n or data pattern <%s>", fDataPattern.Data());
2147 Error("StartAnalysis", "No data to process. Please fix %s in your plugin configuration.", serror.Data());
2150 WriteAnalysisFile();
2151 WriteAnalysisMacro();
2153 WriteValidationScript();
2155 WriteMergingMacro();
2156 WriteMergeExecutable();
2157 WriteValidationScript(kTRUE);
2159 if (!CreateJDL()) return kFALSE;
2160 if (TestBit(AliAnalysisGrid::kOffline)) return kFALSE;
2161 if (TestBit(AliAnalysisGrid::kTest)) {
2162 // Locally testing the analysis
2163 Info("StartAnalysis", "\n_______________________________________________________________________ \
2164 \n Running analysis script in a daughter shell as on a worker node \
2165 \n_______________________________________________________________________");
2166 TObjArray *list = fOutputFiles.Tokenize(",");
2170 while((str=(TObjString*)next())) {
2171 outputFile = str->GetString();
2172 Int_t index = outputFile.Index("@");
2173 if (index > 0) outputFile.Remove(index);
2174 if (!gSystem->AccessPathName(outputFile)) gSystem->Exec(Form("rm %s", outputFile.Data()));
2177 gSystem->Exec(Form("bash %s 2>stderr", fExecutable.Data()));
2178 TString validationScript = fExecutable;
2179 validationScript.ReplaceAll(".sh", "_validation.sh");
2180 gSystem->Exec(Form("bash %s",validationScript.Data()));
2181 // gSystem->Exec("cat stdout");
2184 // Check if submitting is managed by LPM manager
2185 if (fProductionMode) {
2186 TString prodfile = fJDLName;
2187 prodfile.ReplaceAll(".jdl", ".prod");
2188 WriteProductionFile(prodfile);
2189 Info("StartAnalysis", "Job submitting is managed by LPM. Rerun in terminate mode after jobs finished.");
2192 // Submit AliEn job(s)
2193 gGrid->Cd(fGridOutputDir);
2196 if (!fRunNumbers.Length() && !fRunRange[0]) {
2197 // Submit a given xml or a set of runs
2198 res = gGrid->Command(Form("submit %s", fJDLName.Data()));
2199 printf("*************************** %s\n",Form("submit %s", fJDLName.Data()));
2201 const char *cjobId = res->GetKey(0,"jobId");
2205 Error("StartAnalysis", "Your JDL %s could not be submitted", fJDLName.Data());
2208 Info("StartAnalysis", "\n_______________________________________________________________________ \
2209 \n##### Your JDL %s was successfully submitted. \nTHE JOB ID IS: %s \
2210 \n_______________________________________________________________________",
2211 fJDLName.Data(), cjobId);
2216 Error("StartAnalysis", "No grid result after submission !!! Bailing out...");
2220 // Submit for a range of enumeration of runs.
2221 if (!Submit()) return kFALSE;
2224 Info("StartAnalysis", "\n#### STARTING AN ALIEN SHELL FOR YOU. EXIT WHEN YOUR JOB %s HAS FINISHED. #### \
2225 \n You may exit at any time and terminate the job later using the option <terminate> \
2226 \n ##################################################################################", jobID.Data());
2227 gSystem->Exec("aliensh");
2231 //______________________________________________________________________________
2232 Bool_t AliAnalysisAlien::Submit()
2234 // Submit all master jobs.
2235 Int_t nmasterjobs = fInputFiles->GetEntries();
2236 Long_t tshoot = gSystem->Now();
2237 if (!fNsubmitted && !SubmitNext()) return kFALSE;
2238 while (fNsubmitted < nmasterjobs) {
2239 Long_t now = gSystem->Now();
2240 if ((now-tshoot)>30000) {
2242 if (!SubmitNext()) return kFALSE;
2248 //______________________________________________________________________________
2249 Bool_t AliAnalysisAlien::SubmitMerging()
2251 // Submit all merging jobs.
2252 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("/%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
2253 gGrid->Cd(fGridOutputDir);
2254 TString mergeJDLName = fExecutable;
2255 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
2256 Int_t ntosubmit = fInputFiles->GetEntries();
2257 for (Int_t i=0; i<ntosubmit; i++) {
2258 TString runOutDir = gSystem->BaseName(fInputFiles->At(i)->GetName());
2259 runOutDir.ReplaceAll(".xml", "");
2260 if (fOutputToRunNo) {
2261 // The output directory is the run number
2262 printf("### Submitting merging job for run <%s>\n", runOutDir.Data());
2263 runOutDir = Form("%s/%s", fGridOutputDir.Data(), runOutDir.Data());
2265 // The output directory is the master number in 3 digits format
2266 printf("### Submitting merging job for master <%03d>\n", i);
2267 runOutDir = Form("%s/%03d",fGridOutputDir.Data(), i);
2269 // Check now the number of merging stages.
2270 TObjArray *list = fOutputFiles.Tokenize(",");
2274 while((str=(TObjString*)next())) {
2275 outputFile = str->GetString();
2276 Int_t index = outputFile.Index("@");
2277 if (index > 0) outputFile.Remove(index);
2278 if (!fMergeExcludes.Contains(outputFile)) break;
2281 Bool_t done = CheckMergedFiles(outputFile, runOutDir, fMaxMergeFiles, kTRUE, mergeJDLName);
2282 if (!done) return kFALSE;
2284 if (!ntosubmit) return kTRUE;
2285 Info("StartAnalysis", "\n#### STARTING AN ALIEN SHELL FOR YOU. EXIT WHEN YOUR MERGING JOBS HAVE FINISHED. #### \
2286 \n You may exit at any time and terminate the job later using the option <terminate> but disabling SetMergeViaJDL\
2287 \n ##################################################################################");
2288 gSystem->Exec("aliensh");
2292 //______________________________________________________________________________
2293 Bool_t AliAnalysisAlien::SubmitNext()
2295 // Submit next bunch of master jobs if the queue is free.
2296 static Bool_t iscalled = kFALSE;
2297 static Int_t firstmaster = 0;
2298 static Int_t lastmaster = 0;
2299 static Int_t npermaster = 0;
2300 if (iscalled) return kTRUE;
2302 Int_t nrunning=0, nwaiting=0, nerror=0, ndone=0;
2303 Int_t ntosubmit = 0;
2306 if (!fNsubmitted) ntosubmit = 1;
2308 TString status = GetJobStatus(firstmaster, lastmaster, nrunning, nwaiting, nerror, ndone);
2309 printf("=== master %d: %s\n", lastmaster, status.Data());
2310 // If last master not split, just return
2311 if (status != "SPLIT") {iscalled = kFALSE; return kTRUE;}
2312 // No more than 100 waiting jobs
2313 if (nwaiting>100) {iscalled = kFALSE; return kTRUE;}
2314 npermaster = (nrunning+nwaiting+nerror+ndone)/fNsubmitted;
2315 if (npermaster) ntosubmit = (100-nwaiting)/npermaster;
2316 if (!ntosubmit) ntosubmit = 1;
2317 printf("=== WAITING(%d) RUNNING(%d) DONE(%d) OTHER(%d) NperMaster=%d => to submit %d jobs\n",
2318 nwaiting, nrunning, ndone, nerror, npermaster, ntosubmit);
2320 Int_t nmasterjobs = fInputFiles->GetEntries();
2321 for (Int_t i=0; i<ntosubmit; i++) {
2322 // Submit for a range of enumeration of runs.
2323 if (fNsubmitted>=nmasterjobs) {iscalled = kFALSE; return kTRUE;}
2325 TString runOutDir = gSystem->BaseName(fInputFiles->At(fNsubmitted)->GetName());
2326 runOutDir.ReplaceAll(".xml", "");
2328 query = Form("submit %s %s %s", fJDLName.Data(), fInputFiles->At(fNsubmitted)->GetName(), runOutDir.Data());
2330 query = Form("submit %s %s %03d", fJDLName.Data(), fInputFiles->At(fNsubmitted)->GetName(), fNsubmitted);
2331 printf("********* %s\n",query.Data());
2332 res = gGrid->Command(query);
2334 TString cjobId1 = res->GetKey(0,"jobId");
2335 if (!cjobId1.Length()) {
2339 Error("StartAnalysis", "Your JDL %s could not be submitted. The message was:", fJDLName.Data());
2342 Info("StartAnalysis", "\n_______________________________________________________________________ \
2343 \n##### Your JDL %s submitted (%d to go). \nTHE JOB ID IS: %s \
2344 \n_______________________________________________________________________",
2345 fJDLName.Data(), nmasterjobs-fNsubmitted-1, cjobId1.Data());
2348 lastmaster = cjobId1.Atoi();
2349 if (!firstmaster) firstmaster = lastmaster;
2354 Error("StartAnalysis", "No grid result after submission !!! Bailing out...");
2362 //______________________________________________________________________________
2363 void AliAnalysisAlien::WriteAnalysisFile()
2365 // Write current analysis manager into the file <analysisFile>
2366 TString analysisFile = fExecutable;
2367 analysisFile.ReplaceAll(".sh", ".root");
2368 if (!TestBit(AliAnalysisGrid::kSubmit)) {
2369 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
2370 if (!mgr || !mgr->IsInitialized()) {
2371 Error("WriteAnalysisFile", "You need an initialized analysis manager for this");
2374 // Check analysis type
2376 if (mgr->GetMCtruthEventHandler()) TObject::SetBit(AliAnalysisGrid::kUseMC);
2377 handler = (TObject*)mgr->GetInputEventHandler();
2379 if (handler->InheritsFrom("AliESDInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseESD);
2380 if (handler->InheritsFrom("AliAODInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseAOD);
2382 TDirectory *cdir = gDirectory;
2383 TFile *file = TFile::Open(analysisFile, "RECREATE");
2385 // Skip task Terminate calls for the grid job (but not in test mode, where we want to check also the terminate mode
2386 if (!TestBit(AliAnalysisGrid::kTest)) mgr->SetSkipTerminate(kTRUE);
2387 // Unless merging makes no sense
2388 if (IsSingleOutput()) mgr->SetSkipTerminate(kFALSE);
2391 // Enable termination for local jobs
2392 mgr->SetSkipTerminate(kFALSE);
2394 if (cdir) cdir->cd();
2395 Info("WriteAnalysisFile", "\n##### Analysis manager: %s wrote to file <%s>\n", mgr->GetName(),analysisFile.Data());
2397 Bool_t copy = kTRUE;
2398 if (TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
2401 TString workdir = gGrid->GetHomeDirectory();
2402 workdir += fGridWorkingDir;
2403 Info("CreateJDL", "\n##### Copying file <%s> containing your initialized analysis manager to your alien workspace", analysisFile.Data());
2404 if (FileExists(analysisFile)) gGrid->Rm(analysisFile);
2405 TFile::Cp(Form("file:%s",analysisFile.Data()), Form("alien://%s/%s", workdir.Data(),analysisFile.Data()));
2409 //______________________________________________________________________________
2410 void AliAnalysisAlien::WriteAnalysisMacro()
2412 // Write the analysis macro that will steer the analysis in grid mode.
2413 if (!TestBit(AliAnalysisGrid::kSubmit)) {
2415 out.open(fAnalysisMacro.Data(), ios::out);
2417 Error("WriteAnalysisMacro", "could not open file %s for writing", fAnalysisMacro.Data());
2420 Bool_t hasSTEERBase = kFALSE;
2421 Bool_t hasESD = kFALSE;
2422 Bool_t hasAOD = kFALSE;
2423 Bool_t hasANALYSIS = kFALSE;
2424 Bool_t hasANALYSISalice = kFALSE;
2425 Bool_t hasCORRFW = kFALSE;
2426 TString func = fAnalysisMacro;
2427 TString type = "ESD";
2428 TString comment = "// Analysis using ";
2429 if (TObject::TestBit(AliAnalysisGrid::kUseESD)) comment += "ESD";
2430 if (TObject::TestBit(AliAnalysisGrid::kUseAOD)) {
2434 if (type!="AOD" && fFriendChainName!="") {
2435 Error("WriteAnalysisMacro", "Friend chain can be attached only to AOD");
2438 if (TObject::TestBit(AliAnalysisGrid::kUseMC)) comment += "/MC";
2439 else comment += " data";
2440 out << "const char *anatype = \"" << type.Data() << "\";" << endl << endl;
2441 func.ReplaceAll(".C", "");
2442 out << "void " << func.Data() << "()" << endl;
2444 out << comment.Data() << endl;
2445 out << "// Automatically generated analysis steering macro executed in grid subjobs" << endl << endl;
2446 out << " TStopwatch timer;" << endl;
2447 out << " timer.Start();" << endl << endl;
2448 out << "// load base root libraries" << endl;
2449 out << " gSystem->Load(\"libTree\");" << endl;
2450 out << " gSystem->Load(\"libGeom\");" << endl;
2451 out << " gSystem->Load(\"libVMC\");" << endl;
2452 out << " gSystem->Load(\"libPhysics\");" << endl << endl;
2453 out << " gSystem->Load(\"libMinuit\");" << endl << endl;
2454 if (fAdditionalRootLibs.Length()) {
2455 // in principle libtree /lib geom libvmc etc. can go into this list, too
2456 out << "// Add aditional libraries" << endl;
2457 TObjArray *list = fAdditionalRootLibs.Tokenize(" ");
2460 while((str=(TObjString*)next())) {
2461 if (str->GetString().Contains(".so"))
2462 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
2464 if (list) delete list;
2466 out << "// include path" << endl;
2467 if (fIncludePath.Length()) out << " gSystem->AddIncludePath(\"" << fIncludePath.Data() << "\");" << endl;
2468 out << " gSystem->AddIncludePath(\"-I$ALICE_ROOT/include\");" << endl << endl;
2469 out << "// Load analysis framework libraries" << endl;
2470 TString setupPar = "AliAnalysisAlien::SetupPar";
2472 out << " gSystem->Load(\"libSTEERBase\");" << endl;
2473 out << " gSystem->Load(\"libESD\");" << endl;
2474 out << " gSystem->Load(\"libAOD\");" << endl;
2475 out << " gSystem->Load(\"libANALYSIS\");" << endl;
2476 out << " gSystem->Load(\"libANALYSISalice\");" << endl;
2477 out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
2479 TIter next(fPackages);
2482 while ((obj=next())) {
2483 pkgname = obj->GetName();
2484 if (pkgname == "STEERBase" ||
2485 pkgname == "STEERBase.par") hasSTEERBase = kTRUE;
2486 if (pkgname == "ESD" ||
2487 pkgname == "ESD.par") hasESD = kTRUE;
2488 if (pkgname == "AOD" ||
2489 pkgname == "AOD.par") hasAOD = kTRUE;
2490 if (pkgname == "ANALYSIS" ||
2491 pkgname == "ANALYSIS.par") hasANALYSIS = kTRUE;
2492 if (pkgname == "ANALYSISalice" ||
2493 pkgname == "ANALYSISalice.par") hasANALYSISalice = kTRUE;
2494 if (pkgname == "CORRFW" ||
2495 pkgname == "CORRFW.par") hasCORRFW = kTRUE;
2497 if (hasANALYSISalice) setupPar = "SetupPar";
2498 if (!hasSTEERBase) out << " gSystem->Load(\"libSTEERBase\");" << endl;
2499 else out << " if (!" << setupPar << "(\"STEERBase\")) return;" << endl;
2500 if (!hasESD) out << " gSystem->Load(\"libESD\");" << endl;
2501 else out << " if (!" << setupPar << "(\"ESD\")) return;" << endl;
2502 if (!hasAOD) out << " gSystem->Load(\"libAOD\");" << endl;
2503 else out << " if (!" << setupPar << "(\"AOD\")) return;" << endl;
2504 if (!hasANALYSIS) out << " gSystem->Load(\"libANALYSIS\");" << endl;
2505 else out << " if (!" << setupPar << "(\"ANALYSIS\")) return;" << endl;
2506 if (!hasANALYSISalice) out << " gSystem->Load(\"libANALYSISalice\");" << endl;
2507 else out << " if (!" << setupPar << "(\"ANALYSISalice\")) return;" << endl;
2508 if (!hasCORRFW) out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
2509 else out << " if (!" << setupPar << "(\"CORRFW\")) return;" << endl << endl;
2510 out << "// Compile other par packages" << endl;
2512 while ((obj=next())) {
2513 pkgname = obj->GetName();
2514 if (pkgname == "STEERBase" ||
2515 pkgname == "STEERBase.par" ||
2517 pkgname == "ESD.par" ||
2519 pkgname == "AOD.par" ||
2520 pkgname == "ANALYSIS" ||
2521 pkgname == "ANALYSIS.par" ||
2522 pkgname == "ANALYSISalice" ||
2523 pkgname == "ANALYSISalice.par" ||
2524 pkgname == "CORRFW" ||
2525 pkgname == "CORRFW.par") continue;
2526 out << " if (!" << setupPar << "(\"" << obj->GetName() << "\")) return;" << endl;
2529 if (fAdditionalLibs.Length()) {
2530 out << "// Add aditional AliRoot libraries" << endl;
2531 TObjArray *list = fAdditionalLibs.Tokenize(" ");
2534 while((str=(TObjString*)next())) {
2535 if (str->GetString().Contains(".so"))
2536 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
2537 if (str->GetString().Contains(".par"))
2538 out << " if (!" << setupPar << "(\"" << str->GetString() << "\")) return;" << endl;
2540 if (list) delete list;
2543 out << "// analysis source to be compiled at runtime (if any)" << endl;
2544 if (fAnalysisSource.Length()) {
2545 TObjArray *list = fAnalysisSource.Tokenize(" ");
2548 while((str=(TObjString*)next())) {
2549 out << " gROOT->ProcessLine(\".L " << str->GetString().Data() << "+g\");" << endl;
2551 if (list) delete list;
2554 if (fFastReadOption) {
2555 Warning("WriteAnalysisMacro", "!!! You requested FastRead option. Using xrootd flags to reduce timeouts in the grid jobs. This may skip some files that could be accessed !!! \
2556 \n+++ NOTE: To disable this option, use: plugin->SetFastReadOption(kFALSE)");
2557 out << "// fast xrootd reading enabled" << endl;
2558 out << " printf(\"!!! You requested FastRead option. Using xrootd flags to reduce timeouts. Note that this may skip some files that could be accessed !!!\");" << endl;
2559 out << " gEnv->SetValue(\"XNet.ConnectTimeout\",10);" << endl;
2560 out << " gEnv->SetValue(\"XNet.RequestTimeout\",10);" << endl;
2561 out << " gEnv->SetValue(\"XNet.MaxRedirectCount\",2);" << endl;
2562 out << " gEnv->SetValue(\"XNet.ReconnectTimeout\",10);" << endl;
2563 out << " gEnv->SetValue(\"XNet.FirstConnectMaxCnt\",1);" << endl << endl;
2565 // Change temp directory to current one
2566 out << "// Set temporary merging directory to current one" << endl;
2567 out << " gSystem->Setenv(\"TMPDIR\", gSystem->pwd());" << endl << endl;
2568 out << "// connect to AliEn and make the chain" << endl;
2569 out << " if (!TGrid::Connect(\"alien://\")) return;" << endl;
2570 if (IsUsingTags()) {
2571 out << " TChain *chain = CreateChainFromTags(\"wn.xml\", anatype);" << endl << endl;
2573 out << " TChain *chain = CreateChain(\"wn.xml\", anatype);" << endl << endl;
2575 out << "// read the analysis manager from file" << endl;
2576 TString analysisFile = fExecutable;
2577 analysisFile.ReplaceAll(".sh", ".root");
2578 out << " TFile *file = TFile::Open(\"" << analysisFile << "\");" << endl;
2579 out << " if (!file) return;" << endl;
2580 out << " TIter nextkey(file->GetListOfKeys());" << endl;
2581 out << " AliAnalysisManager *mgr = 0;" << endl;
2582 out << " TKey *key;" << endl;
2583 out << " while ((key=(TKey*)nextkey())) {" << endl;
2584 out << " if (!strcmp(key->GetClassName(), \"AliAnalysisManager\"))" << endl;
2585 out << " mgr = (AliAnalysisManager*)file->Get(key->GetName());" << endl;
2586 out << " };" << endl;
2587 out << " if (!mgr) {" << endl;
2588 out << " ::Error(\"" << func.Data() << "\", \"No analysis manager found in file " << analysisFile <<"\");" << endl;
2589 out << " return;" << endl;
2590 out << " }" << endl << endl;
2591 out << " mgr->PrintStatus();" << endl;
2592 if (AliAnalysisManager::GetAnalysisManager()) {
2593 if (AliAnalysisManager::GetAnalysisManager()->GetDebugLevel()>3) {
2594 out << " gEnv->SetValue(\"XNet.Debug\", \"1\");" << endl;
2596 out << " AliLog::SetGlobalLogLevel(AliLog::kError);" << endl;
2599 out << " mgr->StartAnalysis(\"localfile\", chain);" << endl;
2600 out << " timer.Stop();" << endl;
2601 out << " timer.Print();" << endl;
2602 out << "}" << endl << endl;
2603 if (IsUsingTags()) {
2604 out << "TChain* CreateChainFromTags(const char *xmlfile, const char *type=\"ESD\")" << endl;
2606 out << "// Create a chain using tags from the xml file." << endl;
2607 out << " TAlienCollection* coll = TAlienCollection::Open(xmlfile);" << endl;
2608 out << " if (!coll) {" << endl;
2609 out << " ::Error(\"CreateChainFromTags\", \"Cannot create an AliEn collection from %s\", xmlfile);" << endl;
2610 out << " return NULL;" << endl;
2611 out << " }" << endl;
2612 out << " TGridResult* tagResult = coll->GetGridResult(\"\",kFALSE,kFALSE);" << endl;
2613 out << " AliTagAnalysis *tagAna = new AliTagAnalysis(type);" << endl;
2614 out << " tagAna->ChainGridTags(tagResult);" << endl << endl;
2615 out << " AliRunTagCuts *runCuts = new AliRunTagCuts();" << endl;
2616 out << " AliLHCTagCuts *lhcCuts = new AliLHCTagCuts();" << endl;
2617 out << " AliDetectorTagCuts *detCuts = new AliDetectorTagCuts();" << endl;
2618 out << " AliEventTagCuts *evCuts = new AliEventTagCuts();" << endl;
2619 out << " // Check if the cuts configuration file was provided" << endl;
2620 out << " if (!gSystem->AccessPathName(\"ConfigureCuts.C\")) {" << endl;
2621 out << " gROOT->LoadMacro(\"ConfigureCuts.C\");" << endl;
2622 out << " ConfigureCuts(runCuts, lhcCuts, detCuts, evCuts);" << endl;
2623 out << " }" << endl;
2624 if (fFriendChainName=="") {
2625 out << " TChain *chain = tagAna->QueryTags(runCuts, lhcCuts, detCuts, evCuts);" << endl;
2627 out << " TString tmpColl=\"tmpCollection.xml\";" << endl;
2628 out << " tagAna->CreateXMLCollection(tmpColl.Data(),runCuts, lhcCuts, detCuts, evCuts);" << endl;
2629 out << " TChain *chain = CreateChain(tmpColl.Data(),type);" << endl;
2631 out << " if (!chain || !chain->GetNtrees()) return NULL;" << endl;
2632 out << " chain->ls();" << endl;
2633 out << " return chain;" << endl;
2634 out << "}" << endl << endl;
2635 if (gSystem->AccessPathName("ConfigureCuts.C")) {
2636 TString msg = "\n##### You may want to provide a macro ConfigureCuts.C with a method:\n";
2637 msg += " void ConfigureCuts(AliRunTagCuts *runCuts,\n";
2638 msg += " AliLHCTagCuts *lhcCuts,\n";
2639 msg += " AliDetectorTagCuts *detCuts,\n";
2640 msg += " AliEventTagCuts *evCuts)";
2641 Info("WriteAnalysisMacro", msg.Data());
2644 if (!IsUsingTags() || fFriendChainName!="") {
2645 out <<"//________________________________________________________________________________" << endl;
2646 out << "TChain* CreateChain(const char *xmlfile, const char *type=\"ESD\")" << endl;
2648 out << "// Create a chain using url's from xml file" << endl;
2649 out << " TString treename = type;" << endl;
2650 out << " treename.ToLower();" << endl;
2651 out << " treename += \"Tree\";" << endl;
2652 out << " printf(\"***************************************\\n\");" << endl;
2653 out << " printf(\" Getting chain of trees %s\\n\", treename.Data());" << endl;
2654 out << " printf(\"***************************************\\n\");" << endl;
2655 out << " TAlienCollection *coll = TAlienCollection::Open(xmlfile);" << endl;
2656 out << " if (!coll) {" << endl;
2657 out << " ::Error(\"CreateChain\", \"Cannot create an AliEn collection from %s\", xmlfile);" << endl;
2658 out << " return NULL;" << endl;
2659 out << " }" << endl;
2660 out << " TChain *chain = new TChain(treename);" << endl;
2661 if(fFriendChainName!="") {
2662 out << " TChain *chainFriend = new TChain(treename);" << endl;
2664 out << " coll->Reset();" << endl;
2665 out << " while (coll->Next()) {" << endl;
2666 out << " chain->Add(coll->GetTURL(\"\"));" << endl;
2667 if(fFriendChainName!="") {
2668 out << " TString fileFriend=coll->GetTURL(\"\");" << endl;
2669 out << " fileFriend.ReplaceAll(\"AliAOD.root\",\""<<fFriendChainName.Data()<<"\");" << endl;
2670 out << " fileFriend.ReplaceAll(\"AliAODs.root\",\""<<fFriendChainName.Data()<<"\");" << endl;
2671 out << " chainFriend->Add(fileFriend.Data());" << endl;
2673 out << " }" << endl;
2674 out << " if (!chain->GetNtrees()) {" << endl;
2675 out << " ::Error(\"CreateChain\", \"No tree found from collection %s\", xmlfile);" << endl;
2676 out << " return NULL;" << endl;
2677 out << " }" << endl;
2678 if(fFriendChainName!="") {
2679 out << " chain->AddFriend(chainFriend);" << endl;
2681 out << " return chain;" << endl;
2682 out << "}" << endl << endl;
2684 if (hasANALYSISalice) {
2685 out <<"//________________________________________________________________________________" << endl;
2686 out << "Bool_t SetupPar(const char *package) {" << endl;
2687 out << "// Compile the package and set it up." << endl;
2688 out << " TString pkgdir = package;" << endl;
2689 out << " pkgdir.ReplaceAll(\".par\",\"\");" << endl;
2690 out << " gSystem->Exec(Form(\"tar xvzf %s.par\", pkgdir.Data()));" << endl;
2691 out << " TString cdir = gSystem->WorkingDirectory();" << endl;
2692 out << " gSystem->ChangeDirectory(pkgdir);" << endl;
2693 out << " // Check for BUILD.sh and execute" << endl;
2694 out << " if (!gSystem->AccessPathName(\"PROOF-INF/BUILD.sh\")) {" << endl;
2695 out << " printf(\"*******************************\\n\");" << endl;
2696 out << " printf(\"*** Building PAR archive ***\\n\");" << endl;
2697 out << " printf(\"*******************************\\n\");" << endl;
2698 out << " if (gSystem->Exec(\"PROOF-INF/BUILD.sh\")) {" << endl;
2699 out << " ::Error(\"SetupPar\", \"Cannot build par archive %s\", pkgdir.Data());" << endl;
2700 out << " gSystem->ChangeDirectory(cdir);" << endl;
2701 out << " return kFALSE;" << endl;
2702 out << " }" << endl;
2703 out << " } else {" << endl;
2704 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/BUILD.sh for package %s\", pkgdir.Data());" << endl;
2705 out << " gSystem->ChangeDirectory(cdir);" << endl;
2706 out << " return kFALSE;" << endl;
2707 out << " }" << endl;
2708 out << " // Check for SETUP.C and execute" << endl;
2709 out << " if (!gSystem->AccessPathName(\"PROOF-INF/SETUP.C\")) {" << endl;
2710 out << " printf(\"*******************************\\n\");" << endl;
2711 out << " printf(\"*** Setup PAR archive ***\\n\");" << endl;
2712 out << " printf(\"*******************************\\n\");" << endl;
2713 out << " gROOT->Macro(\"PROOF-INF/SETUP.C\");" << endl;
2714 out << " } else {" << endl;
2715 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/SETUP.C for package %s\", pkgdir.Data());" << endl;
2716 out << " gSystem->ChangeDirectory(cdir);" << endl;
2717 out << " return kFALSE;" << endl;
2718 out << " }" << endl;
2719 out << " // Restore original workdir" << endl;
2720 out << " gSystem->ChangeDirectory(cdir);" << endl;
2721 out << " return kTRUE;" << endl;
2724 Info("WriteAnalysisMacro", "\n##### Analysis macro to run on worker nodes <%s> written",fAnalysisMacro.Data());
2726 Bool_t copy = kTRUE;
2727 if (TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
2730 TString workdir = gGrid->GetHomeDirectory();
2731 workdir += fGridWorkingDir;
2732 if (FileExists(fAnalysisMacro)) gGrid->Rm(fAnalysisMacro);
2733 if (IsUsingTags() && !gSystem->AccessPathName("ConfigureCuts.C")) {
2734 if (FileExists("ConfigureCuts.C")) gGrid->Rm("ConfigureCuts.C");
2735 Info("WriteAnalysisMacro", "\n##### Copying cuts configuration macro: <ConfigureCuts.C> to your alien workspace");
2736 TFile::Cp("file:ConfigureCuts.C", Form("alien://%s/ConfigureCuts.C", workdir.Data()));
2738 Info("WriteAnalysisMacro", "\n##### Copying analysis macro: <%s> to your alien workspace", fAnalysisMacro.Data());
2739 TFile::Cp(Form("file:%s",fAnalysisMacro.Data()), Form("alien://%s/%s", workdir.Data(), fAnalysisMacro.Data()));
2743 //______________________________________________________________________________
2744 void AliAnalysisAlien::WriteMergingMacro()
2746 // Write a macro to merge the outputs per master job.
2747 if (!fMergeViaJDL) return;
2748 if (!fOutputFiles.Length()) {
2749 Error("WriteMergingMacro", "No output file names defined. Are you running the right AliAnalysisAlien configuration ?");
2752 TString mergingMacro = fExecutable;
2753 mergingMacro.ReplaceAll(".sh","_merge.C");
2754 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("/%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
2755 if (!TestBit(AliAnalysisGrid::kSubmit)) {
2757 out.open(mergingMacro.Data(), ios::out);
2759 Error("WriteMergingMacro", "could not open file %s for writing", fAnalysisMacro.Data());
2762 Bool_t hasSTEERBase = kFALSE;
2763 Bool_t hasESD = kFALSE;
2764 Bool_t hasAOD = kFALSE;
2765 Bool_t hasANALYSIS = kFALSE;
2766 Bool_t hasANALYSISalice = kFALSE;
2767 Bool_t hasCORRFW = kFALSE;
2768 TString func = mergingMacro;
2770 func.ReplaceAll(".C", "");
2771 out << "void " << func.Data() << "(const char *dir, Int_t stage=0, Int_t ichunk=0)" << endl;
2773 out << "// Automatically generated merging macro executed in grid subjobs" << endl << endl;
2774 out << " TStopwatch timer;" << endl;
2775 out << " timer.Start();" << endl << endl;
2776 if (!fExecutableCommand.Contains("aliroot")) {
2777 out << "// load base root libraries" << endl;
2778 out << " gSystem->Load(\"libTree\");" << endl;
2779 out << " gSystem->Load(\"libGeom\");" << endl;
2780 out << " gSystem->Load(\"libVMC\");" << endl;
2781 out << " gSystem->Load(\"libPhysics\");" << endl << endl;
2782 out << " gSystem->Load(\"libMinuit\");" << endl << endl;
2784 if (fAdditionalRootLibs.Length()) {
2785 // in principle libtree /lib geom libvmc etc. can go into this list, too
2786 out << "// Add aditional libraries" << endl;
2787 TObjArray *list = fAdditionalRootLibs.Tokenize(" ");
2790 while((str=(TObjString*)next())) {
2791 if (str->GetString().Contains(".so"))
2792 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
2794 if (list) delete list;
2796 out << "// include path" << endl;
2797 if (fIncludePath.Length()) out << " gSystem->AddIncludePath(\"" << fIncludePath.Data() << "\");" << endl;
2798 out << " gSystem->AddIncludePath(\"-I$ALICE_ROOT/include\");" << endl << endl;
2799 out << "// Load analysis framework libraries" << endl;
2801 if (!fExecutableCommand.Contains("aliroot")) {
2802 out << " gSystem->Load(\"libSTEERBase\");" << endl;
2803 out << " gSystem->Load(\"libESD\");" << endl;
2804 out << " gSystem->Load(\"libAOD\");" << endl;
2806 out << " gSystem->Load(\"libANALYSIS\");" << endl;
2807 out << " gSystem->Load(\"libANALYSISalice\");" << endl;
2808 out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
2810 TIter next(fPackages);
2813 TString setupPar = "AliAnalysisAlien::SetupPar";
2814 while ((obj=next())) {
2815 pkgname = obj->GetName();
2816 if (pkgname == "STEERBase" ||
2817 pkgname == "STEERBase.par") hasSTEERBase = kTRUE;
2818 if (pkgname == "ESD" ||
2819 pkgname == "ESD.par") hasESD = kTRUE;
2820 if (pkgname == "AOD" ||
2821 pkgname == "AOD.par") hasAOD = kTRUE;
2822 if (pkgname == "ANALYSIS" ||
2823 pkgname == "ANALYSIS.par") hasANALYSIS = kTRUE;
2824 if (pkgname == "ANALYSISalice" ||
2825 pkgname == "ANALYSISalice.par") hasANALYSISalice = kTRUE;
2826 if (pkgname == "CORRFW" ||
2827 pkgname == "CORRFW.par") hasCORRFW = kTRUE;
2829 if (hasANALYSISalice) setupPar = "SetupPar";
2830 if (!hasSTEERBase) out << " gSystem->Load(\"libSTEERBase\");" << endl;
2831 else out << " if (!" << setupPar << "(\"STEERBase\")) return;" << endl;
2832 if (!hasESD) out << " gSystem->Load(\"libESD\");" << endl;
2833 else out << " if (!" << setupPar << "(\"ESD\")) return;" << endl;
2834 if (!hasAOD) out << " gSystem->Load(\"libAOD\");" << endl;
2835 else out << " if (!" << setupPar << "(\"AOD\")) return;" << endl;
2836 if (!hasANALYSIS) out << " gSystem->Load(\"libANALYSIS\");" << endl;
2837 else out << " if (!" << setupPar << "(\"ANALYSIS\")) return;" << endl;
2838 if (!hasANALYSISalice) out << " gSystem->Load(\"libANALYSISalice\");" << endl;
2839 else out << " if (!" << setupPar << "(\"ANALYSISalice\")) return;" << endl;
2840 if (!hasCORRFW) out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
2841 else out << " if (!" << setupPar << "(\"CORRFW\")) return;" << endl << endl;
2842 out << "// Compile other par packages" << endl;
2844 while ((obj=next())) {
2845 pkgname = obj->GetName();
2846 if (pkgname == "STEERBase" ||
2847 pkgname == "STEERBase.par" ||
2849 pkgname == "ESD.par" ||
2851 pkgname == "AOD.par" ||
2852 pkgname == "ANALYSIS" ||
2853 pkgname == "ANALYSIS.par" ||
2854 pkgname == "ANALYSISalice" ||
2855 pkgname == "ANALYSISalice.par" ||
2856 pkgname == "CORRFW" ||
2857 pkgname == "CORRFW.par") continue;
2858 out << " if (!" << setupPar << "(\"" << obj->GetName() << "\")) return;" << endl;
2861 if (fAdditionalLibs.Length()) {
2862 out << "// Add aditional AliRoot libraries" << endl;
2863 TObjArray *list = fAdditionalLibs.Tokenize(" ");
2866 while((str=(TObjString*)next())) {
2867 if (str->GetString().Contains(".so"))
2868 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
2870 if (list) delete list;
2873 out << "// Analysis source to be compiled at runtime (if any)" << endl;
2874 if (fAnalysisSource.Length()) {
2875 TObjArray *list = fAnalysisSource.Tokenize(" ");
2878 while((str=(TObjString*)next())) {
2879 out << " gROOT->ProcessLine(\".L " << str->GetString().Data() << "+g\");" << endl;
2881 if (list) delete list;
2885 if (fFastReadOption) {
2886 Warning("WriteMergingMacro", "!!! You requested FastRead option. Using xrootd flags to reduce timeouts in the grid merging jobs. Note that this may skip some files that could be accessed !!!");
2887 out << "// fast xrootd reading enabled" << endl;
2888 out << " printf(\"!!! You requested FastRead option. Using xrootd flags to reduce timeouts. Note that this may skip some files that could be accessed !!!\");" << endl;
2889 out << " gEnv->SetValue(\"XNet.ConnectTimeout\",10);" << endl;
2890 out << " gEnv->SetValue(\"XNet.RequestTimeout\",10);" << endl;
2891 out << " gEnv->SetValue(\"XNet.MaxRedirectCount\",2);" << endl;
2892 out << " gEnv->SetValue(\"XNet.ReconnectTimeout\",10);" << endl;
2893 out << " gEnv->SetValue(\"XNet.FirstConnectMaxCnt\",1);" << endl << endl;
2895 // Change temp directory to current one
2896 out << "// Set temporary merging directory to current one" << endl;
2897 out << " gSystem->Setenv(\"TMPDIR\", gSystem->pwd());" << endl << endl;
2898 out << "// Connect to AliEn" << endl;
2899 out << " if (!TGrid::Connect(\"alien://\")) return;" << endl;
2900 out << " Bool_t laststage = kFALSE;" << endl;
2901 out << " TString outputDir = dir;" << endl;
2902 out << " TString outputFiles = \"" << fOutputFiles << "\";" << endl;
2903 out << " TString mergeExcludes = \"" << fMergeExcludes << "\";" << endl;
2904 out << " mergeExcludes += \"" << AliAnalysisManager::GetAnalysisManager()->GetExtraFiles() << "\";" << endl;
2905 out << " TObjArray *list = outputFiles.Tokenize(\",\");" << endl;
2906 out << " TIter *iter = new TIter(list);" << endl;
2907 out << " TObjString *str;" << endl;
2908 out << " TString outputFile;" << endl;
2909 out << " Bool_t merged = kTRUE;" << endl;
2910 out << " while((str=(TObjString*)iter->Next())) {" << endl;
2911 out << " outputFile = str->GetString();" << endl;
2912 out << " if (outputFile.Contains(\"*\")) continue;" << endl;
2913 out << " Int_t index = outputFile.Index(\"@\");" << endl;
2914 out << " if (index > 0) outputFile.Remove(index);" << endl;
2915 out << " // Skip already merged outputs" << endl;
2916 out << " if (!gSystem->AccessPathName(outputFile)) {" << endl;
2917 out << " printf(\"Output file <%s> found. Not merging again.\",outputFile.Data());" << endl;
2918 out << " continue;" << endl;
2919 out << " }" << endl;
2920 out << " if (mergeExcludes.Contains(outputFile.Data())) continue;" << endl;
2921 out << " merged = AliAnalysisAlien::MergeOutput(outputFile, outputDir, " << fMaxMergeFiles << ", stage, ichunk);" << endl;
2922 out << " if (!merged) {" << endl;
2923 out << " printf(\"ERROR: Cannot merge %s\\n\", outputFile.Data());" << endl;
2924 out << " return;" << endl;
2925 out << " }" << endl;
2926 out << " // Check if this was the last stage. If yes, run terminate for the tasks." << endl;
2927 out << " if (!gSystem->AccessPathName(outputFile)) laststage = kTRUE;" << endl;
2928 out << " }" << endl;
2929 out << " // all outputs merged, validate" << endl;
2930 out << " ofstream out;" << endl;
2931 out << " out.open(\"outputs_valid\", ios::out);" << endl;
2932 out << " out.close();" << endl;
2933 out << " // read the analysis manager from file" << endl;
2934 TString analysisFile = fExecutable;
2935 analysisFile.ReplaceAll(".sh", ".root");
2936 out << " if (!laststage) return;" << endl;
2937 out << " TFile *file = TFile::Open(\"" << analysisFile << "\");" << endl;
2938 out << " if (!file) return;" << endl;
2939 out << " TIter nextkey(file->GetListOfKeys());" << endl;
2940 out << " AliAnalysisManager *mgr = 0;" << endl;
2941 out << " TKey *key;" << endl;
2942 out << " while ((key=(TKey*)nextkey())) {" << endl;
2943 out << " if (!strcmp(key->GetClassName(), \"AliAnalysisManager\"))" << endl;
2944 out << " mgr = (AliAnalysisManager*)file->Get(key->GetName());" << endl;
2945 out << " };" << endl;
2946 out << " if (!mgr) {" << endl;
2947 out << " ::Error(\"" << func.Data() << "\", \"No analysis manager found in file" << analysisFile <<"\");" << endl;
2948 out << " return;" << endl;
2949 out << " }" << endl << endl;
2950 out << " mgr->SetSkipTerminate(kFALSE);" << endl;
2951 out << " mgr->PrintStatus();" << endl;
2952 if (AliAnalysisManager::GetAnalysisManager()) {
2953 if (AliAnalysisManager::GetAnalysisManager()->GetDebugLevel()>3) {
2954 out << " gEnv->SetValue(\"XNet.Debug\", \"1\");" << endl;
2956 out << " AliLog::SetGlobalLogLevel(AliLog::kError);" << endl;
2959 out << " mgr->StartAnalysis(\"gridterminate\");" << endl;
2960 out << "}" << endl << endl;
2961 if (hasANALYSISalice) {
2962 out <<"//________________________________________________________________________________" << endl;
2963 out << "Bool_t SetupPar(const char *package) {" << endl;
2964 out << "// Compile the package and set it up." << endl;
2965 out << " TString pkgdir = package;" << endl;
2966 out << " pkgdir.ReplaceAll(\".par\",\"\");" << endl;
2967 out << " gSystem->Exec(Form(\"tar xvzf %s.par\", pkgdir.Data()));" << endl;
2968 out << " TString cdir = gSystem->WorkingDirectory();" << endl;
2969 out << " gSystem->ChangeDirectory(pkgdir);" << endl;
2970 out << " // Check for BUILD.sh and execute" << endl;
2971 out << " if (!gSystem->AccessPathName(\"PROOF-INF/BUILD.sh\")) {" << endl;
2972 out << " printf(\"*******************************\\n\");" << endl;
2973 out << " printf(\"*** Building PAR archive ***\\n\");" << endl;
2974 out << " printf(\"*******************************\\n\");" << endl;
2975 out << " if (gSystem->Exec(\"PROOF-INF/BUILD.sh\")) {" << endl;
2976 out << " ::Error(\"SetupPar\", \"Cannot build par archive %s\", pkgdir.Data());" << endl;
2977 out << " gSystem->ChangeDirectory(cdir);" << endl;
2978 out << " return kFALSE;" << endl;
2979 out << " }" << endl;
2980 out << " } else {" << endl;
2981 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/BUILD.sh for package %s\", pkgdir.Data());" << endl;
2982 out << " gSystem->ChangeDirectory(cdir);" << endl;
2983 out << " return kFALSE;" << endl;
2984 out << " }" << endl;
2985 out << " // Check for SETUP.C and execute" << endl;
2986 out << " if (!gSystem->AccessPathName(\"PROOF-INF/SETUP.C\")) {" << endl;
2987 out << " printf(\"*******************************\\n\");" << endl;
2988 out << " printf(\"*** Setup PAR archive ***\\n\");" << endl;
2989 out << " printf(\"*******************************\\n\");" << endl;
2990 out << " gROOT->Macro(\"PROOF-INF/SETUP.C\");" << endl;
2991 out << " } else {" << endl;
2992 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/SETUP.C for package %s\", pkgdir.Data());" << endl;
2993 out << " gSystem->ChangeDirectory(cdir);" << endl;
2994 out << " return kFALSE;" << endl;
2995 out << " }" << endl;
2996 out << " // Restore original workdir" << endl;
2997 out << " gSystem->ChangeDirectory(cdir);" << endl;
2998 out << " return kTRUE;" << endl;
3002 Bool_t copy = kTRUE;
3003 if (TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3006 TString workdir = gGrid->GetHomeDirectory();
3007 workdir += fGridWorkingDir;
3008 if (FileExists(mergingMacro)) gGrid->Rm(mergingMacro);
3009 Info("WriteMergingMacro", "\n##### Copying merging macro: <%s> to your alien workspace", mergingMacro.Data());
3010 TFile::Cp(Form("file:%s",mergingMacro.Data()), Form("alien://%s/%s", workdir.Data(), mergingMacro.Data()));
3014 //______________________________________________________________________________
3015 Bool_t AliAnalysisAlien::SetupPar(const char *package)
3017 // Compile the par file archive pointed by <package>. This must be present in the current directory.
3018 // Note that for loading the compiled library. The current directory should have precedence in
3020 TString pkgdir = package;
3021 pkgdir.ReplaceAll(".par","");
3022 gSystem->Exec(Form("tar xvzf %s.par", pkgdir.Data()));
3023 TString cdir = gSystem->WorkingDirectory();
3024 gSystem->ChangeDirectory(pkgdir);
3025 // Check for BUILD.sh and execute
3026 if (!gSystem->AccessPathName("PROOF-INF/BUILD.sh")) {
3027 printf("**************************************************\n");
3028 printf("*** Building PAR archive %s\n", package);
3029 printf("**************************************************\n");
3030 if (gSystem->Exec("PROOF-INF/BUILD.sh")) {
3031 ::Error("SetupPar", "Cannot build par archive %s", pkgdir.Data());
3032 gSystem->ChangeDirectory(cdir);
3036 ::Error("SetupPar","Cannot access PROOF-INF/BUILD.sh for package %s", pkgdir.Data());
3037 gSystem->ChangeDirectory(cdir);
3040 // Check for SETUP.C and execute
3041 if (!gSystem->AccessPathName("PROOF-INF/SETUP.C")) {
3042 printf("**************************************************\n");
3043 printf("*** Setup PAR archive %s\n", package);
3044 printf("**************************************************\n");
3045 gROOT->Macro("PROOF-INF/SETUP.C");
3046 printf("*** Loaded library: %s\n", gSystem->GetLibraries(pkgdir,"",kFALSE));
3048 ::Error("SetupPar","Cannot access PROOF-INF/SETUP.C for package %s", pkgdir.Data());
3049 gSystem->ChangeDirectory(cdir);
3052 // Restore original workdir
3053 gSystem->ChangeDirectory(cdir);
3057 //______________________________________________________________________________
3058 void AliAnalysisAlien::WriteExecutable()
3060 // Generate the alien executable script.
3061 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3063 out.open(fExecutable.Data(), ios::out);
3065 Error("WriteExecutable", "Bad file name for executable: %s", fExecutable.Data());
3068 out << "#!/bin/bash" << endl;
3069 out << "echo \"=========================================\"" << endl;
3070 out << "echo \"############## PATH : ##############\"" << endl;
3071 out << "echo $PATH" << endl;
3072 out << "echo \"############## LD_LIBRARY_PATH : ##############\"" << endl;
3073 out << "echo $LD_LIBRARY_PATH" << endl;
3074 out << "echo \"############## ROOTSYS : ##############\"" << endl;
3075 out << "echo $ROOTSYS" << endl;
3076 out << "echo \"############## which root : ##############\"" << endl;
3077 out << "which root" << endl;
3078 out << "echo \"############## ALICE_ROOT : ##############\"" << endl;
3079 out << "echo $ALICE_ROOT" << endl;
3080 out << "echo \"############## which aliroot : ##############\"" << endl;
3081 out << "which aliroot" << endl;
3082 out << "echo \"############## system limits : ##############\"" << endl;
3083 out << "ulimit -a" << endl;
3084 out << "echo \"############## memory : ##############\"" << endl;
3085 out << "free -m" << endl;
3086 out << "echo \"=========================================\"" << endl << endl;
3087 // Make sure we can properly compile par files
3088 if (TObject::TestBit(AliAnalysisGrid::kUsePars)) out << "export LD_LIBRARY_PATH=.:$LD_LIBRARY_PATH" << endl;
3089 out << fExecutableCommand << " ";
3090 out << fAnalysisMacro.Data() << " " << fExecutableArgs.Data() << endl << endl;
3091 out << "echo \"======== " << fAnalysisMacro.Data() << " finished with exit code: $? ========\"" << endl;
3092 out << "echo \"############## memory after: ##############\"" << endl;
3093 out << "free -m" << endl;
3095 Bool_t copy = kTRUE;
3096 if (TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3099 TString workdir = gGrid->GetHomeDirectory();
3100 TString bindir = Form("%s/bin", workdir.Data());
3101 if (!DirectoryExists(bindir)) gGrid->Mkdir(bindir,"-p");
3102 workdir += fGridWorkingDir;
3103 TString executable = Form("%s/bin/%s", gGrid->GetHomeDirectory(), fExecutable.Data());
3104 if (FileExists(executable)) gGrid->Rm(executable);
3105 Info("CreateJDL", "\n##### Copying executable file <%s> to your AliEn bin directory", fExecutable.Data());
3106 TFile::Cp(Form("file:%s",fExecutable.Data()), Form("alien://%s", executable.Data()));
3110 //______________________________________________________________________________
3111 void AliAnalysisAlien::WriteMergeExecutable()
3113 // Generate the alien executable script for the merging job.
3114 if (!fMergeViaJDL) return;
3115 TString mergeExec = fExecutable;
3116 mergeExec.ReplaceAll(".sh", "_merge.sh");
3117 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3119 out.open(mergeExec.Data(), ios::out);
3121 Error("WriteMergingExecutable", "Bad file name for executable: %s", mergeExec.Data());
3124 out << "#!/bin/bash" << endl;
3125 out << "echo \"=========================================\"" << endl;
3126 out << "echo \"############## PATH : ##############\"" << endl;
3127 out << "echo $PATH" << endl;
3128 out << "echo \"############## LD_LIBRARY_PATH : ##############\"" << endl;
3129 out << "echo $LD_LIBRARY_PATH" << endl;
3130 out << "echo \"############## ROOTSYS : ##############\"" << endl;
3131 out << "echo $ROOTSYS" << endl;
3132 out << "echo \"############## which root : ##############\"" << endl;
3133 out << "which root" << endl;
3134 out << "echo \"############## ALICE_ROOT : ##############\"" << endl;
3135 out << "echo $ALICE_ROOT" << endl;
3136 out << "echo \"############## which aliroot : ##############\"" << endl;
3137 out << "which aliroot" << endl;
3138 out << "echo \"############## system limits : ##############\"" << endl;
3139 out << "ulimit -a" << endl;
3140 out << "echo \"############## memory : ##############\"" << endl;
3141 out << "free -m" << endl;
3142 out << "echo \"=========================================\"" << endl << endl;
3143 // Make sure we can properly compile par files
3144 if (TObject::TestBit(AliAnalysisGrid::kUsePars)) out << "export LD_LIBRARY_PATH=.:$LD_LIBRARY_PATH" << endl;
3145 TString mergeMacro = fExecutable;
3146 mergeMacro.ReplaceAll(".sh", "_merge.C");
3147 out << "export ARG=\"" << mergeMacro << "(\\\"$1\\\",$2,$3)\"" << endl;
3148 out << fExecutableCommand << " " << "$ARG" << endl;
3149 out << "echo \"======== " << mergeMacro.Data() << " finished with exit code: $? ========\"" << endl;
3150 out << "echo \"############## memory after: ##############\"" << endl;
3151 out << "free -m" << endl;
3153 Bool_t copy = kTRUE;
3154 if (TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3157 TString workdir = gGrid->GetHomeDirectory();
3158 TString bindir = Form("%s/bin", workdir.Data());
3159 if (!DirectoryExists(bindir)) gGrid->Mkdir(bindir,"-p");
3160 workdir += fGridWorkingDir;
3161 TString executable = Form("%s/bin/%s", gGrid->GetHomeDirectory(), mergeExec.Data());
3162 if (FileExists(executable)) gGrid->Rm(executable);
3163 Info("CreateJDL", "\n##### Copying executable file <%s> to your AliEn bin directory", mergeExec.Data());
3164 TFile::Cp(Form("file:%s",mergeExec.Data()), Form("alien://%s", executable.Data()));
3168 //______________________________________________________________________________
3169 void AliAnalysisAlien::WriteProductionFile(const char *filename) const
3171 // Write the production file to be submitted by LPM manager. The format is:
3172 // First line: full_path_to_jdl estimated_no_subjobs_per_master
3173 // Next lines: full_path_to_dataset XXX (XXX is a string)
3174 // To submit, one has to: submit jdl XXX for all lines
3176 out.open(filename, ios::out);
3178 Error("WriteProductionFile", "Bad file name: %s", filename);
3181 TString workdir = gGrid->GetHomeDirectory();
3182 workdir += fGridWorkingDir;
3183 Int_t njobspermaster = 1000*fNrunsPerMaster/fSplitMaxInputFileNumber;
3184 TString locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
3185 out << locjdl << " " << njobspermaster << endl;
3186 Int_t nmasterjobs = fInputFiles->GetEntries();
3187 for (Int_t i=0; i<nmasterjobs; i++) {
3188 TString runOutDir = gSystem->BaseName(fInputFiles->At(i)->GetName());
3189 runOutDir.ReplaceAll(".xml", "");
3191 out << Form("%s", fInputFiles->At(i)->GetName()) << " " << runOutDir << endl;
3193 out << Form("%s", fInputFiles->At(i)->GetName()) << " " << Form("%03d", i) << endl;
3195 Info("WriteProductionFile", "\n##### Copying production file <%s> to your work directory", filename);
3196 if (FileExists(filename)) gGrid->Rm(filename);
3197 TFile::Cp(Form("file:%s",filename), Form("alien://%s/%s", workdir.Data(),filename));
3200 //______________________________________________________________________________
3201 void AliAnalysisAlien::WriteValidationScript(Bool_t merge)
3203 // Generate the alien validation script.
3204 // Generate the validation script
3206 TString validationScript = fExecutable;
3207 if (merge) validationScript.ReplaceAll(".sh", "_mergevalidation.sh");
3208 else validationScript.ReplaceAll(".sh", "_validation.sh");
3210 Error("WriteValidationScript", "Alien connection required");
3213 TString outStream = "";
3214 if (!TestBit(AliAnalysisGrid::kTest)) outStream = " >> stdout";
3215 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3217 out.open(validationScript, ios::out);
3218 out << "#!/bin/bash" << endl;
3219 out << "##################################################" << endl;
3220 out << "validateout=`dirname $0`" << endl;
3221 out << "validatetime=`date`" << endl;
3222 out << "validated=\"0\";" << endl;
3223 out << "error=0" << endl;
3224 out << "if [ -z $validateout ]" << endl;
3225 out << "then" << endl;
3226 out << " validateout=\".\"" << endl;
3227 out << "fi" << endl << endl;
3228 out << "cd $validateout;" << endl;
3229 out << "validateworkdir=`pwd`;" << endl << endl;
3230 out << "echo \"*******************************************************\"" << outStream << endl;
3231 out << "echo \"* Automatically generated validation script *\"" << outStream << endl;
3233 out << "echo \"* Time: $validatetime \"" << outStream << endl;
3234 out << "echo \"* Dir: $validateout\"" << outStream << endl;
3235 out << "echo \"* Workdir: $validateworkdir\"" << outStream << endl;
3236 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl;
3237 out << "ls -la ./" << outStream << endl;
3238 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl << endl;
3239 out << "##################################################" << endl;
3242 out << "if [ ! -f stderr ] ; then" << endl;
3243 out << " error=1" << endl;
3244 out << " echo \"* ########## Job not validated - no stderr ###\" " << outStream << endl;
3245 out << " echo \"Error = $error\" " << outStream << endl;
3246 out << "fi" << endl;
3248 out << "parArch=`grep -Ei \"Cannot Build the PAR Archive\" stderr`" << endl;
3249 out << "segViol=`grep -Ei \"Segmentation violation\" stderr`" << endl;
3250 out << "segFault=`grep -Ei \"Segmentation fault\" stderr`" << endl;
3251 out << "glibcErr=`grep -Ei \"*** glibc detected ***\" stderr`" << endl;
3254 out << "if [ \"$parArch\" != \"\" ] ; then" << endl;
3255 out << " error=1" << endl;
3256 out << " echo \"* ########## Job not validated - PAR archive not built ###\" " << outStream << endl;
3257 out << " echo \"$parArch\" " << outStream << endl;
3258 out << " echo \"Error = $error\" " << outStream << endl;
3259 out << "fi" << endl;
3261 out << "if [ \"$segViol\" != \"\" ] ; then" << endl;
3262 out << " error=1" << endl;
3263 out << " echo \"* ########## Job not validated - Segment. violation ###\" " << outStream << endl;
3264 out << " echo \"$segViol\" " << outStream << endl;
3265 out << " echo \"Error = $error\" " << outStream << endl;
3266 out << "fi" << endl;
3268 out << "if [ \"$segFault\" != \"\" ] ; then" << endl;
3269 out << " error=1" << endl;
3270 out << " echo \"* ########## Job not validated - Segment. fault ###\" " << outStream << endl;
3271 out << " echo \"$segFault\" " << outStream << endl;
3272 out << " echo \"Error = $error\" " << outStream << endl;
3273 out << "fi" << endl;
3275 out << "if [ \"$glibcErr\" != \"\" ] ; then" << endl;
3276 out << " error=1" << endl;
3277 out << " echo \"* ########## Job not validated - *** glibc detected *** ###\" " << outStream << endl;
3278 out << " echo \"$glibcErr\" " << outStream << endl;
3279 out << " echo \"Error = $error\" " << outStream << endl;
3280 out << "fi" << endl;
3282 // Part dedicated to the specific analyses running into the train
3284 TObjArray *arr = fOutputFiles.Tokenize(",");
3287 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
3288 TString extra = mgr->GetExtraFiles();
3289 while ((os=(TObjString*)next1())) {
3291 outputFile = os->GetString();
3292 Int_t index = outputFile.Index("@");
3293 if (index > 0) outputFile.Remove(index);
3294 if (merge && fMergeExcludes.Contains(outputFile)) continue;
3295 if (extra.Contains(outputFile)) continue;
3296 if (outputFile.Contains("*")) continue;
3297 out << "if ! [ -f " << outputFile.Data() << " ] ; then" << endl;
3298 out << " error=1" << endl;
3299 out << " echo \"Output file " << outputFile << " not found. Job FAILED !\"" << outStream << endl;
3300 out << " echo \"Output file " << outputFile << " not found. Job FAILED !\" >> stderr" << endl;
3301 out << "fi" << endl;
3304 out << "if ! [ -f outputs_valid ] ; then" << endl;
3305 out << " error=1" << endl;
3306 out << " echo \"Output files were not validated by the analysis manager\" >> stdout" << endl;
3307 out << " echo \"Output files were not validated by the analysis manager\" >> stderr" << endl;
3308 out << "fi" << endl;
3310 out << "if [ $error = 0 ] ; then" << endl;
3311 out << " echo \"* ---------------- Job Validated ------------------*\"" << outStream << endl;
3312 if (!IsKeepLogs()) {
3313 out << " echo \"* === Logs std* will be deleted === \"" << endl;
3315 out << " rm -f std*" << endl;
3317 out << "fi" << endl;
3319 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl;
3320 out << "echo \"*******************************************************\"" << outStream << endl;
3321 out << "cd -" << endl;
3322 out << "exit $error" << endl;
3324 Bool_t copy = kTRUE;
3325 if (TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3328 TString workdir = gGrid->GetHomeDirectory();
3329 workdir += fGridWorkingDir;
3330 Info("CreateJDL", "\n##### Copying validation script <%s> to your AliEn working space", validationScript.Data());
3331 if (FileExists(validationScript)) gGrid->Rm(validationScript);
3332 TFile::Cp(Form("file:%s",validationScript.Data()), Form("alien://%s/%s", workdir.Data(),validationScript.Data()));