1 /**************************************************************************
2 * Copyright(c) 1998-2007, ALICE Experiment at CERN, All rights reserved. *
4 * Author: The ALICE Off-line Project. *
5 * Contributors are mentioned in the code where appropriate. *
7 * Permission to use, copy, modify and distribute this software and its *
8 * documentation strictly for non-commercial purposes is hereby granted *
9 * without fee, provided that the above copyright notice appears in all *
10 * copies and that both the copyright notice and this permission notice *
11 * appear in the supporting documentation. The authors make no claims *
12 * about the suitability of this software for any purpose. It is *
13 * provided "as is" without express or implied warranty. *
14 **************************************************************************/
16 // Author: Mihaela Gheata, 01/09/2008
18 //==============================================================================
19 // AliAnalysisAlien - AliEn utility class. Provides interface for creating
20 // a personalized JDL, finding and creating a dataset.
21 //==============================================================================
23 #include "Riostream.h"
30 #include "TObjString.h"
31 #include "TObjArray.h"
33 #include "TGridResult.h"
34 #include "TGridCollection.h"
36 #include "TGridJobStatusList.h"
37 #include "TGridJobStatus.h"
38 #include "TFileMerger.h"
39 #include "AliAnalysisManager.h"
40 #include "AliVEventHandler.h"
41 #include "AliAnalysisDataContainer.h"
42 #include "AliAnalysisAlien.h"
44 ClassImp(AliAnalysisAlien)
46 //______________________________________________________________________________
47 AliAnalysisAlien::AliAnalysisAlien()
53 fSplitMaxInputFileNumber(0),
55 fMasterResubmitThreshold(0),
73 fAdditionalRootLibs(),
104 //______________________________________________________________________________
105 AliAnalysisAlien::AliAnalysisAlien(const char *name)
106 :AliAnalysisGrid(name),
111 fSplitMaxInputFileNumber(0),
113 fMasterResubmitThreshold(0),
126 fExecutableCommand(),
131 fAdditionalRootLibs(),
162 //______________________________________________________________________________
163 AliAnalysisAlien::AliAnalysisAlien(const AliAnalysisAlien& other)
164 :AliAnalysisGrid(other),
167 fPrice(other.fPrice),
169 fSplitMaxInputFileNumber(other.fSplitMaxInputFileNumber),
170 fMaxInitFailed(other.fMaxInitFailed),
171 fMasterResubmitThreshold(other.fMasterResubmitThreshold),
172 fNtestFiles(other.fNtestFiles),
173 fNrunsPerMaster(other.fNrunsPerMaster),
174 fMaxMergeFiles(other.fMaxMergeFiles),
175 fNsubmitted(other.fNsubmitted),
176 fProductionMode(other.fProductionMode),
177 fOutputToRunNo(other.fOutputToRunNo),
178 fMergeViaJDL(other.fMergeViaJDL),
179 fFastReadOption(other.fFastReadOption),
180 fOverwriteMode(other.fOverwriteMode),
181 fNreplicas(other.fNreplicas),
182 fRunNumbers(other.fRunNumbers),
183 fExecutable(other.fExecutable),
184 fExecutableCommand(other.fExecutableCommand),
185 fArguments(other.fArguments),
186 fExecutableArgs(other.fExecutableArgs),
187 fAnalysisMacro(other.fAnalysisMacro),
188 fAnalysisSource(other.fAnalysisSource),
189 fAdditionalRootLibs(other.fAdditionalRootLibs),
190 fAdditionalLibs(other.fAdditionalLibs),
191 fSplitMode(other.fSplitMode),
192 fAPIVersion(other.fAPIVersion),
193 fROOTVersion(other.fROOTVersion),
194 fAliROOTVersion(other.fAliROOTVersion),
195 fExternalPackages(other.fExternalPackages),
197 fGridWorkingDir(other.fGridWorkingDir),
198 fGridDataDir(other.fGridDataDir),
199 fDataPattern(other.fDataPattern),
200 fGridOutputDir(other.fGridOutputDir),
201 fOutputArchive(other.fOutputArchive),
202 fOutputFiles(other.fOutputFiles),
203 fInputFormat(other.fInputFormat),
204 fDatasetName(other.fDatasetName),
205 fJDLName(other.fJDLName),
206 fMergeExcludes(other.fMergeExcludes),
207 fIncludePath(other.fIncludePath),
208 fCloseSE(other.fCloseSE),
209 fFriendChainName(other.fFriendChainName),
210 fJobTag(other.fJobTag),
211 fOutputSingle(other.fOutputSingle),
212 fRunPrefix(other.fRunPrefix),
217 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
218 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
219 fRunRange[0] = other.fRunRange[0];
220 fRunRange[1] = other.fRunRange[1];
221 if (other.fInputFiles) {
222 fInputFiles = new TObjArray();
223 TIter next(other.fInputFiles);
225 while ((obj=next())) fInputFiles->Add(new TObjString(obj->GetName()));
226 fInputFiles->SetOwner();
228 if (other.fPackages) {
229 fPackages = new TObjArray();
230 TIter next(other.fPackages);
232 while ((obj=next())) fPackages->Add(new TObjString(obj->GetName()));
233 fPackages->SetOwner();
237 //______________________________________________________________________________
238 AliAnalysisAlien::~AliAnalysisAlien()
241 if (fGridJDL) delete fGridJDL;
242 if (fMergingJDL) delete fMergingJDL;
243 if (fInputFiles) delete fInputFiles;
244 if (fPackages) delete fPackages;
247 //______________________________________________________________________________
248 AliAnalysisAlien &AliAnalysisAlien::operator=(const AliAnalysisAlien& other)
251 if (this != &other) {
252 AliAnalysisGrid::operator=(other);
253 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
254 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
255 fPrice = other.fPrice;
257 fSplitMaxInputFileNumber = other.fSplitMaxInputFileNumber;
258 fMaxInitFailed = other.fMaxInitFailed;
259 fMasterResubmitThreshold = other.fMasterResubmitThreshold;
260 fNtestFiles = other.fNtestFiles;
261 fNrunsPerMaster = other.fNrunsPerMaster;
262 fMaxMergeFiles = other.fMaxMergeFiles;
263 fNsubmitted = other.fNsubmitted;
264 fProductionMode = other.fProductionMode;
265 fOutputToRunNo = other.fOutputToRunNo;
266 fMergeViaJDL = other.fMergeViaJDL;
267 fFastReadOption = other.fFastReadOption;
268 fOverwriteMode = other.fOverwriteMode;
269 fNreplicas = other.fNreplicas;
270 fRunNumbers = other.fRunNumbers;
271 fExecutable = other.fExecutable;
272 fExecutableCommand = other.fExecutableCommand;
273 fArguments = other.fArguments;
274 fExecutableArgs = other.fExecutableArgs;
275 fAnalysisMacro = other.fAnalysisMacro;
276 fAnalysisSource = other.fAnalysisSource;
277 fAdditionalRootLibs = other.fAdditionalRootLibs;
278 fAdditionalLibs = other.fAdditionalLibs;
279 fSplitMode = other.fSplitMode;
280 fAPIVersion = other.fAPIVersion;
281 fROOTVersion = other.fROOTVersion;
282 fAliROOTVersion = other.fAliROOTVersion;
283 fExternalPackages = other.fExternalPackages;
285 fGridWorkingDir = other.fGridWorkingDir;
286 fGridDataDir = other.fGridDataDir;
287 fDataPattern = other.fDataPattern;
288 fGridOutputDir = other.fGridOutputDir;
289 fOutputArchive = other.fOutputArchive;
290 fOutputFiles = other.fOutputFiles;
291 fInputFormat = other.fInputFormat;
292 fDatasetName = other.fDatasetName;
293 fJDLName = other.fJDLName;
294 fMergeExcludes = other.fMergeExcludes;
295 fIncludePath = other.fIncludePath;
296 fCloseSE = other.fCloseSE;
297 fFriendChainName = other.fFriendChainName;
298 fJobTag = other.fJobTag;
299 fOutputSingle = other.fOutputSingle;
300 fRunPrefix = other.fRunPrefix;
301 if (other.fInputFiles) {
302 fInputFiles = new TObjArray();
303 TIter next(other.fInputFiles);
305 while ((obj=next())) fInputFiles->Add(new TObjString(obj->GetName()));
306 fInputFiles->SetOwner();
308 if (other.fPackages) {
309 fPackages = new TObjArray();
310 TIter next(other.fPackages);
312 while ((obj=next())) fPackages->Add(new TObjString(obj->GetName()));
313 fPackages->SetOwner();
319 //______________________________________________________________________________
320 void AliAnalysisAlien::AddIncludePath(const char *path)
322 // Add include path in the remote analysis macro.
324 if (p.Contains("-I")) fIncludePath += Form("%s ", path);
325 else fIncludePath += Form("-I%s ", path);
328 //______________________________________________________________________________
329 void AliAnalysisAlien::AddRunNumber(Int_t run)
331 // Add a run number to the list of runs to be processed.
332 if (fRunNumbers.Length()) fRunNumbers += " ";
333 fRunNumbers += Form("%s%d", fRunPrefix.Data(), run);
336 //______________________________________________________________________________
337 void AliAnalysisAlien::AddRunNumber(const char* run)
339 // Add a run number to the list of runs to be processed.
340 if (fRunNumbers.Length()) fRunNumbers += " ";
344 //______________________________________________________________________________
345 void AliAnalysisAlien::AddDataFile(const char *lfn)
347 // Adds a data file to the input to be analysed. The file should be a valid LFN
348 // or point to an existing file in the alien workdir.
349 if (!fInputFiles) fInputFiles = new TObjArray();
350 fInputFiles->Add(new TObjString(lfn));
353 //______________________________________________________________________________
354 void AliAnalysisAlien::AddExternalPackage(const char *package)
356 // Adds external packages w.r.t to the default ones (root,aliroot and gapi)
357 if (fExternalPackages) fExternalPackages += " ";
358 fExternalPackages += package;
361 //______________________________________________________________________________
362 Bool_t AliAnalysisAlien::Connect()
364 // Try to connect to AliEn. User needs a valid token and /tmp/gclient_env_$UID sourced.
365 if (gGrid && gGrid->IsConnected()) return kTRUE;
367 Info("Connect", "Trying to connect to AliEn ...");
368 TGrid::Connect("alien://");
370 if (!gGrid || !gGrid->IsConnected()) {
371 Error("Connect", "Did not managed to connect to AliEn. Make sure you have a valid token.");
374 fUser = gGrid->GetUser();
375 Info("Connect", "\n##### Connected to AliEn as user %s. Setting analysis user to <%s>", fUser.Data(), fUser.Data());
379 //______________________________________________________________________________
380 void AliAnalysisAlien::CdWork()
382 // Check validity of alien workspace. Create directory if possible.
384 Error("CdWork", "Alien connection required");
387 TString homedir = gGrid->GetHomeDirectory();
388 TString workdir = homedir + fGridWorkingDir;
389 if (DirectoryExists(workdir)) {
393 // Work directory not existing - create it
395 if (gGrid->Mkdir(workdir, "-p")) {
396 gGrid->Cd(fGridWorkingDir);
397 Info("CreateJDL", "\n##### Created alien working directory %s", fGridWorkingDir.Data());
399 Warning("CreateJDL", "Working directory %s cannot be created.\n Using %s instead.",
400 workdir.Data(), homedir.Data());
401 fGridWorkingDir = "";
405 //______________________________________________________________________________
406 Bool_t AliAnalysisAlien::CheckFileCopy(const char *alienpath)
408 // Check if file copying is possible.
410 Error("CheckFileCopy", "Not connected to AliEn. File copying cannot be tested.");
413 Info("CheckFileCopy", "Checking possibility to copy files to your AliEn home directory... \
414 \n +++ NOTE: You can disable this via: plugin->SetCheckCopy(kFALSE);");
415 // Check if alien_CLOSE_SE is defined
416 TString closeSE = gSystem->Getenv("alien_CLOSE_SE");
417 if (!closeSE.IsNull()) {
418 Info("CheckFileCopy", "Your current close storage is pointing to: \
419 \n alien_CLOSE_SE = \"%s\"", closeSE.Data());
421 Warning("CheckFileCopy", "Your current close storage is empty ! Depending on your location, file copying may fail.");
423 // Check if grid directory exists.
424 if (!DirectoryExists(alienpath)) {
425 Error("CheckFileCopy", "Alien path %s does not seem to exist", alienpath);
428 TFile f("plugin_test_copy", "RECREATE");
429 // User may not have write permissions to current directory
431 Error("CheckFileCopy", "Cannot create local test file. Do you have write access to current directory: <%s> ?",
432 gSystem->WorkingDirectory());
436 if (FileExists(Form("alien://%s/%s",alienpath, f.GetName()))) gGrid->Rm(Form("alien://%s/%s",alienpath, f.GetName()));
437 if (!TFile::Cp(f.GetName(), Form("alien://%s/%s",alienpath, f.GetName()))) {
438 Error("CheckFileCopy", "Cannot copy files to Alien destination: <%s> This may be temporary, or: \
439 \n# 1. Make sure you have write permissions there. If this is the case: \
440 \n# 2. Check the storage availability at: http://alimonitor.cern.ch/stats?page=SE/table \
441 \n# Do: export alien_CLOSE_SE=\"working_disk_SE\" \
442 \n# To make this permanent put in in your .bashrc (in .alienshrc is not enough) \
443 \n# Redo token: rm /tmp/x509up_u$UID then: alien-token-init <username>", alienpath);
444 gSystem->Unlink(f.GetName());
447 gSystem->Unlink(f.GetName());
448 gGrid->Rm(Form("%s%s",alienpath,f.GetName()));
449 Info("CheckFileCopy", "### ...SUCCESS ###");
453 //______________________________________________________________________________
454 Bool_t AliAnalysisAlien::CheckInputData()
456 // Check validity of input data. If necessary, create xml files.
457 if (!fInputFiles && !fRunNumbers.Length() && !fRunRange[0]) {
458 if (!fGridDataDir.Length()) {
459 Error("CkeckInputData", "AliEn path to base data directory must be set.\n = Use: SetGridDataDir()");
462 Info("CheckInputData", "Analysis will make a single xml for base data directory %s",fGridDataDir.Data());
463 if (fDataPattern.Contains("tag") && TestBit(AliAnalysisGrid::kTest))
464 TObject::SetBit(AliAnalysisGrid::kUseTags, kTRUE); // ADDED (fix problem in determining the tag usage in test mode)
467 // Process declared files
468 Bool_t isCollection = kFALSE;
469 Bool_t isXml = kFALSE;
470 Bool_t useTags = kFALSE;
471 Bool_t checked = kFALSE;
474 TString workdir = gGrid->GetHomeDirectory();
475 workdir += fGridWorkingDir;
478 TIter next(fInputFiles);
479 while ((objstr=(TObjString*)next())) {
482 file += objstr->GetString();
483 // Store full lfn path
484 if (FileExists(file)) objstr->SetString(file);
486 file = objstr->GetName();
487 if (!FileExists(objstr->GetName())) {
488 Error("CheckInputData", "Data file %s not found or not in your working dir: %s",
489 objstr->GetName(), workdir.Data());
493 Bool_t iscoll, isxml, usetags;
494 CheckDataType(file, iscoll, isxml, usetags);
497 isCollection = iscoll;
500 TObject::SetBit(AliAnalysisGrid::kUseTags, useTags);
502 if ((iscoll != isCollection) || (isxml != isXml) || (usetags != useTags)) {
503 Error("CheckInputData", "Some conflict was found in the types of inputs");
509 // Process requested run numbers
510 if (!fRunNumbers.Length() && !fRunRange[0]) return kTRUE;
511 // Check validity of alien data directory
512 if (!fGridDataDir.Length()) {
513 Error("CkeckInputData", "AliEn path to base data directory must be set.\n = Use: SetGridDataDir()");
516 if (!DirectoryExists(fGridDataDir)) {
517 Error("CheckInputData", "Data directory %s not existing.", fGridDataDir.Data());
521 Error("CheckInputData", "You are using raw AliEn collections as input. Cannot process run numbers.");
525 if (checked && !isXml) {
526 Error("CheckInputData", "Cannot mix processing of full runs with non-xml files");
529 // Check validity of run number(s)
533 TString schunk, schunk2;
537 useTags = fDataPattern.Contains("tag");
538 TObject::SetBit(AliAnalysisGrid::kUseTags, useTags);
540 if (useTags != fDataPattern.Contains("tag")) {
541 Error("CheckInputData", "Cannot mix input files using/not using tags");
544 if (fRunNumbers.Length()) {
545 Info("CheckDataType", "Using supplied run numbers (run ranges are ignored)");
546 arr = fRunNumbers.Tokenize(" ");
548 while ((os=(TObjString*)next())) {
549 path = Form("%s/%s ", fGridDataDir.Data(), os->GetString().Data());
550 if (!DirectoryExists(path)) {
551 Warning("CheckInputData", "Run number %s not found in path: <%s>", os->GetString().Data(), path.Data());
554 path = Form("%s/%s.xml", workdir.Data(),os->GetString().Data());
555 TString msg = "\n##### file: ";
557 msg += " type: xml_collection;";
558 if (useTags) msg += " using_tags: Yes";
559 else msg += " using_tags: No";
560 Info("CheckDataType", "%s", msg.Data());
561 if (fNrunsPerMaster<2) {
562 AddDataFile(Form("%s.xml", os->GetString().Data()));
565 if (((nruns-1)%fNrunsPerMaster) == 0) {
566 schunk = os->GetString();
568 if ((nruns%fNrunsPerMaster)!=0 && os!=arr->Last()) continue;
569 schunk += Form("_%s.xml", os->GetString().Data());
575 Info("CheckDataType", "Using run range [%d, %d]", fRunRange[0], fRunRange[1]);
576 for (Int_t irun=fRunRange[0]; irun<=fRunRange[1]; irun++) {
577 path = Form("%s/%s%d ", fGridDataDir.Data(), fRunPrefix.Data(), irun);
578 if (!DirectoryExists(path)) {
579 // Warning("CheckInputData", "Run number %d not found in path: <%s>", irun, path.Data());
582 path = Form("%s/%s%d.xml", workdir.Data(),fRunPrefix.Data(),irun);
583 TString msg = "\n##### file: ";
585 msg += " type: xml_collection;";
586 if (useTags) msg += " using_tags: Yes";
587 else msg += " using_tags: No";
588 Info("CheckDataType", "%s", msg.Data());
589 if (fNrunsPerMaster<2) {
590 AddDataFile(Form("%s%d.xml",fRunPrefix.Data(),irun));
593 if (((nruns-1)%fNrunsPerMaster) == 0) {
594 schunk = Form("%s%d", fRunPrefix.Data(),irun);
596 schunk2 = Form("_%s%d.xml", fRunPrefix.Data(), irun);
597 if ((nruns%fNrunsPerMaster)!=0 && irun != fRunRange[1]) continue;
610 //______________________________________________________________________________
611 Bool_t AliAnalysisAlien::CreateDataset(const char *pattern)
613 // Create dataset for the grid data directory + run number.
614 if (TestBit(AliAnalysisGrid::kOffline)) return kTRUE;
616 Error("CreateDataset", "Cannot create dataset with no grid connection");
622 TString workdir = gGrid->GetHomeDirectory();
623 workdir += fGridWorkingDir;
625 // Compose the 'find' command arguments
627 TString options = "-x collection ";
628 if (TestBit(AliAnalysisGrid::kTest)) options += Form("-l %d ", fNtestFiles);
629 TString conditions = "";
634 TString schunk, schunk2;
635 TGridCollection *cbase=0, *cadd=0;
636 if (!fRunNumbers.Length() && !fRunRange[0]) {
637 if (fInputFiles && fInputFiles->GetEntries()) return kTRUE;
638 // Make a single data collection from data directory.
640 if (!DirectoryExists(path)) {
641 Error("CreateDataset", "Path to data directory %s not valid",fGridDataDir.Data());
645 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
646 else file = Form("%s.xml", gSystem->BaseName(path));
647 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
653 command += conditions;
654 printf("command: %s\n", command.Data());
655 TGridResult *res = gGrid->Command(command);
657 // Write standard output to file
658 gROOT->ProcessLine(Form("gGrid->Stdout(); > %s", file.Data()));
659 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
660 Bool_t nullFile = kFALSE;
662 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
664 nullFile = (gSystem->Exec(Form("grep /event %s 2>/dev/null > /dev/null",file.Data()))==0)?kFALSE:kTRUE;
666 Error("CreateDataset","Dataset %s produced by the previous find command is empty !", file.Data());
671 Bool_t fileExists = FileExists(file);
672 if (!TestBit(AliAnalysisGrid::kTest) && (!fileExists || fOverwriteMode)) {
673 // Copy xml file to alien space
674 if (fileExists) gGrid->Rm(file);
675 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
676 if (!FileExists(file)) {
677 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
680 // Update list of files to be processed.
682 AddDataFile(Form("%s/%s", workdir.Data(), file.Data()));
686 Bool_t nullResult = kTRUE;
687 if (fRunNumbers.Length()) {
688 TObjArray *arr = fRunNumbers.Tokenize(" ");
691 while ((os=(TObjString*)next())) {
692 path = Form("%s/%s ", fGridDataDir.Data(), os->GetString().Data());
693 if (!DirectoryExists(path)) continue;
695 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
696 else file = Form("%s.xml", os->GetString().Data());
697 // If local collection file does not exist, create it via 'find' command.
698 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
703 command += conditions;
704 TGridResult *res = gGrid->Command(command);
706 // Write standard output to file
707 gROOT->ProcessLine(Form("gGrid->Stdout(); > %s", file.Data()));
708 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
709 Bool_t nullFile = kFALSE;
711 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
713 nullFile = (gSystem->Exec(Form("grep /event %s 2>/dev/null > /dev/null",file.Data()))==0)?kFALSE:kTRUE;
715 Warning("CreateDataset","Dataset %s produced by: <%s> is empty !", file.Data(), command.Data());
716 fRunNumbers.ReplaceAll(os->GetString().Data(), "");
722 if (TestBit(AliAnalysisGrid::kTest)) break;
723 // Check if there is one run per master job.
724 if (fNrunsPerMaster<2) {
725 if (FileExists(file)) {
726 if (fOverwriteMode) gGrid->Rm(file);
728 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
732 // Copy xml file to alien space
733 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
734 if (!FileExists(file)) {
735 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
741 if (((nruns-1)%fNrunsPerMaster) == 0) {
742 schunk = os->GetString();
743 cbase = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
745 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
746 printf(" Merging collection <%s> into masterjob input...\n", file.Data());
750 if ((nruns%fNrunsPerMaster)!=0 && os!=arr->Last()) {
753 schunk += Form("_%s.xml", os->GetString().Data());
754 if (FileExists(schunk)) {
755 if (fOverwriteMode) gGrid->Rm(file);
757 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", schunk.Data());
761 printf("Exporting merged collection <%s> and copying to AliEn\n", schunk.Data());
762 cbase->ExportXML(Form("file://%s", schunk.Data()),kFALSE,kFALSE, schunk, "Merged runs");
763 TFile::Cp(Form("file:%s",schunk.Data()), Form("alien://%s/%s",workdir.Data(), schunk.Data()));
764 if (!FileExists(schunk)) {
765 Error("CreateDataset", "Copy command did NOT succeed for %s", schunk.Data());
773 Error("CreateDataset", "No valid dataset corresponding to the query!");
777 // Process a full run range.
778 for (Int_t irun=fRunRange[0]; irun<=fRunRange[1]; irun++) {
779 path = Form("%s/%s%d ", fGridDataDir.Data(), fRunPrefix.Data(), irun);
780 if (!DirectoryExists(path)) continue;
782 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
783 else file = Form("%s%d.xml", fRunPrefix.Data(), irun);
784 if (FileExists(file) && fNrunsPerMaster<2 && !TestBit(AliAnalysisGrid::kTest)) {
785 if (fOverwriteMode) gGrid->Rm(file);
787 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
791 // If local collection file does not exist, create it via 'find' command.
792 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
797 command += conditions;
798 TGridResult *res = gGrid->Command(command);
800 // Write standard output to file
801 gROOT->ProcessLine(Form("gGrid->Stdout(); > %s", file.Data()));
802 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
803 Bool_t nullFile = kFALSE;
805 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
807 nullFile = (gSystem->Exec(Form("grep /event %s 2>/dev/null > /dev/null",file.Data()))==0)?kFALSE:kTRUE;
809 Warning("CreateDataset","Dataset %s produced by: <%s> is empty !", file.Data(), command.Data());
815 if (TestBit(AliAnalysisGrid::kTest)) break;
816 // Check if there is one run per master job.
817 if (fNrunsPerMaster<2) {
818 if (FileExists(file)) {
819 if (fOverwriteMode) gGrid->Rm(file);
821 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
825 // Copy xml file to alien space
826 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
827 if (!FileExists(file)) {
828 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
833 // Check if the collection for the chunk exist locally.
834 Int_t nchunk = (nruns-1)/fNrunsPerMaster;
835 if (FileExists(fInputFiles->At(nchunk)->GetName())) {
836 if (fOverwriteMode) gGrid->Rm(fInputFiles->At(nchunk)->GetName());
839 printf(" Merging collection <%s> into %d runs chunk...\n",file.Data(),fNrunsPerMaster);
840 if (((nruns-1)%fNrunsPerMaster) == 0) {
841 schunk = Form("%s%d", fRunPrefix.Data(), irun);
842 cbase = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
844 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
848 schunk2 = Form("%s_%s%d.xml", schunk.Data(), fRunPrefix.Data(), irun);
849 if ((nruns%fNrunsPerMaster)!=0 && irun!=fRunRange[1] && schunk2 != fInputFiles->Last()->GetName()) {
853 if (FileExists(schunk)) {
854 if (fOverwriteMode) gGrid->Rm(schunk);
856 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", schunk.Data());
860 printf("Exporting merged collection <%s> and copying to AliEn.\n", schunk.Data());
861 cbase->ExportXML(Form("file://%s", schunk.Data()),kFALSE,kFALSE, schunk, "Merged runs");
862 if (FileExists(schunk)) {
863 if (fOverwriteMode) gGrid->Rm(schunk);
865 Info("CreateDataset", "\n##### Dataset %s exist. Skipping copy...", schunk.Data());
869 TFile::Cp(Form("file:%s",schunk.Data()), Form("alien://%s/%s",workdir.Data(), schunk.Data()));
870 if (!FileExists(schunk)) {
871 Error("CreateDataset", "Copy command did NOT succeed for %s", schunk.Data());
877 Error("CreateDataset", "No valid dataset corresponding to the query!");
884 //______________________________________________________________________________
885 Bool_t AliAnalysisAlien::CreateJDL()
887 // Generate a JDL file according to current settings. The name of the file is
888 // specified by fJDLName.
889 Bool_t error = kFALSE;
892 if (TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
893 Bool_t generate = kTRUE;
894 if (TestBit(AliAnalysisGrid::kTest) || TestBit(AliAnalysisGrid::kSubmit)) generate = kFALSE;
896 Error("CreateJDL", "Alien connection required");
899 // Check validity of alien workspace
901 TString workdir = gGrid->GetHomeDirectory();
902 workdir += fGridWorkingDir;
906 Error("CreateJDL()", "Define some input files for your analysis.");
909 // Compose list of input files
910 // Check if output files were defined
911 if (!fOutputFiles.Length()) {
912 Error("CreateJDL", "You must define at least one output file");
915 // Check if an output directory was defined and valid
916 if (!fGridOutputDir.Length()) {
917 Error("CreateJDL", "You must define AliEn output directory");
920 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s", workdir.Data(), fGridOutputDir.Data());
921 if (!DirectoryExists(fGridOutputDir)) {
922 if (gGrid->Mkdir(fGridOutputDir,"-p")) {
923 Info("CreateJDL", "\n##### Created alien output directory %s", fGridOutputDir.Data());
925 Error("CreateJDL", "Could not create alien output directory %s", fGridOutputDir.Data());
931 // Exit if any error up to now
932 if (error) return kFALSE;
934 if (!fUser.IsNull()) {
935 fGridJDL->SetValue("User", Form("\"%s\"", fUser.Data()));
936 fMergingJDL->SetValue("User", Form("\"%s\"", fUser.Data()));
938 fGridJDL->SetExecutable(fExecutable, "This is the startup script");
939 TString mergeExec = fExecutable;
940 mergeExec.ReplaceAll(".sh", "_merge.sh");
941 fMergingJDL->SetExecutable(mergeExec, "This is the startup script");
942 mergeExec.ReplaceAll(".sh", ".C");
943 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),mergeExec.Data()), "List of input files to be uploaded to workers");
944 if (!fArguments.IsNull())
945 fGridJDL->SetArguments(fArguments, "Arguments for the executable command");
946 fMergingJDL->SetArguments("$1 $2 $3");
947 fGridJDL->SetValue("TTL", Form("\"%d\"",fTTL));
948 fGridJDL->SetDescription("TTL", Form("Time after which the job is killed (%d min.)", fTTL/60));
949 fMergingJDL->SetValue("TTL", Form("\"%d\"",fTTL));
950 fMergingJDL->SetDescription("TTL", Form("Time after which the job is killed (%d min.)", fTTL/60));
952 if (fMaxInitFailed > 0) {
953 fGridJDL->SetValue("MaxInitFailed", Form("\"%d\"",fMaxInitFailed));
954 fGridJDL->SetDescription("MaxInitFailed", "Maximum number of first failing jobs to abort the master job");
956 if (fSplitMaxInputFileNumber > 0) {
957 fGridJDL->SetValue("SplitMaxInputFileNumber", Form("\"%d\"", fSplitMaxInputFileNumber));
958 fGridJDL->SetDescription("SplitMaxInputFileNumber", "Maximum number of input files to be processed per subjob");
960 if (fSplitMode.Length()) {
961 fGridJDL->SetValue("Split", Form("\"%s\"", fSplitMode.Data()));
962 fGridJDL->SetDescription("Split", "We split per SE or file");
964 if (!fAliROOTVersion.IsNull()) {
965 fGridJDL->AddToPackages("AliRoot", fAliROOTVersion,"VO_ALICE", "List of requested packages");
966 fMergingJDL->AddToPackages("AliRoot", fAliROOTVersion, "VO_ALICE", "List of requested packages");
968 if (!fROOTVersion.IsNull()) {
969 fGridJDL->AddToPackages("ROOT", fROOTVersion);
970 fMergingJDL->AddToPackages("ROOT", fROOTVersion);
972 if (!fAPIVersion.IsNull()) {
973 fGridJDL->AddToPackages("APISCONFIG", fAPIVersion);
974 fMergingJDL->AddToPackages("APISCONFIG", fAPIVersion);
976 if (!fExternalPackages.IsNull()) {
977 arr = fExternalPackages.Tokenize(" ");
979 while ((os=(TObjString*)next())) {
980 TString pkgname = os->GetString();
981 Int_t index = pkgname.Index("::");
982 TString pkgversion = pkgname(index+2, pkgname.Length());
983 pkgname.Remove(index);
984 fGridJDL->AddToPackages(pkgname, pkgversion);
985 fMergingJDL->AddToPackages(pkgname, pkgversion);
989 fGridJDL->SetInputDataListFormat(fInputFormat, "Format of input data");
990 fGridJDL->SetInputDataList("wn.xml", "Collection name to be processed on each worker node");
991 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), fAnalysisMacro.Data()), "List of input files to be uploaded to workers");
992 TString analysisFile = fExecutable;
993 analysisFile.ReplaceAll(".sh", ".root");
994 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),analysisFile.Data()));
995 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),analysisFile.Data()));
996 if (IsUsingTags() && !gSystem->AccessPathName("ConfigureCuts.C"))
997 fGridJDL->AddToInputSandbox(Form("LF:%s/ConfigureCuts.C", workdir.Data()));
998 if (fAdditionalLibs.Length()) {
999 arr = fAdditionalLibs.Tokenize(" ");
1001 while ((os=(TObjString*)next())) {
1002 if (os->GetString().Contains(".so")) continue;
1003 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), os->GetString().Data()));
1004 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), os->GetString().Data()));
1009 TIter next(fPackages);
1011 while ((obj=next())) {
1012 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), obj->GetName()));
1013 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), obj->GetName()));
1016 if (fOutputArchive.Length()) {
1017 arr = fOutputArchive.Tokenize(" ");
1019 Bool_t first = kTRUE;
1020 const char *comment = "Files to be archived";
1021 const char *comment1 = comment;
1022 while ((os=(TObjString*)next())) {
1023 if (!first) comment = NULL;
1024 if (!os->GetString().Contains("@") && fCloseSE.Length())
1025 fGridJDL->AddToOutputArchive(Form("%s@%s",os->GetString().Data(), fCloseSE.Data()), comment);
1027 fGridJDL->AddToOutputArchive(os->GetString(), comment);
1031 TString outputArchive = fOutputArchive;
1032 if (!fMergeExcludes.IsNull()) {
1033 arr = fMergeExcludes.Tokenize(" ");
1035 while ((os=(TObjString*)next1())) {
1036 outputArchive.ReplaceAll(Form("%s,",os->GetString().Data()),"");
1037 outputArchive.ReplaceAll(os->GetString(),"");
1041 arr = outputArchive.Tokenize(" ");
1045 while ((os=(TObjString*)next2())) {
1046 if (!first) comment = NULL;
1047 TString currentfile = os->GetString();
1048 currentfile.ReplaceAll(".root", "*.root");
1049 currentfile.ReplaceAll(".zip", "-Stage$2_$3.zip");
1050 if (!currentfile.Contains("@") && fCloseSE.Length())
1051 fMergingJDL->AddToOutputArchive(Form("%s@%s",currentfile.Data(), fCloseSE.Data()), comment);
1053 fMergingJDL->AddToOutputArchive(currentfile, comment);
1058 arr = fOutputFiles.Tokenize(",");
1060 Bool_t first = kTRUE;
1061 const char *comment = "Files to be archived";
1062 const char *comment1 = comment;
1063 while ((os=(TObjString*)next())) {
1064 // Ignore ouputs in jdl that are also in outputarchive
1065 TString sout = os->GetString();
1066 if (sout.Index("@")>0) sout.Remove(sout.Index("@"));
1067 if (fOutputArchive.Contains(sout)) continue;
1068 if (!first) comment = NULL;
1069 if (!os->GetString().Contains("@") && fCloseSE.Length())
1070 fGridJDL->AddToOutputSandbox(Form("%s@%s",os->GetString().Data(), fCloseSE.Data()), comment);
1072 fGridJDL->AddToOutputSandbox(os->GetString(), comment);
1076 if (fOutputFiles.Length()) {
1077 TString outputFiles = fOutputFiles;
1078 if (!fMergeExcludes.IsNull()) {
1079 arr = fMergeExcludes.Tokenize(" ");
1081 while ((os=(TObjString*)next1())) {
1082 outputFiles.ReplaceAll(Form("%s,",os->GetString().Data()),"");
1083 outputFiles.ReplaceAll(os->GetString(),"");
1087 arr = outputFiles.Tokenize(" ");
1091 while ((os=(TObjString*)next2())) {
1092 // Ignore ouputs in jdl that are also in outputarchive
1093 TString sout = os->GetString();
1094 if (sout.Index("@")>0) sout.Remove(sout.Index("@"));
1095 if (fOutputArchive.Contains(sout)) continue;
1096 if (!first) comment = NULL;
1097 if (!os->GetString().Contains("@") && fCloseSE.Length())
1098 fMergingJDL->AddToOutputSandbox(Form("%s@%s",os->GetString().Data(), fCloseSE.Data()), comment);
1100 fMergingJDL->AddToOutputSandbox(os->GetString(), comment);
1104 fGridJDL->SetPrice((UInt_t)fPrice, "AliEn price for this job");
1105 fMergingJDL->SetPrice((UInt_t)fPrice, "AliEn price for this job");
1106 TString validationScript = fExecutable;
1107 validationScript.ReplaceAll(".sh", "_validation.sh");
1108 fGridJDL->SetValidationCommand(Form("%s/%s", workdir.Data(),validationScript.Data()), "Validation script to be run for each subjob");
1109 validationScript = fExecutable;
1110 validationScript.ReplaceAll(".sh", "_mergevalidation.sh");
1111 fMergingJDL->SetValidationCommand(Form("%s/%s", workdir.Data(),validationScript.Data()), "Validation script to be run for each subjob");
1112 if (fMasterResubmitThreshold) {
1113 fGridJDL->SetValue("MasterResubmitThreshold", Form("\"%d%%\"", fMasterResubmitThreshold));
1114 fGridJDL->SetDescription("MasterResubmitThreshold", "Resubmit failed jobs until DONE rate reaches this percentage");
1116 // Write a jdl with 2 input parameters: collection name and output dir name.
1119 // Copy jdl to grid workspace
1121 // Check if an output directory was defined and valid
1122 if (!fGridOutputDir.Length()) {
1123 Error("CreateJDL", "You must define AliEn output directory");
1126 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s", workdir.Data(), fGridOutputDir.Data());
1127 if (!fProductionMode && !DirectoryExists(fGridOutputDir)) {
1128 if (gGrid->Mkdir(fGridOutputDir,"-p")) {
1129 Info("CreateJDL", "\n##### Created alien output directory %s", fGridOutputDir.Data());
1131 Error("CreateJDL", "Could not create alien output directory %s", fGridOutputDir.Data());
1137 if (TestBit(AliAnalysisGrid::kSubmit)) {
1138 TString mergeJDLName = fExecutable;
1139 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
1140 TString locjdl = Form("%s/%s", fGridOutputDir.Data(),fJDLName.Data());
1141 TString locjdl1 = Form("%s/%s", fGridOutputDir.Data(),mergeJDLName.Data());
1142 if (fProductionMode) {
1143 locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
1144 locjdl1 = Form("%s/%s", workdir.Data(),mergeJDLName.Data());
1146 if (FileExists(locjdl)) gGrid->Rm(locjdl);
1147 if (FileExists(locjdl1)) gGrid->Rm(locjdl1);
1148 Info("CreateJDL", "\n##### Copying JDL file <%s> to your AliEn output directory", fJDLName.Data());
1149 TFile::Cp(Form("file:%s",fJDLName.Data()), Form("alien://%s", locjdl.Data()));
1151 Info("CreateJDL", "\n##### Copying merging JDL file <%s> to your AliEn output directory", mergeJDLName.Data());
1152 TFile::Cp(Form("file:%s",mergeJDLName.Data()), Form("alien://%s", locjdl1.Data()));
1155 if (fAdditionalLibs.Length()) {
1156 arr = fAdditionalLibs.Tokenize(" ");
1159 while ((os=(TObjString*)next())) {
1160 if (os->GetString().Contains(".so")) continue;
1161 Info("CreateJDL", "\n##### Copying dependency: <%s> to your alien workspace", os->GetString().Data());
1162 if (FileExists(os->GetString())) gGrid->Rm(os->GetString());
1163 TFile::Cp(Form("file:%s",os->GetString().Data()), Form("alien://%s/%s", workdir.Data(), os->GetString().Data()));
1168 TIter next(fPackages);
1170 while ((obj=next())) {
1171 if (FileExists(obj->GetName())) gGrid->Rm(obj->GetName());
1172 Info("CreateJDL", "\n##### Copying dependency: <%s> to your alien workspace", obj->GetName());
1173 TFile::Cp(Form("file:%s",obj->GetName()), Form("alien://%s/%s", workdir.Data(), obj->GetName()));
1180 //______________________________________________________________________________
1181 Bool_t AliAnalysisAlien::WriteJDL(Bool_t copy)
1183 // Writes one or more JDL's corresponding to findex. If findex is negative,
1184 // all run numbers are considered in one go (jdl). For non-negative indices
1185 // they correspond to the indices in the array fInputFiles.
1186 if (!fInputFiles) return kFALSE;
1188 TString workdir = gGrid->GetHomeDirectory();
1189 workdir += fGridWorkingDir;
1191 if (!fRunNumbers.Length() && !fRunRange[0]) {
1192 // One jdl with no parameters in case input data is specified by name.
1193 TIter next(fInputFiles);
1194 while ((os=(TObjString*)next()))
1195 fGridJDL->AddToInputDataCollection(Form("LF:%s,nodownload", os->GetString().Data()), "Input xml collections");
1196 if (!fOutputSingle.IsNull())
1197 fGridJDL->SetOutputDirectory(Form("#alienfulldir#/../%s",fOutputSingle.Data()), "Output directory");
1199 fGridJDL->SetOutputDirectory(Form("%s/#alien_counter_03i#", fGridOutputDir.Data()), "Output directory");
1200 fMergingJDL->SetOutputDirectory(fGridOutputDir);
1203 // One jdl to be submitted with 2 input parameters: data collection name and output dir prefix
1204 fGridJDL->AddToInputDataCollection(Form("LF:%s/$1,nodownload", workdir.Data()), "Input xml collections");
1205 if (!fOutputSingle.IsNull()) {
1206 if (!fOutputToRunNo) fGridJDL->SetOutputDirectory(Form("#alienfulldir#/%s",fOutputSingle.Data()), "Output directory");
1207 else fGridJDL->SetOutputDirectory(Form("%s/$2",fGridOutputDir.Data()), "Output directory");
1209 fGridJDL->SetOutputDirectory(Form("%s/$2/#alien_counter_03i#", fGridOutputDir.Data()), "Output directory");
1210 fMergingJDL->SetOutputDirectory("$1", "Output directory");
1215 // Generate the JDL as a string
1216 TString sjdl = fGridJDL->Generate();
1217 TString sjdl1 = fMergingJDL->Generate();
1219 sjdl.ReplaceAll("\"LF:", "\n \"LF:");
1220 sjdl.ReplaceAll("(member", "\n (member");
1221 sjdl.ReplaceAll("\",\"VO_", "\",\n \"VO_");
1222 sjdl.ReplaceAll("{", "{\n ");
1223 sjdl.ReplaceAll("};", "\n};");
1224 sjdl.ReplaceAll("{\n \n", "{\n");
1225 sjdl.ReplaceAll("\n\n", "\n");
1226 sjdl.ReplaceAll("OutputDirectory", "OutputDir");
1227 sjdl1.ReplaceAll("\"LF:", "\n \"LF:");
1228 sjdl1.ReplaceAll("(member", "\n (member");
1229 sjdl1.ReplaceAll("\",\"VO_", "\",\n \"VO_");
1230 sjdl1.ReplaceAll("{", "{\n ");
1231 sjdl1.ReplaceAll("};", "\n};");
1232 sjdl1.ReplaceAll("{\n \n", "{\n");
1233 sjdl1.ReplaceAll("\n\n", "\n");
1234 sjdl1.ReplaceAll("OutputDirectory", "OutputDir");
1235 sjdl += "JDLVariables = \n{\n \"Packages\",\n \"OutputDir\"\n};\n";
1236 sjdl.Prepend(Form("Jobtag = {\n \"comment:%s\"\n};\n", fJobTag.Data()));
1237 index = sjdl.Index("JDLVariables");
1238 if (index >= 0) sjdl.Insert(index, "\n# JDL variables\n");
1239 sjdl += "Workdirectorysize = {\"5000MB\"};";
1240 sjdl1 += "JDLVariables = \n{\n \"Packages\",\n \"OutputDir\"\n};\n";
1241 sjdl1.Prepend(Form("Jobtag = {\n \"comment:%s_Merging\"\n};\n", fJobTag.Data()));
1242 sjdl1.Prepend("# Generated merging jdl\n# $1 = full alien path to output directory to be merged\n# $2 = merging stage\n# $3 = merged chunk\n");
1243 index = sjdl1.Index("JDLVariables");
1244 if (index >= 0) sjdl1.Insert(index, "\n# JDL variables\n");
1245 sjdl1 += "Workdirectorysize = {\"5000MB\"};";
1246 // Write jdl to file
1248 out.open(fJDLName.Data(), ios::out);
1250 Error("CreateJDL", "Bad file name: %s", fJDLName.Data());
1253 out << sjdl << endl;
1254 TString mergeJDLName = fExecutable;
1255 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
1258 out1.open(mergeJDLName.Data(), ios::out);
1260 Error("CreateJDL", "Bad file name: %s", mergeJDLName.Data());
1263 out1 << sjdl1 << endl;
1266 // Copy jdl to grid workspace
1268 Info("CreateJDL", "\n##### You may want to review jdl:%s and analysis macro:%s before running in <submit> mode", fJDLName.Data(), fAnalysisMacro.Data());
1270 TString locjdl = Form("%s/%s", fGridOutputDir.Data(),fJDLName.Data());
1271 TString locjdl1 = Form("%s/%s", fGridOutputDir.Data(),mergeJDLName.Data());
1272 if (fProductionMode) {
1273 locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
1274 locjdl1 = Form("%s/%s", workdir.Data(),mergeJDLName.Data());
1276 if (FileExists(locjdl)) gGrid->Rm(locjdl);
1277 if (FileExists(locjdl1)) gGrid->Rm(locjdl1);
1278 Info("CreateJDL", "\n##### Copying JDL file <%s> to your AliEn output directory", fJDLName.Data());
1279 TFile::Cp(Form("file:%s",fJDLName.Data()), Form("alien://%s", locjdl.Data()));
1281 Info("CreateJDL", "\n##### Copying merging JDL file <%s> to your AliEn output directory", mergeJDLName.Data());
1282 TFile::Cp(Form("file:%s",mergeJDLName.Data()), Form("alien://%s", locjdl1.Data()));
1288 //______________________________________________________________________________
1289 Bool_t AliAnalysisAlien::FileExists(const char *lfn)
1291 // Returns true if file exists.
1292 if (!gGrid) return kFALSE;
1293 TGridResult *res = gGrid->Ls(lfn);
1294 if (!res) return kFALSE;
1295 TMap *map = dynamic_cast<TMap*>(res->At(0));
1300 TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("name"));
1301 if (!objs || !objs->GetString().Length()) {
1309 //______________________________________________________________________________
1310 Bool_t AliAnalysisAlien::DirectoryExists(const char *dirname)
1312 // Returns true if directory exists. Can be also a path.
1313 if (!gGrid) return kFALSE;
1314 // Check if dirname is a path
1315 TString dirstripped = dirname;
1316 dirstripped = dirstripped.Strip();
1317 dirstripped = dirstripped.Strip(TString::kTrailing, '/');
1318 TString dir = gSystem->BaseName(dirstripped);
1320 TString path = gSystem->DirName(dirstripped);
1321 TGridResult *res = gGrid->Ls(path, "-F");
1322 if (!res) return kFALSE;
1326 while ((map=dynamic_cast<TMap*>(next()))) {
1327 obj = map->GetValue("name");
1329 if (dir == obj->GetName()) {
1338 //______________________________________________________________________________
1339 void AliAnalysisAlien::CheckDataType(const char *lfn, Bool_t &isCollection, Bool_t &isXml, Bool_t &useTags)
1341 // Check input data type.
1342 isCollection = kFALSE;
1346 Error("CheckDataType", "No connection to grid");
1349 isCollection = IsCollection(lfn);
1350 TString msg = "\n##### file: ";
1353 msg += " type: raw_collection;";
1354 // special treatment for collections
1356 // check for tag files in the collection
1357 TGridResult *res = gGrid->Command(Form("listFilesFromCollection -z -v %s",lfn), kFALSE);
1359 msg += " using_tags: No (unknown)";
1360 Info("CheckDataType", "%s", msg.Data());
1363 const char* typeStr = res->GetKey(0, "origLFN");
1364 if (!typeStr || !strlen(typeStr)) {
1365 msg += " using_tags: No (unknown)";
1366 Info("CheckDataType", "%s", msg.Data());
1369 TString file = typeStr;
1370 useTags = file.Contains(".tag");
1371 if (useTags) msg += " using_tags: Yes";
1372 else msg += " using_tags: No";
1373 Info("CheckDataType", "%s", msg.Data());
1378 isXml = slfn.Contains(".xml");
1380 // Open xml collection and check if there are tag files inside
1381 msg += " type: xml_collection;";
1382 TGridCollection *coll = (TGridCollection*)gROOT->ProcessLine(Form("TAlienCollection::Open(\"alien://%s\",1);",lfn));
1384 msg += " using_tags: No (unknown)";
1385 Info("CheckDataType", "%s", msg.Data());
1388 TMap *map = coll->Next();
1390 msg += " using_tags: No (unknown)";
1391 Info("CheckDataType", "%s", msg.Data());
1394 map = (TMap*)map->GetValue("");
1396 if (map && map->GetValue("name")) file = map->GetValue("name")->GetName();
1397 useTags = file.Contains(".tag");
1399 if (useTags) msg += " using_tags: Yes";
1400 else msg += " using_tags: No";
1401 Info("CheckDataType", "%s", msg.Data());
1404 useTags = slfn.Contains(".tag");
1405 if (slfn.Contains(".root")) msg += " type: root file;";
1406 else msg += " type: unknown file;";
1407 if (useTags) msg += " using_tags: Yes";
1408 else msg += " using_tags: No";
1409 Info("CheckDataType", "%s", msg.Data());
1412 //______________________________________________________________________________
1413 void AliAnalysisAlien::EnablePackage(const char *package)
1415 // Enables a par file supposed to exist in the current directory.
1416 TString pkg(package);
1417 pkg.ReplaceAll(".par", "");
1419 if (gSystem->AccessPathName(pkg)) {
1420 Fatal("EnablePackage", "Package %s not found", pkg.Data());
1423 if (!TObject::TestBit(AliAnalysisGrid::kUsePars))
1424 Info("EnablePackage", "AliEn plugin will use .par packages");
1425 TObject::SetBit(AliAnalysisGrid::kUsePars, kTRUE);
1427 fPackages = new TObjArray();
1428 fPackages->SetOwner();
1430 fPackages->Add(new TObjString(pkg));
1433 //______________________________________________________________________________
1434 const char *AliAnalysisAlien::GetJobStatus(Int_t jobidstart, Int_t lastid, Int_t &nrunning, Int_t &nwaiting, Int_t &nerror, Int_t &ndone)
1436 // Get job status for all jobs with jobid>jobidstart.
1437 static char mstatus[20];
1443 TGridJobStatusList *list = gGrid->Ps("");
1444 if (!list) return mstatus;
1445 Int_t nentries = list->GetSize();
1446 TGridJobStatus *status;
1448 for (Int_t ijob=0; ijob<nentries; ijob++) {
1449 status = (TGridJobStatus *)list->At(ijob);
1450 pid = gROOT->ProcessLine(Form("atoi(((TAlienJobStatus*)0x%lx)->GetKey(\"queueId\"));", (ULong_t)status));
1451 if (pid<jobidstart) continue;
1452 if (pid == lastid) {
1453 gROOT->ProcessLine(Form("sprintf((char*)0x%lx,((TAlienJobStatus*)0x%lx)->GetKey(\"status\"));",(ULong_t)mstatus, (ULong_t)status));
1455 switch (status->GetStatus()) {
1456 case TGridJobStatus::kWAITING:
1458 case TGridJobStatus::kRUNNING:
1460 case TGridJobStatus::kABORTED:
1461 case TGridJobStatus::kFAIL:
1462 case TGridJobStatus::kUNKNOWN:
1464 case TGridJobStatus::kDONE:
1473 //______________________________________________________________________________
1474 Bool_t AliAnalysisAlien::IsCollection(const char *lfn) const
1476 // Returns true if file is a collection. Functionality duplicated from
1477 // TAlien::Type() because we don't want to directly depend on TAlien.
1479 Error("IsCollection", "No connection to grid");
1482 TGridResult *res = gGrid->Command(Form("type -z %s",lfn),kFALSE);
1483 if (!res) return kFALSE;
1484 const char* typeStr = res->GetKey(0, "type");
1485 if (!typeStr || !strlen(typeStr)) return kFALSE;
1486 if (!strcmp(typeStr, "collection")) return kTRUE;
1491 //______________________________________________________________________________
1492 Bool_t AliAnalysisAlien::IsSingleOutput() const
1494 // Check if single-ouput option is on.
1495 return (!fOutputSingle.IsNull());
1498 //______________________________________________________________________________
1499 void AliAnalysisAlien::Print(Option_t *) const
1501 // Print current plugin settings.
1502 printf("### AliEn analysis plugin current settings ###\n");
1503 printf("= OverwriteMode:________________________________ %d\n", fOverwriteMode);
1504 if (fOverwriteMode) {
1505 printf("***** NOTE: Overwrite mode will overwrite the input generated datasets and partial results from previous analysis. \
1506 \n***** To disable, use: plugin->SetOverwriteMode(kFALSE);\n");
1508 printf("= Copy files to grid: __________________________ %s\n", (IsUseCopy())?"YES":"NO");
1509 printf("= Check if files can be copied to grid: ________ %s\n", (IsCheckCopy())?"YES":"NO");
1510 printf("= Production mode:______________________________ %d\n", fProductionMode);
1511 printf("= Version of API requested: ____________________ %s\n", fAPIVersion.Data());
1512 printf("= Version of ROOT requested: ___________________ %s\n", fROOTVersion.Data());
1513 printf("= Version of AliRoot requested: ________________ %s\n", fAliROOTVersion.Data());
1515 printf("= User running the plugin: _____________________ %s\n", fUser.Data());
1516 printf("= Grid workdir relative to user $HOME: _________ %s\n", fGridWorkingDir.Data());
1517 printf("= Grid output directory relative to workdir: ___ %s\n", fGridOutputDir.Data());
1518 printf("= Data base directory path requested: __________ %s\n", fGridDataDir.Data());
1519 printf("= Data search pattern: _________________________ %s\n", fDataPattern.Data());
1520 printf("= Input data format: ___________________________ %s\n", fInputFormat.Data());
1521 if (fRunNumbers.Length())
1522 printf("= Run numbers to be processed: _________________ %s\n", fRunNumbers.Data());
1524 printf("= Run range to be processed: ___________________ %s%d-%s%d\n", fRunPrefix.Data(), fRunRange[0], fRunPrefix.Data(), fRunRange[1]);
1525 if (!fRunRange[0] && !fRunNumbers.Length()) {
1526 TIter next(fInputFiles);
1529 while ((obj=next())) list += obj->GetName();
1530 printf("= Input files to be processed: _________________ %s\n", list.Data());
1532 if (TestBit(AliAnalysisGrid::kTest))
1533 printf("= Number of input files used in test mode: _____ %d\n", fNtestFiles);
1534 printf("= List of output files to be registered: _______ %s\n", fOutputFiles.Data());
1535 printf("= List of outputs going to be archived: ________ %s\n", fOutputArchive.Data());
1536 printf("= List of outputs that should not be merged: ___ %s\n", fMergeExcludes.Data());
1537 printf("=====================================================================\n");
1538 printf("= Job price: ___________________________________ %d\n", fPrice);
1539 printf("= Time to live (TTL): __________________________ %d\n", fTTL);
1540 printf("= Max files per subjob: ________________________ %d\n", fSplitMaxInputFileNumber);
1541 if (fMaxInitFailed>0)
1542 printf("= Max number of subjob fails to kill: __________ %d\n", fMaxInitFailed);
1543 if (fMasterResubmitThreshold>0)
1544 printf("= Resubmit master job if failed subjobs >_______ %d\n", fMasterResubmitThreshold);
1545 printf("= Number of replicas for the output files_______ %d\n", fNreplicas);
1546 if (fNrunsPerMaster>0)
1547 printf("= Number of runs per master job: _______________ %d\n", fNrunsPerMaster);
1548 printf("= Number of files in one chunk to be merged: ___ %d\n", fMaxMergeFiles);
1549 printf("= Name of the generated execution script: ______ %s\n", fExecutable.Data());
1550 printf("= Executable command: __________________________ %s\n", fExecutableCommand.Data());
1551 if (fArguments.Length())
1552 printf("= Arguments for the execution script: __________ %s\n",fArguments.Data());
1553 if (fExecutableArgs.Length())
1554 printf("= Arguments after macro name in executable______ %s\n",fExecutableArgs.Data());
1555 printf("= Name of the generated analysis macro: ________ %s\n",fAnalysisMacro.Data());
1556 printf("= User analysis files to be deployed: __________ %s\n",fAnalysisSource.Data());
1557 printf("= Additional libs to be loaded or souces to be compiled runtime: <%s>\n",fAdditionalLibs.Data());
1558 printf("= Master jobs split mode: ______________________ %s\n",fSplitMode.Data());
1560 printf("= Custom name for the dataset to be created: ___ %s\n", fDatasetName.Data());
1561 printf("= Name of the generated JDL: ___________________ %s\n", fJDLName.Data());
1562 if (fIncludePath.Data())
1563 printf("= Include path for runtime task compilation: ___ %s\n", fIncludePath.Data());
1564 if (fCloseSE.Length())
1565 printf("= Force job outputs to storage element: ________ %s\n", fCloseSE.Data());
1566 if (fFriendChainName.Length())
1567 printf("= Open friend chain file on worker: ____________ %s\n", fFriendChainName.Data());
1569 TIter next(fPackages);
1572 while ((obj=next())) list += obj->GetName();
1573 printf("= Par files to be used: ________________________ %s\n", list.Data());
1577 //______________________________________________________________________________
1578 void AliAnalysisAlien::SetDefaults()
1580 // Set default values for everything. What cannot be filled will be left empty.
1581 if (fGridJDL) delete fGridJDL;
1582 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
1583 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
1586 fSplitMaxInputFileNumber = 100;
1588 fMasterResubmitThreshold = 0;
1593 fNrunsPerMaster = 1;
1594 fMaxMergeFiles = 100;
1596 fExecutable = "analysis.sh";
1597 fExecutableCommand = "root -b -q";
1599 fExecutableArgs = "";
1600 fAnalysisMacro = "myAnalysis.C";
1601 fAnalysisSource = "";
1602 fAdditionalLibs = "";
1606 fAliROOTVersion = "";
1607 fUser = ""; // Your alien user name
1608 fGridWorkingDir = "";
1609 fGridDataDir = ""; // Can be like: /alice/sim/PDC_08a/LHC08c9/
1610 fDataPattern = "*AliESDs.root"; // Can be like: *AliESDs.root, */pass1/*AliESDs.root, ...
1611 fFriendChainName = "";
1612 fGridOutputDir = "output";
1613 fOutputArchive = "log_archive.zip:std*@disk=1 root_archive.zip:*.root@disk=2";
1614 fOutputFiles = ""; // Like "AliAODs.root histos.root"
1615 fInputFormat = "xml-single";
1616 fJDLName = "analysis.jdl";
1617 fJobTag = "Automatically generated analysis JDL";
1618 fMergeExcludes = "";
1621 SetCheckCopy(kTRUE);
1622 SetDefaultOutputs(kTRUE);
1626 //______________________________________________________________________________
1627 Bool_t AliAnalysisAlien::CheckMergedFiles(const char *filename, const char *aliendir, Int_t nperchunk, Bool_t submit, const char *jdl)
1629 // Static method that checks the status of merging. This can submit merging jobs that did not produced the expected
1630 // output. If <submit> is false (checking) returns true only when the final merged file was found. If submit is true returns
1631 // true if the jobs were successfully submitted.
1632 Int_t countOrig = 0;
1633 Int_t countStage = 0;
1636 Bool_t doneFinal = kFALSE;
1638 TString saliendir(aliendir);
1639 TString sfilename, stmp;
1640 saliendir.ReplaceAll("//","/");
1641 saliendir = saliendir.Strip(TString::kTrailing, '/');
1643 ::Error("GetNregisteredFiles", "You need to be connected to AliEn.");
1646 sfilename = filename;
1647 sfilename.ReplaceAll(".root", "*.root");
1648 printf("Checking directory <%s> for merged files <%s> ...\n", aliendir, sfilename.Data());
1649 TString command = Form("find %s/ *%s", saliendir.Data(), sfilename.Data());
1650 TGridResult *res = gGrid->Command(command);
1652 ::Error("GetNregisteredFiles","Error: No result for the find command\n");
1657 while ((map=(TMap*)nextmap())) {
1658 TString turl = map->GetValue("turl")->GetName();
1659 if (!turl.Length()) {
1664 turl.ReplaceAll("alien://", "");
1665 turl.ReplaceAll(saliendir, "");
1666 sfilename = gSystem->BaseName(turl);
1667 turl = turl.Strip(TString::kLeading, '/');
1668 // Now check to what the file corresponds to:
1669 // original output - aliendir/%03d/filename
1670 // merged file (which stage) - aliendir/filename-Stage%02d_%04d
1671 // final merged file - aliendir/filename
1672 if (sfilename == turl) {
1673 if (sfilename == filename) {
1677 Int_t index = sfilename.Index("Stage");
1678 if (index<0) continue;
1679 stmp = sfilename(index+5,2);
1680 Int_t istage = atoi(stmp);
1681 stmp = sfilename(index+8,4);
1682 Int_t ijob = atoi(stmp);
1683 if (istage<stage) continue; // Ignore lower stages
1686 chunksDone.ResetAllBits();
1690 chunksDone.SetBitNumber(ijob);
1697 printf("=> Removing files from previous stages...\n");
1698 gGrid->Rm(Form("%s/*Stage*.root", aliendir));
1703 // Compute number of jobs that were submitted for the current stage
1704 Int_t ntotstage = countOrig;
1705 for (i=1; i<=stage; i++) {
1706 if (ntotstage%nperchunk) ntotstage = (ntotstage/nperchunk)+1;
1707 else ntotstage = (ntotstage/nperchunk);
1709 // Now compare with the number of set bits in the chunksDone array
1710 Int_t nmissing = (stage>0)?(ntotstage - countStage):0;
1712 printf("*** Found %d original files\n", countOrig);
1713 if (stage==0) printf("*** No merging completed so far.\n");
1714 else printf("*** Found %d out of %d files merged for stage %d\n", countStage, ntotstage, stage);
1715 if (nmissing) printf("*** Number of merged files missing for this stage: %d -> check merging job completion\n", nmissing);
1716 if (!submit) return doneFinal;
1717 // Sumbit merging jobs for all missing chunks for the current stage.
1718 TString query = Form("submit %s %s", jdl, aliendir);
1721 for (i=0; i<nmissing; i++) {
1722 ichunk = chunksDone.FirstNullBit(ichunk+1);
1723 Int_t jobId = SubmitSingleJob(Form("%s %d %d", query.Data(), stage, ichunk));
1724 if (!jobId) return kFALSE;
1728 // Submit next stage of merging
1729 if (stage==0) countStage = countOrig;
1730 Int_t nchunks = (countStage/nperchunk);
1731 if (countStage%nperchunk) nchunks += 1;
1732 for (i=0; i<nchunks; i++) {
1733 Int_t jobId = SubmitSingleJob(Form("%s %d %d", query.Data(), stage+1, i));
1734 if (!jobId) return kFALSE;
1739 //______________________________________________________________________________
1740 Int_t AliAnalysisAlien::SubmitSingleJob(const char *query)
1742 // Submits a single job corresponding to the query and returns job id. If 0 submission failed.
1743 if (!gGrid) return 0;
1744 printf("=> %s ------> ",query);
1745 TGridResult *res = gGrid->Command(query);
1747 TString jobId = res->GetKey(0,"jobId");
1749 if (jobId.IsNull()) {
1750 printf("submission failed. Reason:\n");
1753 ::Error("SubmitSingleJob", "Your query %s could not be submitted", query);
1756 printf(" Job id: %s\n", jobId.Data());
1760 //______________________________________________________________________________
1761 Bool_t AliAnalysisAlien::MergeOutput(const char *output, const char *basedir, Int_t nmaxmerge, Int_t stage, Int_t ichunk)
1763 // Merge given output files from basedir. The file merger will merge nmaxmerge
1764 // files in a group. Merging can be done in stages:
1765 // stage=0 : will merge all existing files in a single stage
1766 // stage=1 : does a find command for all files that do NOT contain the string "Stage".
1767 // If their number is bigger that nmaxmerge, only the files from
1768 // ichunk*nmaxmerge to ichunk*(nmaxmerge+1)-1 will get merged as output_stage_<ichunk>
1769 // stage=n : does a find command for files named <output>Stage<stage-1>_*. If their number is bigger than
1770 // nmaxmerge, merge just the chunk ichunk, otherwise write the merged output to the file
1772 TString outputFile = output;
1774 TString outputChunk;
1775 TString previousChunk = "";
1776 Int_t countChunk = 0;
1777 Int_t countZero = nmaxmerge;
1778 Bool_t merged = kTRUE;
1779 Int_t index = outputFile.Index("@");
1780 if (index > 0) outputFile.Remove(index);
1781 TString inputFile = outputFile;
1782 if (stage>1) inputFile.ReplaceAll(".root", Form("-Stage%02d_*.root", stage-1));
1783 command = Form("find %s/ *%s", basedir, inputFile.Data());
1784 printf("command: %s\n", command.Data());
1785 TGridResult *res = gGrid->Command(command);
1787 ::Error("MergeOutput","No result for the find command\n");
1791 TFileMerger *fm = 0;
1794 // Check if there is a merge operation to resume. Works only for stage 0 or 1.
1795 outputChunk = outputFile;
1796 outputChunk.ReplaceAll(".root", "_*.root");
1797 // Check for existent temporary merge files
1798 // Check overwrite mode and remove previous partial results if needed
1799 // Preserve old merging functionality for stage 0.
1801 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
1803 // Skip as many input files as in a chunk
1804 for (Int_t counter=0; counter<nmaxmerge; counter++) map = (TMap*)nextmap();
1806 ::Error("MergeOutput", "Cannot resume merging for <%s>, nentries=%d", outputFile.Data(), res->GetSize());
1810 outputChunk = outputFile;
1811 outputChunk.ReplaceAll(".root", Form("_%04d.root", countChunk));
1813 if (gSystem->AccessPathName(outputChunk)) continue;
1814 // Merged file with chunks up to <countChunk> found
1815 ::Info("MergeOutput", "Resume merging of <%s> from <%s>\n", outputFile.Data(), outputChunk.Data());
1816 previousChunk = outputChunk;
1820 countZero = nmaxmerge;
1822 while ((map=(TMap*)nextmap())) {
1823 // Loop 'find' results and get next LFN
1824 if (countZero == nmaxmerge) {
1825 // First file in chunk - create file merger and add previous chunk if any.
1826 fm = new TFileMerger(kFALSE);
1827 fm->SetFastMethod(kTRUE);
1828 if (previousChunk.Length()) fm->AddFile(previousChunk.Data());
1829 outputChunk = outputFile;
1830 outputChunk.ReplaceAll(".root", Form("_%04d.root", countChunk));
1832 // If last file found, put merged results in the output file
1833 if (map == res->Last()) outputChunk = outputFile;
1834 TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("turl"));
1835 if (!objs || !objs->GetString().Length()) {
1836 // Nothing found - skip this output
1841 // Add file to be merged and decrement chunk counter.
1842 fm->AddFile(objs->GetString());
1844 if (countZero==0 || map == res->Last()) {
1845 if (!fm->GetMergeList() || !fm->GetMergeList()->GetSize()) {
1846 // Nothing found - skip this output
1847 ::Warning("MergeOutput", "No <%s> files found.", inputFile.Data());
1852 fm->OutputFile(outputChunk);
1853 // Merge the outputs, then go to next chunk
1855 ::Error("MergeOutput", "Could not merge all <%s> files", outputFile.Data());
1860 ::Info("MergeOutputs", "\n##### Merged %d output files to <%s>", fm->GetMergeList()->GetSize(), outputChunk.Data());
1861 gSystem->Unlink(previousChunk);
1863 if (map == res->Last()) {
1869 countZero = nmaxmerge;
1870 previousChunk = outputChunk;
1875 // Merging stage different than 0.
1876 // Move to the begining of the requested chunk.
1877 outputChunk = outputFile;
1878 if (nmaxmerge < res->GetSize()) {
1879 if (ichunk*nmaxmerge >= res->GetSize()) {
1880 ::Error("MergeOutput", "Cannot merge merge chunk %d grouping %d files from %d total.", ichunk, nmaxmerge, res->GetSize());
1884 for (Int_t counter=0; counter<ichunk*nmaxmerge; counter++) map = (TMap*)nextmap();
1885 outputChunk.ReplaceAll(".root", Form("-Stage%02d_%04d.root", stage, ichunk));
1887 countZero = nmaxmerge;
1888 fm = new TFileMerger(kFALSE);
1889 fm->SetFastMethod(kTRUE);
1890 while ((map=(TMap*)nextmap())) {
1891 // Loop 'find' results and get next LFN
1892 TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("turl"));
1893 if (!objs || !objs->GetString().Length()) {
1894 // Nothing found - skip this output
1899 // Add file to be merged and decrement chunk counter.
1900 fm->AddFile(objs->GetString());
1902 if (countZero==0) break;
1905 if (!fm->GetMergeList() || !fm->GetMergeList()->GetSize()) {
1906 // Nothing found - skip this output
1907 ::Warning("MergeOutput", "No <%s> files found.", inputFile.Data());
1911 fm->OutputFile(outputChunk);
1912 // Merge the outputs
1914 ::Error("MergeOutput", "Could not merge all <%s> files", outputFile.Data());
1918 ::Info("MergeOutput", "\n##### Merged %d output files to <%s>", fm->GetMergeList()->GetSize(), outputChunk.Data());
1924 //______________________________________________________________________________
1925 Bool_t AliAnalysisAlien::MergeOutputs()
1927 // Merge analysis outputs existing in the AliEn space.
1928 if (TestBit(AliAnalysisGrid::kTest)) return kTRUE;
1929 if (TestBit(AliAnalysisGrid::kOffline)) return kFALSE;
1931 Error("MergeOutputs", "Cannot merge outputs without grid connection. Terminate will NOT be executed");
1935 if (!TestBit(AliAnalysisGrid::kMerge)) {
1936 Info("MergeOutputs", "### Re-run with <MergeViaJDL> option in terminate mode of the plugin to submit merging jobs ###");
1939 if (fProductionMode) {
1940 Info("MergeOutputs", "### Merging will be submitted by LPM manager... ###");
1943 Info("MergeOutputs", "Submitting merging JDL");
1944 if (!SubmitMerging()) return kFALSE;
1945 Info("MergeOutputs", "### Re-run with <MergeViaJDL> off to collect results after merging jobs are done ###");
1946 Info("MergeOutputs", "### The Terminate() method is executed by the merging jobs");
1949 // Get the output path
1950 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("/%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
1951 if (!DirectoryExists(fGridOutputDir)) {
1952 Error("MergeOutputs", "Grid output directory %s not found. Terminate() will NOT be executed", fGridOutputDir.Data());
1955 if (!fOutputFiles.Length()) {
1956 Error("MergeOutputs", "No output file names defined. Are you running the right AliAnalysisAlien configuration ?");
1959 // Check if fast read option was requested
1960 Info("MergeOutputs", "Started local merging of output files from: alien://%s \
1961 \n======= overwrite mode = %d", fGridOutputDir.Data(), (Int_t)fOverwriteMode);
1962 if (fFastReadOption) {
1963 Warning("MergeOutputs", "You requested FastRead option. Using xrootd flags to reduce timeouts. This may skip some files that could be accessed ! \
1964 \n+++ NOTE: To disable this option, use: plugin->SetFastReadOption(kFALSE)");
1965 gEnv->SetValue("XNet.ConnectTimeout",10);
1966 gEnv->SetValue("XNet.RequestTimeout",10);
1967 gEnv->SetValue("XNet.MaxRedirectCount",2);
1968 gEnv->SetValue("XNet.ReconnectTimeout",10);
1969 gEnv->SetValue("XNet.FirstConnectMaxCnt",1);
1971 // Make sure we change the temporary directory
1972 gSystem->Setenv("TMPDIR", gSystem->pwd());
1973 TObjArray *list = fOutputFiles.Tokenize(",");
1977 Bool_t merged = kTRUE;
1978 while((str=(TObjString*)next())) {
1979 outputFile = str->GetString();
1980 Int_t index = outputFile.Index("@");
1981 if (index > 0) outputFile.Remove(index);
1982 TString outputChunk = outputFile;
1983 outputChunk.ReplaceAll(".root", "_*.root");
1984 // Skip already merged outputs
1985 if (!gSystem->AccessPathName(outputFile)) {
1986 if (fOverwriteMode) {
1987 Info("MergeOutputs", "Overwrite mode. Existing file %s was deleted.", outputFile.Data());
1988 gSystem->Unlink(outputFile);
1989 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
1990 Info("MergeOutput", "Overwrite mode: partial merged files %s will removed",
1991 outputChunk.Data());
1992 gSystem->Exec(Form("rm -f %s", outputChunk.Data()));
1995 Info("MergeOutputs", "Output file <%s> found. Not merging again.", outputFile.Data());
1999 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
2000 Info("MergeOutput", "Overwrite mode: partial merged files %s will removed",
2001 outputChunk.Data());
2002 gSystem->Exec(Form("rm -f %s", outputChunk.Data()));
2005 if (fMergeExcludes.Length() &&
2006 fMergeExcludes.Contains(outputFile.Data())) continue;
2007 // Perform a 'find' command in the output directory, looking for registered outputs
2008 merged = MergeOutput(outputFile, fGridOutputDir, fMaxMergeFiles);
2010 Error("MergeOutputs", "Terminate() will NOT be executed");
2013 TFile *fileOpened = (TFile*)gROOT->GetListOfFiles()->FindObject(outputFile);
2014 if (fileOpened) fileOpened->Close();
2019 //______________________________________________________________________________
2020 void AliAnalysisAlien::SetDefaultOutputs(Bool_t flag)
2022 // Use the output files connected to output containers from the analysis manager
2023 // rather than the files defined by SetOutputFiles
2024 if (flag && !TObject::TestBit(AliAnalysisGrid::kDefaultOutputs))
2025 Info("SetDefaultOutputs", "Plugin will use the output files taken from analysis manager");
2026 TObject::SetBit(AliAnalysisGrid::kDefaultOutputs, flag);
2029 //______________________________________________________________________________
2030 void AliAnalysisAlien::SetOutputFiles(const char *list)
2032 // Manually set the output files list.
2033 // Removes duplicates. Not allowed if default outputs are not disabled.
2034 if (TObject::TestBit(AliAnalysisGrid::kDefaultOutputs)) {
2035 Fatal("SetOutputFiles", "You have to explicitly call SetDefaultOutputs(kFALSE) to manually set output files.");
2038 Info("SetOutputFiles", "Output file list is set manually - you are on your own.");
2040 TString slist = list;
2041 if (slist.Contains("@")) Warning("SetOutputFiles","The plugin does not allow explicit SE's. Please use: SetNumberOfReplicas() instead.");
2042 TObjArray *arr = slist.Tokenize(" ");
2046 while ((os=(TObjString*)next())) {
2047 sout = os->GetString();
2048 if (sout.Index("@")>0) sout.Remove(sout.Index("@"));
2049 if (fOutputFiles.Contains(sout)) continue;
2050 if (!fOutputFiles.IsNull()) fOutputFiles += ",";
2051 fOutputFiles += sout;
2056 //______________________________________________________________________________
2057 void AliAnalysisAlien::SetOutputArchive(const char *list)
2059 // Manually set the output archive list. Free text - you are on your own...
2060 // Not allowed if default outputs are not disabled.
2061 if (TObject::TestBit(AliAnalysisGrid::kDefaultOutputs)) {
2062 Fatal("SetOutputArchive", "You have to explicitly call SetDefaultOutputs(kFALSE) to manually set the output archives.");
2065 Info("SetOutputArchive", "Output archive is set manually - you are on your own.");
2066 fOutputArchive = list;
2069 //______________________________________________________________________________
2070 void AliAnalysisAlien::SetPreferedSE(const char */*se*/)
2072 // Setting a prefered output SE is not allowed anymore.
2073 Warning("SetPreferedSE", "Setting a preferential SE is not allowed anymore via the plugin. Use SetNumberOfReplicas() and SetDefaultOutputs()");
2076 //______________________________________________________________________________
2077 Bool_t AliAnalysisAlien::StartAnalysis(Long64_t /*nentries*/, Long64_t /*firstEntry*/)
2079 // Start remote grid analysis.
2081 // Check if output files have to be taken from the analysis manager
2082 if (TestBit(AliAnalysisGrid::kDefaultOutputs)) {
2083 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
2084 if (!mgr || !mgr->IsInitialized()) {
2085 Error("StartAnalysis", "You need an initialized analysis manager for this");
2089 TIter next(mgr->GetOutputs());
2090 AliAnalysisDataContainer *output;
2091 while ((output=(AliAnalysisDataContainer*)next())) {
2092 const char *filename = output->GetFileName();
2093 if (!(strcmp(filename, "default"))) {
2094 if (!mgr->GetOutputEventHandler()) continue;
2095 filename = mgr->GetOutputEventHandler()->GetOutputFileName();
2097 if (fOutputFiles.Contains(filename)) continue;
2098 if (fOutputFiles.Length()) fOutputFiles += ",";
2099 fOutputFiles += filename;
2101 // Add extra files registered to the analysis manager
2102 if (mgr->GetExtraFiles().Length()) {
2103 if (fOutputFiles.Length()) fOutputFiles += ",";
2104 TString extra = mgr->GetExtraFiles();
2105 extra.ReplaceAll(" ", ",");
2106 // Protection in case extra files do not exist (will it work?)
2107 extra.ReplaceAll(".root", "*.root");
2108 fOutputFiles += extra;
2110 // Compose the output archive.
2111 fOutputArchive = "log_archive.zip:std*@disk=1 ";
2112 fOutputArchive += Form("root_archive.zip:%s@disk=%d",fOutputFiles.Data(),fNreplicas);
2114 // if (!fCloseSE.Length()) fCloseSE = gSystem->Getenv("alien_CLOSE_SE");
2115 if (TestBit(AliAnalysisGrid::kOffline)) {
2116 Info("StartAnalysis","\n##### OFFLINE MODE ##### Files to be used in GRID are produced but not copied \
2117 \n there nor any job run. You can revise the JDL and analysis \
2118 \n macro then run the same in \"submit\" mode.");
2119 } else if (TestBit(AliAnalysisGrid::kTest)) {
2120 Info("StartAnalysis","\n##### LOCAL MODE ##### Your analysis will be run locally on a subset of the requested \
2122 } else if (TestBit(AliAnalysisGrid::kSubmit)) {
2123 Info("StartAnalysis","\n##### SUBMIT MODE ##### Files required by your analysis are copied to your grid working \
2124 \n space and job submitted.");
2125 } else if (TestBit(AliAnalysisGrid::kMerge)) {
2126 Info("StartAnalysis","\n##### MERGE MODE ##### The registered outputs of the analysis will be merged");
2127 if (fMergeViaJDL) CheckInputData();
2130 Info("StartAnalysis","\n##### FULL ANALYSIS MODE ##### Producing needed files and submitting your analysis job...");
2135 Error("StartAnalysis", "Cannot start grid analysis without grid connection");
2138 if (IsCheckCopy()) CheckFileCopy(gGrid->GetHomeDirectory());
2139 if (!CheckInputData()) {
2140 Error("StartAnalysis", "There was an error in preprocessing your requested input data");
2143 if (!CreateDataset(fDataPattern)) {
2145 if (!fRunNumbers.Length() && !fRunRange[0]) serror = Form("path to data directory: <%s>", fGridDataDir.Data());
2146 if (fRunNumbers.Length()) serror = "run numbers";
2147 if (fRunRange[0]) serror = Form("run range [%d, %d]", fRunRange[0], fRunRange[1]);
2148 serror += Form("\n or data pattern <%s>", fDataPattern.Data());
2149 Error("StartAnalysis", "No data to process. Please fix %s in your plugin configuration.", serror.Data());
2152 WriteAnalysisFile();
2153 WriteAnalysisMacro();
2155 WriteValidationScript();
2157 WriteMergingMacro();
2158 WriteMergeExecutable();
2159 WriteValidationScript(kTRUE);
2161 if (!CreateJDL()) return kFALSE;
2162 if (TestBit(AliAnalysisGrid::kOffline)) return kFALSE;
2163 if (TestBit(AliAnalysisGrid::kTest)) {
2164 // Locally testing the analysis
2165 Info("StartAnalysis", "\n_______________________________________________________________________ \
2166 \n Running analysis script in a daughter shell as on a worker node \
2167 \n_______________________________________________________________________");
2168 TObjArray *list = fOutputFiles.Tokenize(",");
2172 while((str=(TObjString*)next())) {
2173 outputFile = str->GetString();
2174 Int_t index = outputFile.Index("@");
2175 if (index > 0) outputFile.Remove(index);
2176 if (!gSystem->AccessPathName(outputFile)) gSystem->Exec(Form("rm %s", outputFile.Data()));
2179 gSystem->Exec(Form("bash %s 2>stderr", fExecutable.Data()));
2180 TString validationScript = fExecutable;
2181 validationScript.ReplaceAll(".sh", "_validation.sh");
2182 gSystem->Exec(Form("bash %s",validationScript.Data()));
2183 // gSystem->Exec("cat stdout");
2186 // Check if submitting is managed by LPM manager
2187 if (fProductionMode) {
2188 TString prodfile = fJDLName;
2189 prodfile.ReplaceAll(".jdl", ".prod");
2190 WriteProductionFile(prodfile);
2191 Info("StartAnalysis", "Job submitting is managed by LPM. Rerun in terminate mode after jobs finished.");
2194 // Submit AliEn job(s)
2195 gGrid->Cd(fGridOutputDir);
2198 if (!fRunNumbers.Length() && !fRunRange[0]) {
2199 // Submit a given xml or a set of runs
2200 res = gGrid->Command(Form("submit %s", fJDLName.Data()));
2201 printf("*************************** %s\n",Form("submit %s", fJDLName.Data()));
2203 const char *cjobId = res->GetKey(0,"jobId");
2207 Error("StartAnalysis", "Your JDL %s could not be submitted", fJDLName.Data());
2210 Info("StartAnalysis", "\n_______________________________________________________________________ \
2211 \n##### Your JDL %s was successfully submitted. \nTHE JOB ID IS: %s \
2212 \n_______________________________________________________________________",
2213 fJDLName.Data(), cjobId);
2218 Error("StartAnalysis", "No grid result after submission !!! Bailing out...");
2222 // Submit for a range of enumeration of runs.
2223 if (!Submit()) return kFALSE;
2226 Info("StartAnalysis", "\n#### STARTING AN ALIEN SHELL FOR YOU. EXIT WHEN YOUR JOB %s HAS FINISHED. #### \
2227 \n You may exit at any time and terminate the job later using the option <terminate> \
2228 \n ##################################################################################", jobID.Data());
2229 gSystem->Exec("aliensh");
2233 //______________________________________________________________________________
2234 Bool_t AliAnalysisAlien::Submit()
2236 // Submit all master jobs.
2237 Int_t nmasterjobs = fInputFiles->GetEntries();
2238 Long_t tshoot = gSystem->Now();
2239 if (!fNsubmitted && !SubmitNext()) return kFALSE;
2240 while (fNsubmitted < nmasterjobs) {
2241 Long_t now = gSystem->Now();
2242 if ((now-tshoot)>30000) {
2244 if (!SubmitNext()) return kFALSE;
2250 //______________________________________________________________________________
2251 Bool_t AliAnalysisAlien::SubmitMerging()
2253 // Submit all merging jobs.
2254 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("/%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
2255 gGrid->Cd(fGridOutputDir);
2256 TString mergeJDLName = fExecutable;
2257 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
2258 Int_t ntosubmit = fInputFiles->GetEntries();
2259 for (Int_t i=0; i<ntosubmit; i++) {
2260 TString runOutDir = gSystem->BaseName(fInputFiles->At(i)->GetName());
2261 runOutDir.ReplaceAll(".xml", "");
2262 if (fOutputToRunNo) {
2263 // The output directory is the run number
2264 printf("### Submitting merging job for run <%s>\n", runOutDir.Data());
2265 runOutDir = Form("%s/%s", fGridOutputDir.Data(), runOutDir.Data());
2267 // The output directory is the master number in 3 digits format
2268 printf("### Submitting merging job for master <%03d>\n", i);
2269 runOutDir = Form("%s/%03d",fGridOutputDir.Data(), i);
2271 // Check now the number of merging stages.
2272 TObjArray *list = fOutputFiles.Tokenize(",");
2276 while((str=(TObjString*)next())) {
2277 outputFile = str->GetString();
2278 Int_t index = outputFile.Index("@");
2279 if (index > 0) outputFile.Remove(index);
2280 if (!fMergeExcludes.Contains(outputFile)) break;
2283 Bool_t done = CheckMergedFiles(outputFile, runOutDir, fMaxMergeFiles, kTRUE, mergeJDLName);
2284 if (!done) return kFALSE;
2286 if (!ntosubmit) return kTRUE;
2287 Info("StartAnalysis", "\n#### STARTING AN ALIEN SHELL FOR YOU. EXIT WHEN YOUR MERGING JOBS HAVE FINISHED. #### \
2288 \n You may exit at any time and terminate the job later using the option <terminate> but disabling SetMergeViaJDL\
2289 \n ##################################################################################");
2290 gSystem->Exec("aliensh");
2294 //______________________________________________________________________________
2295 Bool_t AliAnalysisAlien::SubmitNext()
2297 // Submit next bunch of master jobs if the queue is free.
2298 static Bool_t iscalled = kFALSE;
2299 static Int_t firstmaster = 0;
2300 static Int_t lastmaster = 0;
2301 static Int_t npermaster = 0;
2302 if (iscalled) return kTRUE;
2304 Int_t nrunning=0, nwaiting=0, nerror=0, ndone=0;
2305 Int_t ntosubmit = 0;
2308 if (!fNsubmitted) ntosubmit = 1;
2310 TString status = GetJobStatus(firstmaster, lastmaster, nrunning, nwaiting, nerror, ndone);
2311 printf("=== master %d: %s\n", lastmaster, status.Data());
2312 // If last master not split, just return
2313 if (status != "SPLIT") {iscalled = kFALSE; return kTRUE;}
2314 // No more than 100 waiting jobs
2315 if (nwaiting>100) {iscalled = kFALSE; return kTRUE;}
2316 npermaster = (nrunning+nwaiting+nerror+ndone)/fNsubmitted;
2317 if (npermaster) ntosubmit = (100-nwaiting)/npermaster;
2318 if (!ntosubmit) ntosubmit = 1;
2319 printf("=== WAITING(%d) RUNNING(%d) DONE(%d) OTHER(%d) NperMaster=%d => to submit %d jobs\n",
2320 nwaiting, nrunning, ndone, nerror, npermaster, ntosubmit);
2322 Int_t nmasterjobs = fInputFiles->GetEntries();
2323 for (Int_t i=0; i<ntosubmit; i++) {
2324 // Submit for a range of enumeration of runs.
2325 if (fNsubmitted>=nmasterjobs) {iscalled = kFALSE; return kTRUE;}
2327 TString runOutDir = gSystem->BaseName(fInputFiles->At(fNsubmitted)->GetName());
2328 runOutDir.ReplaceAll(".xml", "");
2330 query = Form("submit %s %s %s", fJDLName.Data(), fInputFiles->At(fNsubmitted)->GetName(), runOutDir.Data());
2332 query = Form("submit %s %s %03d", fJDLName.Data(), fInputFiles->At(fNsubmitted)->GetName(), fNsubmitted);
2333 printf("********* %s\n",query.Data());
2334 res = gGrid->Command(query);
2336 TString cjobId1 = res->GetKey(0,"jobId");
2337 if (!cjobId1.Length()) {
2341 Error("StartAnalysis", "Your JDL %s could not be submitted. The message was:", fJDLName.Data());
2344 Info("StartAnalysis", "\n_______________________________________________________________________ \
2345 \n##### Your JDL %s submitted (%d to go). \nTHE JOB ID IS: %s \
2346 \n_______________________________________________________________________",
2347 fJDLName.Data(), nmasterjobs-fNsubmitted-1, cjobId1.Data());
2350 lastmaster = cjobId1.Atoi();
2351 if (!firstmaster) firstmaster = lastmaster;
2356 Error("StartAnalysis", "No grid result after submission !!! Bailing out...");
2364 //______________________________________________________________________________
2365 void AliAnalysisAlien::WriteAnalysisFile()
2367 // Write current analysis manager into the file <analysisFile>
2368 TString analysisFile = fExecutable;
2369 analysisFile.ReplaceAll(".sh", ".root");
2370 if (!TestBit(AliAnalysisGrid::kSubmit)) {
2371 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
2372 if (!mgr || !mgr->IsInitialized()) {
2373 Error("WriteAnalysisFile", "You need an initialized analysis manager for this");
2376 // Check analysis type
2378 if (mgr->GetMCtruthEventHandler()) TObject::SetBit(AliAnalysisGrid::kUseMC);
2379 handler = (TObject*)mgr->GetInputEventHandler();
2381 if (handler->InheritsFrom("AliESDInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseESD);
2382 if (handler->InheritsFrom("AliAODInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseAOD);
2384 TDirectory *cdir = gDirectory;
2385 TFile *file = TFile::Open(analysisFile, "RECREATE");
2387 // Skip task Terminate calls for the grid job (but not in test mode, where we want to check also the terminate mode
2388 if (!TestBit(AliAnalysisGrid::kTest)) mgr->SetSkipTerminate(kTRUE);
2389 // Unless merging makes no sense
2390 if (IsSingleOutput()) mgr->SetSkipTerminate(kFALSE);
2393 // Enable termination for local jobs
2394 mgr->SetSkipTerminate(kFALSE);
2396 if (cdir) cdir->cd();
2397 Info("WriteAnalysisFile", "\n##### Analysis manager: %s wrote to file <%s>\n", mgr->GetName(),analysisFile.Data());
2399 Bool_t copy = kTRUE;
2400 if (TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
2403 TString workdir = gGrid->GetHomeDirectory();
2404 workdir += fGridWorkingDir;
2405 Info("CreateJDL", "\n##### Copying file <%s> containing your initialized analysis manager to your alien workspace", analysisFile.Data());
2406 if (FileExists(analysisFile)) gGrid->Rm(analysisFile);
2407 TFile::Cp(Form("file:%s",analysisFile.Data()), Form("alien://%s/%s", workdir.Data(),analysisFile.Data()));
2411 //______________________________________________________________________________
2412 void AliAnalysisAlien::WriteAnalysisMacro()
2414 // Write the analysis macro that will steer the analysis in grid mode.
2415 if (!TestBit(AliAnalysisGrid::kSubmit)) {
2417 out.open(fAnalysisMacro.Data(), ios::out);
2419 Error("WriteAnalysisMacro", "could not open file %s for writing", fAnalysisMacro.Data());
2422 Bool_t hasSTEERBase = kFALSE;
2423 Bool_t hasESD = kFALSE;
2424 Bool_t hasAOD = kFALSE;
2425 Bool_t hasANALYSIS = kFALSE;
2426 Bool_t hasANALYSISalice = kFALSE;
2427 Bool_t hasCORRFW = kFALSE;
2428 TString func = fAnalysisMacro;
2429 TString type = "ESD";
2430 TString comment = "// Analysis using ";
2431 if (TObject::TestBit(AliAnalysisGrid::kUseESD)) comment += "ESD";
2432 if (TObject::TestBit(AliAnalysisGrid::kUseAOD)) {
2436 if (type!="AOD" && fFriendChainName!="") {
2437 Error("WriteAnalysisMacro", "Friend chain can be attached only to AOD");
2440 if (TObject::TestBit(AliAnalysisGrid::kUseMC)) comment += "/MC";
2441 else comment += " data";
2442 out << "const char *anatype = \"" << type.Data() << "\";" << endl << endl;
2443 func.ReplaceAll(".C", "");
2444 out << "void " << func.Data() << "()" << endl;
2446 out << comment.Data() << endl;
2447 out << "// Automatically generated analysis steering macro executed in grid subjobs" << endl << endl;
2448 out << " TStopwatch timer;" << endl;
2449 out << " timer.Start();" << endl << endl;
2450 out << "// load base root libraries" << endl;
2451 out << " gSystem->Load(\"libTree\");" << endl;
2452 out << " gSystem->Load(\"libGeom\");" << endl;
2453 out << " gSystem->Load(\"libVMC\");" << endl;
2454 out << " gSystem->Load(\"libPhysics\");" << endl << endl;
2455 out << " gSystem->Load(\"libMinuit\");" << endl << endl;
2456 if (fAdditionalRootLibs.Length()) {
2457 // in principle libtree /lib geom libvmc etc. can go into this list, too
2458 out << "// Add aditional libraries" << endl;
2459 TObjArray *list = fAdditionalRootLibs.Tokenize(" ");
2462 while((str=(TObjString*)next())) {
2463 if (str->GetString().Contains(".so"))
2464 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
2466 if (list) delete list;
2468 out << "// include path" << endl;
2469 if (fIncludePath.Length()) out << " gSystem->AddIncludePath(\"" << fIncludePath.Data() << "\");" << endl;
2470 out << " gSystem->AddIncludePath(\"-I$ALICE_ROOT/include\");" << endl << endl;
2471 out << "// Load analysis framework libraries" << endl;
2472 TString setupPar = "AliAnalysisAlien::SetupPar";
2474 out << " gSystem->Load(\"libSTEERBase\");" << endl;
2475 out << " gSystem->Load(\"libESD\");" << endl;
2476 out << " gSystem->Load(\"libAOD\");" << endl;
2477 out << " gSystem->Load(\"libANALYSIS\");" << endl;
2478 out << " gSystem->Load(\"libANALYSISalice\");" << endl;
2479 out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
2481 TIter next(fPackages);
2484 while ((obj=next())) {
2485 pkgname = obj->GetName();
2486 if (pkgname == "STEERBase" ||
2487 pkgname == "STEERBase.par") hasSTEERBase = kTRUE;
2488 if (pkgname == "ESD" ||
2489 pkgname == "ESD.par") hasESD = kTRUE;
2490 if (pkgname == "AOD" ||
2491 pkgname == "AOD.par") hasAOD = kTRUE;
2492 if (pkgname == "ANALYSIS" ||
2493 pkgname == "ANALYSIS.par") hasANALYSIS = kTRUE;
2494 if (pkgname == "ANALYSISalice" ||
2495 pkgname == "ANALYSISalice.par") hasANALYSISalice = kTRUE;
2496 if (pkgname == "CORRFW" ||
2497 pkgname == "CORRFW.par") hasCORRFW = kTRUE;
2499 if (hasANALYSISalice) setupPar = "SetupPar";
2500 if (!hasSTEERBase) out << " gSystem->Load(\"libSTEERBase\");" << endl;
2501 else out << " if (!" << setupPar << "(\"STEERBase\")) return;" << endl;
2502 if (!hasESD) out << " gSystem->Load(\"libESD\");" << endl;
2503 else out << " if (!" << setupPar << "(\"ESD\")) return;" << endl;
2504 if (!hasAOD) out << " gSystem->Load(\"libAOD\");" << endl;
2505 else out << " if (!" << setupPar << "(\"AOD\")) return;" << endl;
2506 if (!hasANALYSIS) out << " gSystem->Load(\"libANALYSIS\");" << endl;
2507 else out << " if (!" << setupPar << "(\"ANALYSIS\")) return;" << endl;
2508 if (!hasANALYSISalice) out << " gSystem->Load(\"libANALYSISalice\");" << endl;
2509 else out << " if (!" << setupPar << "(\"ANALYSISalice\")) return;" << endl;
2510 if (!hasCORRFW) out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
2511 else out << " if (!" << setupPar << "(\"CORRFW\")) return;" << endl << endl;
2512 out << "// Compile other par packages" << endl;
2514 while ((obj=next())) {
2515 pkgname = obj->GetName();
2516 if (pkgname == "STEERBase" ||
2517 pkgname == "STEERBase.par" ||
2519 pkgname == "ESD.par" ||
2521 pkgname == "AOD.par" ||
2522 pkgname == "ANALYSIS" ||
2523 pkgname == "ANALYSIS.par" ||
2524 pkgname == "ANALYSISalice" ||
2525 pkgname == "ANALYSISalice.par" ||
2526 pkgname == "CORRFW" ||
2527 pkgname == "CORRFW.par") continue;
2528 out << " if (!" << setupPar << "(\"" << obj->GetName() << "\")) return;" << endl;
2531 if (fAdditionalLibs.Length()) {
2532 out << "// Add aditional AliRoot libraries" << endl;
2533 TObjArray *list = fAdditionalLibs.Tokenize(" ");
2536 while((str=(TObjString*)next())) {
2537 if (str->GetString().Contains(".so"))
2538 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
2539 if (str->GetString().Contains(".par"))
2540 out << " if (!" << setupPar << "(\"" << str->GetString() << "\")) return;" << endl;
2542 if (list) delete list;
2545 out << "// analysis source to be compiled at runtime (if any)" << endl;
2546 if (fAnalysisSource.Length()) {
2547 TObjArray *list = fAnalysisSource.Tokenize(" ");
2550 while((str=(TObjString*)next())) {
2551 out << " gROOT->ProcessLine(\".L " << str->GetString().Data() << "+g\");" << endl;
2553 if (list) delete list;
2556 if (fFastReadOption) {
2557 Warning("WriteAnalysisMacro", "!!! You requested FastRead option. Using xrootd flags to reduce timeouts in the grid jobs. This may skip some files that could be accessed !!! \
2558 \n+++ NOTE: To disable this option, use: plugin->SetFastReadOption(kFALSE)");
2559 out << "// fast xrootd reading enabled" << endl;
2560 out << " printf(\"!!! You requested FastRead option. Using xrootd flags to reduce timeouts. Note that this may skip some files that could be accessed !!!\");" << endl;
2561 out << " gEnv->SetValue(\"XNet.ConnectTimeout\",10);" << endl;
2562 out << " gEnv->SetValue(\"XNet.RequestTimeout\",10);" << endl;
2563 out << " gEnv->SetValue(\"XNet.MaxRedirectCount\",2);" << endl;
2564 out << " gEnv->SetValue(\"XNet.ReconnectTimeout\",10);" << endl;
2565 out << " gEnv->SetValue(\"XNet.FirstConnectMaxCnt\",1);" << endl << endl;
2567 // Change temp directory to current one
2568 out << "// Set temporary merging directory to current one" << endl;
2569 out << " gSystem->Setenv(\"TMPDIR\", gSystem->pwd());" << endl << endl;
2570 out << "// connect to AliEn and make the chain" << endl;
2571 out << " if (!TGrid::Connect(\"alien://\")) return;" << endl;
2572 if (IsUsingTags()) {
2573 out << " TChain *chain = CreateChainFromTags(\"wn.xml\", anatype);" << endl << endl;
2575 out << " TChain *chain = CreateChain(\"wn.xml\", anatype);" << endl << endl;
2577 out << "// read the analysis manager from file" << endl;
2578 TString analysisFile = fExecutable;
2579 analysisFile.ReplaceAll(".sh", ".root");
2580 out << " TFile *file = TFile::Open(\"" << analysisFile << "\");" << endl;
2581 out << " if (!file) return;" << endl;
2582 out << " TIter nextkey(file->GetListOfKeys());" << endl;
2583 out << " AliAnalysisManager *mgr = 0;" << endl;
2584 out << " TKey *key;" << endl;
2585 out << " while ((key=(TKey*)nextkey())) {" << endl;
2586 out << " if (!strcmp(key->GetClassName(), \"AliAnalysisManager\"))" << endl;
2587 out << " mgr = (AliAnalysisManager*)file->Get(key->GetName());" << endl;
2588 out << " };" << endl;
2589 out << " if (!mgr) {" << endl;
2590 out << " ::Error(\"" << func.Data() << "\", \"No analysis manager found in file " << analysisFile <<"\");" << endl;
2591 out << " return;" << endl;
2592 out << " }" << endl << endl;
2593 out << " mgr->PrintStatus();" << endl;
2594 if (AliAnalysisManager::GetAnalysisManager()) {
2595 if (AliAnalysisManager::GetAnalysisManager()->GetDebugLevel()>3) {
2596 out << " gEnv->SetValue(\"XNet.Debug\", \"1\");" << endl;
2598 out << " AliLog::SetGlobalLogLevel(AliLog::kError);" << endl;
2601 out << " mgr->StartAnalysis(\"localfile\", chain);" << endl;
2602 out << " timer.Stop();" << endl;
2603 out << " timer.Print();" << endl;
2604 out << "}" << endl << endl;
2605 if (IsUsingTags()) {
2606 out << "TChain* CreateChainFromTags(const char *xmlfile, const char *type=\"ESD\")" << endl;
2608 out << "// Create a chain using tags from the xml file." << endl;
2609 out << " TAlienCollection* coll = TAlienCollection::Open(xmlfile);" << endl;
2610 out << " if (!coll) {" << endl;
2611 out << " ::Error(\"CreateChainFromTags\", \"Cannot create an AliEn collection from %s\", xmlfile);" << endl;
2612 out << " return NULL;" << endl;
2613 out << " }" << endl;
2614 out << " TGridResult* tagResult = coll->GetGridResult(\"\",kFALSE,kFALSE);" << endl;
2615 out << " AliTagAnalysis *tagAna = new AliTagAnalysis(type);" << endl;
2616 out << " tagAna->ChainGridTags(tagResult);" << endl << endl;
2617 out << " AliRunTagCuts *runCuts = new AliRunTagCuts();" << endl;
2618 out << " AliLHCTagCuts *lhcCuts = new AliLHCTagCuts();" << endl;
2619 out << " AliDetectorTagCuts *detCuts = new AliDetectorTagCuts();" << endl;
2620 out << " AliEventTagCuts *evCuts = new AliEventTagCuts();" << endl;
2621 out << " // Check if the cuts configuration file was provided" << endl;
2622 out << " if (!gSystem->AccessPathName(\"ConfigureCuts.C\")) {" << endl;
2623 out << " gROOT->LoadMacro(\"ConfigureCuts.C\");" << endl;
2624 out << " ConfigureCuts(runCuts, lhcCuts, detCuts, evCuts);" << endl;
2625 out << " }" << endl;
2626 if (fFriendChainName=="") {
2627 out << " TChain *chain = tagAna->QueryTags(runCuts, lhcCuts, detCuts, evCuts);" << endl;
2629 out << " TString tmpColl=\"tmpCollection.xml\";" << endl;
2630 out << " tagAna->CreateXMLCollection(tmpColl.Data(),runCuts, lhcCuts, detCuts, evCuts);" << endl;
2631 out << " TChain *chain = CreateChain(tmpColl.Data(),type);" << endl;
2633 out << " if (!chain || !chain->GetNtrees()) return NULL;" << endl;
2634 out << " chain->ls();" << endl;
2635 out << " return chain;" << endl;
2636 out << "}" << endl << endl;
2637 if (gSystem->AccessPathName("ConfigureCuts.C")) {
2638 TString msg = "\n##### You may want to provide a macro ConfigureCuts.C with a method:\n";
2639 msg += " void ConfigureCuts(AliRunTagCuts *runCuts,\n";
2640 msg += " AliLHCTagCuts *lhcCuts,\n";
2641 msg += " AliDetectorTagCuts *detCuts,\n";
2642 msg += " AliEventTagCuts *evCuts)";
2643 Info("WriteAnalysisMacro", "%s", msg.Data());
2646 if (!IsUsingTags() || fFriendChainName!="") {
2647 out <<"//________________________________________________________________________________" << endl;
2648 out << "TChain* CreateChain(const char *xmlfile, const char *type=\"ESD\")" << endl;
2650 out << "// Create a chain using url's from xml file" << endl;
2651 out << " TString treename = type;" << endl;
2652 out << " treename.ToLower();" << endl;
2653 out << " treename += \"Tree\";" << endl;
2654 out << " printf(\"***************************************\\n\");" << endl;
2655 out << " printf(\" Getting chain of trees %s\\n\", treename.Data());" << endl;
2656 out << " printf(\"***************************************\\n\");" << endl;
2657 out << " TAlienCollection *coll = TAlienCollection::Open(xmlfile);" << endl;
2658 out << " if (!coll) {" << endl;
2659 out << " ::Error(\"CreateChain\", \"Cannot create an AliEn collection from %s\", xmlfile);" << endl;
2660 out << " return NULL;" << endl;
2661 out << " }" << endl;
2662 out << " TChain *chain = new TChain(treename);" << endl;
2663 if(fFriendChainName!="") {
2664 out << " TChain *chainFriend = new TChain(treename);" << endl;
2666 out << " coll->Reset();" << endl;
2667 out << " while (coll->Next()) {" << endl;
2668 out << " chain->Add(coll->GetTURL(\"\"));" << endl;
2669 if(fFriendChainName!="") {
2670 out << " TString fileFriend=coll->GetTURL(\"\");" << endl;
2671 out << " fileFriend.ReplaceAll(\"AliAOD.root\",\""<<fFriendChainName.Data()<<"\");" << endl;
2672 out << " fileFriend.ReplaceAll(\"AliAODs.root\",\""<<fFriendChainName.Data()<<"\");" << endl;
2673 out << " chainFriend->Add(fileFriend.Data());" << endl;
2675 out << " }" << endl;
2676 out << " if (!chain->GetNtrees()) {" << endl;
2677 out << " ::Error(\"CreateChain\", \"No tree found from collection %s\", xmlfile);" << endl;
2678 out << " return NULL;" << endl;
2679 out << " }" << endl;
2680 if(fFriendChainName!="") {
2681 out << " chain->AddFriend(chainFriend);" << endl;
2683 out << " return chain;" << endl;
2684 out << "}" << endl << endl;
2686 if (hasANALYSISalice) {
2687 out <<"//________________________________________________________________________________" << endl;
2688 out << "Bool_t SetupPar(const char *package) {" << endl;
2689 out << "// Compile the package and set it up." << endl;
2690 out << " TString pkgdir = package;" << endl;
2691 out << " pkgdir.ReplaceAll(\".par\",\"\");" << endl;
2692 out << " gSystem->Exec(Form(\"tar xvzf %s.par\", pkgdir.Data()));" << endl;
2693 out << " TString cdir = gSystem->WorkingDirectory();" << endl;
2694 out << " gSystem->ChangeDirectory(pkgdir);" << endl;
2695 out << " // Check for BUILD.sh and execute" << endl;
2696 out << " if (!gSystem->AccessPathName(\"PROOF-INF/BUILD.sh\")) {" << endl;
2697 out << " printf(\"*******************************\\n\");" << endl;
2698 out << " printf(\"*** Building PAR archive ***\\n\");" << endl;
2699 out << " printf(\"*******************************\\n\");" << endl;
2700 out << " if (gSystem->Exec(\"PROOF-INF/BUILD.sh\")) {" << endl;
2701 out << " ::Error(\"SetupPar\", \"Cannot build par archive %s\", pkgdir.Data());" << endl;
2702 out << " gSystem->ChangeDirectory(cdir);" << endl;
2703 out << " return kFALSE;" << endl;
2704 out << " }" << endl;
2705 out << " } else {" << endl;
2706 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/BUILD.sh for package %s\", pkgdir.Data());" << endl;
2707 out << " gSystem->ChangeDirectory(cdir);" << endl;
2708 out << " return kFALSE;" << endl;
2709 out << " }" << endl;
2710 out << " // Check for SETUP.C and execute" << endl;
2711 out << " if (!gSystem->AccessPathName(\"PROOF-INF/SETUP.C\")) {" << endl;
2712 out << " printf(\"*******************************\\n\");" << endl;
2713 out << " printf(\"*** Setup PAR archive ***\\n\");" << endl;
2714 out << " printf(\"*******************************\\n\");" << endl;
2715 out << " gROOT->Macro(\"PROOF-INF/SETUP.C\");" << endl;
2716 out << " } else {" << endl;
2717 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/SETUP.C for package %s\", pkgdir.Data());" << endl;
2718 out << " gSystem->ChangeDirectory(cdir);" << endl;
2719 out << " return kFALSE;" << endl;
2720 out << " }" << endl;
2721 out << " // Restore original workdir" << endl;
2722 out << " gSystem->ChangeDirectory(cdir);" << endl;
2723 out << " return kTRUE;" << endl;
2726 Info("WriteAnalysisMacro", "\n##### Analysis macro to run on worker nodes <%s> written",fAnalysisMacro.Data());
2728 Bool_t copy = kTRUE;
2729 if (TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
2732 TString workdir = gGrid->GetHomeDirectory();
2733 workdir += fGridWorkingDir;
2734 if (FileExists(fAnalysisMacro)) gGrid->Rm(fAnalysisMacro);
2735 if (IsUsingTags() && !gSystem->AccessPathName("ConfigureCuts.C")) {
2736 if (FileExists("ConfigureCuts.C")) gGrid->Rm("ConfigureCuts.C");
2737 Info("WriteAnalysisMacro", "\n##### Copying cuts configuration macro: <ConfigureCuts.C> to your alien workspace");
2738 TFile::Cp("file:ConfigureCuts.C", Form("alien://%s/ConfigureCuts.C", workdir.Data()));
2740 Info("WriteAnalysisMacro", "\n##### Copying analysis macro: <%s> to your alien workspace", fAnalysisMacro.Data());
2741 TFile::Cp(Form("file:%s",fAnalysisMacro.Data()), Form("alien://%s/%s", workdir.Data(), fAnalysisMacro.Data()));
2745 //______________________________________________________________________________
2746 void AliAnalysisAlien::WriteMergingMacro()
2748 // Write a macro to merge the outputs per master job.
2749 if (!fMergeViaJDL) return;
2750 if (!fOutputFiles.Length()) {
2751 Error("WriteMergingMacro", "No output file names defined. Are you running the right AliAnalysisAlien configuration ?");
2754 TString mergingMacro = fExecutable;
2755 mergingMacro.ReplaceAll(".sh","_merge.C");
2756 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("/%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
2757 if (!TestBit(AliAnalysisGrid::kSubmit)) {
2759 out.open(mergingMacro.Data(), ios::out);
2761 Error("WriteMergingMacro", "could not open file %s for writing", fAnalysisMacro.Data());
2764 Bool_t hasSTEERBase = kFALSE;
2765 Bool_t hasESD = kFALSE;
2766 Bool_t hasAOD = kFALSE;
2767 Bool_t hasANALYSIS = kFALSE;
2768 Bool_t hasANALYSISalice = kFALSE;
2769 Bool_t hasCORRFW = kFALSE;
2770 TString func = mergingMacro;
2772 func.ReplaceAll(".C", "");
2773 out << "void " << func.Data() << "(const char *dir, Int_t stage=0, Int_t ichunk=0)" << endl;
2775 out << "// Automatically generated merging macro executed in grid subjobs" << endl << endl;
2776 out << " TStopwatch timer;" << endl;
2777 out << " timer.Start();" << endl << endl;
2778 if (!fExecutableCommand.Contains("aliroot")) {
2779 out << "// load base root libraries" << endl;
2780 out << " gSystem->Load(\"libTree\");" << endl;
2781 out << " gSystem->Load(\"libGeom\");" << endl;
2782 out << " gSystem->Load(\"libVMC\");" << endl;
2783 out << " gSystem->Load(\"libPhysics\");" << endl << endl;
2784 out << " gSystem->Load(\"libMinuit\");" << endl << endl;
2786 if (fAdditionalRootLibs.Length()) {
2787 // in principle libtree /lib geom libvmc etc. can go into this list, too
2788 out << "// Add aditional libraries" << endl;
2789 TObjArray *list = fAdditionalRootLibs.Tokenize(" ");
2792 while((str=(TObjString*)next())) {
2793 if (str->GetString().Contains(".so"))
2794 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
2796 if (list) delete list;
2798 out << "// include path" << endl;
2799 if (fIncludePath.Length()) out << " gSystem->AddIncludePath(\"" << fIncludePath.Data() << "\");" << endl;
2800 out << " gSystem->AddIncludePath(\"-I$ALICE_ROOT/include\");" << endl << endl;
2801 out << "// Load analysis framework libraries" << endl;
2803 if (!fExecutableCommand.Contains("aliroot")) {
2804 out << " gSystem->Load(\"libSTEERBase\");" << endl;
2805 out << " gSystem->Load(\"libESD\");" << endl;
2806 out << " gSystem->Load(\"libAOD\");" << endl;
2808 out << " gSystem->Load(\"libANALYSIS\");" << endl;
2809 out << " gSystem->Load(\"libANALYSISalice\");" << endl;
2810 out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
2812 TIter next(fPackages);
2815 TString setupPar = "AliAnalysisAlien::SetupPar";
2816 while ((obj=next())) {
2817 pkgname = obj->GetName();
2818 if (pkgname == "STEERBase" ||
2819 pkgname == "STEERBase.par") hasSTEERBase = kTRUE;
2820 if (pkgname == "ESD" ||
2821 pkgname == "ESD.par") hasESD = kTRUE;
2822 if (pkgname == "AOD" ||
2823 pkgname == "AOD.par") hasAOD = kTRUE;
2824 if (pkgname == "ANALYSIS" ||
2825 pkgname == "ANALYSIS.par") hasANALYSIS = kTRUE;
2826 if (pkgname == "ANALYSISalice" ||
2827 pkgname == "ANALYSISalice.par") hasANALYSISalice = kTRUE;
2828 if (pkgname == "CORRFW" ||
2829 pkgname == "CORRFW.par") hasCORRFW = kTRUE;
2831 if (hasANALYSISalice) setupPar = "SetupPar";
2832 if (!hasSTEERBase) out << " gSystem->Load(\"libSTEERBase\");" << endl;
2833 else out << " if (!" << setupPar << "(\"STEERBase\")) return;" << endl;
2834 if (!hasESD) out << " gSystem->Load(\"libESD\");" << endl;
2835 else out << " if (!" << setupPar << "(\"ESD\")) return;" << endl;
2836 if (!hasAOD) out << " gSystem->Load(\"libAOD\");" << endl;
2837 else out << " if (!" << setupPar << "(\"AOD\")) return;" << endl;
2838 if (!hasANALYSIS) out << " gSystem->Load(\"libANALYSIS\");" << endl;
2839 else out << " if (!" << setupPar << "(\"ANALYSIS\")) return;" << endl;
2840 if (!hasANALYSISalice) out << " gSystem->Load(\"libANALYSISalice\");" << endl;
2841 else out << " if (!" << setupPar << "(\"ANALYSISalice\")) return;" << endl;
2842 if (!hasCORRFW) out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
2843 else out << " if (!" << setupPar << "(\"CORRFW\")) return;" << endl << endl;
2844 out << "// Compile other par packages" << endl;
2846 while ((obj=next())) {
2847 pkgname = obj->GetName();
2848 if (pkgname == "STEERBase" ||
2849 pkgname == "STEERBase.par" ||
2851 pkgname == "ESD.par" ||
2853 pkgname == "AOD.par" ||
2854 pkgname == "ANALYSIS" ||
2855 pkgname == "ANALYSIS.par" ||
2856 pkgname == "ANALYSISalice" ||
2857 pkgname == "ANALYSISalice.par" ||
2858 pkgname == "CORRFW" ||
2859 pkgname == "CORRFW.par") continue;
2860 out << " if (!" << setupPar << "(\"" << obj->GetName() << "\")) return;" << endl;
2863 if (fAdditionalLibs.Length()) {
2864 out << "// Add aditional AliRoot libraries" << endl;
2865 TObjArray *list = fAdditionalLibs.Tokenize(" ");
2868 while((str=(TObjString*)next())) {
2869 if (str->GetString().Contains(".so"))
2870 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
2872 if (list) delete list;
2875 out << "// Analysis source to be compiled at runtime (if any)" << endl;
2876 if (fAnalysisSource.Length()) {
2877 TObjArray *list = fAnalysisSource.Tokenize(" ");
2880 while((str=(TObjString*)next())) {
2881 out << " gROOT->ProcessLine(\".L " << str->GetString().Data() << "+g\");" << endl;
2883 if (list) delete list;
2887 if (fFastReadOption) {
2888 Warning("WriteMergingMacro", "!!! You requested FastRead option. Using xrootd flags to reduce timeouts in the grid merging jobs. Note that this may skip some files that could be accessed !!!");
2889 out << "// fast xrootd reading enabled" << endl;
2890 out << " printf(\"!!! You requested FastRead option. Using xrootd flags to reduce timeouts. Note that this may skip some files that could be accessed !!!\");" << endl;
2891 out << " gEnv->SetValue(\"XNet.ConnectTimeout\",10);" << endl;
2892 out << " gEnv->SetValue(\"XNet.RequestTimeout\",10);" << endl;
2893 out << " gEnv->SetValue(\"XNet.MaxRedirectCount\",2);" << endl;
2894 out << " gEnv->SetValue(\"XNet.ReconnectTimeout\",10);" << endl;
2895 out << " gEnv->SetValue(\"XNet.FirstConnectMaxCnt\",1);" << endl << endl;
2897 // Change temp directory to current one
2898 out << "// Set temporary merging directory to current one" << endl;
2899 out << " gSystem->Setenv(\"TMPDIR\", gSystem->pwd());" << endl << endl;
2900 out << "// Connect to AliEn" << endl;
2901 out << " if (!TGrid::Connect(\"alien://\")) return;" << endl;
2902 out << " Bool_t laststage = kFALSE;" << endl;
2903 out << " TString outputDir = dir;" << endl;
2904 out << " TString outputFiles = \"" << fOutputFiles << "\";" << endl;
2905 out << " TString mergeExcludes = \"" << fMergeExcludes << "\";" << endl;
2906 out << " mergeExcludes += \"" << AliAnalysisManager::GetAnalysisManager()->GetExtraFiles() << "\";" << endl;
2907 out << " TObjArray *list = outputFiles.Tokenize(\",\");" << endl;
2908 out << " TIter *iter = new TIter(list);" << endl;
2909 out << " TObjString *str;" << endl;
2910 out << " TString outputFile;" << endl;
2911 out << " Bool_t merged = kTRUE;" << endl;
2912 out << " while((str=(TObjString*)iter->Next())) {" << endl;
2913 out << " outputFile = str->GetString();" << endl;
2914 out << " if (outputFile.Contains(\"*\")) continue;" << endl;
2915 out << " Int_t index = outputFile.Index(\"@\");" << endl;
2916 out << " if (index > 0) outputFile.Remove(index);" << endl;
2917 out << " // Skip already merged outputs" << endl;
2918 out << " if (!gSystem->AccessPathName(outputFile)) {" << endl;
2919 out << " printf(\"Output file <%s> found. Not merging again.\",outputFile.Data());" << endl;
2920 out << " continue;" << endl;
2921 out << " }" << endl;
2922 out << " if (mergeExcludes.Contains(outputFile.Data())) continue;" << endl;
2923 out << " merged = AliAnalysisAlien::MergeOutput(outputFile, outputDir, " << fMaxMergeFiles << ", stage, ichunk);" << endl;
2924 out << " if (!merged) {" << endl;
2925 out << " printf(\"ERROR: Cannot merge %s\\n\", outputFile.Data());" << endl;
2926 out << " return;" << endl;
2927 out << " }" << endl;
2928 out << " // Check if this was the last stage. If yes, run terminate for the tasks." << endl;
2929 out << " if (!gSystem->AccessPathName(outputFile)) laststage = kTRUE;" << endl;
2930 out << " }" << endl;
2931 out << " // all outputs merged, validate" << endl;
2932 out << " ofstream out;" << endl;
2933 out << " out.open(\"outputs_valid\", ios::out);" << endl;
2934 out << " out.close();" << endl;
2935 out << " // read the analysis manager from file" << endl;
2936 TString analysisFile = fExecutable;
2937 analysisFile.ReplaceAll(".sh", ".root");
2938 out << " if (!laststage) return;" << endl;
2939 out << " TFile *file = TFile::Open(\"" << analysisFile << "\");" << endl;
2940 out << " if (!file) return;" << endl;
2941 out << " TIter nextkey(file->GetListOfKeys());" << endl;
2942 out << " AliAnalysisManager *mgr = 0;" << endl;
2943 out << " TKey *key;" << endl;
2944 out << " while ((key=(TKey*)nextkey())) {" << endl;
2945 out << " if (!strcmp(key->GetClassName(), \"AliAnalysisManager\"))" << endl;
2946 out << " mgr = (AliAnalysisManager*)file->Get(key->GetName());" << endl;
2947 out << " };" << endl;
2948 out << " if (!mgr) {" << endl;
2949 out << " ::Error(\"" << func.Data() << "\", \"No analysis manager found in file" << analysisFile <<"\");" << endl;
2950 out << " return;" << endl;
2951 out << " }" << endl << endl;
2952 out << " mgr->SetSkipTerminate(kFALSE);" << endl;
2953 out << " mgr->PrintStatus();" << endl;
2954 if (AliAnalysisManager::GetAnalysisManager()) {
2955 if (AliAnalysisManager::GetAnalysisManager()->GetDebugLevel()>3) {
2956 out << " gEnv->SetValue(\"XNet.Debug\", \"1\");" << endl;
2958 out << " AliLog::SetGlobalLogLevel(AliLog::kError);" << endl;
2961 out << " mgr->StartAnalysis(\"gridterminate\");" << endl;
2962 out << "}" << endl << endl;
2963 if (hasANALYSISalice) {
2964 out <<"//________________________________________________________________________________" << endl;
2965 out << "Bool_t SetupPar(const char *package) {" << endl;
2966 out << "// Compile the package and set it up." << endl;
2967 out << " TString pkgdir = package;" << endl;
2968 out << " pkgdir.ReplaceAll(\".par\",\"\");" << endl;
2969 out << " gSystem->Exec(Form(\"tar xvzf %s.par\", pkgdir.Data()));" << endl;
2970 out << " TString cdir = gSystem->WorkingDirectory();" << endl;
2971 out << " gSystem->ChangeDirectory(pkgdir);" << endl;
2972 out << " // Check for BUILD.sh and execute" << endl;
2973 out << " if (!gSystem->AccessPathName(\"PROOF-INF/BUILD.sh\")) {" << endl;
2974 out << " printf(\"*******************************\\n\");" << endl;
2975 out << " printf(\"*** Building PAR archive ***\\n\");" << endl;
2976 out << " printf(\"*******************************\\n\");" << endl;
2977 out << " if (gSystem->Exec(\"PROOF-INF/BUILD.sh\")) {" << endl;
2978 out << " ::Error(\"SetupPar\", \"Cannot build par archive %s\", pkgdir.Data());" << endl;
2979 out << " gSystem->ChangeDirectory(cdir);" << endl;
2980 out << " return kFALSE;" << endl;
2981 out << " }" << endl;
2982 out << " } else {" << endl;
2983 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/BUILD.sh for package %s\", pkgdir.Data());" << endl;
2984 out << " gSystem->ChangeDirectory(cdir);" << endl;
2985 out << " return kFALSE;" << endl;
2986 out << " }" << endl;
2987 out << " // Check for SETUP.C and execute" << endl;
2988 out << " if (!gSystem->AccessPathName(\"PROOF-INF/SETUP.C\")) {" << endl;
2989 out << " printf(\"*******************************\\n\");" << endl;
2990 out << " printf(\"*** Setup PAR archive ***\\n\");" << endl;
2991 out << " printf(\"*******************************\\n\");" << endl;
2992 out << " gROOT->Macro(\"PROOF-INF/SETUP.C\");" << endl;
2993 out << " } else {" << endl;
2994 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/SETUP.C for package %s\", pkgdir.Data());" << endl;
2995 out << " gSystem->ChangeDirectory(cdir);" << endl;
2996 out << " return kFALSE;" << endl;
2997 out << " }" << endl;
2998 out << " // Restore original workdir" << endl;
2999 out << " gSystem->ChangeDirectory(cdir);" << endl;
3000 out << " return kTRUE;" << endl;
3004 Bool_t copy = kTRUE;
3005 if (TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3008 TString workdir = gGrid->GetHomeDirectory();
3009 workdir += fGridWorkingDir;
3010 if (FileExists(mergingMacro)) gGrid->Rm(mergingMacro);
3011 Info("WriteMergingMacro", "\n##### Copying merging macro: <%s> to your alien workspace", mergingMacro.Data());
3012 TFile::Cp(Form("file:%s",mergingMacro.Data()), Form("alien://%s/%s", workdir.Data(), mergingMacro.Data()));
3016 //______________________________________________________________________________
3017 Bool_t AliAnalysisAlien::SetupPar(const char *package)
3019 // Compile the par file archive pointed by <package>. This must be present in the current directory.
3020 // Note that for loading the compiled library. The current directory should have precedence in
3022 TString pkgdir = package;
3023 pkgdir.ReplaceAll(".par","");
3024 gSystem->Exec(Form("tar xvzf %s.par", pkgdir.Data()));
3025 TString cdir = gSystem->WorkingDirectory();
3026 gSystem->ChangeDirectory(pkgdir);
3027 // Check for BUILD.sh and execute
3028 if (!gSystem->AccessPathName("PROOF-INF/BUILD.sh")) {
3029 printf("**************************************************\n");
3030 printf("*** Building PAR archive %s\n", package);
3031 printf("**************************************************\n");
3032 if (gSystem->Exec("PROOF-INF/BUILD.sh")) {
3033 ::Error("SetupPar", "Cannot build par archive %s", pkgdir.Data());
3034 gSystem->ChangeDirectory(cdir);
3038 ::Error("SetupPar","Cannot access PROOF-INF/BUILD.sh for package %s", pkgdir.Data());
3039 gSystem->ChangeDirectory(cdir);
3042 // Check for SETUP.C and execute
3043 if (!gSystem->AccessPathName("PROOF-INF/SETUP.C")) {
3044 printf("**************************************************\n");
3045 printf("*** Setup PAR archive %s\n", package);
3046 printf("**************************************************\n");
3047 gROOT->Macro("PROOF-INF/SETUP.C");
3048 printf("*** Loaded library: %s\n", gSystem->GetLibraries(pkgdir,"",kFALSE));
3050 ::Error("SetupPar","Cannot access PROOF-INF/SETUP.C for package %s", pkgdir.Data());
3051 gSystem->ChangeDirectory(cdir);
3054 // Restore original workdir
3055 gSystem->ChangeDirectory(cdir);
3059 //______________________________________________________________________________
3060 void AliAnalysisAlien::WriteExecutable()
3062 // Generate the alien executable script.
3063 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3065 out.open(fExecutable.Data(), ios::out);
3067 Error("WriteExecutable", "Bad file name for executable: %s", fExecutable.Data());
3070 out << "#!/bin/bash" << endl;
3071 out << "echo \"=========================================\"" << endl;
3072 out << "echo \"############## PATH : ##############\"" << endl;
3073 out << "echo $PATH" << endl;
3074 out << "echo \"############## LD_LIBRARY_PATH : ##############\"" << endl;
3075 out << "echo $LD_LIBRARY_PATH" << endl;
3076 out << "echo \"############## ROOTSYS : ##############\"" << endl;
3077 out << "echo $ROOTSYS" << endl;
3078 out << "echo \"############## which root : ##############\"" << endl;
3079 out << "which root" << endl;
3080 out << "echo \"############## ALICE_ROOT : ##############\"" << endl;
3081 out << "echo $ALICE_ROOT" << endl;
3082 out << "echo \"############## which aliroot : ##############\"" << endl;
3083 out << "which aliroot" << endl;
3084 out << "echo \"############## system limits : ##############\"" << endl;
3085 out << "ulimit -a" << endl;
3086 out << "echo \"############## memory : ##############\"" << endl;
3087 out << "free -m" << endl;
3088 out << "echo \"=========================================\"" << endl << endl;
3089 // Make sure we can properly compile par files
3090 if (TObject::TestBit(AliAnalysisGrid::kUsePars)) out << "export LD_LIBRARY_PATH=.:$LD_LIBRARY_PATH" << endl;
3091 out << fExecutableCommand << " ";
3092 out << fAnalysisMacro.Data() << " " << fExecutableArgs.Data() << endl << endl;
3093 out << "echo \"======== " << fAnalysisMacro.Data() << " finished with exit code: $? ========\"" << endl;
3094 out << "echo \"############## memory after: ##############\"" << endl;
3095 out << "free -m" << endl;
3097 Bool_t copy = kTRUE;
3098 if (TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3101 TString workdir = gGrid->GetHomeDirectory();
3102 TString bindir = Form("%s/bin", workdir.Data());
3103 if (!DirectoryExists(bindir)) gGrid->Mkdir(bindir,"-p");
3104 workdir += fGridWorkingDir;
3105 TString executable = Form("%s/bin/%s", gGrid->GetHomeDirectory(), fExecutable.Data());
3106 if (FileExists(executable)) gGrid->Rm(executable);
3107 Info("CreateJDL", "\n##### Copying executable file <%s> to your AliEn bin directory", fExecutable.Data());
3108 TFile::Cp(Form("file:%s",fExecutable.Data()), Form("alien://%s", executable.Data()));
3112 //______________________________________________________________________________
3113 void AliAnalysisAlien::WriteMergeExecutable()
3115 // Generate the alien executable script for the merging job.
3116 if (!fMergeViaJDL) return;
3117 TString mergeExec = fExecutable;
3118 mergeExec.ReplaceAll(".sh", "_merge.sh");
3119 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3121 out.open(mergeExec.Data(), ios::out);
3123 Error("WriteMergingExecutable", "Bad file name for executable: %s", mergeExec.Data());
3126 out << "#!/bin/bash" << endl;
3127 out << "echo \"=========================================\"" << endl;
3128 out << "echo \"############## PATH : ##############\"" << endl;
3129 out << "echo $PATH" << endl;
3130 out << "echo \"############## LD_LIBRARY_PATH : ##############\"" << endl;
3131 out << "echo $LD_LIBRARY_PATH" << endl;
3132 out << "echo \"############## ROOTSYS : ##############\"" << endl;
3133 out << "echo $ROOTSYS" << endl;
3134 out << "echo \"############## which root : ##############\"" << endl;
3135 out << "which root" << endl;
3136 out << "echo \"############## ALICE_ROOT : ##############\"" << endl;
3137 out << "echo $ALICE_ROOT" << endl;
3138 out << "echo \"############## which aliroot : ##############\"" << endl;
3139 out << "which aliroot" << endl;
3140 out << "echo \"############## system limits : ##############\"" << endl;
3141 out << "ulimit -a" << endl;
3142 out << "echo \"############## memory : ##############\"" << endl;
3143 out << "free -m" << endl;
3144 out << "echo \"=========================================\"" << endl << endl;
3145 // Make sure we can properly compile par files
3146 if (TObject::TestBit(AliAnalysisGrid::kUsePars)) out << "export LD_LIBRARY_PATH=.:$LD_LIBRARY_PATH" << endl;
3147 TString mergeMacro = fExecutable;
3148 mergeMacro.ReplaceAll(".sh", "_merge.C");
3149 out << "export ARG=\"" << mergeMacro << "(\\\"$1\\\",$2,$3)\"" << endl;
3150 out << fExecutableCommand << " " << "$ARG" << endl;
3151 out << "echo \"======== " << mergeMacro.Data() << " finished with exit code: $? ========\"" << endl;
3152 out << "echo \"############## memory after: ##############\"" << endl;
3153 out << "free -m" << endl;
3155 Bool_t copy = kTRUE;
3156 if (TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3159 TString workdir = gGrid->GetHomeDirectory();
3160 TString bindir = Form("%s/bin", workdir.Data());
3161 if (!DirectoryExists(bindir)) gGrid->Mkdir(bindir,"-p");
3162 workdir += fGridWorkingDir;
3163 TString executable = Form("%s/bin/%s", gGrid->GetHomeDirectory(), mergeExec.Data());
3164 if (FileExists(executable)) gGrid->Rm(executable);
3165 Info("CreateJDL", "\n##### Copying executable file <%s> to your AliEn bin directory", mergeExec.Data());
3166 TFile::Cp(Form("file:%s",mergeExec.Data()), Form("alien://%s", executable.Data()));
3170 //______________________________________________________________________________
3171 void AliAnalysisAlien::WriteProductionFile(const char *filename) const
3173 // Write the production file to be submitted by LPM manager. The format is:
3174 // First line: full_path_to_jdl estimated_no_subjobs_per_master
3175 // Next lines: full_path_to_dataset XXX (XXX is a string)
3176 // To submit, one has to: submit jdl XXX for all lines
3178 out.open(filename, ios::out);
3180 Error("WriteProductionFile", "Bad file name: %s", filename);
3183 TString workdir = gGrid->GetHomeDirectory();
3184 workdir += fGridWorkingDir;
3185 Int_t njobspermaster = 1000*fNrunsPerMaster/fSplitMaxInputFileNumber;
3186 TString locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
3187 out << locjdl << " " << njobspermaster << endl;
3188 Int_t nmasterjobs = fInputFiles->GetEntries();
3189 for (Int_t i=0; i<nmasterjobs; i++) {
3190 TString runOutDir = gSystem->BaseName(fInputFiles->At(i)->GetName());
3191 runOutDir.ReplaceAll(".xml", "");
3193 out << Form("%s", fInputFiles->At(i)->GetName()) << " " << runOutDir << endl;
3195 out << Form("%s", fInputFiles->At(i)->GetName()) << " " << Form("%03d", i) << endl;
3197 Info("WriteProductionFile", "\n##### Copying production file <%s> to your work directory", filename);
3198 if (FileExists(filename)) gGrid->Rm(filename);
3199 TFile::Cp(Form("file:%s",filename), Form("alien://%s/%s", workdir.Data(),filename));
3202 //______________________________________________________________________________
3203 void AliAnalysisAlien::WriteValidationScript(Bool_t merge)
3205 // Generate the alien validation script.
3206 // Generate the validation script
3208 TString validationScript = fExecutable;
3209 if (merge) validationScript.ReplaceAll(".sh", "_mergevalidation.sh");
3210 else validationScript.ReplaceAll(".sh", "_validation.sh");
3212 Error("WriteValidationScript", "Alien connection required");
3215 TString outStream = "";
3216 if (!TestBit(AliAnalysisGrid::kTest)) outStream = " >> stdout";
3217 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3219 out.open(validationScript, ios::out);
3220 out << "#!/bin/bash" << endl;
3221 out << "##################################################" << endl;
3222 out << "validateout=`dirname $0`" << endl;
3223 out << "validatetime=`date`" << endl;
3224 out << "validated=\"0\";" << endl;
3225 out << "error=0" << endl;
3226 out << "if [ -z $validateout ]" << endl;
3227 out << "then" << endl;
3228 out << " validateout=\".\"" << endl;
3229 out << "fi" << endl << endl;
3230 out << "cd $validateout;" << endl;
3231 out << "validateworkdir=`pwd`;" << endl << endl;
3232 out << "echo \"*******************************************************\"" << outStream << endl;
3233 out << "echo \"* Automatically generated validation script *\"" << outStream << endl;
3235 out << "echo \"* Time: $validatetime \"" << outStream << endl;
3236 out << "echo \"* Dir: $validateout\"" << outStream << endl;
3237 out << "echo \"* Workdir: $validateworkdir\"" << outStream << endl;
3238 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl;
3239 out << "ls -la ./" << outStream << endl;
3240 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl << endl;
3241 out << "##################################################" << endl;
3244 out << "if [ ! -f stderr ] ; then" << endl;
3245 out << " error=1" << endl;
3246 out << " echo \"* ########## Job not validated - no stderr ###\" " << outStream << endl;
3247 out << " echo \"Error = $error\" " << outStream << endl;
3248 out << "fi" << endl;
3250 out << "parArch=`grep -Ei \"Cannot Build the PAR Archive\" stderr`" << endl;
3251 out << "segViol=`grep -Ei \"Segmentation violation\" stderr`" << endl;
3252 out << "segFault=`grep -Ei \"Segmentation fault\" stderr`" << endl;
3253 out << "glibcErr=`grep -Ei \"*** glibc detected ***\" stderr`" << endl;
3256 out << "if [ \"$parArch\" != \"\" ] ; then" << endl;
3257 out << " error=1" << endl;
3258 out << " echo \"* ########## Job not validated - PAR archive not built ###\" " << outStream << endl;
3259 out << " echo \"$parArch\" " << outStream << endl;
3260 out << " echo \"Error = $error\" " << outStream << endl;
3261 out << "fi" << endl;
3263 out << "if [ \"$segViol\" != \"\" ] ; then" << endl;
3264 out << " error=1" << endl;
3265 out << " echo \"* ########## Job not validated - Segment. violation ###\" " << outStream << endl;
3266 out << " echo \"$segViol\" " << outStream << endl;
3267 out << " echo \"Error = $error\" " << outStream << endl;
3268 out << "fi" << endl;
3270 out << "if [ \"$segFault\" != \"\" ] ; then" << endl;
3271 out << " error=1" << endl;
3272 out << " echo \"* ########## Job not validated - Segment. fault ###\" " << outStream << endl;
3273 out << " echo \"$segFault\" " << outStream << endl;
3274 out << " echo \"Error = $error\" " << outStream << endl;
3275 out << "fi" << endl;
3277 out << "if [ \"$glibcErr\" != \"\" ] ; then" << endl;
3278 out << " error=1" << endl;
3279 out << " echo \"* ########## Job not validated - *** glibc detected *** ###\" " << outStream << endl;
3280 out << " echo \"$glibcErr\" " << outStream << endl;
3281 out << " echo \"Error = $error\" " << outStream << endl;
3282 out << "fi" << endl;
3284 // Part dedicated to the specific analyses running into the train
3286 TObjArray *arr = fOutputFiles.Tokenize(",");
3289 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
3290 TString extra = mgr->GetExtraFiles();
3291 while ((os=(TObjString*)next1())) {
3293 outputFile = os->GetString();
3294 Int_t index = outputFile.Index("@");
3295 if (index > 0) outputFile.Remove(index);
3296 if (merge && fMergeExcludes.Contains(outputFile)) continue;
3297 if (extra.Contains(outputFile)) continue;
3298 if (outputFile.Contains("*")) continue;
3299 out << "if ! [ -f " << outputFile.Data() << " ] ; then" << endl;
3300 out << " error=1" << endl;
3301 out << " echo \"Output file " << outputFile << " not found. Job FAILED !\"" << outStream << endl;
3302 out << " echo \"Output file " << outputFile << " not found. Job FAILED !\" >> stderr" << endl;
3303 out << "fi" << endl;
3306 out << "if ! [ -f outputs_valid ] ; then" << endl;
3307 out << " error=1" << endl;
3308 out << " echo \"Output files were not validated by the analysis manager\" >> stdout" << endl;
3309 out << " echo \"Output files were not validated by the analysis manager\" >> stderr" << endl;
3310 out << "fi" << endl;
3312 out << "if [ $error = 0 ] ; then" << endl;
3313 out << " echo \"* ---------------- Job Validated ------------------*\"" << outStream << endl;
3314 if (!IsKeepLogs()) {
3315 out << " echo \"* === Logs std* will be deleted === \"" << endl;
3317 out << " rm -f std*" << endl;
3319 out << "fi" << endl;
3321 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl;
3322 out << "echo \"*******************************************************\"" << outStream << endl;
3323 out << "cd -" << endl;
3324 out << "exit $error" << endl;
3326 Bool_t copy = kTRUE;
3327 if (TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3330 TString workdir = gGrid->GetHomeDirectory();
3331 workdir += fGridWorkingDir;
3332 Info("CreateJDL", "\n##### Copying validation script <%s> to your AliEn working space", validationScript.Data());
3333 if (FileExists(validationScript)) gGrid->Rm(validationScript);
3334 TFile::Cp(Form("file:%s",validationScript.Data()), Form("alien://%s/%s", workdir.Data(),validationScript.Data()));