1 /**************************************************************************
2 * Copyright(c) 1998-2007, ALICE Experiment at CERN, All rights reserved. *
4 * Author: The ALICE Off-line Project. *
5 * Contributors are mentioned in the code where appropriate. *
7 * Permission to use, copy, modify and distribute this software and its *
8 * documentation strictly for non-commercial purposes is hereby granted *
9 * without fee, provided that the above copyright notice appears in all *
10 * copies and that both the copyright notice and this permission notice *
11 * appear in the supporting documentation. The authors make no claims *
12 * about the suitability of this software for any purpose. It is *
13 * provided "as is" without express or implied warranty. *
14 **************************************************************************/
16 // Author: Mihaela Gheata, 01/09/2008
18 //==============================================================================
19 // AliAnalysisAlien - AliEn utility class. Provides interface for creating
20 // a personalized JDL, finding and creating a dataset.
21 //==============================================================================
23 #include "Riostream.h"
30 #include "TFileCollection.h"
32 #include "TObjString.h"
33 #include "TObjArray.h"
35 #include "TGridResult.h"
36 #include "TGridCollection.h"
38 #include "TGridJobStatusList.h"
39 #include "TGridJobStatus.h"
40 #include "TFileMerger.h"
41 #include "AliAnalysisManager.h"
42 #include "AliVEventHandler.h"
43 #include "AliAnalysisDataContainer.h"
44 #include "AliAnalysisAlien.h"
46 ClassImp(AliAnalysisAlien)
48 //______________________________________________________________________________
49 AliAnalysisAlien::AliAnalysisAlien()
55 fSplitMaxInputFileNumber(0),
57 fMasterResubmitThreshold(0),
69 fNproofWorkersPerSlave(0),
79 fAdditionalRootLibs(),
107 fRootVersionForProof(),
116 //______________________________________________________________________________
117 AliAnalysisAlien::AliAnalysisAlien(const char *name)
118 :AliAnalysisGrid(name),
123 fSplitMaxInputFileNumber(0),
125 fMasterResubmitThreshold(0),
137 fNproofWorkersPerSlave(0),
141 fExecutableCommand(),
147 fAdditionalRootLibs(),
175 fRootVersionForProof(),
184 //______________________________________________________________________________
185 AliAnalysisAlien::AliAnalysisAlien(const AliAnalysisAlien& other)
186 :AliAnalysisGrid(other),
189 fPrice(other.fPrice),
191 fSplitMaxInputFileNumber(other.fSplitMaxInputFileNumber),
192 fMaxInitFailed(other.fMaxInitFailed),
193 fMasterResubmitThreshold(other.fMasterResubmitThreshold),
194 fNtestFiles(other.fNtestFiles),
195 fNrunsPerMaster(other.fNrunsPerMaster),
196 fMaxMergeFiles(other.fMaxMergeFiles),
197 fNsubmitted(other.fNsubmitted),
198 fProductionMode(other.fProductionMode),
199 fOutputToRunNo(other.fOutputToRunNo),
200 fMergeViaJDL(other.fMergeViaJDL),
201 fFastReadOption(other.fFastReadOption),
202 fOverwriteMode(other.fOverwriteMode),
203 fNreplicas(other.fNreplicas),
204 fNproofWorkers(other.fNproofWorkers),
205 fNproofWorkersPerSlave(other.fNproofWorkersPerSlave),
206 fProofReset(other.fProofReset),
207 fRunNumbers(other.fRunNumbers),
208 fExecutable(other.fExecutable),
209 fExecutableCommand(other.fExecutableCommand),
210 fArguments(other.fArguments),
211 fExecutableArgs(other.fExecutableArgs),
212 fAnalysisMacro(other.fAnalysisMacro),
213 fAnalysisSource(other.fAnalysisSource),
214 fValidationScript(other.fValidationScript),
215 fAdditionalRootLibs(other.fAdditionalRootLibs),
216 fAdditionalLibs(other.fAdditionalLibs),
217 fSplitMode(other.fSplitMode),
218 fAPIVersion(other.fAPIVersion),
219 fROOTVersion(other.fROOTVersion),
220 fAliROOTVersion(other.fAliROOTVersion),
221 fExternalPackages(other.fExternalPackages),
223 fGridWorkingDir(other.fGridWorkingDir),
224 fGridDataDir(other.fGridDataDir),
225 fDataPattern(other.fDataPattern),
226 fGridOutputDir(other.fGridOutputDir),
227 fOutputArchive(other.fOutputArchive),
228 fOutputFiles(other.fOutputFiles),
229 fInputFormat(other.fInputFormat),
230 fDatasetName(other.fDatasetName),
231 fJDLName(other.fJDLName),
232 fTerminateFiles(other.fTerminateFiles),
233 fMergeExcludes(other.fMergeExcludes),
234 fIncludePath(other.fIncludePath),
235 fCloseSE(other.fCloseSE),
236 fFriendChainName(other.fFriendChainName),
237 fJobTag(other.fJobTag),
238 fOutputSingle(other.fOutputSingle),
239 fRunPrefix(other.fRunPrefix),
240 fProofCluster(other.fProofCluster),
241 fProofDataSet(other.fProofDataSet),
242 fFileForTestMode(other.fFileForTestMode),
243 fRootVersionForProof(other.fRootVersionForProof),
244 fAliRootMode(other.fAliRootMode),
249 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
250 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
251 fRunRange[0] = other.fRunRange[0];
252 fRunRange[1] = other.fRunRange[1];
253 if (other.fInputFiles) {
254 fInputFiles = new TObjArray();
255 TIter next(other.fInputFiles);
257 while ((obj=next())) fInputFiles->Add(new TObjString(obj->GetName()));
258 fInputFiles->SetOwner();
260 if (other.fPackages) {
261 fPackages = new TObjArray();
262 TIter next(other.fPackages);
264 while ((obj=next())) fPackages->Add(new TObjString(obj->GetName()));
265 fPackages->SetOwner();
269 //______________________________________________________________________________
270 AliAnalysisAlien::~AliAnalysisAlien()
273 if (fGridJDL) delete fGridJDL;
274 if (fMergingJDL) delete fMergingJDL;
275 if (fInputFiles) delete fInputFiles;
276 if (fPackages) delete fPackages;
279 //______________________________________________________________________________
280 AliAnalysisAlien &AliAnalysisAlien::operator=(const AliAnalysisAlien& other)
283 if (this != &other) {
284 AliAnalysisGrid::operator=(other);
285 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
286 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
287 fPrice = other.fPrice;
289 fSplitMaxInputFileNumber = other.fSplitMaxInputFileNumber;
290 fMaxInitFailed = other.fMaxInitFailed;
291 fMasterResubmitThreshold = other.fMasterResubmitThreshold;
292 fNtestFiles = other.fNtestFiles;
293 fNrunsPerMaster = other.fNrunsPerMaster;
294 fMaxMergeFiles = other.fMaxMergeFiles;
295 fNsubmitted = other.fNsubmitted;
296 fProductionMode = other.fProductionMode;
297 fOutputToRunNo = other.fOutputToRunNo;
298 fMergeViaJDL = other.fMergeViaJDL;
299 fFastReadOption = other.fFastReadOption;
300 fOverwriteMode = other.fOverwriteMode;
301 fNreplicas = other.fNreplicas;
302 fNproofWorkers = other.fNproofWorkers;
303 fNproofWorkersPerSlave = other.fNproofWorkersPerSlave;
304 fProofReset = other.fProofReset;
305 fRunNumbers = other.fRunNumbers;
306 fExecutable = other.fExecutable;
307 fExecutableCommand = other.fExecutableCommand;
308 fArguments = other.fArguments;
309 fExecutableArgs = other.fExecutableArgs;
310 fAnalysisMacro = other.fAnalysisMacro;
311 fAnalysisSource = other.fAnalysisSource;
312 fValidationScript = other.fValidationScript;
313 fAdditionalRootLibs = other.fAdditionalRootLibs;
314 fAdditionalLibs = other.fAdditionalLibs;
315 fSplitMode = other.fSplitMode;
316 fAPIVersion = other.fAPIVersion;
317 fROOTVersion = other.fROOTVersion;
318 fAliROOTVersion = other.fAliROOTVersion;
319 fExternalPackages = other.fExternalPackages;
321 fGridWorkingDir = other.fGridWorkingDir;
322 fGridDataDir = other.fGridDataDir;
323 fDataPattern = other.fDataPattern;
324 fGridOutputDir = other.fGridOutputDir;
325 fOutputArchive = other.fOutputArchive;
326 fOutputFiles = other.fOutputFiles;
327 fInputFormat = other.fInputFormat;
328 fDatasetName = other.fDatasetName;
329 fJDLName = other.fJDLName;
330 fTerminateFiles = other.fTerminateFiles;
331 fMergeExcludes = other.fMergeExcludes;
332 fIncludePath = other.fIncludePath;
333 fCloseSE = other.fCloseSE;
334 fFriendChainName = other.fFriendChainName;
335 fJobTag = other.fJobTag;
336 fOutputSingle = other.fOutputSingle;
337 fRunPrefix = other.fRunPrefix;
338 fProofCluster = other.fProofCluster;
339 fProofDataSet = other.fProofDataSet;
340 fFileForTestMode = other.fFileForTestMode;
341 fRootVersionForProof = other.fRootVersionForProof;
342 fAliRootMode = other.fAliRootMode;
343 if (other.fInputFiles) {
344 fInputFiles = new TObjArray();
345 TIter next(other.fInputFiles);
347 while ((obj=next())) fInputFiles->Add(new TObjString(obj->GetName()));
348 fInputFiles->SetOwner();
350 if (other.fPackages) {
351 fPackages = new TObjArray();
352 TIter next(other.fPackages);
354 while ((obj=next())) fPackages->Add(new TObjString(obj->GetName()));
355 fPackages->SetOwner();
361 //______________________________________________________________________________
362 void AliAnalysisAlien::AddIncludePath(const char *path)
364 // Add include path in the remote analysis macro.
366 if (p.Contains("-I")) fIncludePath += Form("%s ", path);
367 else fIncludePath += Form("-I%s ", path);
370 //______________________________________________________________________________
371 void AliAnalysisAlien::AddRunNumber(Int_t run)
373 // Add a run number to the list of runs to be processed.
374 if (fRunNumbers.Length()) fRunNumbers += " ";
375 fRunNumbers += Form("%s%d", fRunPrefix.Data(), run);
378 //______________________________________________________________________________
379 void AliAnalysisAlien::AddRunNumber(const char* run)
381 // Add a run number to the list of runs to be processed.
382 if (fRunNumbers.Length()) fRunNumbers += " ";
386 //______________________________________________________________________________
387 void AliAnalysisAlien::AddDataFile(const char *lfn)
389 // Adds a data file to the input to be analysed. The file should be a valid LFN
390 // or point to an existing file in the alien workdir.
391 if (!fInputFiles) fInputFiles = new TObjArray();
392 fInputFiles->Add(new TObjString(lfn));
395 //______________________________________________________________________________
396 void AliAnalysisAlien::AddExternalPackage(const char *package)
398 // Adds external packages w.r.t to the default ones (root,aliroot and gapi)
399 if (fExternalPackages) fExternalPackages += " ";
400 fExternalPackages += package;
403 //______________________________________________________________________________
404 Bool_t AliAnalysisAlien::Connect()
406 // Try to connect to AliEn. User needs a valid token and /tmp/gclient_env_$UID sourced.
407 if (gGrid && gGrid->IsConnected()) return kTRUE;
408 if (fProductionMode) return kTRUE;
410 Info("Connect", "Trying to connect to AliEn ...");
411 TGrid::Connect("alien://");
413 if (!gGrid || !gGrid->IsConnected()) {
414 Error("Connect", "Did not managed to connect to AliEn. Make sure you have a valid token.");
417 fUser = gGrid->GetUser();
418 Info("Connect", "\n##### Connected to AliEn as user %s. Setting analysis user to <%s>", fUser.Data(), fUser.Data());
422 //______________________________________________________________________________
423 void AliAnalysisAlien::CdWork()
425 // Check validity of alien workspace. Create directory if possible.
427 Error("CdWork", "Alien connection required");
430 TString homedir = gGrid->GetHomeDirectory();
431 TString workdir = homedir + fGridWorkingDir;
432 if (DirectoryExists(workdir)) {
436 // Work directory not existing - create it
438 if (gGrid->Mkdir(workdir, "-p")) {
439 gGrid->Cd(fGridWorkingDir);
440 Info("CdWork", "\n##### Created alien working directory %s", fGridWorkingDir.Data());
442 Warning("CdWork", "Working directory %s cannot be created.\n Using %s instead.",
443 workdir.Data(), homedir.Data());
444 fGridWorkingDir = "";
448 //______________________________________________________________________________
449 Bool_t AliAnalysisAlien::CheckFileCopy(const char *alienpath)
451 // Check if file copying is possible.
452 if (fProductionMode) return kTRUE;
454 Error("CheckFileCopy", "Not connected to AliEn. File copying cannot be tested.");
457 Info("CheckFileCopy", "Checking possibility to copy files to your AliEn home directory... \
458 \n +++ NOTE: You can disable this via: plugin->SetCheckCopy(kFALSE);");
459 // Check if alien_CLOSE_SE is defined
460 TString closeSE = gSystem->Getenv("alien_CLOSE_SE");
461 if (!closeSE.IsNull()) {
462 Info("CheckFileCopy", "Your current close storage is pointing to: \
463 \n alien_CLOSE_SE = \"%s\"", closeSE.Data());
465 Warning("CheckFileCopy", "Your current close storage is empty ! Depending on your location, file copying may fail.");
467 // Check if grid directory exists.
468 if (!DirectoryExists(alienpath)) {
469 Error("CheckFileCopy", "Alien path %s does not seem to exist", alienpath);
472 TFile f("plugin_test_copy", "RECREATE");
473 // User may not have write permissions to current directory
475 Error("CheckFileCopy", "Cannot create local test file. Do you have write access to current directory: <%s> ?",
476 gSystem->WorkingDirectory());
480 if (FileExists(Form("alien://%s/%s",alienpath, f.GetName()))) gGrid->Rm(Form("alien://%s/%s",alienpath, f.GetName()));
481 if (!TFile::Cp(f.GetName(), Form("alien://%s/%s",alienpath, f.GetName()))) {
482 Error("CheckFileCopy", "Cannot copy files to Alien destination: <%s> This may be temporary, or: \
483 \n# 1. Make sure you have write permissions there. If this is the case: \
484 \n# 2. Check the storage availability at: http://alimonitor.cern.ch/stats?page=SE/table \
485 \n# Do: export alien_CLOSE_SE=\"working_disk_SE\" \
486 \n# To make this permanent put in in your .bashrc (in .alienshrc is not enough) \
487 \n# Redo token: rm /tmp/x509up_u$UID then: alien-token-init <username>", alienpath);
488 gSystem->Unlink(f.GetName());
491 gSystem->Unlink(f.GetName());
492 gGrid->Rm(Form("%s%s",alienpath,f.GetName()));
493 Info("CheckFileCopy", "### ...SUCCESS ###");
497 //______________________________________________________________________________
498 Bool_t AliAnalysisAlien::CheckInputData()
500 // Check validity of input data. If necessary, create xml files.
501 if (fProductionMode) return kTRUE;
502 if (!fInputFiles && !fRunNumbers.Length() && !fRunRange[0]) {
503 if (!fGridDataDir.Length()) {
504 Error("CkeckInputData", "AliEn path to base data directory must be set.\n = Use: SetGridDataDir()");
507 Info("CheckInputData", "Analysis will make a single xml for base data directory %s",fGridDataDir.Data());
508 if (fDataPattern.Contains("tag") && TestBit(AliAnalysisGrid::kTest))
509 TObject::SetBit(AliAnalysisGrid::kUseTags, kTRUE); // ADDED (fix problem in determining the tag usage in test mode)
512 // Process declared files
513 Bool_t isCollection = kFALSE;
514 Bool_t isXml = kFALSE;
515 Bool_t useTags = kFALSE;
516 Bool_t checked = kFALSE;
517 if (!TestBit(AliAnalysisGrid::kTest)) CdWork();
519 TString workdir = gGrid->GetHomeDirectory();
520 workdir += fGridWorkingDir;
523 TIter next(fInputFiles);
524 while ((objstr=(TObjString*)next())) {
527 file += objstr->GetString();
528 // Store full lfn path
529 if (FileExists(file)) objstr->SetString(file);
531 file = objstr->GetName();
532 if (!FileExists(objstr->GetName())) {
533 Error("CheckInputData", "Data file %s not found or not in your working dir: %s",
534 objstr->GetName(), workdir.Data());
538 Bool_t iscoll, isxml, usetags;
539 CheckDataType(file, iscoll, isxml, usetags);
542 isCollection = iscoll;
545 TObject::SetBit(AliAnalysisGrid::kUseTags, useTags);
547 if ((iscoll != isCollection) || (isxml != isXml) || (usetags != useTags)) {
548 Error("CheckInputData", "Some conflict was found in the types of inputs");
554 // Process requested run numbers
555 if (!fRunNumbers.Length() && !fRunRange[0]) return kTRUE;
556 // Check validity of alien data directory
557 if (!fGridDataDir.Length()) {
558 Error("CkeckInputData", "AliEn path to base data directory must be set.\n = Use: SetGridDataDir()");
561 if (!DirectoryExists(fGridDataDir)) {
562 Error("CheckInputData", "Data directory %s not existing.", fGridDataDir.Data());
566 Error("CheckInputData", "You are using raw AliEn collections as input. Cannot process run numbers.");
570 if (checked && !isXml) {
571 Error("CheckInputData", "Cannot mix processing of full runs with non-xml files");
574 // Check validity of run number(s)
578 TString schunk, schunk2;
582 useTags = fDataPattern.Contains("tag");
583 TObject::SetBit(AliAnalysisGrid::kUseTags, useTags);
585 if (useTags != fDataPattern.Contains("tag")) {
586 Error("CheckInputData", "Cannot mix input files using/not using tags");
589 if (fRunNumbers.Length()) {
590 Info("CheckDataType", "Using supplied run numbers (run ranges are ignored)");
591 arr = fRunNumbers.Tokenize(" ");
593 while ((os=(TObjString*)next())) {
594 path = Form("%s/%s ", fGridDataDir.Data(), os->GetString().Data());
595 if (!DirectoryExists(path)) {
596 Warning("CheckInputData", "Run number %s not found in path: <%s>", os->GetString().Data(), path.Data());
599 path = Form("%s/%s.xml", workdir.Data(),os->GetString().Data());
600 TString msg = "\n##### file: ";
602 msg += " type: xml_collection;";
603 if (useTags) msg += " using_tags: Yes";
604 else msg += " using_tags: No";
605 Info("CheckDataType", "%s", msg.Data());
606 if (fNrunsPerMaster<2) {
607 AddDataFile(Form("%s.xml", os->GetString().Data()));
610 if (((nruns-1)%fNrunsPerMaster) == 0) {
611 schunk = os->GetString();
613 if ((nruns%fNrunsPerMaster)!=0 && os!=arr->Last()) continue;
614 schunk += Form("_%s.xml", os->GetString().Data());
620 Info("CheckDataType", "Using run range [%d, %d]", fRunRange[0], fRunRange[1]);
621 for (Int_t irun=fRunRange[0]; irun<=fRunRange[1]; irun++) {
622 path = Form("%s/%s%d ", fGridDataDir.Data(), fRunPrefix.Data(), irun);
623 if (!DirectoryExists(path)) {
624 // Warning("CheckInputData", "Run number %d not found in path: <%s>", irun, path.Data());
627 path = Form("%s/%s%d.xml", workdir.Data(),fRunPrefix.Data(),irun);
628 TString msg = "\n##### file: ";
630 msg += " type: xml_collection;";
631 if (useTags) msg += " using_tags: Yes";
632 else msg += " using_tags: No";
633 Info("CheckDataType", "%s", msg.Data());
634 if (fNrunsPerMaster<2) {
635 AddDataFile(Form("%s%d.xml",fRunPrefix.Data(),irun));
638 if (((nruns-1)%fNrunsPerMaster) == 0) {
639 schunk = Form("%s%d", fRunPrefix.Data(),irun);
641 schunk2 = Form("_%s%d.xml", fRunPrefix.Data(), irun);
642 if ((nruns%fNrunsPerMaster)!=0 && irun != fRunRange[1]) continue;
655 //______________________________________________________________________________
656 Bool_t AliAnalysisAlien::CreateDataset(const char *pattern)
658 // Create dataset for the grid data directory + run number.
659 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline)) return kTRUE;
661 Error("CreateDataset", "Cannot create dataset with no grid connection");
666 if (!TestBit(AliAnalysisGrid::kTest)) CdWork();
667 TString workdir = gGrid->GetHomeDirectory();
668 workdir += fGridWorkingDir;
670 // Compose the 'find' command arguments
672 TString options = "-x collection ";
673 if (TestBit(AliAnalysisGrid::kTest)) options += Form("-l %d ", fNtestFiles);
674 TString conditions = "";
679 TString schunk, schunk2;
680 TGridCollection *cbase=0, *cadd=0;
681 if (!fRunNumbers.Length() && !fRunRange[0]) {
682 if (fInputFiles && fInputFiles->GetEntries()) return kTRUE;
683 // Make a single data collection from data directory.
685 if (!DirectoryExists(path)) {
686 Error("CreateDataset", "Path to data directory %s not valid",fGridDataDir.Data());
690 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
691 else file = Form("%s.xml", gSystem->BaseName(path));
692 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
698 command += conditions;
699 printf("command: %s\n", command.Data());
700 TGridResult *res = gGrid->Command(command);
702 // Write standard output to file
703 gROOT->ProcessLine(Form("gGrid->Stdout(); > %s", file.Data()));
704 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
705 Bool_t nullFile = kFALSE;
707 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
709 nullFile = (gSystem->Exec(Form("grep /event %s 2>/dev/null > /dev/null",file.Data()))==0)?kFALSE:kTRUE;
711 Error("CreateDataset","Dataset %s produced by the previous find command is empty !", file.Data());
716 Bool_t fileExists = FileExists(file);
717 if (!TestBit(AliAnalysisGrid::kTest) && (!fileExists || fOverwriteMode)) {
718 // Copy xml file to alien space
719 if (fileExists) gGrid->Rm(file);
720 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
721 if (!FileExists(file)) {
722 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
725 // Update list of files to be processed.
727 AddDataFile(Form("%s/%s", workdir.Data(), file.Data()));
731 Bool_t nullResult = kTRUE;
732 if (fRunNumbers.Length()) {
733 TObjArray *arr = fRunNumbers.Tokenize(" ");
736 while ((os=(TObjString*)next())) {
737 path = Form("%s/%s ", fGridDataDir.Data(), os->GetString().Data());
738 if (!DirectoryExists(path)) continue;
740 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
741 else file = Form("%s.xml", os->GetString().Data());
742 // If local collection file does not exist, create it via 'find' command.
743 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
748 command += conditions;
749 TGridResult *res = gGrid->Command(command);
751 // Write standard output to file
752 gROOT->ProcessLine(Form("gGrid->Stdout(); > %s", file.Data()));
753 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
754 Bool_t nullFile = kFALSE;
756 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
758 nullFile = (gSystem->Exec(Form("grep /event %s 2>/dev/null > /dev/null",file.Data()))==0)?kFALSE:kTRUE;
760 Warning("CreateDataset","Dataset %s produced by: <%s> is empty !", file.Data(), command.Data());
761 fRunNumbers.ReplaceAll(os->GetString().Data(), "");
767 if (TestBit(AliAnalysisGrid::kTest)) break;
768 // Check if there is one run per master job.
769 if (fNrunsPerMaster<2) {
770 if (FileExists(file)) {
771 if (fOverwriteMode) gGrid->Rm(file);
773 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
777 // Copy xml file to alien space
778 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
779 if (!FileExists(file)) {
780 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
786 if (((nruns-1)%fNrunsPerMaster) == 0) {
787 schunk = os->GetString();
788 cbase = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
790 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
791 printf(" Merging collection <%s> into masterjob input...\n", file.Data());
795 if ((nruns%fNrunsPerMaster)!=0 && os!=arr->Last()) {
798 schunk += Form("_%s.xml", os->GetString().Data());
799 if (FileExists(schunk)) {
800 if (fOverwriteMode) gGrid->Rm(file);
802 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", schunk.Data());
806 printf("Exporting merged collection <%s> and copying to AliEn\n", schunk.Data());
807 cbase->ExportXML(Form("file://%s", schunk.Data()),kFALSE,kFALSE, schunk, "Merged runs");
808 TFile::Cp(Form("file:%s",schunk.Data()), Form("alien://%s/%s",workdir.Data(), schunk.Data()));
809 if (!FileExists(schunk)) {
810 Error("CreateDataset", "Copy command did NOT succeed for %s", schunk.Data());
818 Error("CreateDataset", "No valid dataset corresponding to the query!");
822 // Process a full run range.
823 for (Int_t irun=fRunRange[0]; irun<=fRunRange[1]; irun++) {
824 path = Form("%s/%s%d ", fGridDataDir.Data(), fRunPrefix.Data(), irun);
825 if (!DirectoryExists(path)) continue;
827 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
828 else file = Form("%s%d.xml", fRunPrefix.Data(), irun);
829 if (FileExists(file) && fNrunsPerMaster<2 && !TestBit(AliAnalysisGrid::kTest)) {
830 if (fOverwriteMode) gGrid->Rm(file);
832 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
836 // If local collection file does not exist, create it via 'find' command.
837 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
842 command += conditions;
843 TGridResult *res = gGrid->Command(command);
845 // Write standard output to file
846 gROOT->ProcessLine(Form("gGrid->Stdout(); > %s", file.Data()));
847 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
848 Bool_t nullFile = kFALSE;
850 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
852 nullFile = (gSystem->Exec(Form("grep /event %s 2>/dev/null > /dev/null",file.Data()))==0)?kFALSE:kTRUE;
854 Warning("CreateDataset","Dataset %s produced by: <%s> is empty !", file.Data(), command.Data());
860 if (TestBit(AliAnalysisGrid::kTest)) break;
861 // Check if there is one run per master job.
862 if (fNrunsPerMaster<2) {
863 if (FileExists(file)) {
864 if (fOverwriteMode) gGrid->Rm(file);
866 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
870 // Copy xml file to alien space
871 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
872 if (!FileExists(file)) {
873 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
878 // Check if the collection for the chunk exist locally.
879 Int_t nchunk = (nruns-1)/fNrunsPerMaster;
880 if (FileExists(fInputFiles->At(nchunk)->GetName())) {
881 if (fOverwriteMode) gGrid->Rm(fInputFiles->At(nchunk)->GetName());
884 printf(" Merging collection <%s> into %d runs chunk...\n",file.Data(),fNrunsPerMaster);
885 if (((nruns-1)%fNrunsPerMaster) == 0) {
886 schunk = Form("%s%d", fRunPrefix.Data(), irun);
887 cbase = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
889 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
893 schunk2 = Form("%s_%s%d.xml", schunk.Data(), fRunPrefix.Data(), irun);
894 if ((nruns%fNrunsPerMaster)!=0 && irun!=fRunRange[1] && schunk2 != fInputFiles->Last()->GetName()) {
898 if (FileExists(schunk)) {
899 if (fOverwriteMode) gGrid->Rm(schunk);
901 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", schunk.Data());
905 printf("Exporting merged collection <%s> and copying to AliEn.\n", schunk.Data());
906 cbase->ExportXML(Form("file://%s", schunk.Data()),kFALSE,kFALSE, schunk, "Merged runs");
907 if (FileExists(schunk)) {
908 if (fOverwriteMode) gGrid->Rm(schunk);
910 Info("CreateDataset", "\n##### Dataset %s exist. Skipping copy...", schunk.Data());
914 TFile::Cp(Form("file:%s",schunk.Data()), Form("alien://%s/%s",workdir.Data(), schunk.Data()));
915 if (!FileExists(schunk)) {
916 Error("CreateDataset", "Copy command did NOT succeed for %s", schunk.Data());
922 Error("CreateDataset", "No valid dataset corresponding to the query!");
929 //______________________________________________________________________________
930 Bool_t AliAnalysisAlien::CreateJDL()
932 // Generate a JDL file according to current settings. The name of the file is
933 // specified by fJDLName.
934 Bool_t error = kFALSE;
937 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
938 Bool_t generate = kTRUE;
939 if (TestBit(AliAnalysisGrid::kTest) || TestBit(AliAnalysisGrid::kSubmit)) generate = kFALSE;
941 Error("CreateJDL", "Alien connection required");
944 // Check validity of alien workspace
946 if (!fProductionMode && !fGridWorkingDir.BeginsWith("/alice")) workdir = gGrid->GetHomeDirectory();
947 if (!fProductionMode && !TestBit(AliAnalysisGrid::kTest)) CdWork();
948 workdir += fGridWorkingDir;
952 Error("CreateJDL()", "Define some input files for your analysis.");
955 // Compose list of input files
956 // Check if output files were defined
957 if (!fOutputFiles.Length()) {
958 Error("CreateJDL", "You must define at least one output file");
961 // Check if an output directory was defined and valid
962 if (!fGridOutputDir.Length()) {
963 Error("CreateJDL", "You must define AliEn output directory");
966 if (!fProductionMode) {
967 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s", workdir.Data(), fGridOutputDir.Data());
968 if (!DirectoryExists(fGridOutputDir)) {
969 if (gGrid->Mkdir(fGridOutputDir,"-p")) {
970 Info("CreateJDL", "\n##### Created alien output directory %s", fGridOutputDir.Data());
972 Error("CreateJDL", "Could not create alien output directory %s", fGridOutputDir.Data());
979 // Exit if any error up to now
980 if (error) return kFALSE;
982 if (!fUser.IsNull()) {
983 fGridJDL->SetValue("User", Form("\"%s\"", fUser.Data()));
984 fMergingJDL->SetValue("User", Form("\"%s\"", fUser.Data()));
986 fGridJDL->SetExecutable(fExecutable, "This is the startup script");
987 TString mergeExec = fExecutable;
988 mergeExec.ReplaceAll(".sh", "_merge.sh");
989 fMergingJDL->SetExecutable(mergeExec, "This is the startup script");
990 mergeExec.ReplaceAll(".sh", ".C");
991 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),mergeExec.Data()), "List of input files to be uploaded to workers");
992 if (!fArguments.IsNull())
993 fGridJDL->SetArguments(fArguments, "Arguments for the executable command");
994 if (IsOneStageMerging()) fMergingJDL->SetArguments(fGridOutputDir);
995 else fMergingJDL->SetArguments("$1 $2 $3");
996 fGridJDL->SetValue("TTL", Form("\"%d\"",fTTL));
997 fGridJDL->SetDescription("TTL", Form("Time after which the job is killed (%d min.)", fTTL/60));
998 fMergingJDL->SetValue("TTL", Form("\"%d\"",fTTL));
999 fMergingJDL->SetDescription("TTL", Form("Time after which the job is killed (%d min.)", fTTL/60));
1001 if (fMaxInitFailed > 0) {
1002 fGridJDL->SetValue("MaxInitFailed", Form("\"%d\"",fMaxInitFailed));
1003 fGridJDL->SetDescription("MaxInitFailed", "Maximum number of first failing jobs to abort the master job");
1005 if (fSplitMaxInputFileNumber > 0) {
1006 fGridJDL->SetValue("SplitMaxInputFileNumber", Form("\"%d\"", fSplitMaxInputFileNumber));
1007 fGridJDL->SetDescription("SplitMaxInputFileNumber", "Maximum number of input files to be processed per subjob");
1009 if (fSplitMode.Length()) {
1010 fGridJDL->SetValue("Split", Form("\"%s\"", fSplitMode.Data()));
1011 fGridJDL->SetDescription("Split", "We split per SE or file");
1013 if (!fAliROOTVersion.IsNull()) {
1014 fGridJDL->AddToPackages("AliRoot", fAliROOTVersion,"VO_ALICE", "List of requested packages");
1015 fMergingJDL->AddToPackages("AliRoot", fAliROOTVersion, "VO_ALICE", "List of requested packages");
1017 if (!fROOTVersion.IsNull()) {
1018 fGridJDL->AddToPackages("ROOT", fROOTVersion);
1019 fMergingJDL->AddToPackages("ROOT", fROOTVersion);
1021 if (!fAPIVersion.IsNull()) {
1022 fGridJDL->AddToPackages("APISCONFIG", fAPIVersion);
1023 fMergingJDL->AddToPackages("APISCONFIG", fAPIVersion);
1025 if (!fExternalPackages.IsNull()) {
1026 arr = fExternalPackages.Tokenize(" ");
1028 while ((os=(TObjString*)next())) {
1029 TString pkgname = os->GetString();
1030 Int_t index = pkgname.Index("::");
1031 TString pkgversion = pkgname(index+2, pkgname.Length());
1032 pkgname.Remove(index);
1033 fGridJDL->AddToPackages(pkgname, pkgversion);
1034 fMergingJDL->AddToPackages(pkgname, pkgversion);
1038 fGridJDL->SetInputDataListFormat(fInputFormat, "Format of input data");
1039 fGridJDL->SetInputDataList("wn.xml", "Collection name to be processed on each worker node");
1040 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), fAnalysisMacro.Data()), "List of input files to be uploaded to workers");
1041 TString analysisFile = fExecutable;
1042 analysisFile.ReplaceAll(".sh", ".root");
1043 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),analysisFile.Data()));
1044 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),analysisFile.Data()));
1045 if (IsUsingTags() && !gSystem->AccessPathName("ConfigureCuts.C"))
1046 fGridJDL->AddToInputSandbox(Form("LF:%s/ConfigureCuts.C", workdir.Data()));
1047 if (fAdditionalLibs.Length()) {
1048 arr = fAdditionalLibs.Tokenize(" ");
1050 while ((os=(TObjString*)next())) {
1051 if (os->GetString().Contains(".so")) continue;
1052 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), os->GetString().Data()));
1053 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), os->GetString().Data()));
1058 TIter next(fPackages);
1060 while ((obj=next())) {
1061 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), obj->GetName()));
1062 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), obj->GetName()));
1065 if (fOutputArchive.Length()) {
1066 arr = fOutputArchive.Tokenize(" ");
1068 Bool_t first = kTRUE;
1069 const char *comment = "Files to be archived";
1070 const char *comment1 = comment;
1071 while ((os=(TObjString*)next())) {
1072 if (!first) comment = NULL;
1073 if (!os->GetString().Contains("@") && fCloseSE.Length())
1074 fGridJDL->AddToOutputArchive(Form("%s@%s",os->GetString().Data(), fCloseSE.Data()), comment);
1076 fGridJDL->AddToOutputArchive(os->GetString(), comment);
1080 // Output archive for the merging jdl
1081 TString outputArchive;
1082 if (TestBit(AliAnalysisGrid::kDefaultOutputs)) {
1083 outputArchive = "log_archive.zip:std*,*.stat@disk=1 ";
1084 // Add normal output files, extra files + terminate files
1085 TString files = GetListOfFiles("outextter");
1086 // Do not register merge excludes
1087 if (!fMergeExcludes.IsNull()) {
1088 arr = fMergeExcludes.Tokenize(" ");
1090 while ((os=(TObjString*)next1())) {
1091 files.ReplaceAll(Form("%s,",os->GetString().Data()),"");
1092 files.ReplaceAll(os->GetString(),"");
1096 files.ReplaceAll(".root", "*.root");
1097 outputArchive += Form("root_archive.zip:%s@disk=%d",files.Data(),fNreplicas);
1099 TString files = fOutputArchive;
1100 files.ReplaceAll(".root", "*.root"); // nreplicas etc should be already atttached by use
1101 outputArchive = files;
1103 arr = outputArchive.Tokenize(" ");
1107 while ((os=(TObjString*)next2())) {
1108 if (!first) comment = NULL;
1109 TString currentfile = os->GetString();
1110 if (!IsOneStageMerging()) currentfile.ReplaceAll(".zip", "-Stage$2_$3.zip");
1111 if (!currentfile.Contains("@") && fCloseSE.Length())
1112 fMergingJDL->AddToOutputArchive(Form("%s@%s",currentfile.Data(), fCloseSE.Data()), comment);
1114 fMergingJDL->AddToOutputArchive(currentfile, comment);
1119 arr = fOutputFiles.Tokenize(",");
1121 Bool_t first = kTRUE;
1122 const char *comment = "Files to be saved";
1123 while ((os=(TObjString*)next())) {
1124 // Ignore ouputs in jdl that are also in outputarchive
1125 TString sout = os->GetString();
1126 sout.ReplaceAll("*", "");
1127 sout.ReplaceAll(".root", "");
1128 if (sout.Index("@")>0) sout.Remove(sout.Index("@"));
1129 if (fOutputArchive.Contains(sout)) continue;
1130 if (!first) comment = NULL;
1131 if (!os->GetString().Contains("@") && fCloseSE.Length())
1132 fGridJDL->AddToOutputSandbox(Form("%s@%s",os->GetString().Data(), fCloseSE.Data()), comment);
1134 fGridJDL->AddToOutputSandbox(os->GetString(), comment);
1136 if (fMergeExcludes.Contains(sout)) continue;
1137 if (!os->GetString().Contains("@") && fCloseSE.Length())
1138 fMergingJDL->AddToOutputSandbox(Form("%s@%s",os->GetString().Data(), fCloseSE.Data()), comment);
1140 fMergingJDL->AddToOutputSandbox(os->GetString(), comment);
1143 fGridJDL->SetPrice((UInt_t)fPrice, "AliEn price for this job");
1144 fMergingJDL->SetPrice((UInt_t)fPrice, "AliEn price for this job");
1145 TString validationScript = fValidationScript;
1146 fGridJDL->SetValidationCommand(Form("%s/%s", workdir.Data(),validationScript.Data()), "Validation script to be run for each subjob");
1147 validationScript.ReplaceAll(".sh", "_merge.sh");
1148 fMergingJDL->SetValidationCommand(Form("%s/%s", workdir.Data(),validationScript.Data()), "Validation script to be run for each subjob");
1149 if (fMasterResubmitThreshold) {
1150 fGridJDL->SetValue("MasterResubmitThreshold", Form("\"%d%%\"", fMasterResubmitThreshold));
1151 fGridJDL->SetDescription("MasterResubmitThreshold", "Resubmit failed jobs until DONE rate reaches this percentage");
1153 // Write a jdl with 2 input parameters: collection name and output dir name.
1156 // Copy jdl to grid workspace
1158 // Check if an output directory was defined and valid
1159 if (!fGridOutputDir.Length()) {
1160 Error("CreateJDL", "You must define AliEn output directory");
1163 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s", workdir.Data(), fGridOutputDir.Data());
1164 if (!fProductionMode && !DirectoryExists(fGridOutputDir)) {
1165 if (gGrid->Mkdir(fGridOutputDir,"-p")) {
1166 Info("CreateJDL", "\n##### Created alien output directory %s", fGridOutputDir.Data());
1168 Error("CreateJDL", "Could not create alien output directory %s", fGridOutputDir.Data());
1174 if (TestBit(AliAnalysisGrid::kSubmit)) {
1175 TString mergeJDLName = fExecutable;
1176 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
1177 TString locjdl = Form("%s/%s", fGridOutputDir.Data(),fJDLName.Data());
1178 TString locjdl1 = Form("%s/%s", fGridOutputDir.Data(),mergeJDLName.Data());
1179 if (fProductionMode) {
1180 locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
1181 locjdl1 = Form("%s/%s", workdir.Data(),mergeJDLName.Data());
1183 if (FileExists(locjdl)) gGrid->Rm(locjdl);
1184 if (FileExists(locjdl1)) gGrid->Rm(locjdl1);
1185 Info("CreateJDL", "\n##### Copying JDL file <%s> to your AliEn output directory", fJDLName.Data());
1186 TFile::Cp(Form("file:%s",fJDLName.Data()), Form("alien://%s", locjdl.Data()));
1188 Info("CreateJDL", "\n##### Copying merging JDL file <%s> to your AliEn output directory", mergeJDLName.Data());
1189 TFile::Cp(Form("file:%s",mergeJDLName.Data()), Form("alien://%s", locjdl1.Data()));
1192 if (fAdditionalLibs.Length()) {
1193 arr = fAdditionalLibs.Tokenize(" ");
1196 while ((os=(TObjString*)next())) {
1197 if (os->GetString().Contains(".so")) continue;
1198 Info("CreateJDL", "\n##### Copying dependency: <%s> to your alien workspace", os->GetString().Data());
1199 if (FileExists(os->GetString())) gGrid->Rm(os->GetString());
1200 TFile::Cp(Form("file:%s",os->GetString().Data()), Form("alien://%s/%s", workdir.Data(), os->GetString().Data()));
1205 TIter next(fPackages);
1207 while ((obj=next())) {
1208 if (FileExists(obj->GetName())) gGrid->Rm(obj->GetName());
1209 Info("CreateJDL", "\n##### Copying dependency: <%s> to your alien workspace", obj->GetName());
1210 TFile::Cp(Form("file:%s",obj->GetName()), Form("alien://%s/%s", workdir.Data(), obj->GetName()));
1217 //______________________________________________________________________________
1218 Bool_t AliAnalysisAlien::WriteJDL(Bool_t copy)
1220 // Writes one or more JDL's corresponding to findex. If findex is negative,
1221 // all run numbers are considered in one go (jdl). For non-negative indices
1222 // they correspond to the indices in the array fInputFiles.
1223 if (!fInputFiles) return kFALSE;
1226 if (!fProductionMode && !fGridWorkingDir.BeginsWith("/alice")) workdir = gGrid->GetHomeDirectory();
1227 workdir += fGridWorkingDir;
1229 if (fProductionMode) {
1230 TIter next(fInputFiles);
1232 fGridJDL->AddToInputDataCollection(Form("LF:%s,nodownload", os->GetName()), "Input xml collections");
1233 fGridJDL->SetOutputDirectory(Form("%s/#alien_counter_04i#", fGridOutputDir.Data()));
1234 fMergingJDL->SetOutputDirectory(fGridOutputDir);
1236 if (!fRunNumbers.Length() && !fRunRange[0]) {
1237 // One jdl with no parameters in case input data is specified by name.
1238 TIter next(fInputFiles);
1240 fGridJDL->AddToInputDataCollection(Form("LF:%s,nodownload", os->GetName()), "Input xml collections");
1241 if (!fOutputSingle.IsNull())
1242 fGridJDL->SetOutputDirectory(Form("#alienfulldir#/../%s",fOutputSingle.Data()), "Output directory");
1244 fGridJDL->SetOutputDirectory(Form("%s/#alien_counter_03i#", fGridOutputDir.Data()), "Output directory");
1245 fMergingJDL->SetOutputDirectory(fGridOutputDir);
1248 // One jdl to be submitted with 2 input parameters: data collection name and output dir prefix
1249 fGridJDL->AddToInputDataCollection(Form("LF:%s/$1,nodownload", workdir.Data()), "Input xml collections");
1250 if (!fOutputSingle.IsNull()) {
1251 if (!fOutputToRunNo) fGridJDL->SetOutputDirectory(Form("#alienfulldir#/%s",fOutputSingle.Data()), "Output directory");
1252 else fGridJDL->SetOutputDirectory(Form("%s/$2",fGridOutputDir.Data()), "Output directory");
1254 fGridJDL->SetOutputDirectory(Form("%s/$2/#alien_counter_03i#", fGridOutputDir.Data()), "Output directory");
1255 fMergingJDL->SetOutputDirectory("$1", "Output directory");
1260 // Generate the JDL as a string
1261 TString sjdl = fGridJDL->Generate();
1262 TString sjdl1 = fMergingJDL->Generate();
1264 sjdl.ReplaceAll("\"LF:", "\n \"LF:");
1265 sjdl.ReplaceAll("(member", "\n (member");
1266 sjdl.ReplaceAll("\",\"VO_", "\",\n \"VO_");
1267 sjdl.ReplaceAll("{", "{\n ");
1268 sjdl.ReplaceAll("};", "\n};");
1269 sjdl.ReplaceAll("{\n \n", "{\n");
1270 sjdl.ReplaceAll("\n\n", "\n");
1271 sjdl.ReplaceAll("OutputDirectory", "OutputDir");
1272 sjdl1.ReplaceAll("\"LF:", "\n \"LF:");
1273 sjdl1.ReplaceAll("(member", "\n (member");
1274 sjdl1.ReplaceAll("\",\"VO_", "\",\n \"VO_");
1275 sjdl1.ReplaceAll("{", "{\n ");
1276 sjdl1.ReplaceAll("};", "\n};");
1277 sjdl1.ReplaceAll("{\n \n", "{\n");
1278 sjdl1.ReplaceAll("\n\n", "\n");
1279 sjdl1.ReplaceAll("OutputDirectory", "OutputDir");
1280 sjdl += "JDLVariables = \n{\n \"Packages\",\n \"OutputDir\"\n};\n";
1281 sjdl.Prepend(Form("Jobtag = {\n \"comment:%s\"\n};\n", fJobTag.Data()));
1282 index = sjdl.Index("JDLVariables");
1283 if (index >= 0) sjdl.Insert(index, "\n# JDL variables\n");
1284 sjdl += "Workdirectorysize = {\"5000MB\"};";
1285 sjdl1 += "JDLVariables = \n{\n \"Packages\",\n \"OutputDir\"\n};\n";
1286 index = fJobTag.Index(":");
1287 if (index < 0) index = fJobTag.Length();
1288 TString jobTag = fJobTag;
1289 jobTag.Insert(index, "_Merging");
1290 sjdl1.Prepend(Form("Jobtag = {\n \"comment:%s_Merging\"\n};\n", jobTag.Data()));
1291 sjdl1.Prepend("# Generated merging jdl\n# $1 = full alien path to output directory to be merged\n# $2 = merging stage\n# $3 = merged chunk\n");
1292 index = sjdl1.Index("JDLVariables");
1293 if (index >= 0) sjdl1.Insert(index, "\n# JDL variables\n");
1294 sjdl1 += "Workdirectorysize = {\"5000MB\"};";
1295 // Write jdl to file
1297 out.open(fJDLName.Data(), ios::out);
1299 Error("WriteJDL", "Bad file name: %s", fJDLName.Data());
1302 out << sjdl << endl;
1303 TString mergeJDLName = fExecutable;
1304 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
1307 out1.open(mergeJDLName.Data(), ios::out);
1309 Error("WriteJDL", "Bad file name: %s", mergeJDLName.Data());
1312 out1 << sjdl1 << endl;
1315 // Copy jdl to grid workspace
1317 Info("WriteJDL", "\n##### You may want to review jdl:%s and analysis macro:%s before running in <submit> mode", fJDLName.Data(), fAnalysisMacro.Data());
1319 TString locjdl = Form("%s/%s", fGridOutputDir.Data(),fJDLName.Data());
1320 TString locjdl1 = Form("%s/%s", fGridOutputDir.Data(),mergeJDLName.Data());
1321 if (fProductionMode) {
1322 locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
1323 locjdl1 = Form("%s/%s", workdir.Data(),mergeJDLName.Data());
1325 if (FileExists(locjdl)) gGrid->Rm(locjdl);
1326 if (FileExists(locjdl1)) gGrid->Rm(locjdl1);
1327 Info("WriteJDL", "\n##### Copying JDL file <%s> to your AliEn output directory", fJDLName.Data());
1328 TFile::Cp(Form("file:%s",fJDLName.Data()), Form("alien://%s", locjdl.Data()));
1330 Info("WriteJDL", "\n##### Copying merging JDL file <%s> to your AliEn output directory", mergeJDLName.Data());
1331 TFile::Cp(Form("file:%s",mergeJDLName.Data()), Form("alien://%s", locjdl1.Data()));
1337 //______________________________________________________________________________
1338 Bool_t AliAnalysisAlien::FileExists(const char *lfn)
1340 // Returns true if file exists.
1341 if (!gGrid) return kFALSE;
1343 slfn.ReplaceAll("alien://","");
1344 TGridResult *res = gGrid->Ls(slfn);
1345 if (!res) return kFALSE;
1346 TMap *map = dynamic_cast<TMap*>(res->At(0));
1351 TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("name"));
1352 if (!objs || !objs->GetString().Length()) {
1360 //______________________________________________________________________________
1361 Bool_t AliAnalysisAlien::DirectoryExists(const char *dirname)
1363 // Returns true if directory exists. Can be also a path.
1364 if (!gGrid) return kFALSE;
1365 // Check if dirname is a path
1366 TString dirstripped = dirname;
1367 dirstripped = dirstripped.Strip();
1368 dirstripped = dirstripped.Strip(TString::kTrailing, '/');
1369 TString dir = gSystem->BaseName(dirstripped);
1371 TString path = gSystem->DirName(dirstripped);
1372 TGridResult *res = gGrid->Ls(path, "-F");
1373 if (!res) return kFALSE;
1377 while ((map=dynamic_cast<TMap*>(next()))) {
1378 obj = map->GetValue("name");
1380 if (dir == obj->GetName()) {
1389 //______________________________________________________________________________
1390 void AliAnalysisAlien::CheckDataType(const char *lfn, Bool_t &isCollection, Bool_t &isXml, Bool_t &useTags)
1392 // Check input data type.
1393 isCollection = kFALSE;
1397 Error("CheckDataType", "No connection to grid");
1400 isCollection = IsCollection(lfn);
1401 TString msg = "\n##### file: ";
1404 msg += " type: raw_collection;";
1405 // special treatment for collections
1407 // check for tag files in the collection
1408 TGridResult *res = gGrid->Command(Form("listFilesFromCollection -z -v %s",lfn), kFALSE);
1410 msg += " using_tags: No (unknown)";
1411 Info("CheckDataType", "%s", msg.Data());
1414 const char* typeStr = res->GetKey(0, "origLFN");
1415 if (!typeStr || !strlen(typeStr)) {
1416 msg += " using_tags: No (unknown)";
1417 Info("CheckDataType", "%s", msg.Data());
1420 TString file = typeStr;
1421 useTags = file.Contains(".tag");
1422 if (useTags) msg += " using_tags: Yes";
1423 else msg += " using_tags: No";
1424 Info("CheckDataType", "%s", msg.Data());
1429 isXml = slfn.Contains(".xml");
1431 // Open xml collection and check if there are tag files inside
1432 msg += " type: xml_collection;";
1433 TGridCollection *coll = (TGridCollection*)gROOT->ProcessLine(Form("TAlienCollection::Open(\"alien://%s\",1);",lfn));
1435 msg += " using_tags: No (unknown)";
1436 Info("CheckDataType", "%s", msg.Data());
1439 TMap *map = coll->Next();
1441 msg += " using_tags: No (unknown)";
1442 Info("CheckDataType", "%s", msg.Data());
1445 map = (TMap*)map->GetValue("");
1447 if (map && map->GetValue("name")) file = map->GetValue("name")->GetName();
1448 useTags = file.Contains(".tag");
1450 if (useTags) msg += " using_tags: Yes";
1451 else msg += " using_tags: No";
1452 Info("CheckDataType", "%s", msg.Data());
1455 useTags = slfn.Contains(".tag");
1456 if (slfn.Contains(".root")) msg += " type: root file;";
1457 else msg += " type: unknown file;";
1458 if (useTags) msg += " using_tags: Yes";
1459 else msg += " using_tags: No";
1460 Info("CheckDataType", "%s", msg.Data());
1463 //______________________________________________________________________________
1464 void AliAnalysisAlien::EnablePackage(const char *package)
1466 // Enables a par file supposed to exist in the current directory.
1467 TString pkg(package);
1468 pkg.ReplaceAll(".par", "");
1470 if (gSystem->AccessPathName(pkg)) {
1471 Fatal("EnablePackage", "Package %s not found", pkg.Data());
1474 if (!TObject::TestBit(AliAnalysisGrid::kUsePars))
1475 Info("EnablePackage", "AliEn plugin will use .par packages");
1476 TObject::SetBit(AliAnalysisGrid::kUsePars, kTRUE);
1478 fPackages = new TObjArray();
1479 fPackages->SetOwner();
1481 fPackages->Add(new TObjString(pkg));
1484 //______________________________________________________________________________
1485 TChain *AliAnalysisAlien::GetChainForTestMode(const char *treeName) const
1487 // Make a tree from files having the location specified in fFileForTestMode.
1488 // Inspired from JF's CreateESDChain.
1489 if (fFileForTestMode.IsNull()) {
1490 Error("GetChainForTestMode", "For proof test mode please use SetFileForTestMode() pointing to a file that contains data file locations.");
1493 if (gSystem->AccessPathName(fFileForTestMode)) {
1494 Error("GetChainForTestMode", "File not found: %s", fFileForTestMode.Data());
1499 in.open(fFileForTestMode);
1501 // Read the input list of files and add them to the chain
1503 TChain *chain = new TChain(treeName);
1507 if (line.IsNull()) continue;
1508 if (count++ == fNtestFiles) break;
1509 TString esdFile(line);
1510 TFile *file = TFile::Open(esdFile);
1512 if (!file->IsZombie()) chain->Add(esdFile);
1515 Error("GetChainforTestMode", "Skipping un-openable file: %s", esdFile.Data());
1519 if (!chain->GetListOfFiles()->GetEntries()) {
1520 Error("GetChainForTestMode", "No file from %s could be opened", fFileForTestMode.Data());
1528 //______________________________________________________________________________
1529 const char *AliAnalysisAlien::GetJobStatus(Int_t jobidstart, Int_t lastid, Int_t &nrunning, Int_t &nwaiting, Int_t &nerror, Int_t &ndone)
1531 // Get job status for all jobs with jobid>jobidstart.
1532 static char mstatus[20];
1538 TGridJobStatusList *list = gGrid->Ps("");
1539 if (!list) return mstatus;
1540 Int_t nentries = list->GetSize();
1541 TGridJobStatus *status;
1543 for (Int_t ijob=0; ijob<nentries; ijob++) {
1544 status = (TGridJobStatus *)list->At(ijob);
1545 pid = gROOT->ProcessLine(Form("atoi(((TAlienJobStatus*)0x%lx)->GetKey(\"queueId\"));", (ULong_t)status));
1546 if (pid<jobidstart) continue;
1547 if (pid == lastid) {
1548 gROOT->ProcessLine(Form("sprintf((char*)0x%lx,((TAlienJobStatus*)0x%lx)->GetKey(\"status\"));",(ULong_t)mstatus, (ULong_t)status));
1550 switch (status->GetStatus()) {
1551 case TGridJobStatus::kWAITING:
1553 case TGridJobStatus::kRUNNING:
1555 case TGridJobStatus::kABORTED:
1556 case TGridJobStatus::kFAIL:
1557 case TGridJobStatus::kUNKNOWN:
1559 case TGridJobStatus::kDONE:
1568 //______________________________________________________________________________
1569 Bool_t AliAnalysisAlien::IsCollection(const char *lfn) const
1571 // Returns true if file is a collection. Functionality duplicated from
1572 // TAlien::Type() because we don't want to directly depend on TAlien.
1574 Error("IsCollection", "No connection to grid");
1577 TGridResult *res = gGrid->Command(Form("type -z %s",lfn),kFALSE);
1578 if (!res) return kFALSE;
1579 const char* typeStr = res->GetKey(0, "type");
1580 if (!typeStr || !strlen(typeStr)) return kFALSE;
1581 if (!strcmp(typeStr, "collection")) return kTRUE;
1586 //______________________________________________________________________________
1587 Bool_t AliAnalysisAlien::IsSingleOutput() const
1589 // Check if single-ouput option is on.
1590 return (!fOutputSingle.IsNull());
1593 //______________________________________________________________________________
1594 void AliAnalysisAlien::Print(Option_t *) const
1596 // Print current plugin settings.
1597 printf("### AliEn analysis plugin current settings ###\n");
1598 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
1599 if (mgr && mgr->IsProofMode()) {
1600 TString proofType = "= PLUGIN IN PROOF MODE ON CLUSTER:_________________";
1601 if (TestBit(AliAnalysisGrid::kTest))
1602 proofType = "= PLUGIN IN PROOF LITE MODE ON CLUSTER:____________";
1603 printf("%s %s\n", proofType.Data(), fProofCluster.Data());
1604 if (!fProofDataSet.IsNull())
1605 printf("= Requested data set:___________________________ %s\n", fProofDataSet.Data());
1607 printf("= Soft reset signal will be send to master______ CHANGE BEHAVIOR AFTER COMPLETION\n");
1609 printf("= Hard reset signal will be send to master______ CHANGE BEHAVIOR AFTER COMPLETION\n");
1610 if (!fRootVersionForProof.IsNull())
1611 printf("= ROOT version requested________________________ %s\n", fRootVersionForProof.Data());
1613 printf("= ROOT version requested________________________ default\n");
1614 printf("= AliRoot version requested_____________________ %s\n", fAliROOTVersion.Data());
1615 if (!fAliRootMode.IsNull())
1616 printf("= Requested AliRoot mode________________________ %s\n", fAliRootMode.Data());
1618 printf("= Number of PROOF workers limited to____________ %d\n", fNproofWorkers);
1619 if (fNproofWorkersPerSlave)
1620 printf("= Maximum number of workers per slave___________ %d\n", fNproofWorkersPerSlave);
1621 if (TestSpecialBit(kClearPackages))
1622 printf("= ClearPackages requested...\n");
1623 if (fIncludePath.Data())
1624 printf("= Include path for runtime task compilation: ___ %s\n", fIncludePath.Data());
1625 printf("= Additional libs to be loaded or souces to be compiled runtime: <%s>\n",fAdditionalLibs.Data());
1626 if (fPackages && fPackages->GetEntries()) {
1627 TIter next(fPackages);
1630 while ((obj=next())) list += obj->GetName();
1631 printf("= Par files to be used: ________________________ %s\n", list.Data());
1633 if (TestSpecialBit(kProofConnectGrid))
1634 printf("= Requested PROOF connection to grid\n");
1637 printf("= OverwriteMode:________________________________ %d\n", fOverwriteMode);
1638 if (fOverwriteMode) {
1639 printf("***** NOTE: Overwrite mode will overwrite the input generated datasets and partial results from previous analysis. \
1640 \n***** To disable, use: plugin->SetOverwriteMode(kFALSE);\n");
1642 printf("= Copy files to grid: __________________________ %s\n", (IsUseCopy())?"YES":"NO");
1643 printf("= Check if files can be copied to grid: ________ %s\n", (IsCheckCopy())?"YES":"NO");
1644 printf("= Production mode:______________________________ %d\n", fProductionMode);
1645 printf("= Version of API requested: ____________________ %s\n", fAPIVersion.Data());
1646 printf("= Version of ROOT requested: ___________________ %s\n", fROOTVersion.Data());
1647 printf("= Version of AliRoot requested: ________________ %s\n", fAliROOTVersion.Data());
1649 printf("= User running the plugin: _____________________ %s\n", fUser.Data());
1650 printf("= Grid workdir relative to user $HOME: _________ %s\n", fGridWorkingDir.Data());
1651 printf("= Grid output directory relative to workdir: ___ %s\n", fGridOutputDir.Data());
1652 printf("= Data base directory path requested: __________ %s\n", fGridDataDir.Data());
1653 printf("= Data search pattern: _________________________ %s\n", fDataPattern.Data());
1654 printf("= Input data format: ___________________________ %s\n", fInputFormat.Data());
1655 if (fRunNumbers.Length())
1656 printf("= Run numbers to be processed: _________________ %s\n", fRunNumbers.Data());
1658 printf("= Run range to be processed: ___________________ %s%d-%s%d\n", fRunPrefix.Data(), fRunRange[0], fRunPrefix.Data(), fRunRange[1]);
1659 if (!fRunRange[0] && !fRunNumbers.Length()) {
1660 TIter next(fInputFiles);
1663 while ((obj=next())) list += obj->GetName();
1664 printf("= Input files to be processed: _________________ %s\n", list.Data());
1666 if (TestBit(AliAnalysisGrid::kTest))
1667 printf("= Number of input files used in test mode: _____ %d\n", fNtestFiles);
1668 printf("= List of output files to be registered: _______ %s\n", fOutputFiles.Data());
1669 printf("= List of outputs going to be archived: ________ %s\n", fOutputArchive.Data());
1670 printf("= List of outputs that should not be merged: ___ %s\n", fMergeExcludes.Data());
1671 printf("= List of outputs produced during Terminate: ___ %s\n", fTerminateFiles.Data());
1672 printf("=====================================================================\n");
1673 printf("= Job price: ___________________________________ %d\n", fPrice);
1674 printf("= Time to live (TTL): __________________________ %d\n", fTTL);
1675 printf("= Max files per subjob: ________________________ %d\n", fSplitMaxInputFileNumber);
1676 if (fMaxInitFailed>0)
1677 printf("= Max number of subjob fails to kill: __________ %d\n", fMaxInitFailed);
1678 if (fMasterResubmitThreshold>0)
1679 printf("= Resubmit master job if failed subjobs >_______ %d\n", fMasterResubmitThreshold);
1680 printf("= Number of replicas for the output files_______ %d\n", fNreplicas);
1681 if (fNrunsPerMaster>0)
1682 printf("= Number of runs per master job: _______________ %d\n", fNrunsPerMaster);
1683 printf("= Number of files in one chunk to be merged: ___ %d\n", fMaxMergeFiles);
1684 printf("= Name of the generated execution script: ______ %s\n", fExecutable.Data());
1685 printf("= Executable command: __________________________ %s\n", fExecutableCommand.Data());
1686 if (fArguments.Length())
1687 printf("= Arguments for the execution script: __________ %s\n",fArguments.Data());
1688 if (fExecutableArgs.Length())
1689 printf("= Arguments after macro name in executable______ %s\n",fExecutableArgs.Data());
1690 printf("= Name of the generated analysis macro: ________ %s\n",fAnalysisMacro.Data());
1691 printf("= User analysis files to be deployed: __________ %s\n",fAnalysisSource.Data());
1692 printf("= Additional libs to be loaded or souces to be compiled runtime: <%s>\n",fAdditionalLibs.Data());
1693 printf("= Master jobs split mode: ______________________ %s\n",fSplitMode.Data());
1695 printf("= Custom name for the dataset to be created: ___ %s\n", fDatasetName.Data());
1696 printf("= Name of the generated JDL: ___________________ %s\n", fJDLName.Data());
1697 if (fIncludePath.Data())
1698 printf("= Include path for runtime task compilation: ___ %s\n", fIncludePath.Data());
1699 if (fCloseSE.Length())
1700 printf("= Force job outputs to storage element: ________ %s\n", fCloseSE.Data());
1701 if (fFriendChainName.Length())
1702 printf("= Open friend chain file on worker: ____________ %s\n", fFriendChainName.Data());
1703 if (fPackages && fPackages->GetEntries()) {
1704 TIter next(fPackages);
1707 while ((obj=next())) list += obj->GetName();
1708 printf("= Par files to be used: ________________________ %s\n", list.Data());
1712 //______________________________________________________________________________
1713 void AliAnalysisAlien::SetDefaults()
1715 // Set default values for everything. What cannot be filled will be left empty.
1716 if (fGridJDL) delete fGridJDL;
1717 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
1718 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
1721 fSplitMaxInputFileNumber = 100;
1723 fMasterResubmitThreshold = 0;
1728 fNrunsPerMaster = 1;
1729 fMaxMergeFiles = 100;
1731 fExecutable = "analysis.sh";
1732 fExecutableCommand = "root -b -q";
1734 fExecutableArgs = "";
1735 fAnalysisMacro = "myAnalysis.C";
1736 fAnalysisSource = "";
1737 fAdditionalLibs = "";
1741 fAliROOTVersion = "";
1742 fUser = ""; // Your alien user name
1743 fGridWorkingDir = "";
1744 fGridDataDir = ""; // Can be like: /alice/sim/PDC_08a/LHC08c9/
1745 fDataPattern = "*AliESDs.root"; // Can be like: *AliESDs.root, */pass1/*AliESDs.root, ...
1746 fFriendChainName = "";
1747 fGridOutputDir = "output";
1748 fOutputArchive = "log_archive.zip:std*,*.stat@disk=1 root_archive.zip:*.root@disk=2";
1749 fOutputFiles = ""; // Like "AliAODs.root histos.root"
1750 fInputFormat = "xml-single";
1751 fJDLName = "analysis.jdl";
1752 fJobTag = "Automatically generated analysis JDL";
1753 fMergeExcludes = "";
1756 SetCheckCopy(kTRUE);
1757 SetDefaultOutputs(kTRUE);
1761 //______________________________________________________________________________
1762 Bool_t AliAnalysisAlien::CheckMergedFiles(const char *filename, const char *aliendir, Int_t nperchunk, Bool_t submit, const char *jdl)
1764 // Static method that checks the status of merging. This can submit merging jobs that did not produced the expected
1765 // output. If <submit> is false (checking) returns true only when the final merged file was found. If submit is true returns
1766 // true if the jobs were successfully submitted.
1767 Int_t countOrig = 0;
1768 Int_t countStage = 0;
1771 Bool_t doneFinal = kFALSE;
1773 TString saliendir(aliendir);
1774 TString sfilename, stmp;
1775 saliendir.ReplaceAll("//","/");
1776 saliendir = saliendir.Strip(TString::kTrailing, '/');
1778 ::Error("GetNregisteredFiles", "You need to be connected to AliEn.");
1781 sfilename = filename;
1782 sfilename.ReplaceAll(".root", "*.root");
1783 printf("Checking directory <%s> for merged files <%s> ...\n", aliendir, sfilename.Data());
1784 TString command = Form("find %s/ *%s", saliendir.Data(), sfilename.Data());
1785 TGridResult *res = gGrid->Command(command);
1787 ::Error("GetNregisteredFiles","Error: No result for the find command\n");
1792 while ((map=(TMap*)nextmap())) {
1793 TString turl = map->GetValue("turl")->GetName();
1794 if (!turl.Length()) {
1799 turl.ReplaceAll("alien://", "");
1800 turl.ReplaceAll(saliendir, "");
1801 sfilename = gSystem->BaseName(turl);
1802 turl = turl.Strip(TString::kLeading, '/');
1803 // Now check to what the file corresponds to:
1804 // original output - aliendir/%03d/filename
1805 // merged file (which stage) - aliendir/filename-Stage%02d_%04d
1806 // final merged file - aliendir/filename
1807 if (sfilename == turl) {
1808 if (sfilename == filename) {
1812 Int_t index = sfilename.Index("Stage");
1813 if (index<0) continue;
1814 stmp = sfilename(index+5,2);
1815 Int_t istage = atoi(stmp);
1816 stmp = sfilename(index+8,4);
1817 Int_t ijob = atoi(stmp);
1818 if (istage<stage) continue; // Ignore lower stages
1821 chunksDone.ResetAllBits();
1825 chunksDone.SetBitNumber(ijob);
1832 printf("=> Removing files from previous stages...\n");
1833 gGrid->Rm(Form("%s/*Stage*.root", aliendir));
1834 for (i=1; i<stage; i++)
1835 gGrid->Rm(Form("%s/*Stage%d*.zip", aliendir, i));
1840 // Compute number of jobs that were submitted for the current stage
1841 Int_t ntotstage = countOrig;
1842 for (i=1; i<=stage; i++) {
1843 if (ntotstage%nperchunk) ntotstage = (ntotstage/nperchunk)+1;
1844 else ntotstage = (ntotstage/nperchunk);
1846 // Now compare with the number of set bits in the chunksDone array
1847 Int_t nmissing = (stage>0)?(ntotstage - countStage):0;
1849 printf("*** Found %d original files\n", countOrig);
1850 if (stage==0) printf("*** No merging completed so far.\n");
1851 else printf("*** Found %d out of %d files merged for stage %d\n", countStage, ntotstage, stage);
1852 if (nmissing) printf("*** Number of merged files missing for this stage: %d -> check merging job completion\n", nmissing);
1853 if (!submit) return doneFinal;
1854 // Sumbit merging jobs for all missing chunks for the current stage.
1855 TString query = Form("submit %s %s", jdl, aliendir);
1857 chunksDone.SetBitNumber(ntotstage); // expand the array to the maximum number of chunks
1859 for (i=0; i<nmissing; i++) {
1860 ichunk = chunksDone.FirstNullBit(ichunk+1);
1861 Int_t jobId = SubmitSingleJob(Form("%s %d %d", query.Data(), stage, ichunk));
1862 if (!jobId) return kFALSE;
1866 // Submit next stage of merging
1867 if (stage==0) countStage = countOrig;
1868 Int_t nchunks = (countStage/nperchunk);
1869 if (countStage%nperchunk) nchunks += 1;
1870 for (i=0; i<nchunks; i++) {
1871 Int_t jobId = SubmitSingleJob(Form("%s %d %d", query.Data(), stage+1, i));
1872 if (!jobId) return kFALSE;
1877 //______________________________________________________________________________
1878 Int_t AliAnalysisAlien::SubmitSingleJob(const char *query)
1880 // Submits a single job corresponding to the query and returns job id. If 0 submission failed.
1881 if (!gGrid) return 0;
1882 printf("=> %s ------> ",query);
1883 TGridResult *res = gGrid->Command(query);
1885 TString jobId = res->GetKey(0,"jobId");
1887 if (jobId.IsNull()) {
1888 printf("submission failed. Reason:\n");
1891 ::Error("SubmitSingleJob", "Your query %s could not be submitted", query);
1894 printf(" Job id: %s\n", jobId.Data());
1898 //______________________________________________________________________________
1899 Bool_t AliAnalysisAlien::MergeOutput(const char *output, const char *basedir, Int_t nmaxmerge, Int_t stage, Int_t ichunk)
1901 // Merge given output files from basedir. The file merger will merge nmaxmerge
1902 // files in a group. Merging can be done in stages:
1903 // stage=0 : will merge all existing files in a single stage
1904 // stage=1 : does a find command for all files that do NOT contain the string "Stage".
1905 // If their number is bigger that nmaxmerge, only the files from
1906 // ichunk*nmaxmerge to ichunk*(nmaxmerge+1)-1 will get merged as output_stage_<ichunk>
1907 // stage=n : does a find command for files named <output>Stage<stage-1>_*. If their number is bigger than
1908 // nmaxmerge, merge just the chunk ichunk, otherwise write the merged output to the file
1910 TString outputFile = output;
1912 TString outputChunk;
1913 TString previousChunk = "";
1914 Int_t countChunk = 0;
1915 Int_t countZero = nmaxmerge;
1916 Bool_t merged = kTRUE;
1917 Int_t index = outputFile.Index("@");
1918 if (index > 0) outputFile.Remove(index);
1919 TString inputFile = outputFile;
1920 if (stage>1) inputFile.ReplaceAll(".root", Form("-Stage%02d_*.root", stage-1));
1921 command = Form("find %s/ *%s", basedir, inputFile.Data());
1922 printf("command: %s\n", command.Data());
1923 TGridResult *res = gGrid->Command(command);
1925 ::Error("MergeOutput","No result for the find command\n");
1929 TFileMerger *fm = 0;
1932 // Check if there is a merge operation to resume. Works only for stage 0 or 1.
1933 outputChunk = outputFile;
1934 outputChunk.ReplaceAll(".root", "_*.root");
1935 // Check for existent temporary merge files
1936 // Check overwrite mode and remove previous partial results if needed
1937 // Preserve old merging functionality for stage 0.
1939 Int_t countChar = 0;
1940 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
1942 // Skip as many input files as in a chunk
1943 for (Int_t counter=0; counter<nmaxmerge; counter++) {
1944 map = (TMap*)nextmap();
1946 ::Error("MergeOutput", "Mismatch found. Please remove partial merged files from local dir.");
1950 TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("turl"));
1951 // Count the '/' characters in the path to the current file.
1952 Int_t crtCount = objs->GetString().CountChar('/');
1954 countChar = crtCount;
1955 // Make sure we check if the same file in the parent dir exists
1956 if (FileExists(Form("%s/../%s", basedir, output))) countChar--;
1958 if (crtCount > countChar) counter--;
1961 ::Error("MergeOutput", "Cannot resume merging for <%s>, nentries=%d", outputFile.Data(), res->GetSize());
1965 outputChunk = outputFile;
1966 outputChunk.ReplaceAll(".root", Form("_%04d.root", countChunk));
1968 if (gSystem->AccessPathName(outputChunk)) continue;
1969 // Merged file with chunks up to <countChunk> found
1970 ::Info("MergeOutput", "Resume merging of <%s> from <%s>\n", outputFile.Data(), outputChunk.Data());
1971 previousChunk = outputChunk;
1975 countZero = nmaxmerge;
1977 while ((map=(TMap*)nextmap())) {
1978 TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("turl"));
1979 if (!objs || !objs->GetString().Length()) {
1980 // Nothing found - skip this output
1985 // Make sure this is a good file and not one from a subjob directory in case we merge runs
1986 // Count the '/' characters in the path to the current file.
1987 Int_t crtCount = objs->GetString().CountChar('/');
1989 countChar = crtCount;
1990 // Make sure we check if the same file in the parent dir exists
1991 if (FileExists(Form("%s/../%s", basedir, output))) countChar--;
1993 if (crtCount > countChar) continue;
1994 // Loop 'find' results and get next LFN
1995 if (countZero == nmaxmerge) {
1996 // First file in chunk - create file merger and add previous chunk if any.
1997 fm = new TFileMerger(kFALSE);
1998 fm->SetFastMethod(kTRUE);
1999 if (previousChunk.Length()) fm->AddFile(previousChunk.Data());
2000 outputChunk = outputFile;
2001 outputChunk.ReplaceAll(".root", Form("_%04d.root", countChunk));
2003 // If last file found, put merged results in the output file
2004 if (map == res->Last()) outputChunk = outputFile;
2005 // Add file to be merged and decrement chunk counter.
2006 fm->AddFile(objs->GetString());
2008 if (countZero==0 || map == res->Last()) {
2009 if (!fm->GetMergeList() || !fm->GetMergeList()->GetSize()) {
2010 // Nothing found - skip this output
2011 ::Warning("MergeOutput", "No <%s> files found.", inputFile.Data());
2016 fm->OutputFile(outputChunk);
2017 // Merge the outputs, then go to next chunk
2019 ::Error("MergeOutput", "Could not merge all <%s> files", outputFile.Data());
2024 ::Info("MergeOutputs", "\n##### Merged %d output files to <%s>", fm->GetMergeList()->GetSize(), outputChunk.Data());
2025 gSystem->Unlink(previousChunk);
2027 if (map == res->Last()) {
2033 countZero = nmaxmerge;
2034 previousChunk = outputChunk;
2039 // Merging stage different than 0.
2040 // Move to the begining of the requested chunk.
2041 outputChunk = outputFile;
2042 if (nmaxmerge < res->GetSize()) {
2043 if (ichunk*nmaxmerge >= res->GetSize()) {
2044 ::Error("MergeOutput", "Cannot merge merge chunk %d grouping %d files from %d total.", ichunk, nmaxmerge, res->GetSize());
2048 for (Int_t counter=0; counter<ichunk*nmaxmerge; counter++) nextmap();
2049 outputChunk.ReplaceAll(".root", Form("-Stage%02d_%04d.root", stage, ichunk));
2051 countZero = nmaxmerge;
2052 fm = new TFileMerger(kFALSE);
2053 fm->SetFastMethod(kTRUE);
2054 while ((map=(TMap*)nextmap())) {
2055 // Loop 'find' results and get next LFN
2056 TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("turl"));
2057 if (!objs || !objs->GetString().Length()) {
2058 // Nothing found - skip this output
2063 // Add file to be merged and decrement chunk counter.
2064 fm->AddFile(objs->GetString());
2066 if (countZero==0) break;
2069 if (!fm->GetMergeList() || !fm->GetMergeList()->GetSize()) {
2070 // Nothing found - skip this output
2071 ::Warning("MergeOutput", "No <%s> files found.", inputFile.Data());
2075 fm->OutputFile(outputChunk);
2076 // Merge the outputs
2078 ::Error("MergeOutput", "Could not merge all <%s> files", outputFile.Data());
2082 ::Info("MergeOutput", "\n##### Merged %d output files to <%s>", fm->GetMergeList()->GetSize(), outputChunk.Data());
2088 //______________________________________________________________________________
2089 Bool_t AliAnalysisAlien::MergeOutputs()
2091 // Merge analysis outputs existing in the AliEn space.
2092 if (TestBit(AliAnalysisGrid::kTest)) return kTRUE;
2093 if (TestBit(AliAnalysisGrid::kOffline)) return kFALSE;
2095 Error("MergeOutputs", "Cannot merge outputs without grid connection. Terminate will NOT be executed");
2099 if (!TestBit(AliAnalysisGrid::kMerge)) {
2100 Info("MergeOutputs", "### Re-run with <MergeViaJDL> option in terminate mode of the plugin to submit merging jobs ###");
2103 if (fProductionMode) {
2104 Info("MergeOutputs", "### Merging will be submitted by LPM manager... ###");
2107 Info("MergeOutputs", "Submitting merging JDL");
2108 if (!SubmitMerging()) return kFALSE;
2109 Info("MergeOutputs", "### Re-run with <MergeViaJDL> off to collect results after merging jobs are done ###");
2110 Info("MergeOutputs", "### The Terminate() method is executed by the merging jobs");
2113 // Get the output path
2114 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("/%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
2115 if (!DirectoryExists(fGridOutputDir)) {
2116 Error("MergeOutputs", "Grid output directory %s not found. Terminate() will NOT be executed", fGridOutputDir.Data());
2119 if (!fOutputFiles.Length()) {
2120 Error("MergeOutputs", "No output file names defined. Are you running the right AliAnalysisAlien configuration ?");
2123 // Check if fast read option was requested
2124 Info("MergeOutputs", "Started local merging of output files from: alien://%s \
2125 \n======= overwrite mode = %d", fGridOutputDir.Data(), (Int_t)fOverwriteMode);
2126 if (fFastReadOption) {
2127 Warning("MergeOutputs", "You requested FastRead option. Using xrootd flags to reduce timeouts. This may skip some files that could be accessed ! \
2128 \n+++ NOTE: To disable this option, use: plugin->SetFastReadOption(kFALSE)");
2129 gEnv->SetValue("XNet.ConnectTimeout",10);
2130 gEnv->SetValue("XNet.RequestTimeout",10);
2131 gEnv->SetValue("XNet.MaxRedirectCount",2);
2132 gEnv->SetValue("XNet.ReconnectTimeout",10);
2133 gEnv->SetValue("XNet.FirstConnectMaxCnt",1);
2135 // Make sure we change the temporary directory
2136 gSystem->Setenv("TMPDIR", gSystem->pwd());
2137 TObjArray *list = fOutputFiles.Tokenize(",");
2141 Bool_t merged = kTRUE;
2142 while((str=(TObjString*)next())) {
2143 outputFile = str->GetString();
2144 Int_t index = outputFile.Index("@");
2145 if (index > 0) outputFile.Remove(index);
2146 TString outputChunk = outputFile;
2147 outputChunk.ReplaceAll(".root", "_*.root");
2148 // Skip already merged outputs
2149 if (!gSystem->AccessPathName(outputFile)) {
2150 if (fOverwriteMode) {
2151 Info("MergeOutputs", "Overwrite mode. Existing file %s was deleted.", outputFile.Data());
2152 gSystem->Unlink(outputFile);
2153 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
2154 Info("MergeOutput", "Overwrite mode: partial merged files %s will removed",
2155 outputChunk.Data());
2156 gSystem->Exec(Form("rm -f %s", outputChunk.Data()));
2159 Info("MergeOutputs", "Output file <%s> found. Not merging again.", outputFile.Data());
2163 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
2164 Info("MergeOutput", "Overwrite mode: partial merged files %s will removed",
2165 outputChunk.Data());
2166 gSystem->Exec(Form("rm -f %s", outputChunk.Data()));
2169 if (fMergeExcludes.Length() &&
2170 fMergeExcludes.Contains(outputFile.Data())) continue;
2171 // Perform a 'find' command in the output directory, looking for registered outputs
2172 merged = MergeOutput(outputFile, fGridOutputDir, fMaxMergeFiles);
2174 Error("MergeOutputs", "Terminate() will NOT be executed");
2177 TFile *fileOpened = (TFile*)gROOT->GetListOfFiles()->FindObject(outputFile);
2178 if (fileOpened) fileOpened->Close();
2183 //______________________________________________________________________________
2184 void AliAnalysisAlien::SetDefaultOutputs(Bool_t flag)
2186 // Use the output files connected to output containers from the analysis manager
2187 // rather than the files defined by SetOutputFiles
2188 if (flag && !TObject::TestBit(AliAnalysisGrid::kDefaultOutputs))
2189 Info("SetDefaultOutputs", "Plugin will use the output files taken from analysis manager");
2190 TObject::SetBit(AliAnalysisGrid::kDefaultOutputs, flag);
2193 //______________________________________________________________________________
2194 void AliAnalysisAlien::SetOutputFiles(const char *list)
2196 // Manually set the output files list.
2197 // Removes duplicates. Not allowed if default outputs are not disabled.
2198 if (TObject::TestBit(AliAnalysisGrid::kDefaultOutputs)) {
2199 Fatal("SetOutputFiles", "You have to explicitly call SetDefaultOutputs(kFALSE) to manually set output files.");
2202 Info("SetOutputFiles", "Output file list is set manually - you are on your own.");
2204 TString slist = list;
2205 if (slist.Contains("@")) Warning("SetOutputFiles","The plugin does not allow explicit SE's. Please use: SetNumberOfReplicas() instead.");
2206 TObjArray *arr = slist.Tokenize(" ");
2210 while ((os=(TObjString*)next())) {
2211 sout = os->GetString();
2212 if (sout.Index("@")>0) sout.Remove(sout.Index("@"));
2213 if (fOutputFiles.Contains(sout)) continue;
2214 if (!fOutputFiles.IsNull()) fOutputFiles += ",";
2215 fOutputFiles += sout;
2220 //______________________________________________________________________________
2221 void AliAnalysisAlien::SetOutputArchive(const char *list)
2223 // Manually set the output archive list. Free text - you are on your own...
2224 // Not allowed if default outputs are not disabled.
2225 if (TObject::TestBit(AliAnalysisGrid::kDefaultOutputs)) {
2226 Fatal("SetOutputArchive", "You have to explicitly call SetDefaultOutputs(kFALSE) to manually set the output archives.");
2229 Info("SetOutputArchive", "Output archive is set manually - you are on your own.");
2230 fOutputArchive = list;
2233 //______________________________________________________________________________
2234 void AliAnalysisAlien::SetPreferedSE(const char */*se*/)
2236 // Setting a prefered output SE is not allowed anymore.
2237 Warning("SetPreferedSE", "Setting a preferential SE is not allowed anymore via the plugin. Use SetNumberOfReplicas() and SetDefaultOutputs()");
2240 //______________________________________________________________________________
2241 Bool_t AliAnalysisAlien::StartAnalysis(Long64_t /*nentries*/, Long64_t /*firstEntry*/)
2243 // Start remote grid analysis.
2244 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
2245 Bool_t testMode = TestBit(AliAnalysisGrid::kTest);
2246 if (!mgr || !mgr->IsInitialized()) {
2247 Error("StartAnalysis", "You need an initialized analysis manager for this");
2250 // Are we in PROOF mode ?
2251 if (mgr->IsProofMode()) {
2252 Info("StartAnalysis", "##### Starting PROOF analysis on cluster <%s> via the plugin #####", fProofCluster.Data());
2253 if (fProofCluster.IsNull()) {
2254 Error("StartAnalysis", "You need to specify the proof cluster name via SetProofCluster");
2257 if (fProofDataSet.IsNull() && !testMode) {
2258 Error("StartAnalysis", "You need to specify a dataset using SetProofDataSet()");
2261 // Set the needed environment
2262 gEnv->SetValue("XSec.GSI.DelegProxy","2");
2263 // Do we need to reset PROOF ? The success of the Reset operation cannot be checked
2264 if (fProofReset && !testMode) {
2265 if (fProofReset==1) {
2266 Info("StartAnalysis", "Sending soft reset signal to proof cluster %s", fProofCluster.Data());
2267 gROOT->ProcessLine(Form("TProof::Reset(\"%s\", kFALSE);", fProofCluster.Data()));
2269 Info("StartAnalysis", "Sending hard reset signal to proof cluster %s", fProofCluster.Data());
2270 gROOT->ProcessLine(Form("TProof::Reset(\"%s\", kTRUE);", fProofCluster.Data()));
2272 Info("StartAnalysis", "Stopping the analysis. Please use SetProofReset(0) to resume.");
2275 // Do we need to change the ROOT version ? The success of this cannot be checked.
2276 if (!fRootVersionForProof.IsNull() && !testMode) {
2277 gROOT->ProcessLine(Form("TProof::Mgr(\"%s\")->SetROOTVersion(\"%s\");",
2278 fProofCluster.Data(), fRootVersionForProof.Data()));
2280 // Connect to PROOF and check the status
2283 if (fNproofWorkersPerSlave) sworkers = Form("workers=%dx", fNproofWorkersPerSlave);
2284 else if (fNproofWorkers) sworkers = Form("workers=%d", fNproofWorkers);
2286 if (!sworkers.IsNull())
2287 proof = gROOT->ProcessLine(Form("TProof::Open(\"%s\", \"%s\");", fProofCluster.Data(), sworkers.Data()));
2289 proof = gROOT->ProcessLine(Form("TProof::Open(\"%s\");", fProofCluster.Data()));
2291 proof = gROOT->ProcessLine("TProof::Open(\"\");");
2293 Error("StartAnalysis", "Could not start PROOF in test mode");
2298 Error("StartAnalysis", "Could not connect to PROOF cluster <%s>", fProofCluster.Data());
2301 if (fNproofWorkersPerSlave*fNproofWorkers > 0)
2302 gROOT->ProcessLine(Form("gProof->SetParallel(%d);", fNproofWorkers));
2303 // Is dataset existing ?
2305 TString dataset = fProofDataSet;
2306 Int_t index = dataset.Index("#");
2307 if (index>=0) dataset.Remove(index);
2308 // if (!gROOT->ProcessLine(Form("gProof->ExistsDataSet(\"%s\");",fProofDataSet.Data()))) {
2309 // Error("StartAnalysis", "Dataset %s not existing", fProofDataSet.Data());
2312 // Info("StartAnalysis", "Dataset %s found", dataset.Data());
2314 // Is ClearPackages() needed ?
2315 if (TestSpecialBit(kClearPackages)) {
2316 Info("StartAnalysis", "ClearPackages signal sent to PROOF. Use SetClearPackages(kFALSE) to reset this.");
2317 gROOT->ProcessLine("gProof->ClearPackages();");
2319 // Is a given aliroot mode requested ?
2322 if (!fAliRootMode.IsNull()) {
2323 TString alirootMode = fAliRootMode;
2324 if (alirootMode == "default") alirootMode = "";
2325 Info("StartAnalysis", "You are requesting AliRoot mode: %s", fAliRootMode.Data());
2326 optionsList.SetOwner();
2327 optionsList.Add(new TNamed("ALIROOT_MODE", alirootMode.Data()));
2328 // Check the additional libs to be loaded
2330 Bool_t parMode = kFALSE;
2331 if (!alirootMode.IsNull()) extraLibs = "ANALYSIS:ANALYSISalice";
2332 // Parse the extra libs for .so
2333 if (fAdditionalLibs.Length()) {
2334 TObjArray *list = fAdditionalLibs.Tokenize(" ");
2337 while((str=(TObjString*)next())) {
2338 if (str->GetString().Contains(".so")) {
2340 Warning("StartAnalysis", "Plugin does not support loading libs after par files in PROOF mode. Library %s and following will not load on workers", str->GetName());
2343 TString stmp = str->GetName();
2344 if (stmp.BeginsWith("lib")) stmp.Remove(0,3);
2345 stmp.ReplaceAll(".so","");
2346 if (!extraLibs.IsNull()) extraLibs += ":";
2350 if (str->GetString().Contains(".par")) {
2351 // The first par file found in the list will not allow any further .so
2353 if (!parLibs.IsNull()) parLibs += ":";
2354 parLibs += str->GetName();
2358 if (list) delete list;
2360 if (!extraLibs.IsNull()) optionsList.Add(new TNamed("ALIROOT_EXTRA_LIBS",extraLibs.Data()));
2361 // Check extra includes
2362 if (!fIncludePath.IsNull()) {
2363 TString includePath = fIncludePath;
2364 includePath.ReplaceAll(" ",":");
2365 includePath.Strip(TString::kTrailing, ':');
2366 Info("StartAnalysis", "Adding extra includes: %s",includePath.Data());
2367 optionsList.Add(new TNamed("ALIROOT_EXTRA_INCLUDES",includePath.Data()));
2369 // Check if connection to grid is requested
2370 if (TestSpecialBit(kProofConnectGrid))
2371 optionsList.Add(new TNamed("ALIROOT_ENABLE_ALIEN", "1"));
2372 // Enable AliRoot par
2374 // Enable proof lite package
2375 TString alirootLite = gSystem->ExpandPathName("$ALICE_ROOT/ANALYSIS/macros/AliRootProofLite.par");
2376 for (Int_t i=0; i<optionsList.GetSize(); i++) {
2377 TNamed *obj = (TNamed*)optionsList.At(i);
2378 printf("%s %s\n", obj->GetName(), obj->GetTitle());
2380 if (!gROOT->ProcessLine(Form("gProof->UploadPackage(\"%s\");",alirootLite.Data()))
2381 && !gROOT->ProcessLine(Form("gProof->EnablePackage(\"%s\", (TList*)0x%lx);",alirootLite.Data(),(ULong_t)&optionsList))) {
2382 Info("StartAnalysis", "AliRootProofLite enabled");
2384 Error("StartAnalysis", "There was an error trying to enable package AliRootProofLite.par");
2388 if (gROOT->ProcessLine(Form("gProof->EnablePackage(\"VO_ALICE@AliRoot::%s\", (TList*)0x%lx);",
2389 fAliROOTVersion.Data(), (ULong_t)&optionsList))) {
2390 Error("StartAnalysis", "There was an error trying to enable package VO_ALICE@AliRoot::%s", fAliROOTVersion.Data());
2394 // Enable first par files from fAdditionalLibs
2395 if (!parLibs.IsNull()) {
2396 TObjArray *list = parLibs.Tokenize(":");
2398 TObjString *package;
2399 while((package=(TObjString*)next())) {
2400 TString spkg = package->GetName();
2401 spkg.ReplaceAll(".par", "");
2402 gSystem->Exec(TString::Format("rm -rf %s", spkg.Data()));
2403 if (!gROOT->ProcessLine(Form("gProof->UploadPackage(\"%s\");", package->GetName()))) {
2404 TString enablePackage = (testMode)?Form("gProof->EnablePackage(\"%s\",kFALSE);", package->GetName()):Form("gProof->EnablePackage(\"%s\",kTRUE);", package->GetName());
2405 if (gROOT->ProcessLine(enablePackage)) {
2406 Error("StartAnalysis", "There was an error trying to enable package %s", package->GetName());
2410 Error("StartAnalysis", "There was an error trying to upload package %s", package->GetName());
2414 if (list) delete list;
2417 if (fAdditionalLibs.Contains(".so") && !testMode) {
2418 Error("StartAnalysis", "You request additional libs to be loaded but did not enabled any AliRoot mode. Please refer to: \
2419 \n http://aaf.cern.ch/node/83 and use a parameter for SetAliRootMode()");
2423 // Enable par files if requested
2424 if (fPackages && fPackages->GetEntries()) {
2425 TIter next(fPackages);
2427 while ((package=next())) {
2428 // Skip packages already enabled
2429 if (parLibs.Contains(package->GetName())) continue;
2430 TString spkg = package->GetName();
2431 spkg.ReplaceAll(".par", "");
2432 gSystem->Exec(TString::Format("rm -rf %s", spkg.Data()));
2433 if (gROOT->ProcessLine(Form("gProof->UploadPackage(\"%s\");", package->GetName()))) {
2434 if (gROOT->ProcessLine(Form("gProof->EnablePackage(\"%s\",kTRUE);", package->GetName()))) {
2435 Error("StartAnalysis", "There was an error trying to enable package %s", package->GetName());
2439 Error("StartAnalysis", "There was an error trying to upload package %s", package->GetName());
2444 // Do we need to load analysis source files ?
2445 // NOTE: don't load on client since this is anyway done by the user to attach his task.
2446 if (fAnalysisSource.Length()) {
2447 TObjArray *list = fAnalysisSource.Tokenize(" ");
2450 while((str=(TObjString*)next())) {
2451 gROOT->ProcessLine(Form("gProof->Load(\"%s+g\", kTRUE);", str->GetName()));
2453 if (list) delete list;
2456 // Register dataset to proof lite.
2457 if (fFileForTestMode.IsNull()) {
2458 Error("GetChainForTestMode", "For proof test mode please use SetFileForTestMode() pointing to a file that contains data file locations.");
2461 if (gSystem->AccessPathName(fFileForTestMode)) {
2462 Error("GetChainForTestMode", "File not found: %s", fFileForTestMode.Data());
2465 TFileCollection *coll = new TFileCollection();
2466 coll->AddFromFile(fFileForTestMode);
2467 gROOT->ProcessLine(Form("gProof->RegisterDataSet(\"test_collection\", (TFileCollection*)0x%lx, \"OV\");", (ULong_t)coll));
2468 gROOT->ProcessLine("gProof->ShowDataSets()");
2473 // Check if output files have to be taken from the analysis manager
2474 if (TestBit(AliAnalysisGrid::kDefaultOutputs)) {
2475 // Add output files and AOD files
2476 fOutputFiles = GetListOfFiles("outaod");
2477 // Add extra files registered to the analysis manager
2478 TString extra = GetListOfFiles("ext");
2479 if (!extra.IsNull()) {
2480 extra.ReplaceAll(".root", "*.root");
2481 if (!fOutputFiles.IsNull()) fOutputFiles += ",";
2482 fOutputFiles += extra;
2484 // Compose the output archive.
2485 fOutputArchive = "log_archive.zip:std*,*.stat@disk=1 ";
2486 fOutputArchive += Form("root_archive.zip:%s@disk=%d",fOutputFiles.Data(),fNreplicas);
2488 // if (!fCloseSE.Length()) fCloseSE = gSystem->Getenv("alien_CLOSE_SE");
2489 if (TestBit(AliAnalysisGrid::kOffline)) {
2490 Info("StartAnalysis","\n##### OFFLINE MODE ##### Files to be used in GRID are produced but not copied \
2491 \n there nor any job run. You can revise the JDL and analysis \
2492 \n macro then run the same in \"submit\" mode.");
2493 } else if (TestBit(AliAnalysisGrid::kTest)) {
2494 Info("StartAnalysis","\n##### LOCAL MODE ##### Your analysis will be run locally on a subset of the requested \
2496 } else if (TestBit(AliAnalysisGrid::kSubmit)) {
2497 Info("StartAnalysis","\n##### SUBMIT MODE ##### Files required by your analysis are copied to your grid working \
2498 \n space and job submitted.");
2499 } else if (TestBit(AliAnalysisGrid::kMerge)) {
2500 Info("StartAnalysis","\n##### MERGE MODE ##### The registered outputs of the analysis will be merged");
2501 if (fMergeViaJDL) CheckInputData();
2504 Info("StartAnalysis","\n##### FULL ANALYSIS MODE ##### Producing needed files and submitting your analysis job...");
2509 Error("StartAnalysis", "Cannot start grid analysis without grid connection");
2512 if (IsCheckCopy() && gGrid) CheckFileCopy(gGrid->GetHomeDirectory());
2513 if (!CheckInputData()) {
2514 Error("StartAnalysis", "There was an error in preprocessing your requested input data");
2517 if (!CreateDataset(fDataPattern)) {
2519 if (!fRunNumbers.Length() && !fRunRange[0]) serror = Form("path to data directory: <%s>", fGridDataDir.Data());
2520 if (fRunNumbers.Length()) serror = "run numbers";
2521 if (fRunRange[0]) serror = Form("run range [%d, %d]", fRunRange[0], fRunRange[1]);
2522 serror += Form("\n or data pattern <%s>", fDataPattern.Data());
2523 Error("StartAnalysis", "No data to process. Please fix %s in your plugin configuration.", serror.Data());
2526 WriteAnalysisFile();
2527 WriteAnalysisMacro();
2529 WriteValidationScript();
2531 WriteMergingMacro();
2532 WriteMergeExecutable();
2533 WriteValidationScript(kTRUE);
2535 if (!CreateJDL()) return kFALSE;
2536 if (TestBit(AliAnalysisGrid::kOffline)) return kFALSE;
2538 // Locally testing the analysis
2539 Info("StartAnalysis", "\n_______________________________________________________________________ \
2540 \n Running analysis script in a daughter shell as on a worker node \
2541 \n_______________________________________________________________________");
2542 TObjArray *list = fOutputFiles.Tokenize(",");
2546 while((str=(TObjString*)next())) {
2547 outputFile = str->GetString();
2548 Int_t index = outputFile.Index("@");
2549 if (index > 0) outputFile.Remove(index);
2550 if (!gSystem->AccessPathName(outputFile)) gSystem->Exec(Form("rm %s", outputFile.Data()));
2553 gSystem->Exec(Form("bash %s 2>stderr", fExecutable.Data()));
2554 gSystem->Exec(Form("bash %s",fValidationScript.Data()));
2555 // gSystem->Exec("cat stdout");
2558 // Check if submitting is managed by LPM manager
2559 if (fProductionMode) {
2560 TString prodfile = fJDLName;
2561 prodfile.ReplaceAll(".jdl", ".prod");
2562 WriteProductionFile(prodfile);
2563 Info("StartAnalysis", "Job submitting is managed by LPM. Rerun in terminate mode after jobs finished.");
2566 // Submit AliEn job(s)
2567 gGrid->Cd(fGridOutputDir);
2570 if (!fRunNumbers.Length() && !fRunRange[0]) {
2571 // Submit a given xml or a set of runs
2572 res = gGrid->Command(Form("submit %s", fJDLName.Data()));
2573 printf("*************************** %s\n",Form("submit %s", fJDLName.Data()));
2575 const char *cjobId = res->GetKey(0,"jobId");
2579 Error("StartAnalysis", "Your JDL %s could not be submitted", fJDLName.Data());
2582 Info("StartAnalysis", "\n_______________________________________________________________________ \
2583 \n##### Your JDL %s was successfully submitted. \nTHE JOB ID IS: %s \
2584 \n_______________________________________________________________________",
2585 fJDLName.Data(), cjobId);
2590 Error("StartAnalysis", "No grid result after submission !!! Bailing out...");
2594 // Submit for a range of enumeration of runs.
2595 if (!Submit()) return kFALSE;
2598 Info("StartAnalysis", "\n#### STARTING AN ALIEN SHELL FOR YOU. EXIT WHEN YOUR JOB %s HAS FINISHED. #### \
2599 \n You may exit at any time and terminate the job later using the option <terminate> \
2600 \n ##################################################################################", jobID.Data());
2601 gSystem->Exec("aliensh");
2605 //______________________________________________________________________________
2606 const char *AliAnalysisAlien::GetListOfFiles(const char *type)
2608 // Get a comma-separated list of output files of the requested type.
2609 // Type can be (case unsensitive):
2610 // aod - list of aod files (std, extensions and filters)
2611 // out - list of output files connected to containers (but not aod's or extras)
2612 // ext - list of extra files registered to the manager
2613 // ter - list of files produced in terminate
2614 static TString files;
2616 TString stype = type;
2618 TString aodfiles, extra;
2619 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
2621 ::Error("GetListOfFiles", "Cannot call this without analysis manager");
2622 return files.Data();
2624 if (mgr->GetOutputEventHandler()) {
2625 aodfiles = mgr->GetOutputEventHandler()->GetOutputFileName();
2626 TString extraaod = mgr->GetOutputEventHandler()->GetExtraOutputs();
2627 if (!extraaod.IsNull()) {
2629 aodfiles += extraaod;
2632 if (stype.Contains("aod")) {
2634 if (stype == "aod") return files.Data();
2636 // Add output files that are not in the list of AOD files
2637 TString outputfiles = "";
2638 TIter next(mgr->GetOutputs());
2639 AliAnalysisDataContainer *output;
2640 const char *filename = 0;
2641 while ((output=(AliAnalysisDataContainer*)next())) {
2642 filename = output->GetFileName();
2643 if (!(strcmp(filename, "default"))) continue;
2644 if (outputfiles.Contains(filename)) continue;
2645 if (aodfiles.Contains(filename)) continue;
2646 if (!outputfiles.IsNull()) outputfiles += ",";
2647 outputfiles += filename;
2649 if (stype.Contains("out")) {
2650 if (!files.IsNull()) files += ",";
2651 files += outputfiles;
2652 if (stype == "out") return files.Data();
2654 // Add extra files registered to the analysis manager
2656 extra = mgr->GetExtraFiles();
2657 if (!extra.IsNull()) {
2659 extra.ReplaceAll(" ", ",");
2660 TObjArray *fextra = extra.Tokenize(",");
2661 TIter nextx(fextra);
2663 while ((obj=nextx())) {
2664 if (aodfiles.Contains(obj->GetName())) continue;
2665 if (outputfiles.Contains(obj->GetName())) continue;
2666 if (sextra.Contains(obj->GetName())) continue;
2667 if (!sextra.IsNull()) sextra += ",";
2668 sextra += obj->GetName();
2671 if (stype.Contains("ext")) {
2672 if (!files.IsNull()) files += ",";
2676 if (stype == "ext") return files.Data();
2678 if (!fTerminateFiles.IsNull()) {
2679 fTerminateFiles.Strip();
2680 fTerminateFiles.ReplaceAll(" ",",");
2681 TObjArray *fextra = fTerminateFiles.Tokenize(",");
2682 TIter nextx(fextra);
2684 while ((obj=nextx())) {
2685 if (aodfiles.Contains(obj->GetName())) continue;
2686 if (outputfiles.Contains(obj->GetName())) continue;
2687 if (termfiles.Contains(obj->GetName())) continue;
2688 if (sextra.Contains(obj->GetName())) continue;
2689 if (!termfiles.IsNull()) termfiles += ",";
2690 termfiles += obj->GetName();
2694 if (stype.Contains("ter")) {
2695 if (!files.IsNull() && !termfiles.IsNull()) {
2700 return files.Data();
2703 //______________________________________________________________________________
2704 Bool_t AliAnalysisAlien::Submit()
2706 // Submit all master jobs.
2707 Int_t nmasterjobs = fInputFiles->GetEntries();
2708 Long_t tshoot = gSystem->Now();
2709 if (!fNsubmitted && !SubmitNext()) return kFALSE;
2710 while (fNsubmitted < nmasterjobs) {
2711 Long_t now = gSystem->Now();
2712 if ((now-tshoot)>30000) {
2714 if (!SubmitNext()) return kFALSE;
2720 //______________________________________________________________________________
2721 Bool_t AliAnalysisAlien::SubmitMerging()
2723 // Submit all merging jobs.
2724 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("/%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
2725 gGrid->Cd(fGridOutputDir);
2726 TString mergeJDLName = fExecutable;
2727 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
2728 Int_t ntosubmit = fInputFiles->GetEntries();
2729 for (Int_t i=0; i<ntosubmit; i++) {
2730 TString runOutDir = gSystem->BaseName(fInputFiles->At(i)->GetName());
2731 runOutDir.ReplaceAll(".xml", "");
2732 if (fOutputToRunNo) {
2733 // The output directory is the run number
2734 printf("### Submitting merging job for run <%s>\n", runOutDir.Data());
2735 runOutDir = Form("%s/%s", fGridOutputDir.Data(), runOutDir.Data());
2737 // The output directory is the master number in 3 digits format
2738 printf("### Submitting merging job for master <%03d>\n", i);
2739 runOutDir = Form("%s/%03d",fGridOutputDir.Data(), i);
2741 // Check now the number of merging stages.
2742 TObjArray *list = fOutputFiles.Tokenize(",");
2746 while((str=(TObjString*)next())) {
2747 outputFile = str->GetString();
2748 Int_t index = outputFile.Index("@");
2749 if (index > 0) outputFile.Remove(index);
2750 if (!fMergeExcludes.Contains(outputFile)) break;
2753 Bool_t done = CheckMergedFiles(outputFile, runOutDir, fMaxMergeFiles, kTRUE, mergeJDLName);
2754 if (!done) return kFALSE;
2756 if (!ntosubmit) return kTRUE;
2757 Info("StartAnalysis", "\n#### STARTING AN ALIEN SHELL FOR YOU. EXIT WHEN YOUR MERGING JOBS HAVE FINISHED. #### \
2758 \n You may exit at any time and terminate the job later using the option <terminate> but disabling SetMergeViaJDL\
2759 \n ##################################################################################");
2760 gSystem->Exec("aliensh");
2764 //______________________________________________________________________________
2765 Bool_t AliAnalysisAlien::SubmitNext()
2767 // Submit next bunch of master jobs if the queue is free. The first master job is
2768 // submitted right away, while the next will not be unless the previous was split.
2769 // The plugin will not submit new master jobs if there are more that 500 jobs in
2771 static Bool_t iscalled = kFALSE;
2772 static Int_t firstmaster = 0;
2773 static Int_t lastmaster = 0;
2774 static Int_t npermaster = 0;
2775 if (iscalled) return kTRUE;
2777 Int_t nrunning=0, nwaiting=0, nerror=0, ndone=0;
2778 Int_t ntosubmit = 0;
2781 Int_t nmasterjobs = fInputFiles->GetEntries();
2784 if (!IsUseSubmitPolicy()) {
2786 Info("SubmitNext","### Warning submit policy not used ! Submitting too many jobs at a time may be prohibitted. \
2787 \n### You can use SetUseSubmitPolicy() to enable if you have problems.");
2788 ntosubmit = nmasterjobs;
2791 TString status = GetJobStatus(firstmaster, lastmaster, nrunning, nwaiting, nerror, ndone);
2792 printf("=== master %d: %s\n", lastmaster, status.Data());
2793 // If last master not split, just return
2794 if (status != "SPLIT") {iscalled = kFALSE; return kTRUE;}
2795 // No more than 100 waiting jobs
2796 if (nwaiting>500) {iscalled = kFALSE; return kTRUE;}
2797 npermaster = (nrunning+nwaiting+nerror+ndone)/fNsubmitted;
2798 if (npermaster) ntosubmit = (500-nwaiting)/npermaster;
2799 if (!ntosubmit) ntosubmit = 1;
2800 printf("=== WAITING(%d) RUNNING(%d) DONE(%d) OTHER(%d) NperMaster=%d => to submit %d jobs\n",
2801 nwaiting, nrunning, ndone, nerror, npermaster, ntosubmit);
2803 for (Int_t i=0; i<ntosubmit; i++) {
2804 // Submit for a range of enumeration of runs.
2805 if (fNsubmitted>=nmasterjobs) {iscalled = kFALSE; return kTRUE;}
2807 TString runOutDir = gSystem->BaseName(fInputFiles->At(fNsubmitted)->GetName());
2808 runOutDir.ReplaceAll(".xml", "");
2810 query = Form("submit %s %s %s", fJDLName.Data(), fInputFiles->At(fNsubmitted)->GetName(), runOutDir.Data());
2812 query = Form("submit %s %s %03d", fJDLName.Data(), fInputFiles->At(fNsubmitted)->GetName(), fNsubmitted);
2813 printf("********* %s\n",query.Data());
2814 res = gGrid->Command(query);
2816 TString cjobId1 = res->GetKey(0,"jobId");
2817 if (!cjobId1.Length()) {
2821 Error("StartAnalysis", "Your JDL %s could not be submitted. The message was:", fJDLName.Data());
2824 Info("StartAnalysis", "\n_______________________________________________________________________ \
2825 \n##### Your JDL %s submitted (%d to go). \nTHE JOB ID IS: %s \
2826 \n_______________________________________________________________________",
2827 fJDLName.Data(), nmasterjobs-fNsubmitted-1, cjobId1.Data());
2830 lastmaster = cjobId1.Atoi();
2831 if (!firstmaster) firstmaster = lastmaster;
2836 Error("StartAnalysis", "No grid result after submission !!! Bailing out...");
2844 //______________________________________________________________________________
2845 void AliAnalysisAlien::WriteAnalysisFile()
2847 // Write current analysis manager into the file <analysisFile>
2848 TString analysisFile = fExecutable;
2849 analysisFile.ReplaceAll(".sh", ".root");
2850 if (!TestBit(AliAnalysisGrid::kSubmit)) {
2851 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
2852 if (!mgr || !mgr->IsInitialized()) {
2853 Error("WriteAnalysisFile", "You need an initialized analysis manager for this");
2856 // Check analysis type
2858 if (mgr->GetMCtruthEventHandler()) TObject::SetBit(AliAnalysisGrid::kUseMC);
2859 handler = (TObject*)mgr->GetInputEventHandler();
2861 if (handler->InheritsFrom("AliESDInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseESD);
2862 if (handler->InheritsFrom("AliAODInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseAOD);
2864 TDirectory *cdir = gDirectory;
2865 TFile *file = TFile::Open(analysisFile, "RECREATE");
2867 // Skip task Terminate calls for the grid job (but not in test mode, where we want to check also the terminate mode
2868 if (!TestBit(AliAnalysisGrid::kTest)) mgr->SetSkipTerminate(kTRUE);
2869 // Unless merging makes no sense
2870 if (IsSingleOutput()) mgr->SetSkipTerminate(kFALSE);
2873 // Enable termination for local jobs
2874 mgr->SetSkipTerminate(kFALSE);
2876 if (cdir) cdir->cd();
2877 Info("WriteAnalysisFile", "\n##### Analysis manager: %s wrote to file <%s>\n", mgr->GetName(),analysisFile.Data());
2879 Bool_t copy = kTRUE;
2880 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
2883 TString workdir = gGrid->GetHomeDirectory();
2884 workdir += fGridWorkingDir;
2885 Info("WriteAnalysisFile", "\n##### Copying file <%s> containing your initialized analysis manager to your alien workspace", analysisFile.Data());
2886 if (FileExists(analysisFile)) gGrid->Rm(analysisFile);
2887 TFile::Cp(Form("file:%s",analysisFile.Data()), Form("alien://%s/%s", workdir.Data(),analysisFile.Data()));
2891 //______________________________________________________________________________
2892 void AliAnalysisAlien::WriteAnalysisMacro()
2894 // Write the analysis macro that will steer the analysis in grid mode.
2895 if (!TestBit(AliAnalysisGrid::kSubmit)) {
2897 out.open(fAnalysisMacro.Data(), ios::out);
2899 Error("WriteAnalysisMacro", "could not open file %s for writing", fAnalysisMacro.Data());
2902 Bool_t hasSTEERBase = kFALSE;
2903 Bool_t hasESD = kFALSE;
2904 Bool_t hasAOD = kFALSE;
2905 Bool_t hasANALYSIS = kFALSE;
2906 Bool_t hasANALYSISalice = kFALSE;
2907 Bool_t hasCORRFW = kFALSE;
2908 TString func = fAnalysisMacro;
2909 TString type = "ESD";
2910 TString comment = "// Analysis using ";
2911 if (TObject::TestBit(AliAnalysisGrid::kUseESD)) comment += "ESD";
2912 if (TObject::TestBit(AliAnalysisGrid::kUseAOD)) {
2916 if (type!="AOD" && fFriendChainName!="") {
2917 Error("WriteAnalysisMacro", "Friend chain can be attached only to AOD");
2920 if (TObject::TestBit(AliAnalysisGrid::kUseMC)) comment += "/MC";
2921 else comment += " data";
2922 out << "const char *anatype = \"" << type.Data() << "\";" << endl << endl;
2923 func.ReplaceAll(".C", "");
2924 out << "void " << func.Data() << "()" << endl;
2926 out << comment.Data() << endl;
2927 out << "// Automatically generated analysis steering macro executed in grid subjobs" << endl << endl;
2928 out << " TStopwatch timer;" << endl;
2929 out << " timer.Start();" << endl << endl;
2930 // Change temp directory to current one
2931 out << "// Set temporary merging directory to current one" << endl;
2932 out << " gSystem->Setenv(\"TMPDIR\", gSystem->pwd());" << endl << endl;
2933 if (!fExecutableCommand.Contains("aliroot")) {
2934 out << "// load base root libraries" << endl;
2935 out << " gSystem->Load(\"libTree\");" << endl;
2936 out << " gSystem->Load(\"libGeom\");" << endl;
2937 out << " gSystem->Load(\"libVMC\");" << endl;
2938 out << " gSystem->Load(\"libPhysics\");" << endl << endl;
2939 out << " gSystem->Load(\"libMinuit\");" << endl << endl;
2941 if (fAdditionalRootLibs.Length()) {
2942 // in principle libtree /lib geom libvmc etc. can go into this list, too
2943 out << "// Add aditional libraries" << endl;
2944 TObjArray *list = fAdditionalRootLibs.Tokenize(" ");
2947 while((str=(TObjString*)next())) {
2948 if (str->GetString().Contains(".so"))
2949 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
2951 if (list) delete list;
2953 out << "// include path" << endl;
2954 if (fIncludePath.Length()) out << " gSystem->AddIncludePath(\"" << fIncludePath.Data() << "\");" << endl;
2955 out << " gSystem->AddIncludePath(\"-I$ALICE_ROOT/include\");" << endl << endl;
2956 out << "// Load analysis framework libraries" << endl;
2957 TString setupPar = "AliAnalysisAlien::SetupPar";
2959 if (!fExecutableCommand.Contains("aliroot")) {
2960 out << " gSystem->Load(\"libSTEERBase\");" << endl;
2961 out << " gSystem->Load(\"libESD\");" << endl;
2962 out << " gSystem->Load(\"libAOD\");" << endl;
2964 out << " gSystem->Load(\"libANALYSIS\");" << endl;
2965 out << " gSystem->Load(\"libANALYSISalice\");" << endl;
2966 out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
2968 TIter next(fPackages);
2971 while ((obj=next())) {
2972 pkgname = obj->GetName();
2973 if (pkgname == "STEERBase" ||
2974 pkgname == "STEERBase.par") hasSTEERBase = kTRUE;
2975 if (pkgname == "ESD" ||
2976 pkgname == "ESD.par") hasESD = kTRUE;
2977 if (pkgname == "AOD" ||
2978 pkgname == "AOD.par") hasAOD = kTRUE;
2979 if (pkgname == "ANALYSIS" ||
2980 pkgname == "ANALYSIS.par") hasANALYSIS = kTRUE;
2981 if (pkgname == "ANALYSISalice" ||
2982 pkgname == "ANALYSISalice.par") hasANALYSISalice = kTRUE;
2983 if (pkgname == "CORRFW" ||
2984 pkgname == "CORRFW.par") hasCORRFW = kTRUE;
2986 if (hasANALYSISalice) setupPar = "SetupPar";
2987 if (!hasSTEERBase) out << " gSystem->Load(\"libSTEERBase\");" << endl;
2988 else out << " if (!" << setupPar << "(\"STEERBase\")) return;" << endl;
2989 if (!hasESD) out << " gSystem->Load(\"libESD\");" << endl;
2990 else out << " if (!" << setupPar << "(\"ESD\")) return;" << endl;
2991 if (!hasAOD) out << " gSystem->Load(\"libAOD\");" << endl;
2992 else out << " if (!" << setupPar << "(\"AOD\")) return;" << endl;
2993 if (!hasANALYSIS) out << " gSystem->Load(\"libANALYSIS\");" << endl;
2994 else out << " if (!" << setupPar << "(\"ANALYSIS\")) return;" << endl;
2995 if (!hasANALYSISalice) out << " gSystem->Load(\"libANALYSISalice\");" << endl;
2996 else out << " if (!" << setupPar << "(\"ANALYSISalice\")) return;" << endl;
2997 if (!hasCORRFW) out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
2998 else out << " if (!" << setupPar << "(\"CORRFW\")) return;" << endl << endl;
2999 out << "// Compile other par packages" << endl;
3001 while ((obj=next())) {
3002 pkgname = obj->GetName();
3003 if (pkgname == "STEERBase" ||
3004 pkgname == "STEERBase.par" ||
3006 pkgname == "ESD.par" ||
3008 pkgname == "AOD.par" ||
3009 pkgname == "ANALYSIS" ||
3010 pkgname == "ANALYSIS.par" ||
3011 pkgname == "ANALYSISalice" ||
3012 pkgname == "ANALYSISalice.par" ||
3013 pkgname == "CORRFW" ||
3014 pkgname == "CORRFW.par") continue;
3015 out << " if (!" << setupPar << "(\"" << obj->GetName() << "\")) return;" << endl;
3018 if (fAdditionalLibs.Length()) {
3019 out << "// Add aditional AliRoot libraries" << endl;
3020 TObjArray *list = fAdditionalLibs.Tokenize(" ");
3023 while((str=(TObjString*)next())) {
3024 if (str->GetString().Contains(".so"))
3025 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
3026 if (str->GetString().Contains(".par"))
3027 out << " if (!" << setupPar << "(\"" << str->GetString() << "\")) return;" << endl;
3029 if (list) delete list;
3032 out << "// analysis source to be compiled at runtime (if any)" << endl;
3033 if (fAnalysisSource.Length()) {
3034 TObjArray *list = fAnalysisSource.Tokenize(" ");
3037 while((str=(TObjString*)next())) {
3038 out << " gROOT->ProcessLine(\".L " << str->GetString().Data() << "+g\");" << endl;
3040 if (list) delete list;
3043 if (fFastReadOption) {
3044 Warning("WriteAnalysisMacro", "!!! You requested FastRead option. Using xrootd flags to reduce timeouts in the grid jobs. This may skip some files that could be accessed !!! \
3045 \n+++ NOTE: To disable this option, use: plugin->SetFastReadOption(kFALSE)");
3046 out << "// fast xrootd reading enabled" << endl;
3047 out << " printf(\"!!! You requested FastRead option. Using xrootd flags to reduce timeouts. Note that this may skip some files that could be accessed !!!\");" << endl;
3048 out << " gEnv->SetValue(\"XNet.ConnectTimeout\",10);" << endl;
3049 out << " gEnv->SetValue(\"XNet.RequestTimeout\",10);" << endl;
3050 out << " gEnv->SetValue(\"XNet.MaxRedirectCount\",2);" << endl;
3051 out << " gEnv->SetValue(\"XNet.ReconnectTimeout\",10);" << endl;
3052 out << " gEnv->SetValue(\"XNet.FirstConnectMaxCnt\",1);" << endl << endl;
3054 out << "// connect to AliEn and make the chain" << endl;
3055 out << " if (!TGrid::Connect(\"alien://\")) return;" << endl;
3056 out << "// read the analysis manager from file" << endl;
3057 TString analysisFile = fExecutable;
3058 analysisFile.ReplaceAll(".sh", ".root");
3059 out << " TFile *file = TFile::Open(\"" << analysisFile << "\");" << endl;
3060 out << " if (!file) return;" << endl;
3061 out << " TIter nextkey(file->GetListOfKeys());" << endl;
3062 out << " AliAnalysisManager *mgr = 0;" << endl;
3063 out << " TKey *key;" << endl;
3064 out << " while ((key=(TKey*)nextkey())) {" << endl;
3065 out << " if (!strcmp(key->GetClassName(), \"AliAnalysisManager\"))" << endl;
3066 out << " mgr = (AliAnalysisManager*)file->Get(key->GetName());" << endl;
3067 out << " };" << endl;
3068 out << " if (!mgr) {" << endl;
3069 out << " ::Error(\"" << func.Data() << "\", \"No analysis manager found in file " << analysisFile <<"\");" << endl;
3070 out << " return;" << endl;
3071 out << " }" << endl << endl;
3072 out << " mgr->PrintStatus();" << endl;
3073 if (AliAnalysisManager::GetAnalysisManager()) {
3074 if (AliAnalysisManager::GetAnalysisManager()->GetDebugLevel()>3) {
3075 out << " gEnv->SetValue(\"XNet.Debug\", \"1\");" << endl;
3077 if (TestBit(AliAnalysisGrid::kTest))
3078 out << " AliLog::SetGlobalLogLevel(AliLog::kWarning);" << endl;
3080 out << " AliLog::SetGlobalLogLevel(AliLog::kError);" << endl;
3083 if (IsUsingTags()) {
3084 out << " TChain *chain = CreateChainFromTags(\"wn.xml\", anatype);" << endl << endl;
3086 out << " TChain *chain = CreateChain(\"wn.xml\", anatype);" << endl << endl;
3088 out << " mgr->StartAnalysis(\"localfile\", chain);" << endl;
3089 out << " timer.Stop();" << endl;
3090 out << " timer.Print();" << endl;
3091 out << "}" << endl << endl;
3092 if (IsUsingTags()) {
3093 out << "TChain* CreateChainFromTags(const char *xmlfile, const char *type=\"ESD\")" << endl;
3095 out << "// Create a chain using tags from the xml file." << endl;
3096 out << " TAlienCollection* coll = TAlienCollection::Open(xmlfile);" << endl;
3097 out << " if (!coll) {" << endl;
3098 out << " ::Error(\"CreateChainFromTags\", \"Cannot create an AliEn collection from %s\", xmlfile);" << endl;
3099 out << " return NULL;" << endl;
3100 out << " }" << endl;
3101 out << " TGridResult* tagResult = coll->GetGridResult(\"\",kFALSE,kFALSE);" << endl;
3102 out << " AliTagAnalysis *tagAna = new AliTagAnalysis(type);" << endl;
3103 out << " tagAna->ChainGridTags(tagResult);" << endl << endl;
3104 out << " AliRunTagCuts *runCuts = new AliRunTagCuts();" << endl;
3105 out << " AliLHCTagCuts *lhcCuts = new AliLHCTagCuts();" << endl;
3106 out << " AliDetectorTagCuts *detCuts = new AliDetectorTagCuts();" << endl;
3107 out << " AliEventTagCuts *evCuts = new AliEventTagCuts();" << endl;
3108 out << " // Check if the cuts configuration file was provided" << endl;
3109 out << " if (!gSystem->AccessPathName(\"ConfigureCuts.C\")) {" << endl;
3110 out << " gROOT->LoadMacro(\"ConfigureCuts.C\");" << endl;
3111 out << " ConfigureCuts(runCuts, lhcCuts, detCuts, evCuts);" << endl;
3112 out << " }" << endl;
3113 if (fFriendChainName=="") {
3114 out << " TChain *chain = tagAna->QueryTags(runCuts, lhcCuts, detCuts, evCuts);" << endl;
3116 out << " TString tmpColl=\"tmpCollection.xml\";" << endl;
3117 out << " tagAna->CreateXMLCollection(tmpColl.Data(),runCuts, lhcCuts, detCuts, evCuts);" << endl;
3118 out << " TChain *chain = CreateChain(tmpColl.Data(),type);" << endl;
3120 out << " if (!chain || !chain->GetNtrees()) return NULL;" << endl;
3121 out << " chain->ls();" << endl;
3122 out << " return chain;" << endl;
3123 out << "}" << endl << endl;
3124 if (gSystem->AccessPathName("ConfigureCuts.C")) {
3125 TString msg = "\n##### You may want to provide a macro ConfigureCuts.C with a method:\n";
3126 msg += " void ConfigureCuts(AliRunTagCuts *runCuts,\n";
3127 msg += " AliLHCTagCuts *lhcCuts,\n";
3128 msg += " AliDetectorTagCuts *detCuts,\n";
3129 msg += " AliEventTagCuts *evCuts)";
3130 Info("WriteAnalysisMacro", "%s", msg.Data());
3133 if (!IsUsingTags() || fFriendChainName!="") {
3134 out <<"//________________________________________________________________________________" << endl;
3135 out << "TChain* CreateChain(const char *xmlfile, const char *type=\"ESD\")" << endl;
3137 out << "// Create a chain using url's from xml file" << endl;
3138 out << " TString filename;" << endl;
3139 out << " Int_t run = 0;" << endl;
3140 out << " TString treename = type;" << endl;
3141 out << " treename.ToLower();" << endl;
3142 out << " treename += \"Tree\";" << endl;
3143 out << " printf(\"***************************************\\n\");" << endl;
3144 out << " printf(\" Getting chain of trees %s\\n\", treename.Data());" << endl;
3145 out << " printf(\"***************************************\\n\");" << endl;
3146 out << " TAlienCollection *coll = TAlienCollection::Open(xmlfile);" << endl;
3147 out << " if (!coll) {" << endl;
3148 out << " ::Error(\"CreateChain\", \"Cannot create an AliEn collection from %s\", xmlfile);" << endl;
3149 out << " return NULL;" << endl;
3150 out << " }" << endl;
3151 out << " AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();" << endl;
3152 out << " TChain *chain = new TChain(treename);" << endl;
3153 if(fFriendChainName!="") {
3154 out << " TChain *chainFriend = new TChain(treename);" << endl;
3156 out << " coll->Reset();" << endl;
3157 out << " while (coll->Next()) {" << endl;
3158 out << " filename = coll->GetTURL("");" << endl;
3159 out << " if (mgr) {" << endl;
3160 out << " Int_t nrun = AliAnalysisManager::GetRunFromAlienPath(filename);" << endl;
3161 out << " if (nrun && nrun != run) {" << endl;
3162 out << " printf(\"### Run number detected from chain: %d\\n\", nrun);" << endl;
3163 out << " mgr->SetRunFromPath(nrun);" << endl;
3164 out << " run = nrun;" << endl;
3165 out << " }" << endl;
3166 out << " }" << endl;
3167 out << " chain->Add(filename);" << endl;
3168 if(fFriendChainName!="") {
3169 out << " TString fileFriend=coll->GetTURL(\"\");" << endl;
3170 out << " fileFriend.ReplaceAll(\"AliAOD.root\",\""<<fFriendChainName.Data()<<"\");" << endl;
3171 out << " fileFriend.ReplaceAll(\"AliAODs.root\",\""<<fFriendChainName.Data()<<"\");" << endl;
3172 out << " chainFriend->Add(fileFriend.Data());" << endl;
3174 out << " }" << endl;
3175 out << " if (!chain->GetNtrees()) {" << endl;
3176 out << " ::Error(\"CreateChain\", \"No tree found from collection %s\", xmlfile);" << endl;
3177 out << " return NULL;" << endl;
3178 out << " }" << endl;
3179 if(fFriendChainName!="") {
3180 out << " chain->AddFriend(chainFriend);" << endl;
3182 out << " return chain;" << endl;
3183 out << "}" << endl << endl;
3185 if (hasANALYSISalice) {
3186 out <<"//________________________________________________________________________________" << endl;
3187 out << "Bool_t SetupPar(const char *package) {" << endl;
3188 out << "// Compile the package and set it up." << endl;
3189 out << " TString pkgdir = package;" << endl;
3190 out << " pkgdir.ReplaceAll(\".par\",\"\");" << endl;
3191 out << " gSystem->Exec(TString::Format(\"tar xvzf %s.par\", pkgdir.Data()));" << endl;
3192 out << " TString cdir = gSystem->WorkingDirectory();" << endl;
3193 out << " gSystem->ChangeDirectory(pkgdir);" << endl;
3194 out << " // Check for BUILD.sh and execute" << endl;
3195 out << " if (!gSystem->AccessPathName(\"PROOF-INF/BUILD.sh\")) {" << endl;
3196 out << " printf(\"*******************************\\n\");" << endl;
3197 out << " printf(\"*** Building PAR archive ***\\n\");" << endl;
3198 out << " printf(\"*******************************\\n\");" << endl;
3199 out << " if (gSystem->Exec(\"PROOF-INF/BUILD.sh\")) {" << endl;
3200 out << " ::Error(\"SetupPar\", \"Cannot build par archive %s\", pkgdir.Data());" << endl;
3201 out << " gSystem->ChangeDirectory(cdir);" << endl;
3202 out << " return kFALSE;" << endl;
3203 out << " }" << endl;
3204 out << " } else {" << endl;
3205 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/BUILD.sh for package %s\", pkgdir.Data());" << endl;
3206 out << " gSystem->ChangeDirectory(cdir);" << endl;
3207 out << " return kFALSE;" << endl;
3208 out << " }" << endl;
3209 out << " // Check for SETUP.C and execute" << endl;
3210 out << " if (!gSystem->AccessPathName(\"PROOF-INF/SETUP.C\")) {" << endl;
3211 out << " printf(\"*******************************\\n\");" << endl;
3212 out << " printf(\"*** Setup PAR archive ***\\n\");" << endl;
3213 out << " printf(\"*******************************\\n\");" << endl;
3214 out << " gROOT->Macro(\"PROOF-INF/SETUP.C\");" << endl;
3215 out << " } else {" << endl;
3216 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/SETUP.C for package %s\", pkgdir.Data());" << endl;
3217 out << " gSystem->ChangeDirectory(cdir);" << endl;
3218 out << " return kFALSE;" << endl;
3219 out << " }" << endl;
3220 out << " // Restore original workdir" << endl;
3221 out << " gSystem->ChangeDirectory(cdir);" << endl;
3222 out << " return kTRUE;" << endl;
3225 Info("WriteAnalysisMacro", "\n##### Analysis macro to run on worker nodes <%s> written",fAnalysisMacro.Data());
3227 Bool_t copy = kTRUE;
3228 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3231 TString workdir = gGrid->GetHomeDirectory();
3232 workdir += fGridWorkingDir;
3233 if (FileExists(fAnalysisMacro)) gGrid->Rm(fAnalysisMacro);
3234 if (IsUsingTags() && !gSystem->AccessPathName("ConfigureCuts.C")) {
3235 if (FileExists("ConfigureCuts.C")) gGrid->Rm("ConfigureCuts.C");
3236 Info("WriteAnalysisMacro", "\n##### Copying cuts configuration macro: <ConfigureCuts.C> to your alien workspace");
3237 TFile::Cp("file:ConfigureCuts.C", Form("alien://%s/ConfigureCuts.C", workdir.Data()));
3239 Info("WriteAnalysisMacro", "\n##### Copying analysis macro: <%s> to your alien workspace", fAnalysisMacro.Data());
3240 TFile::Cp(Form("file:%s",fAnalysisMacro.Data()), Form("alien://%s/%s", workdir.Data(), fAnalysisMacro.Data()));
3244 //______________________________________________________________________________
3245 void AliAnalysisAlien::WriteMergingMacro()
3247 // Write a macro to merge the outputs per master job.
3248 if (!fMergeViaJDL) return;
3249 if (!fOutputFiles.Length()) {
3250 Error("WriteMergingMacro", "No output file names defined. Are you running the right AliAnalysisAlien configuration ?");
3253 TString mergingMacro = fExecutable;
3254 mergingMacro.ReplaceAll(".sh","_merge.C");
3255 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("/%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
3256 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3258 out.open(mergingMacro.Data(), ios::out);
3260 Error("WriteMergingMacro", "could not open file %s for writing", fAnalysisMacro.Data());
3263 Bool_t hasSTEERBase = kFALSE;
3264 Bool_t hasESD = kFALSE;
3265 Bool_t hasAOD = kFALSE;
3266 Bool_t hasANALYSIS = kFALSE;
3267 Bool_t hasANALYSISalice = kFALSE;
3268 Bool_t hasCORRFW = kFALSE;
3269 TString func = mergingMacro;
3271 func.ReplaceAll(".C", "");
3272 out << "void " << func.Data() << "(const char *dir, Int_t stage=0, Int_t ichunk=0)" << endl;
3274 out << "// Automatically generated merging macro executed in grid subjobs" << endl << endl;
3275 out << " TStopwatch timer;" << endl;
3276 out << " timer.Start();" << endl << endl;
3277 if (!fExecutableCommand.Contains("aliroot")) {
3278 out << "// load base root libraries" << endl;
3279 out << " gSystem->Load(\"libTree\");" << endl;
3280 out << " gSystem->Load(\"libGeom\");" << endl;
3281 out << " gSystem->Load(\"libVMC\");" << endl;
3282 out << " gSystem->Load(\"libPhysics\");" << endl << endl;
3283 out << " gSystem->Load(\"libMinuit\");" << endl << endl;
3285 if (fAdditionalRootLibs.Length()) {
3286 // in principle libtree /lib geom libvmc etc. can go into this list, too
3287 out << "// Add aditional libraries" << endl;
3288 TObjArray *list = fAdditionalRootLibs.Tokenize(" ");
3291 while((str=(TObjString*)next())) {
3292 if (str->GetString().Contains(".so"))
3293 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
3295 if (list) delete list;
3297 out << "// include path" << endl;
3298 if (fIncludePath.Length()) out << " gSystem->AddIncludePath(\"" << fIncludePath.Data() << "\");" << endl;
3299 out << " gSystem->AddIncludePath(\"-I$ALICE_ROOT/include\");" << endl << endl;
3300 out << "// Load analysis framework libraries" << endl;
3302 if (!fExecutableCommand.Contains("aliroot")) {
3303 out << " gSystem->Load(\"libSTEERBase\");" << endl;
3304 out << " gSystem->Load(\"libESD\");" << endl;
3305 out << " gSystem->Load(\"libAOD\");" << endl;
3307 out << " gSystem->Load(\"libANALYSIS\");" << endl;
3308 out << " gSystem->Load(\"libANALYSISalice\");" << endl;
3309 out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
3311 TIter next(fPackages);
3314 TString setupPar = "AliAnalysisAlien::SetupPar";
3315 while ((obj=next())) {
3316 pkgname = obj->GetName();
3317 if (pkgname == "STEERBase" ||
3318 pkgname == "STEERBase.par") hasSTEERBase = kTRUE;
3319 if (pkgname == "ESD" ||
3320 pkgname == "ESD.par") hasESD = kTRUE;
3321 if (pkgname == "AOD" ||
3322 pkgname == "AOD.par") hasAOD = kTRUE;
3323 if (pkgname == "ANALYSIS" ||
3324 pkgname == "ANALYSIS.par") hasANALYSIS = kTRUE;
3325 if (pkgname == "ANALYSISalice" ||
3326 pkgname == "ANALYSISalice.par") hasANALYSISalice = kTRUE;
3327 if (pkgname == "CORRFW" ||
3328 pkgname == "CORRFW.par") hasCORRFW = kTRUE;
3330 if (hasANALYSISalice) setupPar = "SetupPar";
3331 if (!hasSTEERBase) out << " gSystem->Load(\"libSTEERBase\");" << endl;
3332 else out << " if (!" << setupPar << "(\"STEERBase\")) return;" << endl;
3333 if (!hasESD) out << " gSystem->Load(\"libESD\");" << endl;
3334 else out << " if (!" << setupPar << "(\"ESD\")) return;" << endl;
3335 if (!hasAOD) out << " gSystem->Load(\"libAOD\");" << endl;
3336 else out << " if (!" << setupPar << "(\"AOD\")) return;" << endl;
3337 if (!hasANALYSIS) out << " gSystem->Load(\"libANALYSIS\");" << endl;
3338 else out << " if (!" << setupPar << "(\"ANALYSIS\")) return;" << endl;
3339 if (!hasANALYSISalice) out << " gSystem->Load(\"libANALYSISalice\");" << endl;
3340 else out << " if (!" << setupPar << "(\"ANALYSISalice\")) return;" << endl;
3341 if (!hasCORRFW) out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
3342 else out << " if (!" << setupPar << "(\"CORRFW\")) return;" << endl << endl;
3343 out << "// Compile other par packages" << endl;
3345 while ((obj=next())) {
3346 pkgname = obj->GetName();
3347 if (pkgname == "STEERBase" ||
3348 pkgname == "STEERBase.par" ||
3350 pkgname == "ESD.par" ||
3352 pkgname == "AOD.par" ||
3353 pkgname == "ANALYSIS" ||
3354 pkgname == "ANALYSIS.par" ||
3355 pkgname == "ANALYSISalice" ||
3356 pkgname == "ANALYSISalice.par" ||
3357 pkgname == "CORRFW" ||
3358 pkgname == "CORRFW.par") continue;
3359 out << " if (!" << setupPar << "(\"" << obj->GetName() << "\")) return;" << endl;
3362 if (fAdditionalLibs.Length()) {
3363 out << "// Add aditional AliRoot libraries" << endl;
3364 TObjArray *list = fAdditionalLibs.Tokenize(" ");
3367 while((str=(TObjString*)next())) {
3368 if (str->GetString().Contains(".so"))
3369 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
3371 if (list) delete list;
3374 out << "// Analysis source to be compiled at runtime (if any)" << endl;
3375 if (fAnalysisSource.Length()) {
3376 TObjArray *list = fAnalysisSource.Tokenize(" ");
3379 while((str=(TObjString*)next())) {
3380 out << " gROOT->ProcessLine(\".L " << str->GetString().Data() << "+g\");" << endl;
3382 if (list) delete list;
3386 if (fFastReadOption) {
3387 Warning("WriteMergingMacro", "!!! You requested FastRead option. Using xrootd flags to reduce timeouts in the grid merging jobs. Note that this may skip some files that could be accessed !!!");
3388 out << "// fast xrootd reading enabled" << endl;
3389 out << " printf(\"!!! You requested FastRead option. Using xrootd flags to reduce timeouts. Note that this may skip some files that could be accessed !!!\");" << endl;
3390 out << " gEnv->SetValue(\"XNet.ConnectTimeout\",10);" << endl;
3391 out << " gEnv->SetValue(\"XNet.RequestTimeout\",10);" << endl;
3392 out << " gEnv->SetValue(\"XNet.MaxRedirectCount\",2);" << endl;
3393 out << " gEnv->SetValue(\"XNet.ReconnectTimeout\",10);" << endl;
3394 out << " gEnv->SetValue(\"XNet.FirstConnectMaxCnt\",1);" << endl << endl;
3396 // Change temp directory to current one
3397 out << "// Set temporary merging directory to current one" << endl;
3398 out << " gSystem->Setenv(\"TMPDIR\", gSystem->pwd());" << endl << endl;
3399 out << "// Connect to AliEn" << endl;
3400 out << " if (!TGrid::Connect(\"alien://\")) return;" << endl;
3401 out << " Bool_t laststage = kFALSE;" << endl;
3402 out << " TString outputDir = dir;" << endl;
3403 out << " TString outputFiles = \"" << GetListOfFiles("out") << "\";" << endl;
3404 out << " TString mergeExcludes = \"" << fMergeExcludes << "\";" << endl;
3405 out << " TObjArray *list = outputFiles.Tokenize(\",\");" << endl;
3406 out << " TIter *iter = new TIter(list);" << endl;
3407 out << " TObjString *str;" << endl;
3408 out << " TString outputFile;" << endl;
3409 out << " Bool_t merged = kTRUE;" << endl;
3410 out << " while((str=(TObjString*)iter->Next())) {" << endl;
3411 out << " outputFile = str->GetString();" << endl;
3412 out << " if (outputFile.Contains(\"*\")) continue;" << endl;
3413 out << " Int_t index = outputFile.Index(\"@\");" << endl;
3414 out << " if (index > 0) outputFile.Remove(index);" << endl;
3415 out << " // Skip already merged outputs" << endl;
3416 out << " if (!gSystem->AccessPathName(outputFile)) {" << endl;
3417 out << " printf(\"Output file <%s> found. Not merging again.\",outputFile.Data());" << endl;
3418 out << " continue;" << endl;
3419 out << " }" << endl;
3420 out << " if (mergeExcludes.Contains(outputFile.Data())) continue;" << endl;
3421 out << " merged = AliAnalysisAlien::MergeOutput(outputFile, outputDir, " << fMaxMergeFiles << ", stage, ichunk);" << endl;
3422 out << " if (!merged) {" << endl;
3423 out << " printf(\"ERROR: Cannot merge %s\\n\", outputFile.Data());" << endl;
3424 out << " return;" << endl;
3425 out << " }" << endl;
3426 out << " // Check if this was the last stage. If yes, run terminate for the tasks." << endl;
3427 out << " if (!gSystem->AccessPathName(outputFile)) laststage = kTRUE;" << endl;
3428 out << " }" << endl;
3429 out << " // all outputs merged, validate" << endl;
3430 out << " ofstream out;" << endl;
3431 out << " out.open(\"outputs_valid\", ios::out);" << endl;
3432 out << " out.close();" << endl;
3433 out << " // read the analysis manager from file" << endl;
3434 TString analysisFile = fExecutable;
3435 analysisFile.ReplaceAll(".sh", ".root");
3436 out << " if (!laststage) return;" << endl;
3437 out << " TFile *file = TFile::Open(\"" << analysisFile << "\");" << endl;
3438 out << " if (!file) return;" << endl;
3439 out << " TIter nextkey(file->GetListOfKeys());" << endl;
3440 out << " AliAnalysisManager *mgr = 0;" << endl;
3441 out << " TKey *key;" << endl;
3442 out << " while ((key=(TKey*)nextkey())) {" << endl;
3443 out << " if (!strcmp(key->GetClassName(), \"AliAnalysisManager\"))" << endl;
3444 out << " mgr = (AliAnalysisManager*)file->Get(key->GetName());" << endl;
3445 out << " };" << endl;
3446 out << " if (!mgr) {" << endl;
3447 out << " ::Error(\"" << func.Data() << "\", \"No analysis manager found in file" << analysisFile <<"\");" << endl;
3448 out << " return;" << endl;
3449 out << " }" << endl << endl;
3450 out << " mgr->SetRunFromPath(mgr->GetRunFromAlienPath(dir));" << endl;
3451 out << " mgr->SetSkipTerminate(kFALSE);" << endl;
3452 out << " mgr->PrintStatus();" << endl;
3453 if (AliAnalysisManager::GetAnalysisManager()) {
3454 if (AliAnalysisManager::GetAnalysisManager()->GetDebugLevel()>3) {
3455 out << " gEnv->SetValue(\"XNet.Debug\", \"1\");" << endl;
3457 if (TestBit(AliAnalysisGrid::kTest))
3458 out << " AliLog::SetGlobalLogLevel(AliLog::kWarning);" << endl;
3460 out << " AliLog::SetGlobalLogLevel(AliLog::kError);" << endl;
3463 out << " TTree *tree = NULL;" << endl;
3464 out << " mgr->StartAnalysis(\"gridterminate\", tree);" << endl;
3465 out << "}" << endl << endl;
3466 if (hasANALYSISalice) {
3467 out <<"//________________________________________________________________________________" << endl;
3468 out << "Bool_t SetupPar(const char *package) {" << endl;
3469 out << "// Compile the package and set it up." << endl;
3470 out << " TString pkgdir = package;" << endl;
3471 out << " pkgdir.ReplaceAll(\".par\",\"\");" << endl;
3472 out << " gSystem->Exec(TString::Format(\"tar xvzf %s.par\", pkgdir.Data()));" << endl;
3473 out << " TString cdir = gSystem->WorkingDirectory();" << endl;
3474 out << " gSystem->ChangeDirectory(pkgdir);" << endl;
3475 out << " // Check for BUILD.sh and execute" << endl;
3476 out << " if (!gSystem->AccessPathName(\"PROOF-INF/BUILD.sh\")) {" << endl;
3477 out << " printf(\"*******************************\\n\");" << endl;
3478 out << " printf(\"*** Building PAR archive ***\\n\");" << endl;
3479 out << " printf(\"*******************************\\n\");" << endl;
3480 out << " if (gSystem->Exec(\"PROOF-INF/BUILD.sh\")) {" << endl;
3481 out << " ::Error(\"SetupPar\", \"Cannot build par archive %s\", pkgdir.Data());" << endl;
3482 out << " gSystem->ChangeDirectory(cdir);" << endl;
3483 out << " return kFALSE;" << endl;
3484 out << " }" << endl;
3485 out << " } else {" << endl;
3486 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/BUILD.sh for package %s\", pkgdir.Data());" << endl;
3487 out << " gSystem->ChangeDirectory(cdir);" << endl;
3488 out << " return kFALSE;" << endl;
3489 out << " }" << endl;
3490 out << " // Check for SETUP.C and execute" << endl;
3491 out << " if (!gSystem->AccessPathName(\"PROOF-INF/SETUP.C\")) {" << endl;
3492 out << " printf(\"*******************************\\n\");" << endl;
3493 out << " printf(\"*** Setup PAR archive ***\\n\");" << endl;
3494 out << " printf(\"*******************************\\n\");" << endl;
3495 out << " gROOT->Macro(\"PROOF-INF/SETUP.C\");" << endl;
3496 out << " } else {" << endl;
3497 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/SETUP.C for package %s\", pkgdir.Data());" << endl;
3498 out << " gSystem->ChangeDirectory(cdir);" << endl;
3499 out << " return kFALSE;" << endl;
3500 out << " }" << endl;
3501 out << " // Restore original workdir" << endl;
3502 out << " gSystem->ChangeDirectory(cdir);" << endl;
3503 out << " return kTRUE;" << endl;
3507 Bool_t copy = kTRUE;
3508 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3511 TString workdir = gGrid->GetHomeDirectory();
3512 workdir += fGridWorkingDir;
3513 if (FileExists(mergingMacro)) gGrid->Rm(mergingMacro);
3514 Info("WriteMergingMacro", "\n##### Copying merging macro: <%s> to your alien workspace", mergingMacro.Data());
3515 TFile::Cp(Form("file:%s",mergingMacro.Data()), Form("alien://%s/%s", workdir.Data(), mergingMacro.Data()));
3519 //______________________________________________________________________________
3520 Bool_t AliAnalysisAlien::SetupPar(const char *package)
3522 // Compile the par file archive pointed by <package>. This must be present in the current directory.
3523 // Note that for loading the compiled library. The current directory should have precedence in
3525 TString pkgdir = package;
3526 pkgdir.ReplaceAll(".par","");
3527 gSystem->Exec(TString::Format("tar xzf %s.par", pkgdir.Data()));
3528 TString cdir = gSystem->WorkingDirectory();
3529 gSystem->ChangeDirectory(pkgdir);
3530 // Check for BUILD.sh and execute
3531 if (!gSystem->AccessPathName("PROOF-INF/BUILD.sh")) {
3532 printf("**************************************************\n");
3533 printf("*** Building PAR archive %s\n", package);
3534 printf("**************************************************\n");
3535 if (gSystem->Exec("PROOF-INF/BUILD.sh")) {
3536 ::Error("SetupPar", "Cannot build par archive %s", pkgdir.Data());
3537 gSystem->ChangeDirectory(cdir);
3541 ::Error("SetupPar","Cannot access PROOF-INF/BUILD.sh for package %s", pkgdir.Data());
3542 gSystem->ChangeDirectory(cdir);
3545 // Check for SETUP.C and execute
3546 if (!gSystem->AccessPathName("PROOF-INF/SETUP.C")) {
3547 printf("**************************************************\n");
3548 printf("*** Setup PAR archive %s\n", package);
3549 printf("**************************************************\n");
3550 gROOT->Macro("PROOF-INF/SETUP.C");
3551 printf("*** Loaded library: %s\n", gSystem->GetLibraries(pkgdir,"",kFALSE));
3553 ::Error("SetupPar","Cannot access PROOF-INF/SETUP.C for package %s", pkgdir.Data());
3554 gSystem->ChangeDirectory(cdir);
3557 // Restore original workdir
3558 gSystem->ChangeDirectory(cdir);
3562 //______________________________________________________________________________
3563 void AliAnalysisAlien::WriteExecutable()
3565 // Generate the alien executable script.
3566 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3568 out.open(fExecutable.Data(), ios::out);
3570 Error("WriteExecutable", "Bad file name for executable: %s", fExecutable.Data());
3573 out << "#!/bin/bash" << endl;
3574 // Make sure we can properly compile par files
3575 out << "export LD_LIBRARY_PATH=.:$LD_LIBRARY_PATH" << endl;
3576 out << "echo \"=========================================\"" << endl;
3577 out << "echo \"############## PATH : ##############\"" << endl;
3578 out << "echo $PATH" << endl;
3579 out << "echo \"############## LD_LIBRARY_PATH : ##############\"" << endl;
3580 out << "echo $LD_LIBRARY_PATH" << endl;
3581 out << "echo \"############## ROOTSYS : ##############\"" << endl;
3582 out << "echo $ROOTSYS" << endl;
3583 out << "echo \"############## which root : ##############\"" << endl;
3584 out << "which root" << endl;
3585 out << "echo \"############## ALICE_ROOT : ##############\"" << endl;
3586 out << "echo $ALICE_ROOT" << endl;
3587 out << "echo \"############## which aliroot : ##############\"" << endl;
3588 out << "which aliroot" << endl;
3589 out << "echo \"############## system limits : ##############\"" << endl;
3590 out << "ulimit -a" << endl;
3591 out << "echo \"############## memory : ##############\"" << endl;
3592 out << "free -m" << endl;
3593 out << "echo \"=========================================\"" << endl << endl;
3594 out << fExecutableCommand << " ";
3595 out << fAnalysisMacro.Data() << " " << fExecutableArgs.Data() << endl << endl;
3596 out << "echo \"======== " << fAnalysisMacro.Data() << " finished with exit code: $? ========\"" << endl;
3597 out << "echo \"############## memory after: ##############\"" << endl;
3598 out << "free -m" << endl;
3600 Bool_t copy = kTRUE;
3601 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3604 TString workdir = gGrid->GetHomeDirectory();
3605 TString bindir = Form("%s/bin", workdir.Data());
3606 if (!DirectoryExists(bindir)) gGrid->Mkdir(bindir,"-p");
3607 workdir += fGridWorkingDir;
3608 TString executable = Form("%s/bin/%s", gGrid->GetHomeDirectory(), fExecutable.Data());
3609 if (FileExists(executable)) gGrid->Rm(executable);
3610 Info("WriteExecutable", "\n##### Copying executable file <%s> to your AliEn bin directory", fExecutable.Data());
3611 TFile::Cp(Form("file:%s",fExecutable.Data()), Form("alien://%s", executable.Data()));
3615 //______________________________________________________________________________
3616 void AliAnalysisAlien::WriteMergeExecutable()
3618 // Generate the alien executable script for the merging job.
3619 if (!fMergeViaJDL) return;
3620 TString mergeExec = fExecutable;
3621 mergeExec.ReplaceAll(".sh", "_merge.sh");
3622 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3624 out.open(mergeExec.Data(), ios::out);
3626 Error("WriteMergingExecutable", "Bad file name for executable: %s", mergeExec.Data());
3629 out << "#!/bin/bash" << endl;
3630 // Make sure we can properly compile par files
3631 out << "export LD_LIBRARY_PATH=.:$LD_LIBRARY_PATH" << endl;
3632 out << "echo \"=========================================\"" << endl;
3633 out << "echo \"############## PATH : ##############\"" << endl;
3634 out << "echo $PATH" << endl;
3635 out << "echo \"############## LD_LIBRARY_PATH : ##############\"" << endl;
3636 out << "echo $LD_LIBRARY_PATH" << endl;
3637 out << "echo \"############## ROOTSYS : ##############\"" << endl;
3638 out << "echo $ROOTSYS" << endl;
3639 out << "echo \"############## which root : ##############\"" << endl;
3640 out << "which root" << endl;
3641 out << "echo \"############## ALICE_ROOT : ##############\"" << endl;
3642 out << "echo $ALICE_ROOT" << endl;
3643 out << "echo \"############## which aliroot : ##############\"" << endl;
3644 out << "which aliroot" << endl;
3645 out << "echo \"############## system limits : ##############\"" << endl;
3646 out << "ulimit -a" << endl;
3647 out << "echo \"############## memory : ##############\"" << endl;
3648 out << "free -m" << endl;
3649 out << "echo \"=========================================\"" << endl << endl;
3650 TString mergeMacro = fExecutable;
3651 mergeMacro.ReplaceAll(".sh", "_merge.C");
3652 if (IsOneStageMerging())
3653 out << "export ARG=\"" << mergeMacro << "(\\\"$1\\\")\"" << endl;
3655 out << "export ARG=\"" << mergeMacro << "(\\\"$1\\\",$2,$3)\"" << endl;
3656 out << fExecutableCommand << " " << "$ARG" << endl;
3657 out << "echo \"======== " << mergeMacro.Data() << " finished with exit code: $? ========\"" << endl;
3658 out << "echo \"############## memory after: ##############\"" << endl;
3659 out << "free -m" << endl;
3661 Bool_t copy = kTRUE;
3662 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3665 TString workdir = gGrid->GetHomeDirectory();
3666 TString bindir = Form("%s/bin", workdir.Data());
3667 if (!DirectoryExists(bindir)) gGrid->Mkdir(bindir,"-p");
3668 workdir += fGridWorkingDir;
3669 TString executable = Form("%s/bin/%s", gGrid->GetHomeDirectory(), mergeExec.Data());
3670 if (FileExists(executable)) gGrid->Rm(executable);
3671 Info("WriteMergeExecutable", "\n##### Copying executable file <%s> to your AliEn bin directory", mergeExec.Data());
3672 TFile::Cp(Form("file:%s",mergeExec.Data()), Form("alien://%s", executable.Data()));
3676 //______________________________________________________________________________
3677 void AliAnalysisAlien::WriteProductionFile(const char *filename) const
3679 // Write the production file to be submitted by LPM manager. The format is:
3680 // First line: full_path_to_jdl estimated_no_subjobs_per_master
3681 // Next lines: full_path_to_dataset XXX (XXX is a string)
3682 // To submit, one has to: submit jdl XXX for all lines
3684 out.open(filename, ios::out);
3686 Error("WriteProductionFile", "Bad file name: %s", filename);
3690 if (!fProductionMode && !fGridWorkingDir.BeginsWith("/alice"))
3691 workdir = gGrid->GetHomeDirectory();
3692 workdir += fGridWorkingDir;
3693 Int_t njobspermaster = 1000*fNrunsPerMaster/fSplitMaxInputFileNumber;
3694 TString locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
3695 out << locjdl << " " << njobspermaster << endl;
3696 Int_t nmasterjobs = fInputFiles->GetEntries();
3697 for (Int_t i=0; i<nmasterjobs; i++) {
3698 TString runOutDir = gSystem->BaseName(fInputFiles->At(i)->GetName());
3699 runOutDir.ReplaceAll(".xml", "");
3701 out << Form("%s", fInputFiles->At(i)->GetName()) << " " << runOutDir << endl;
3703 out << Form("%s", fInputFiles->At(i)->GetName()) << " " << Form("%03d", i) << endl;
3706 Info("WriteProductionFile", "\n##### Copying production file <%s> to your work directory", filename);
3707 if (FileExists(filename)) gGrid->Rm(filename);
3708 TFile::Cp(Form("file:%s",filename), Form("alien://%s/%s", workdir.Data(),filename));
3712 //______________________________________________________________________________
3713 void AliAnalysisAlien::WriteValidationScript(Bool_t merge)
3715 // Generate the alien validation script.
3716 // Generate the validation script
3718 if (fValidationScript.IsNull()) {
3719 fValidationScript = fExecutable;
3720 fValidationScript.ReplaceAll(".sh", "_validation.sh");
3722 TString validationScript = fValidationScript;
3723 if (merge) validationScript.ReplaceAll(".sh", "_merge.sh");
3725 Error("WriteValidationScript", "Alien connection required");
3728 if (!fTerminateFiles.IsNull()) {
3729 fTerminateFiles.Strip();
3730 fTerminateFiles.ReplaceAll(" ",",");
3732 TString outStream = "";
3733 if (!TestBit(AliAnalysisGrid::kTest)) outStream = " >> stdout";
3734 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3736 out.open(validationScript, ios::out);
3737 out << "#!/bin/bash" << endl;
3738 out << "##################################################" << endl;
3739 out << "validateout=`dirname $0`" << endl;
3740 out << "validatetime=`date`" << endl;
3741 out << "validated=\"0\";" << endl;
3742 out << "error=0" << endl;
3743 out << "if [ -z $validateout ]" << endl;
3744 out << "then" << endl;
3745 out << " validateout=\".\"" << endl;
3746 out << "fi" << endl << endl;
3747 out << "cd $validateout;" << endl;
3748 out << "validateworkdir=`pwd`;" << endl << endl;
3749 out << "echo \"*******************************************************\"" << outStream << endl;
3750 out << "echo \"* Automatically generated validation script *\"" << outStream << endl;
3752 out << "echo \"* Time: $validatetime \"" << outStream << endl;
3753 out << "echo \"* Dir: $validateout\"" << outStream << endl;
3754 out << "echo \"* Workdir: $validateworkdir\"" << outStream << endl;
3755 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl;
3756 out << "ls -la ./" << outStream << endl;
3757 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl << endl;
3758 out << "##################################################" << endl;
3761 out << "if [ ! -f stderr ] ; then" << endl;
3762 out << " error=1" << endl;
3763 out << " echo \"* ########## Job not validated - no stderr ###\" " << outStream << endl;
3764 out << " echo \"Error = $error\" " << outStream << endl;
3765 out << "fi" << endl;
3767 out << "parArch=`grep -Ei \"Cannot Build the PAR Archive\" stderr`" << endl;
3768 out << "segViol=`grep -Ei \"Segmentation violation\" stderr`" << endl;
3769 out << "segFault=`grep -Ei \"Segmentation fault\" stderr`" << endl;
3770 out << "glibcErr=`grep -Ei \"*** glibc detected ***\" stderr`" << endl;
3773 out << "if [ \"$parArch\" != \"\" ] ; then" << endl;
3774 out << " error=1" << endl;
3775 out << " echo \"* ########## Job not validated - PAR archive not built ###\" " << outStream << endl;
3776 out << " echo \"$parArch\" " << outStream << endl;
3777 out << " echo \"Error = $error\" " << outStream << endl;
3778 out << "fi" << endl;
3780 out << "if [ \"$segViol\" != \"\" ] ; then" << endl;
3781 out << " error=1" << endl;
3782 out << " echo \"* ########## Job not validated - Segment. violation ###\" " << outStream << endl;
3783 out << " echo \"$segViol\" " << outStream << endl;
3784 out << " echo \"Error = $error\" " << outStream << endl;
3785 out << "fi" << endl;
3787 out << "if [ \"$segFault\" != \"\" ] ; then" << endl;
3788 out << " error=1" << endl;
3789 out << " echo \"* ########## Job not validated - Segment. fault ###\" " << outStream << endl;
3790 out << " echo \"$segFault\" " << outStream << endl;
3791 out << " echo \"Error = $error\" " << outStream << endl;
3792 out << "fi" << endl;
3794 out << "if [ \"$glibcErr\" != \"\" ] ; then" << endl;
3795 out << " error=1" << endl;
3796 out << " echo \"* ########## Job not validated - *** glibc detected *** ###\" " << outStream << endl;
3797 out << " echo \"$glibcErr\" " << outStream << endl;
3798 out << " echo \"Error = $error\" " << outStream << endl;
3799 out << "fi" << endl;
3801 // Part dedicated to the specific analyses running into the train
3803 TString outputFiles = fOutputFiles;
3804 if (merge && !fTerminateFiles.IsNull()) {
3806 outputFiles += fTerminateFiles;
3808 TObjArray *arr = outputFiles.Tokenize(",");
3811 while (!merge && (os=(TObjString*)next1())) {
3812 // No need to validate outputs produced by merging since the merging macro does this
3813 outputFile = os->GetString();
3814 Int_t index = outputFile.Index("@");
3815 if (index > 0) outputFile.Remove(index);
3816 if (fTerminateFiles.Contains(outputFile)) continue;
3817 if (outputFile.Contains("*")) continue;
3818 out << "if ! [ -f " << outputFile.Data() << " ] ; then" << endl;
3819 out << " error=1" << endl;
3820 out << " echo \"Output file " << outputFile << " not found. Job FAILED !\"" << outStream << endl;
3821 out << " echo \"Output file " << outputFile << " not found. Job FAILED !\" >> stderr" << endl;
3822 out << "fi" << endl;
3825 out << "if ! [ -f outputs_valid ] ; then" << endl;
3826 out << " error=1" << endl;
3827 out << " echo \"Output files were not validated by the analysis manager\" >> stdout" << endl;
3828 out << " echo \"Output files were not validated by the analysis manager\" >> stderr" << endl;
3829 out << "fi" << endl;
3831 out << "if [ $error = 0 ] ; then" << endl;
3832 out << " echo \"* ---------------- Job Validated ------------------*\"" << outStream << endl;
3833 if (!IsKeepLogs()) {
3834 out << " echo \"* === Logs std* will be deleted === \"" << endl;
3836 out << " rm -f std*" << endl;
3838 out << "fi" << endl;
3840 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl;
3841 out << "echo \"*******************************************************\"" << outStream << endl;
3842 out << "cd -" << endl;
3843 out << "exit $error" << endl;
3845 Bool_t copy = kTRUE;
3846 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3849 TString workdir = gGrid->GetHomeDirectory();
3850 workdir += fGridWorkingDir;
3851 Info("WriteValidationScript", "\n##### Copying validation script <%s> to your AliEn working space", validationScript.Data());
3852 if (FileExists(validationScript)) gGrid->Rm(validationScript);
3853 TFile::Cp(Form("file:%s",validationScript.Data()), Form("alien://%s/%s", workdir.Data(),validationScript.Data()));