1 /**************************************************************************
2 * Copyright(c) 1998-2007, ALICE Experiment at CERN, All rights reserved. *
4 * Author: The ALICE Off-line Project. *
5 * Contributors are mentioned in the code where appropriate. *
7 * Permission to use, copy, modify and distribute this software and its *
8 * documentation strictly for non-commercial purposes is hereby granted *
9 * without fee, provided that the above copyright notice appears in all *
10 * copies and that both the copyright notice and this permission notice *
11 * appear in the supporting documentation. The authors make no claims *
12 * about the suitability of this software for any purpose. It is *
13 * provided "as is" without express or implied warranty. *
14 **************************************************************************/
16 // Author: Mihaela Gheata, 01/09/2008
18 //==============================================================================
19 // AliAnalysisAlien - AliEn utility class. Provides interface for creating
20 // a personalized JDL, finding and creating a dataset.
21 //==============================================================================
23 #include "Riostream.h"
30 #include "TFileCollection.h"
32 #include "TObjString.h"
33 #include "TObjArray.h"
35 #include "TGridResult.h"
36 #include "TGridCollection.h"
38 #include "TGridJobStatusList.h"
39 #include "TGridJobStatus.h"
40 #include "TFileMerger.h"
41 #include "AliAnalysisManager.h"
42 #include "AliVEventHandler.h"
43 #include "AliAnalysisDataContainer.h"
44 #include "AliAnalysisAlien.h"
46 ClassImp(AliAnalysisAlien)
48 //______________________________________________________________________________
49 AliAnalysisAlien::AliAnalysisAlien()
55 fSplitMaxInputFileNumber(0),
57 fMasterResubmitThreshold(0),
69 fNproofWorkersPerSlave(0),
79 fAdditionalRootLibs(),
107 fRootVersionForProof(),
116 //______________________________________________________________________________
117 AliAnalysisAlien::AliAnalysisAlien(const char *name)
118 :AliAnalysisGrid(name),
123 fSplitMaxInputFileNumber(0),
125 fMasterResubmitThreshold(0),
137 fNproofWorkersPerSlave(0),
141 fExecutableCommand(),
147 fAdditionalRootLibs(),
175 fRootVersionForProof(),
184 //______________________________________________________________________________
185 AliAnalysisAlien::AliAnalysisAlien(const AliAnalysisAlien& other)
186 :AliAnalysisGrid(other),
189 fPrice(other.fPrice),
191 fSplitMaxInputFileNumber(other.fSplitMaxInputFileNumber),
192 fMaxInitFailed(other.fMaxInitFailed),
193 fMasterResubmitThreshold(other.fMasterResubmitThreshold),
194 fNtestFiles(other.fNtestFiles),
195 fNrunsPerMaster(other.fNrunsPerMaster),
196 fMaxMergeFiles(other.fMaxMergeFiles),
197 fNsubmitted(other.fNsubmitted),
198 fProductionMode(other.fProductionMode),
199 fOutputToRunNo(other.fOutputToRunNo),
200 fMergeViaJDL(other.fMergeViaJDL),
201 fFastReadOption(other.fFastReadOption),
202 fOverwriteMode(other.fOverwriteMode),
203 fNreplicas(other.fNreplicas),
204 fNproofWorkers(other.fNproofWorkers),
205 fNproofWorkersPerSlave(other.fNproofWorkersPerSlave),
206 fProofReset(other.fProofReset),
207 fRunNumbers(other.fRunNumbers),
208 fExecutable(other.fExecutable),
209 fExecutableCommand(other.fExecutableCommand),
210 fArguments(other.fArguments),
211 fExecutableArgs(other.fExecutableArgs),
212 fAnalysisMacro(other.fAnalysisMacro),
213 fAnalysisSource(other.fAnalysisSource),
214 fValidationScript(other.fValidationScript),
215 fAdditionalRootLibs(other.fAdditionalRootLibs),
216 fAdditionalLibs(other.fAdditionalLibs),
217 fSplitMode(other.fSplitMode),
218 fAPIVersion(other.fAPIVersion),
219 fROOTVersion(other.fROOTVersion),
220 fAliROOTVersion(other.fAliROOTVersion),
221 fExternalPackages(other.fExternalPackages),
223 fGridWorkingDir(other.fGridWorkingDir),
224 fGridDataDir(other.fGridDataDir),
225 fDataPattern(other.fDataPattern),
226 fGridOutputDir(other.fGridOutputDir),
227 fOutputArchive(other.fOutputArchive),
228 fOutputFiles(other.fOutputFiles),
229 fInputFormat(other.fInputFormat),
230 fDatasetName(other.fDatasetName),
231 fJDLName(other.fJDLName),
232 fTerminateFiles(other.fTerminateFiles),
233 fMergeExcludes(other.fMergeExcludes),
234 fIncludePath(other.fIncludePath),
235 fCloseSE(other.fCloseSE),
236 fFriendChainName(other.fFriendChainName),
237 fJobTag(other.fJobTag),
238 fOutputSingle(other.fOutputSingle),
239 fRunPrefix(other.fRunPrefix),
240 fProofCluster(other.fProofCluster),
241 fProofDataSet(other.fProofDataSet),
242 fFileForTestMode(other.fFileForTestMode),
243 fRootVersionForProof(other.fRootVersionForProof),
244 fAliRootMode(other.fAliRootMode),
249 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
250 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
251 fRunRange[0] = other.fRunRange[0];
252 fRunRange[1] = other.fRunRange[1];
253 if (other.fInputFiles) {
254 fInputFiles = new TObjArray();
255 TIter next(other.fInputFiles);
257 while ((obj=next())) fInputFiles->Add(new TObjString(obj->GetName()));
258 fInputFiles->SetOwner();
260 if (other.fPackages) {
261 fPackages = new TObjArray();
262 TIter next(other.fPackages);
264 while ((obj=next())) fPackages->Add(new TObjString(obj->GetName()));
265 fPackages->SetOwner();
269 //______________________________________________________________________________
270 AliAnalysisAlien::~AliAnalysisAlien()
273 if (fGridJDL) delete fGridJDL;
274 if (fMergingJDL) delete fMergingJDL;
275 if (fInputFiles) delete fInputFiles;
276 if (fPackages) delete fPackages;
279 //______________________________________________________________________________
280 AliAnalysisAlien &AliAnalysisAlien::operator=(const AliAnalysisAlien& other)
283 if (this != &other) {
284 AliAnalysisGrid::operator=(other);
285 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
286 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
287 fPrice = other.fPrice;
289 fSplitMaxInputFileNumber = other.fSplitMaxInputFileNumber;
290 fMaxInitFailed = other.fMaxInitFailed;
291 fMasterResubmitThreshold = other.fMasterResubmitThreshold;
292 fNtestFiles = other.fNtestFiles;
293 fNrunsPerMaster = other.fNrunsPerMaster;
294 fMaxMergeFiles = other.fMaxMergeFiles;
295 fNsubmitted = other.fNsubmitted;
296 fProductionMode = other.fProductionMode;
297 fOutputToRunNo = other.fOutputToRunNo;
298 fMergeViaJDL = other.fMergeViaJDL;
299 fFastReadOption = other.fFastReadOption;
300 fOverwriteMode = other.fOverwriteMode;
301 fNreplicas = other.fNreplicas;
302 fNproofWorkers = other.fNproofWorkers;
303 fNproofWorkersPerSlave = other.fNproofWorkersPerSlave;
304 fProofReset = other.fProofReset;
305 fRunNumbers = other.fRunNumbers;
306 fExecutable = other.fExecutable;
307 fExecutableCommand = other.fExecutableCommand;
308 fArguments = other.fArguments;
309 fExecutableArgs = other.fExecutableArgs;
310 fAnalysisMacro = other.fAnalysisMacro;
311 fAnalysisSource = other.fAnalysisSource;
312 fValidationScript = other.fValidationScript;
313 fAdditionalRootLibs = other.fAdditionalRootLibs;
314 fAdditionalLibs = other.fAdditionalLibs;
315 fSplitMode = other.fSplitMode;
316 fAPIVersion = other.fAPIVersion;
317 fROOTVersion = other.fROOTVersion;
318 fAliROOTVersion = other.fAliROOTVersion;
319 fExternalPackages = other.fExternalPackages;
321 fGridWorkingDir = other.fGridWorkingDir;
322 fGridDataDir = other.fGridDataDir;
323 fDataPattern = other.fDataPattern;
324 fGridOutputDir = other.fGridOutputDir;
325 fOutputArchive = other.fOutputArchive;
326 fOutputFiles = other.fOutputFiles;
327 fInputFormat = other.fInputFormat;
328 fDatasetName = other.fDatasetName;
329 fJDLName = other.fJDLName;
330 fTerminateFiles = other.fTerminateFiles;
331 fMergeExcludes = other.fMergeExcludes;
332 fIncludePath = other.fIncludePath;
333 fCloseSE = other.fCloseSE;
334 fFriendChainName = other.fFriendChainName;
335 fJobTag = other.fJobTag;
336 fOutputSingle = other.fOutputSingle;
337 fRunPrefix = other.fRunPrefix;
338 fProofCluster = other.fProofCluster;
339 fProofDataSet = other.fProofDataSet;
340 fFileForTestMode = other.fFileForTestMode;
341 fRootVersionForProof = other.fRootVersionForProof;
342 fAliRootMode = other.fAliRootMode;
343 if (other.fInputFiles) {
344 fInputFiles = new TObjArray();
345 TIter next(other.fInputFiles);
347 while ((obj=next())) fInputFiles->Add(new TObjString(obj->GetName()));
348 fInputFiles->SetOwner();
350 if (other.fPackages) {
351 fPackages = new TObjArray();
352 TIter next(other.fPackages);
354 while ((obj=next())) fPackages->Add(new TObjString(obj->GetName()));
355 fPackages->SetOwner();
361 //______________________________________________________________________________
362 void AliAnalysisAlien::AddIncludePath(const char *path)
364 // Add include path in the remote analysis macro.
366 if (p.Contains("-I")) fIncludePath += Form("%s ", path);
367 else fIncludePath += Form("-I%s ", path);
370 //______________________________________________________________________________
371 void AliAnalysisAlien::AddRunNumber(Int_t run)
373 // Add a run number to the list of runs to be processed.
374 if (fRunNumbers.Length()) fRunNumbers += " ";
375 fRunNumbers += Form("%s%d", fRunPrefix.Data(), run);
378 //______________________________________________________________________________
379 void AliAnalysisAlien::AddRunNumber(const char* run)
381 // Add a run number to the list of runs to be processed.
382 if (fRunNumbers.Length()) fRunNumbers += " ";
386 //______________________________________________________________________________
387 void AliAnalysisAlien::AddDataFile(const char *lfn)
389 // Adds a data file to the input to be analysed. The file should be a valid LFN
390 // or point to an existing file in the alien workdir.
391 if (!fInputFiles) fInputFiles = new TObjArray();
392 fInputFiles->Add(new TObjString(lfn));
395 //______________________________________________________________________________
396 void AliAnalysisAlien::AddExternalPackage(const char *package)
398 // Adds external packages w.r.t to the default ones (root,aliroot and gapi)
399 if (fExternalPackages) fExternalPackages += " ";
400 fExternalPackages += package;
403 //______________________________________________________________________________
404 Bool_t AliAnalysisAlien::Connect()
406 // Try to connect to AliEn. User needs a valid token and /tmp/gclient_env_$UID sourced.
407 if (gGrid && gGrid->IsConnected()) return kTRUE;
408 if (fProductionMode) return kTRUE;
410 Info("Connect", "Trying to connect to AliEn ...");
411 TGrid::Connect("alien://");
413 if (!gGrid || !gGrid->IsConnected()) {
414 Error("Connect", "Did not managed to connect to AliEn. Make sure you have a valid token.");
417 fUser = gGrid->GetUser();
418 Info("Connect", "\n##### Connected to AliEn as user %s. Setting analysis user to <%s>", fUser.Data(), fUser.Data());
422 //______________________________________________________________________________
423 void AliAnalysisAlien::CdWork()
425 // Check validity of alien workspace. Create directory if possible.
427 Error("CdWork", "Alien connection required");
430 TString homedir = gGrid->GetHomeDirectory();
431 TString workdir = homedir + fGridWorkingDir;
432 if (DirectoryExists(workdir)) {
436 // Work directory not existing - create it
438 if (gGrid->Mkdir(workdir, "-p")) {
439 gGrid->Cd(fGridWorkingDir);
440 Info("CdWork", "\n##### Created alien working directory %s", fGridWorkingDir.Data());
442 Warning("CdWork", "Working directory %s cannot be created.\n Using %s instead.",
443 workdir.Data(), homedir.Data());
444 fGridWorkingDir = "";
448 //______________________________________________________________________________
449 Bool_t AliAnalysisAlien::CheckFileCopy(const char *alienpath)
451 // Check if file copying is possible.
452 if (fProductionMode) return kTRUE;
454 Error("CheckFileCopy", "Not connected to AliEn. File copying cannot be tested.");
457 Info("CheckFileCopy", "Checking possibility to copy files to your AliEn home directory... \
458 \n +++ NOTE: You can disable this via: plugin->SetCheckCopy(kFALSE);");
459 // Check if alien_CLOSE_SE is defined
460 TString closeSE = gSystem->Getenv("alien_CLOSE_SE");
461 if (!closeSE.IsNull()) {
462 Info("CheckFileCopy", "Your current close storage is pointing to: \
463 \n alien_CLOSE_SE = \"%s\"", closeSE.Data());
465 Warning("CheckFileCopy", "Your current close storage is empty ! Depending on your location, file copying may fail.");
467 // Check if grid directory exists.
468 if (!DirectoryExists(alienpath)) {
469 Error("CheckFileCopy", "Alien path %s does not seem to exist", alienpath);
472 TFile f("plugin_test_copy", "RECREATE");
473 // User may not have write permissions to current directory
475 Error("CheckFileCopy", "Cannot create local test file. Do you have write access to current directory: <%s> ?",
476 gSystem->WorkingDirectory());
480 if (FileExists(Form("alien://%s/%s",alienpath, f.GetName()))) gGrid->Rm(Form("alien://%s/%s",alienpath, f.GetName()));
481 if (!TFile::Cp(f.GetName(), Form("alien://%s/%s",alienpath, f.GetName()))) {
482 Error("CheckFileCopy", "Cannot copy files to Alien destination: <%s> This may be temporary, or: \
483 \n# 1. Make sure you have write permissions there. If this is the case: \
484 \n# 2. Check the storage availability at: http://alimonitor.cern.ch/stats?page=SE/table \
485 \n# Do: export alien_CLOSE_SE=\"working_disk_SE\" \
486 \n# To make this permanent put in in your .bashrc (in .alienshrc is not enough) \
487 \n# Redo token: rm /tmp/x509up_u$UID then: alien-token-init <username>", alienpath);
488 gSystem->Unlink(f.GetName());
491 gSystem->Unlink(f.GetName());
492 gGrid->Rm(Form("%s%s",alienpath,f.GetName()));
493 Info("CheckFileCopy", "### ...SUCCESS ###");
497 //______________________________________________________________________________
498 Bool_t AliAnalysisAlien::CheckInputData()
500 // Check validity of input data. If necessary, create xml files.
501 if (fProductionMode) return kTRUE;
502 if (!fInputFiles && !fRunNumbers.Length() && !fRunRange[0]) {
503 if (!fGridDataDir.Length()) {
504 Error("CkeckInputData", "AliEn path to base data directory must be set.\n = Use: SetGridDataDir()");
507 Info("CheckInputData", "Analysis will make a single xml for base data directory %s",fGridDataDir.Data());
508 if (fDataPattern.Contains("tag") && TestBit(AliAnalysisGrid::kTest))
509 TObject::SetBit(AliAnalysisGrid::kUseTags, kTRUE); // ADDED (fix problem in determining the tag usage in test mode)
512 // Process declared files
513 Bool_t isCollection = kFALSE;
514 Bool_t isXml = kFALSE;
515 Bool_t useTags = kFALSE;
516 Bool_t checked = kFALSE;
517 if (!TestBit(AliAnalysisGrid::kTest)) CdWork();
519 TString workdir = gGrid->GetHomeDirectory();
520 workdir += fGridWorkingDir;
523 TIter next(fInputFiles);
524 while ((objstr=(TObjString*)next())) {
527 file += objstr->GetString();
528 // Store full lfn path
529 if (FileExists(file)) objstr->SetString(file);
531 file = objstr->GetName();
532 if (!FileExists(objstr->GetName())) {
533 Error("CheckInputData", "Data file %s not found or not in your working dir: %s",
534 objstr->GetName(), workdir.Data());
538 Bool_t iscoll, isxml, usetags;
539 CheckDataType(file, iscoll, isxml, usetags);
542 isCollection = iscoll;
545 TObject::SetBit(AliAnalysisGrid::kUseTags, useTags);
547 if ((iscoll != isCollection) || (isxml != isXml) || (usetags != useTags)) {
548 Error("CheckInputData", "Some conflict was found in the types of inputs");
554 // Process requested run numbers
555 if (!fRunNumbers.Length() && !fRunRange[0]) return kTRUE;
556 // Check validity of alien data directory
557 if (!fGridDataDir.Length()) {
558 Error("CkeckInputData", "AliEn path to base data directory must be set.\n = Use: SetGridDataDir()");
561 if (!DirectoryExists(fGridDataDir)) {
562 Error("CheckInputData", "Data directory %s not existing.", fGridDataDir.Data());
566 Error("CheckInputData", "You are using raw AliEn collections as input. Cannot process run numbers.");
570 if (checked && !isXml) {
571 Error("CheckInputData", "Cannot mix processing of full runs with non-xml files");
574 // Check validity of run number(s)
578 TString schunk, schunk2;
582 useTags = fDataPattern.Contains("tag");
583 TObject::SetBit(AliAnalysisGrid::kUseTags, useTags);
585 if (useTags != fDataPattern.Contains("tag")) {
586 Error("CheckInputData", "Cannot mix input files using/not using tags");
589 if (fRunNumbers.Length()) {
590 Info("CheckDataType", "Using supplied run numbers (run ranges are ignored)");
591 arr = fRunNumbers.Tokenize(" ");
593 while ((os=(TObjString*)next())) {
594 path = Form("%s/%s ", fGridDataDir.Data(), os->GetString().Data());
595 if (!DirectoryExists(path)) {
596 Warning("CheckInputData", "Run number %s not found in path: <%s>", os->GetString().Data(), path.Data());
599 path = Form("%s/%s.xml", workdir.Data(),os->GetString().Data());
600 TString msg = "\n##### file: ";
602 msg += " type: xml_collection;";
603 if (useTags) msg += " using_tags: Yes";
604 else msg += " using_tags: No";
605 Info("CheckDataType", "%s", msg.Data());
606 if (fNrunsPerMaster<2) {
607 AddDataFile(Form("%s.xml", os->GetString().Data()));
610 if (((nruns-1)%fNrunsPerMaster) == 0) {
611 schunk = os->GetString();
613 if ((nruns%fNrunsPerMaster)!=0 && os!=arr->Last()) continue;
614 schunk += Form("_%s.xml", os->GetString().Data());
620 Info("CheckDataType", "Using run range [%d, %d]", fRunRange[0], fRunRange[1]);
621 for (Int_t irun=fRunRange[0]; irun<=fRunRange[1]; irun++) {
622 path = Form("%s/%s%d ", fGridDataDir.Data(), fRunPrefix.Data(), irun);
623 if (!DirectoryExists(path)) {
624 // Warning("CheckInputData", "Run number %d not found in path: <%s>", irun, path.Data());
627 path = Form("%s/%s%d.xml", workdir.Data(),fRunPrefix.Data(),irun);
628 TString msg = "\n##### file: ";
630 msg += " type: xml_collection;";
631 if (useTags) msg += " using_tags: Yes";
632 else msg += " using_tags: No";
633 Info("CheckDataType", "%s", msg.Data());
634 if (fNrunsPerMaster<2) {
635 AddDataFile(Form("%s%d.xml",fRunPrefix.Data(),irun));
638 if (((nruns-1)%fNrunsPerMaster) == 0) {
639 schunk = Form("%s%d", fRunPrefix.Data(),irun);
641 schunk2 = Form("_%s%d.xml", fRunPrefix.Data(), irun);
642 if ((nruns%fNrunsPerMaster)!=0 && irun != fRunRange[1]) continue;
655 //______________________________________________________________________________
656 Bool_t AliAnalysisAlien::CreateDataset(const char *pattern)
658 // Create dataset for the grid data directory + run number.
659 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline)) return kTRUE;
661 Error("CreateDataset", "Cannot create dataset with no grid connection");
666 if (!TestBit(AliAnalysisGrid::kTest)) CdWork();
667 TString workdir = gGrid->GetHomeDirectory();
668 workdir += fGridWorkingDir;
670 // Compose the 'find' command arguments
672 TString options = "-x collection ";
673 if (TestBit(AliAnalysisGrid::kTest)) options += Form("-l %d ", fNtestFiles);
674 TString conditions = "";
679 TString schunk, schunk2;
680 TGridCollection *cbase=0, *cadd=0;
681 if (!fRunNumbers.Length() && !fRunRange[0]) {
682 if (fInputFiles && fInputFiles->GetEntries()) return kTRUE;
683 // Make a single data collection from data directory.
685 if (!DirectoryExists(path)) {
686 Error("CreateDataset", "Path to data directory %s not valid",fGridDataDir.Data());
690 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
691 else file = Form("%s.xml", gSystem->BaseName(path));
692 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
698 command += conditions;
699 printf("command: %s\n", command.Data());
700 TGridResult *res = gGrid->Command(command);
702 // Write standard output to file
703 gROOT->ProcessLine(Form("gGrid->Stdout(); > %s", file.Data()));
704 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
705 Bool_t nullFile = kFALSE;
707 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
709 nullFile = (gSystem->Exec(Form("grep /event %s 2>/dev/null > /dev/null",file.Data()))==0)?kFALSE:kTRUE;
711 Error("CreateDataset","Dataset %s produced by the previous find command is empty !", file.Data());
716 Bool_t fileExists = FileExists(file);
717 if (!TestBit(AliAnalysisGrid::kTest) && (!fileExists || fOverwriteMode)) {
718 // Copy xml file to alien space
719 if (fileExists) gGrid->Rm(file);
720 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
721 if (!FileExists(file)) {
722 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
725 // Update list of files to be processed.
727 AddDataFile(Form("%s/%s", workdir.Data(), file.Data()));
731 Bool_t nullResult = kTRUE;
732 if (fRunNumbers.Length()) {
733 TObjArray *arr = fRunNumbers.Tokenize(" ");
736 while ((os=(TObjString*)next())) {
737 path = Form("%s/%s ", fGridDataDir.Data(), os->GetString().Data());
738 if (!DirectoryExists(path)) continue;
740 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
741 else file = Form("%s.xml", os->GetString().Data());
742 // If local collection file does not exist, create it via 'find' command.
743 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
748 command += conditions;
749 TGridResult *res = gGrid->Command(command);
751 // Write standard output to file
752 gROOT->ProcessLine(Form("gGrid->Stdout(); > %s", file.Data()));
753 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
754 Bool_t nullFile = kFALSE;
756 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
758 nullFile = (gSystem->Exec(Form("grep /event %s 2>/dev/null > /dev/null",file.Data()))==0)?kFALSE:kTRUE;
760 Warning("CreateDataset","Dataset %s produced by: <%s> is empty !", file.Data(), command.Data());
761 fRunNumbers.ReplaceAll(os->GetString().Data(), "");
767 if (TestBit(AliAnalysisGrid::kTest)) break;
768 // Check if there is one run per master job.
769 if (fNrunsPerMaster<2) {
770 if (FileExists(file)) {
771 if (fOverwriteMode) gGrid->Rm(file);
773 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
777 // Copy xml file to alien space
778 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
779 if (!FileExists(file)) {
780 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
786 if (((nruns-1)%fNrunsPerMaster) == 0) {
787 schunk = os->GetString();
788 cbase = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
790 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
791 printf(" Merging collection <%s> into masterjob input...\n", file.Data());
795 if ((nruns%fNrunsPerMaster)!=0 && os!=arr->Last()) {
798 schunk += Form("_%s.xml", os->GetString().Data());
799 if (FileExists(schunk)) {
800 if (fOverwriteMode) gGrid->Rm(file);
802 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", schunk.Data());
806 printf("Exporting merged collection <%s> and copying to AliEn\n", schunk.Data());
807 cbase->ExportXML(Form("file://%s", schunk.Data()),kFALSE,kFALSE, schunk, "Merged runs");
808 TFile::Cp(Form("file:%s",schunk.Data()), Form("alien://%s/%s",workdir.Data(), schunk.Data()));
809 if (!FileExists(schunk)) {
810 Error("CreateDataset", "Copy command did NOT succeed for %s", schunk.Data());
818 Error("CreateDataset", "No valid dataset corresponding to the query!");
822 // Process a full run range.
823 for (Int_t irun=fRunRange[0]; irun<=fRunRange[1]; irun++) {
824 path = Form("%s/%s%d ", fGridDataDir.Data(), fRunPrefix.Data(), irun);
825 if (!DirectoryExists(path)) continue;
827 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
828 else file = Form("%s%d.xml", fRunPrefix.Data(), irun);
829 if (FileExists(file) && fNrunsPerMaster<2 && !TestBit(AliAnalysisGrid::kTest)) {
830 if (fOverwriteMode) gGrid->Rm(file);
832 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
836 // If local collection file does not exist, create it via 'find' command.
837 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
842 command += conditions;
843 TGridResult *res = gGrid->Command(command);
845 // Write standard output to file
846 gROOT->ProcessLine(Form("gGrid->Stdout(); > %s", file.Data()));
847 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
848 Bool_t nullFile = kFALSE;
850 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
852 nullFile = (gSystem->Exec(Form("grep /event %s 2>/dev/null > /dev/null",file.Data()))==0)?kFALSE:kTRUE;
854 Warning("CreateDataset","Dataset %s produced by: <%s> is empty !", file.Data(), command.Data());
860 if (TestBit(AliAnalysisGrid::kTest)) break;
861 // Check if there is one run per master job.
862 if (fNrunsPerMaster<2) {
863 if (FileExists(file)) {
864 if (fOverwriteMode) gGrid->Rm(file);
866 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
870 // Copy xml file to alien space
871 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
872 if (!FileExists(file)) {
873 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
878 // Check if the collection for the chunk exist locally.
879 Int_t nchunk = (nruns-1)/fNrunsPerMaster;
880 if (FileExists(fInputFiles->At(nchunk)->GetName())) {
881 if (fOverwriteMode) gGrid->Rm(fInputFiles->At(nchunk)->GetName());
884 printf(" Merging collection <%s> into %d runs chunk...\n",file.Data(),fNrunsPerMaster);
885 if (((nruns-1)%fNrunsPerMaster) == 0) {
886 schunk = Form("%s%d", fRunPrefix.Data(), irun);
887 cbase = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
889 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
893 schunk2 = Form("%s_%s%d.xml", schunk.Data(), fRunPrefix.Data(), irun);
894 if ((nruns%fNrunsPerMaster)!=0 && irun!=fRunRange[1] && schunk2 != fInputFiles->Last()->GetName()) {
898 if (FileExists(schunk)) {
899 if (fOverwriteMode) gGrid->Rm(schunk);
901 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", schunk.Data());
905 printf("Exporting merged collection <%s> and copying to AliEn.\n", schunk.Data());
906 cbase->ExportXML(Form("file://%s", schunk.Data()),kFALSE,kFALSE, schunk, "Merged runs");
907 if (FileExists(schunk)) {
908 if (fOverwriteMode) gGrid->Rm(schunk);
910 Info("CreateDataset", "\n##### Dataset %s exist. Skipping copy...", schunk.Data());
914 TFile::Cp(Form("file:%s",schunk.Data()), Form("alien://%s/%s",workdir.Data(), schunk.Data()));
915 if (!FileExists(schunk)) {
916 Error("CreateDataset", "Copy command did NOT succeed for %s", schunk.Data());
922 Error("CreateDataset", "No valid dataset corresponding to the query!");
929 //______________________________________________________________________________
930 Bool_t AliAnalysisAlien::CreateJDL()
932 // Generate a JDL file according to current settings. The name of the file is
933 // specified by fJDLName.
934 Bool_t error = kFALSE;
937 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
938 Bool_t generate = kTRUE;
939 if (TestBit(AliAnalysisGrid::kTest) || TestBit(AliAnalysisGrid::kSubmit)) generate = kFALSE;
941 Error("CreateJDL", "Alien connection required");
944 // Check validity of alien workspace
946 if (!fProductionMode && !fGridWorkingDir.BeginsWith("/alice")) workdir = gGrid->GetHomeDirectory();
947 if (!fProductionMode && !TestBit(AliAnalysisGrid::kTest)) CdWork();
948 workdir += fGridWorkingDir;
952 Error("CreateJDL()", "Define some input files for your analysis.");
955 // Compose list of input files
956 // Check if output files were defined
957 if (!fOutputFiles.Length()) {
958 Error("CreateJDL", "You must define at least one output file");
961 // Check if an output directory was defined and valid
962 if (!fGridOutputDir.Length()) {
963 Error("CreateJDL", "You must define AliEn output directory");
966 if (!fProductionMode) {
967 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s", workdir.Data(), fGridOutputDir.Data());
968 if (!DirectoryExists(fGridOutputDir)) {
969 if (gGrid->Mkdir(fGridOutputDir,"-p")) {
970 Info("CreateJDL", "\n##### Created alien output directory %s", fGridOutputDir.Data());
972 Error("CreateJDL", "Could not create alien output directory %s", fGridOutputDir.Data());
979 // Exit if any error up to now
980 if (error) return kFALSE;
982 if (!fUser.IsNull()) {
983 fGridJDL->SetValue("User", Form("\"%s\"", fUser.Data()));
984 fMergingJDL->SetValue("User", Form("\"%s\"", fUser.Data()));
986 fGridJDL->SetExecutable(fExecutable, "This is the startup script");
987 TString mergeExec = fExecutable;
988 mergeExec.ReplaceAll(".sh", "_merge.sh");
989 fMergingJDL->SetExecutable(mergeExec, "This is the startup script");
990 mergeExec.ReplaceAll(".sh", ".C");
991 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),mergeExec.Data()), "List of input files to be uploaded to workers");
992 if (!fArguments.IsNull())
993 fGridJDL->SetArguments(fArguments, "Arguments for the executable command");
994 if (IsOneStageMerging()) fMergingJDL->SetArguments(fGridOutputDir);
995 else fMergingJDL->SetArguments("$1 $2 $3");
996 fGridJDL->SetValue("TTL", Form("\"%d\"",fTTL));
997 fGridJDL->SetDescription("TTL", Form("Time after which the job is killed (%d min.)", fTTL/60));
998 fMergingJDL->SetValue("TTL", Form("\"%d\"",fTTL));
999 fMergingJDL->SetDescription("TTL", Form("Time after which the job is killed (%d min.)", fTTL/60));
1001 if (fMaxInitFailed > 0) {
1002 fGridJDL->SetValue("MaxInitFailed", Form("\"%d\"",fMaxInitFailed));
1003 fGridJDL->SetDescription("MaxInitFailed", "Maximum number of first failing jobs to abort the master job");
1005 if (fSplitMaxInputFileNumber > 0) {
1006 fGridJDL->SetValue("SplitMaxInputFileNumber", Form("\"%d\"", fSplitMaxInputFileNumber));
1007 fGridJDL->SetDescription("SplitMaxInputFileNumber", "Maximum number of input files to be processed per subjob");
1009 if (fSplitMode.Length()) {
1010 fGridJDL->SetValue("Split", Form("\"%s\"", fSplitMode.Data()));
1011 fGridJDL->SetDescription("Split", "We split per SE or file");
1013 if (!fAliROOTVersion.IsNull()) {
1014 fGridJDL->AddToPackages("AliRoot", fAliROOTVersion,"VO_ALICE", "List of requested packages");
1015 fMergingJDL->AddToPackages("AliRoot", fAliROOTVersion, "VO_ALICE", "List of requested packages");
1017 if (!fROOTVersion.IsNull()) {
1018 fGridJDL->AddToPackages("ROOT", fROOTVersion);
1019 fMergingJDL->AddToPackages("ROOT", fROOTVersion);
1021 if (!fAPIVersion.IsNull()) {
1022 fGridJDL->AddToPackages("APISCONFIG", fAPIVersion);
1023 fMergingJDL->AddToPackages("APISCONFIG", fAPIVersion);
1025 if (!fExternalPackages.IsNull()) {
1026 arr = fExternalPackages.Tokenize(" ");
1028 while ((os=(TObjString*)next())) {
1029 TString pkgname = os->GetString();
1030 Int_t index = pkgname.Index("::");
1031 TString pkgversion = pkgname(index+2, pkgname.Length());
1032 pkgname.Remove(index);
1033 fGridJDL->AddToPackages(pkgname, pkgversion);
1034 fMergingJDL->AddToPackages(pkgname, pkgversion);
1038 fGridJDL->SetInputDataListFormat(fInputFormat, "Format of input data");
1039 fGridJDL->SetInputDataList("wn.xml", "Collection name to be processed on each worker node");
1040 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), fAnalysisMacro.Data()), "List of input files to be uploaded to workers");
1041 TString analysisFile = fExecutable;
1042 analysisFile.ReplaceAll(".sh", ".root");
1043 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),analysisFile.Data()));
1044 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),analysisFile.Data()));
1045 if (IsUsingTags() && !gSystem->AccessPathName("ConfigureCuts.C"))
1046 fGridJDL->AddToInputSandbox(Form("LF:%s/ConfigureCuts.C", workdir.Data()));
1047 if (fAdditionalLibs.Length()) {
1048 arr = fAdditionalLibs.Tokenize(" ");
1050 while ((os=(TObjString*)next())) {
1051 if (os->GetString().Contains(".so")) continue;
1052 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), os->GetString().Data()));
1053 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), os->GetString().Data()));
1058 TIter next(fPackages);
1060 while ((obj=next())) {
1061 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), obj->GetName()));
1062 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), obj->GetName()));
1065 if (fOutputArchive.Length()) {
1066 arr = fOutputArchive.Tokenize(" ");
1068 Bool_t first = kTRUE;
1069 const char *comment = "Files to be archived";
1070 const char *comment1 = comment;
1071 while ((os=(TObjString*)next())) {
1072 if (!first) comment = NULL;
1073 if (!os->GetString().Contains("@") && fCloseSE.Length())
1074 fGridJDL->AddToOutputArchive(Form("%s@%s",os->GetString().Data(), fCloseSE.Data()), comment);
1076 fGridJDL->AddToOutputArchive(os->GetString(), comment);
1080 // Output archive for the merging jdl
1081 TString outputArchive;
1082 if (TestBit(AliAnalysisGrid::kDefaultOutputs)) {
1083 outputArchive = "log_archive.zip:std*@disk=1 ";
1084 // Add normal output files, extra files + terminate files
1085 TString files = GetListOfFiles("outextter");
1086 // Do not register merge excludes
1087 if (!fMergeExcludes.IsNull()) {
1088 arr = fMergeExcludes.Tokenize(" ");
1090 while ((os=(TObjString*)next1())) {
1091 files.ReplaceAll(Form("%s,",os->GetString().Data()),"");
1092 files.ReplaceAll(os->GetString(),"");
1096 files.ReplaceAll(".root", "*.root");
1097 outputArchive += Form("root_archive.zip:%s@disk=%d",files.Data(),fNreplicas);
1099 outputArchive = fOutputArchive;
1101 arr = outputArchive.Tokenize(" ");
1105 while ((os=(TObjString*)next2())) {
1106 if (!first) comment = NULL;
1107 TString currentfile = os->GetString();
1108 if (!IsOneStageMerging()) currentfile.ReplaceAll(".zip", "-Stage$2_$3.zip");
1109 if (!currentfile.Contains("@") && fCloseSE.Length())
1110 fMergingJDL->AddToOutputArchive(Form("%s@%s",currentfile.Data(), fCloseSE.Data()), comment);
1112 fMergingJDL->AddToOutputArchive(currentfile, comment);
1117 arr = fOutputFiles.Tokenize(",");
1119 Bool_t first = kTRUE;
1120 const char *comment = "Files to be saved";
1121 while ((os=(TObjString*)next())) {
1122 // Ignore ouputs in jdl that are also in outputarchive
1123 TString sout = os->GetString();
1124 sout.ReplaceAll("*", "");
1125 sout.ReplaceAll(".root", "");
1126 if (sout.Index("@")>0) sout.Remove(sout.Index("@"));
1127 if (fOutputArchive.Contains(sout)) continue;
1128 if (!first) comment = NULL;
1129 if (!os->GetString().Contains("@") && fCloseSE.Length())
1130 fGridJDL->AddToOutputSandbox(Form("%s@%s",os->GetString().Data(), fCloseSE.Data()), comment);
1132 fGridJDL->AddToOutputSandbox(os->GetString(), comment);
1134 if (fMergeExcludes.Contains(sout)) continue;
1135 if (!os->GetString().Contains("@") && fCloseSE.Length())
1136 fMergingJDL->AddToOutputSandbox(Form("%s@%s",os->GetString().Data(), fCloseSE.Data()), comment);
1138 fMergingJDL->AddToOutputSandbox(os->GetString(), comment);
1141 fGridJDL->SetPrice((UInt_t)fPrice, "AliEn price for this job");
1142 fMergingJDL->SetPrice((UInt_t)fPrice, "AliEn price for this job");
1143 TString validationScript = fValidationScript;
1144 fGridJDL->SetValidationCommand(Form("%s/%s", workdir.Data(),validationScript.Data()), "Validation script to be run for each subjob");
1145 validationScript.ReplaceAll(".sh", "_merge.sh");
1146 fMergingJDL->SetValidationCommand(Form("%s/%s", workdir.Data(),validationScript.Data()), "Validation script to be run for each subjob");
1147 if (fMasterResubmitThreshold) {
1148 fGridJDL->SetValue("MasterResubmitThreshold", Form("\"%d%%\"", fMasterResubmitThreshold));
1149 fGridJDL->SetDescription("MasterResubmitThreshold", "Resubmit failed jobs until DONE rate reaches this percentage");
1151 // Write a jdl with 2 input parameters: collection name and output dir name.
1154 // Copy jdl to grid workspace
1156 // Check if an output directory was defined and valid
1157 if (!fGridOutputDir.Length()) {
1158 Error("CreateJDL", "You must define AliEn output directory");
1161 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s", workdir.Data(), fGridOutputDir.Data());
1162 if (!fProductionMode && !DirectoryExists(fGridOutputDir)) {
1163 if (gGrid->Mkdir(fGridOutputDir,"-p")) {
1164 Info("CreateJDL", "\n##### Created alien output directory %s", fGridOutputDir.Data());
1166 Error("CreateJDL", "Could not create alien output directory %s", fGridOutputDir.Data());
1172 if (TestBit(AliAnalysisGrid::kSubmit)) {
1173 TString mergeJDLName = fExecutable;
1174 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
1175 TString locjdl = Form("%s/%s", fGridOutputDir.Data(),fJDLName.Data());
1176 TString locjdl1 = Form("%s/%s", fGridOutputDir.Data(),mergeJDLName.Data());
1177 if (fProductionMode) {
1178 locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
1179 locjdl1 = Form("%s/%s", workdir.Data(),mergeJDLName.Data());
1181 if (FileExists(locjdl)) gGrid->Rm(locjdl);
1182 if (FileExists(locjdl1)) gGrid->Rm(locjdl1);
1183 Info("CreateJDL", "\n##### Copying JDL file <%s> to your AliEn output directory", fJDLName.Data());
1184 TFile::Cp(Form("file:%s",fJDLName.Data()), Form("alien://%s", locjdl.Data()));
1186 Info("CreateJDL", "\n##### Copying merging JDL file <%s> to your AliEn output directory", mergeJDLName.Data());
1187 TFile::Cp(Form("file:%s",mergeJDLName.Data()), Form("alien://%s", locjdl1.Data()));
1190 if (fAdditionalLibs.Length()) {
1191 arr = fAdditionalLibs.Tokenize(" ");
1194 while ((os=(TObjString*)next())) {
1195 if (os->GetString().Contains(".so")) continue;
1196 Info("CreateJDL", "\n##### Copying dependency: <%s> to your alien workspace", os->GetString().Data());
1197 if (FileExists(os->GetString())) gGrid->Rm(os->GetString());
1198 TFile::Cp(Form("file:%s",os->GetString().Data()), Form("alien://%s/%s", workdir.Data(), os->GetString().Data()));
1203 TIter next(fPackages);
1205 while ((obj=next())) {
1206 if (FileExists(obj->GetName())) gGrid->Rm(obj->GetName());
1207 Info("CreateJDL", "\n##### Copying dependency: <%s> to your alien workspace", obj->GetName());
1208 TFile::Cp(Form("file:%s",obj->GetName()), Form("alien://%s/%s", workdir.Data(), obj->GetName()));
1215 //______________________________________________________________________________
1216 Bool_t AliAnalysisAlien::WriteJDL(Bool_t copy)
1218 // Writes one or more JDL's corresponding to findex. If findex is negative,
1219 // all run numbers are considered in one go (jdl). For non-negative indices
1220 // they correspond to the indices in the array fInputFiles.
1221 if (!fInputFiles) return kFALSE;
1224 if (!fProductionMode && !fGridWorkingDir.BeginsWith("/alice")) workdir = gGrid->GetHomeDirectory();
1225 workdir += fGridWorkingDir;
1227 if (fProductionMode) {
1228 TIter next(fInputFiles);
1230 fGridJDL->AddToInputDataCollection(Form("LF:%s,nodownload", os->GetName()), "Input xml collections");
1231 fGridJDL->SetOutputDirectory(Form("%s/#alien_counter_04i#", fGridOutputDir.Data()));
1232 fMergingJDL->SetOutputDirectory(fGridOutputDir);
1234 if (!fRunNumbers.Length() && !fRunRange[0]) {
1235 // One jdl with no parameters in case input data is specified by name.
1236 TIter next(fInputFiles);
1238 fGridJDL->AddToInputDataCollection(Form("LF:%s,nodownload", os->GetName()), "Input xml collections");
1239 if (!fOutputSingle.IsNull())
1240 fGridJDL->SetOutputDirectory(Form("#alienfulldir#/../%s",fOutputSingle.Data()), "Output directory");
1242 fGridJDL->SetOutputDirectory(Form("%s/#alien_counter_03i#", fGridOutputDir.Data()), "Output directory");
1243 fMergingJDL->SetOutputDirectory(fGridOutputDir);
1246 // One jdl to be submitted with 2 input parameters: data collection name and output dir prefix
1247 fGridJDL->AddToInputDataCollection(Form("LF:%s/$1,nodownload", workdir.Data()), "Input xml collections");
1248 if (!fOutputSingle.IsNull()) {
1249 if (!fOutputToRunNo) fGridJDL->SetOutputDirectory(Form("#alienfulldir#/%s",fOutputSingle.Data()), "Output directory");
1250 else fGridJDL->SetOutputDirectory(Form("%s/$2",fGridOutputDir.Data()), "Output directory");
1252 fGridJDL->SetOutputDirectory(Form("%s/$2/#alien_counter_03i#", fGridOutputDir.Data()), "Output directory");
1253 fMergingJDL->SetOutputDirectory("$1", "Output directory");
1258 // Generate the JDL as a string
1259 TString sjdl = fGridJDL->Generate();
1260 TString sjdl1 = fMergingJDL->Generate();
1262 sjdl.ReplaceAll("\"LF:", "\n \"LF:");
1263 sjdl.ReplaceAll("(member", "\n (member");
1264 sjdl.ReplaceAll("\",\"VO_", "\",\n \"VO_");
1265 sjdl.ReplaceAll("{", "{\n ");
1266 sjdl.ReplaceAll("};", "\n};");
1267 sjdl.ReplaceAll("{\n \n", "{\n");
1268 sjdl.ReplaceAll("\n\n", "\n");
1269 sjdl.ReplaceAll("OutputDirectory", "OutputDir");
1270 sjdl1.ReplaceAll("\"LF:", "\n \"LF:");
1271 sjdl1.ReplaceAll("(member", "\n (member");
1272 sjdl1.ReplaceAll("\",\"VO_", "\",\n \"VO_");
1273 sjdl1.ReplaceAll("{", "{\n ");
1274 sjdl1.ReplaceAll("};", "\n};");
1275 sjdl1.ReplaceAll("{\n \n", "{\n");
1276 sjdl1.ReplaceAll("\n\n", "\n");
1277 sjdl1.ReplaceAll("OutputDirectory", "OutputDir");
1278 sjdl += "JDLVariables = \n{\n \"Packages\",\n \"OutputDir\"\n};\n";
1279 sjdl.Prepend(Form("Jobtag = {\n \"comment:%s\"\n};\n", fJobTag.Data()));
1280 index = sjdl.Index("JDLVariables");
1281 if (index >= 0) sjdl.Insert(index, "\n# JDL variables\n");
1282 sjdl += "Workdirectorysize = {\"5000MB\"};";
1283 sjdl1 += "JDLVariables = \n{\n \"Packages\",\n \"OutputDir\"\n};\n";
1284 index = fJobTag.Index(":");
1285 if (index < 0) index = fJobTag.Length();
1286 TString jobTag = fJobTag;
1287 jobTag.Insert(index, "_Merging");
1288 sjdl1.Prepend(Form("Jobtag = {\n \"comment:%s_Merging\"\n};\n", jobTag.Data()));
1289 sjdl1.Prepend("# Generated merging jdl\n# $1 = full alien path to output directory to be merged\n# $2 = merging stage\n# $3 = merged chunk\n");
1290 index = sjdl1.Index("JDLVariables");
1291 if (index >= 0) sjdl1.Insert(index, "\n# JDL variables\n");
1292 sjdl1 += "Workdirectorysize = {\"5000MB\"};";
1293 // Write jdl to file
1295 out.open(fJDLName.Data(), ios::out);
1297 Error("WriteJDL", "Bad file name: %s", fJDLName.Data());
1300 out << sjdl << endl;
1301 TString mergeJDLName = fExecutable;
1302 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
1305 out1.open(mergeJDLName.Data(), ios::out);
1307 Error("WriteJDL", "Bad file name: %s", mergeJDLName.Data());
1310 out1 << sjdl1 << endl;
1313 // Copy jdl to grid workspace
1315 Info("WriteJDL", "\n##### You may want to review jdl:%s and analysis macro:%s before running in <submit> mode", fJDLName.Data(), fAnalysisMacro.Data());
1317 TString locjdl = Form("%s/%s", fGridOutputDir.Data(),fJDLName.Data());
1318 TString locjdl1 = Form("%s/%s", fGridOutputDir.Data(),mergeJDLName.Data());
1319 if (fProductionMode) {
1320 locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
1321 locjdl1 = Form("%s/%s", workdir.Data(),mergeJDLName.Data());
1323 if (FileExists(locjdl)) gGrid->Rm(locjdl);
1324 if (FileExists(locjdl1)) gGrid->Rm(locjdl1);
1325 Info("WriteJDL", "\n##### Copying JDL file <%s> to your AliEn output directory", fJDLName.Data());
1326 TFile::Cp(Form("file:%s",fJDLName.Data()), Form("alien://%s", locjdl.Data()));
1328 Info("WriteJDL", "\n##### Copying merging JDL file <%s> to your AliEn output directory", mergeJDLName.Data());
1329 TFile::Cp(Form("file:%s",mergeJDLName.Data()), Form("alien://%s", locjdl1.Data()));
1335 //______________________________________________________________________________
1336 Bool_t AliAnalysisAlien::FileExists(const char *lfn)
1338 // Returns true if file exists.
1339 if (!gGrid) return kFALSE;
1340 TGridResult *res = gGrid->Ls(lfn);
1341 if (!res) return kFALSE;
1342 TMap *map = dynamic_cast<TMap*>(res->At(0));
1347 TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("name"));
1348 if (!objs || !objs->GetString().Length()) {
1356 //______________________________________________________________________________
1357 Bool_t AliAnalysisAlien::DirectoryExists(const char *dirname)
1359 // Returns true if directory exists. Can be also a path.
1360 if (!gGrid) return kFALSE;
1361 // Check if dirname is a path
1362 TString dirstripped = dirname;
1363 dirstripped = dirstripped.Strip();
1364 dirstripped = dirstripped.Strip(TString::kTrailing, '/');
1365 TString dir = gSystem->BaseName(dirstripped);
1367 TString path = gSystem->DirName(dirstripped);
1368 TGridResult *res = gGrid->Ls(path, "-F");
1369 if (!res) return kFALSE;
1373 while ((map=dynamic_cast<TMap*>(next()))) {
1374 obj = map->GetValue("name");
1376 if (dir == obj->GetName()) {
1385 //______________________________________________________________________________
1386 void AliAnalysisAlien::CheckDataType(const char *lfn, Bool_t &isCollection, Bool_t &isXml, Bool_t &useTags)
1388 // Check input data type.
1389 isCollection = kFALSE;
1393 Error("CheckDataType", "No connection to grid");
1396 isCollection = IsCollection(lfn);
1397 TString msg = "\n##### file: ";
1400 msg += " type: raw_collection;";
1401 // special treatment for collections
1403 // check for tag files in the collection
1404 TGridResult *res = gGrid->Command(Form("listFilesFromCollection -z -v %s",lfn), kFALSE);
1406 msg += " using_tags: No (unknown)";
1407 Info("CheckDataType", "%s", msg.Data());
1410 const char* typeStr = res->GetKey(0, "origLFN");
1411 if (!typeStr || !strlen(typeStr)) {
1412 msg += " using_tags: No (unknown)";
1413 Info("CheckDataType", "%s", msg.Data());
1416 TString file = typeStr;
1417 useTags = file.Contains(".tag");
1418 if (useTags) msg += " using_tags: Yes";
1419 else msg += " using_tags: No";
1420 Info("CheckDataType", "%s", msg.Data());
1425 isXml = slfn.Contains(".xml");
1427 // Open xml collection and check if there are tag files inside
1428 msg += " type: xml_collection;";
1429 TGridCollection *coll = (TGridCollection*)gROOT->ProcessLine(Form("TAlienCollection::Open(\"alien://%s\",1);",lfn));
1431 msg += " using_tags: No (unknown)";
1432 Info("CheckDataType", "%s", msg.Data());
1435 TMap *map = coll->Next();
1437 msg += " using_tags: No (unknown)";
1438 Info("CheckDataType", "%s", msg.Data());
1441 map = (TMap*)map->GetValue("");
1443 if (map && map->GetValue("name")) file = map->GetValue("name")->GetName();
1444 useTags = file.Contains(".tag");
1446 if (useTags) msg += " using_tags: Yes";
1447 else msg += " using_tags: No";
1448 Info("CheckDataType", "%s", msg.Data());
1451 useTags = slfn.Contains(".tag");
1452 if (slfn.Contains(".root")) msg += " type: root file;";
1453 else msg += " type: unknown file;";
1454 if (useTags) msg += " using_tags: Yes";
1455 else msg += " using_tags: No";
1456 Info("CheckDataType", "%s", msg.Data());
1459 //______________________________________________________________________________
1460 void AliAnalysisAlien::EnablePackage(const char *package)
1462 // Enables a par file supposed to exist in the current directory.
1463 TString pkg(package);
1464 pkg.ReplaceAll(".par", "");
1466 if (gSystem->AccessPathName(pkg)) {
1467 Fatal("EnablePackage", "Package %s not found", pkg.Data());
1470 if (!TObject::TestBit(AliAnalysisGrid::kUsePars))
1471 Info("EnablePackage", "AliEn plugin will use .par packages");
1472 TObject::SetBit(AliAnalysisGrid::kUsePars, kTRUE);
1474 fPackages = new TObjArray();
1475 fPackages->SetOwner();
1477 fPackages->Add(new TObjString(pkg));
1480 //______________________________________________________________________________
1481 TChain *AliAnalysisAlien::GetChainForTestMode(const char *treeName) const
1483 // Make a tree from files having the location specified in fFileForTestMode.
1484 // Inspired from JF's CreateESDChain.
1485 if (fFileForTestMode.IsNull()) {
1486 Error("GetChainForTestMode", "For proof test mode please use SetFileForTestMode() pointing to a file that contains data file locations.");
1489 if (gSystem->AccessPathName(fFileForTestMode)) {
1490 Error("GetChainForTestMode", "File not found: %s", fFileForTestMode.Data());
1495 in.open(fFileForTestMode);
1497 // Read the input list of files and add them to the chain
1499 TChain *chain = new TChain(treeName);
1503 if (line.IsNull()) continue;
1504 if (count++ == fNtestFiles) break;
1505 TString esdFile(line);
1506 TFile *file = TFile::Open(esdFile);
1508 if (!file->IsZombie()) chain->Add(esdFile);
1511 Error("GetChainforTestMode", "Skipping un-openable file: %s", esdFile.Data());
1515 if (!chain->GetListOfFiles()->GetEntries()) {
1516 Error("GetChainForTestMode", "No file from %s could be opened", fFileForTestMode.Data());
1524 //______________________________________________________________________________
1525 const char *AliAnalysisAlien::GetJobStatus(Int_t jobidstart, Int_t lastid, Int_t &nrunning, Int_t &nwaiting, Int_t &nerror, Int_t &ndone)
1527 // Get job status for all jobs with jobid>jobidstart.
1528 static char mstatus[20];
1534 TGridJobStatusList *list = gGrid->Ps("");
1535 if (!list) return mstatus;
1536 Int_t nentries = list->GetSize();
1537 TGridJobStatus *status;
1539 for (Int_t ijob=0; ijob<nentries; ijob++) {
1540 status = (TGridJobStatus *)list->At(ijob);
1541 pid = gROOT->ProcessLine(Form("atoi(((TAlienJobStatus*)0x%lx)->GetKey(\"queueId\"));", (ULong_t)status));
1542 if (pid<jobidstart) continue;
1543 if (pid == lastid) {
1544 gROOT->ProcessLine(Form("sprintf((char*)0x%lx,((TAlienJobStatus*)0x%lx)->GetKey(\"status\"));",(ULong_t)mstatus, (ULong_t)status));
1546 switch (status->GetStatus()) {
1547 case TGridJobStatus::kWAITING:
1549 case TGridJobStatus::kRUNNING:
1551 case TGridJobStatus::kABORTED:
1552 case TGridJobStatus::kFAIL:
1553 case TGridJobStatus::kUNKNOWN:
1555 case TGridJobStatus::kDONE:
1564 //______________________________________________________________________________
1565 Bool_t AliAnalysisAlien::IsCollection(const char *lfn) const
1567 // Returns true if file is a collection. Functionality duplicated from
1568 // TAlien::Type() because we don't want to directly depend on TAlien.
1570 Error("IsCollection", "No connection to grid");
1573 TGridResult *res = gGrid->Command(Form("type -z %s",lfn),kFALSE);
1574 if (!res) return kFALSE;
1575 const char* typeStr = res->GetKey(0, "type");
1576 if (!typeStr || !strlen(typeStr)) return kFALSE;
1577 if (!strcmp(typeStr, "collection")) return kTRUE;
1582 //______________________________________________________________________________
1583 Bool_t AliAnalysisAlien::IsSingleOutput() const
1585 // Check if single-ouput option is on.
1586 return (!fOutputSingle.IsNull());
1589 //______________________________________________________________________________
1590 void AliAnalysisAlien::Print(Option_t *) const
1592 // Print current plugin settings.
1593 printf("### AliEn analysis plugin current settings ###\n");
1594 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
1595 if (mgr && mgr->IsProofMode()) {
1596 TString proofType = "= PLUGIN IN PROOF MODE ON CLUSTER:_________________";
1597 if (TestBit(AliAnalysisGrid::kTest))
1598 proofType = "= PLUGIN IN PROOF LITE MODE ON CLUSTER:____________";
1599 printf("%s %s\n", proofType.Data(), fProofCluster.Data());
1600 if (!fProofDataSet.IsNull())
1601 printf("= Requested data set:___________________________ %s\n", fProofDataSet.Data());
1603 printf("= Soft reset signal will be send to master______ CHANGE BEHAVIOR AFTER COMPLETION\n");
1605 printf("= Hard reset signal will be send to master______ CHANGE BEHAVIOR AFTER COMPLETION\n");
1606 if (!fRootVersionForProof.IsNull())
1607 printf("= ROOT version requested________________________ %s\n", fRootVersionForProof.Data());
1609 printf("= ROOT version requested________________________ default\n");
1610 printf("= AliRoot version requested_____________________ %s\n", fAliROOTVersion.Data());
1611 if (!fAliRootMode.IsNull())
1612 printf("= Requested AliRoot mode________________________ %s\n", fAliRootMode.Data());
1614 printf("= Number of PROOF workers limited to____________ %d\n", fNproofWorkers);
1615 if (fNproofWorkersPerSlave)
1616 printf("= Maximum number of workers per slave___________ %d\n", fNproofWorkersPerSlave);
1617 if (TestSpecialBit(kClearPackages))
1618 printf("= ClearPackages requested...\n");
1619 if (fIncludePath.Data())
1620 printf("= Include path for runtime task compilation: ___ %s\n", fIncludePath.Data());
1621 printf("= Additional libs to be loaded or souces to be compiled runtime: <%s>\n",fAdditionalLibs.Data());
1622 if (fPackages && fPackages->GetEntries()) {
1623 TIter next(fPackages);
1626 while ((obj=next())) list += obj->GetName();
1627 printf("= Par files to be used: ________________________ %s\n", list.Data());
1629 if (TestSpecialBit(kProofConnectGrid))
1630 printf("= Requested PROOF connection to grid\n");
1633 printf("= OverwriteMode:________________________________ %d\n", fOverwriteMode);
1634 if (fOverwriteMode) {
1635 printf("***** NOTE: Overwrite mode will overwrite the input generated datasets and partial results from previous analysis. \
1636 \n***** To disable, use: plugin->SetOverwriteMode(kFALSE);\n");
1638 printf("= Copy files to grid: __________________________ %s\n", (IsUseCopy())?"YES":"NO");
1639 printf("= Check if files can be copied to grid: ________ %s\n", (IsCheckCopy())?"YES":"NO");
1640 printf("= Production mode:______________________________ %d\n", fProductionMode);
1641 printf("= Version of API requested: ____________________ %s\n", fAPIVersion.Data());
1642 printf("= Version of ROOT requested: ___________________ %s\n", fROOTVersion.Data());
1643 printf("= Version of AliRoot requested: ________________ %s\n", fAliROOTVersion.Data());
1645 printf("= User running the plugin: _____________________ %s\n", fUser.Data());
1646 printf("= Grid workdir relative to user $HOME: _________ %s\n", fGridWorkingDir.Data());
1647 printf("= Grid output directory relative to workdir: ___ %s\n", fGridOutputDir.Data());
1648 printf("= Data base directory path requested: __________ %s\n", fGridDataDir.Data());
1649 printf("= Data search pattern: _________________________ %s\n", fDataPattern.Data());
1650 printf("= Input data format: ___________________________ %s\n", fInputFormat.Data());
1651 if (fRunNumbers.Length())
1652 printf("= Run numbers to be processed: _________________ %s\n", fRunNumbers.Data());
1654 printf("= Run range to be processed: ___________________ %s%d-%s%d\n", fRunPrefix.Data(), fRunRange[0], fRunPrefix.Data(), fRunRange[1]);
1655 if (!fRunRange[0] && !fRunNumbers.Length()) {
1656 TIter next(fInputFiles);
1659 while ((obj=next())) list += obj->GetName();
1660 printf("= Input files to be processed: _________________ %s\n", list.Data());
1662 if (TestBit(AliAnalysisGrid::kTest))
1663 printf("= Number of input files used in test mode: _____ %d\n", fNtestFiles);
1664 printf("= List of output files to be registered: _______ %s\n", fOutputFiles.Data());
1665 printf("= List of outputs going to be archived: ________ %s\n", fOutputArchive.Data());
1666 printf("= List of outputs that should not be merged: ___ %s\n", fMergeExcludes.Data());
1667 printf("= List of outputs produced during Terminate: ___ %s\n", fTerminateFiles.Data());
1668 printf("=====================================================================\n");
1669 printf("= Job price: ___________________________________ %d\n", fPrice);
1670 printf("= Time to live (TTL): __________________________ %d\n", fTTL);
1671 printf("= Max files per subjob: ________________________ %d\n", fSplitMaxInputFileNumber);
1672 if (fMaxInitFailed>0)
1673 printf("= Max number of subjob fails to kill: __________ %d\n", fMaxInitFailed);
1674 if (fMasterResubmitThreshold>0)
1675 printf("= Resubmit master job if failed subjobs >_______ %d\n", fMasterResubmitThreshold);
1676 printf("= Number of replicas for the output files_______ %d\n", fNreplicas);
1677 if (fNrunsPerMaster>0)
1678 printf("= Number of runs per master job: _______________ %d\n", fNrunsPerMaster);
1679 printf("= Number of files in one chunk to be merged: ___ %d\n", fMaxMergeFiles);
1680 printf("= Name of the generated execution script: ______ %s\n", fExecutable.Data());
1681 printf("= Executable command: __________________________ %s\n", fExecutableCommand.Data());
1682 if (fArguments.Length())
1683 printf("= Arguments for the execution script: __________ %s\n",fArguments.Data());
1684 if (fExecutableArgs.Length())
1685 printf("= Arguments after macro name in executable______ %s\n",fExecutableArgs.Data());
1686 printf("= Name of the generated analysis macro: ________ %s\n",fAnalysisMacro.Data());
1687 printf("= User analysis files to be deployed: __________ %s\n",fAnalysisSource.Data());
1688 printf("= Additional libs to be loaded or souces to be compiled runtime: <%s>\n",fAdditionalLibs.Data());
1689 printf("= Master jobs split mode: ______________________ %s\n",fSplitMode.Data());
1691 printf("= Custom name for the dataset to be created: ___ %s\n", fDatasetName.Data());
1692 printf("= Name of the generated JDL: ___________________ %s\n", fJDLName.Data());
1693 if (fIncludePath.Data())
1694 printf("= Include path for runtime task compilation: ___ %s\n", fIncludePath.Data());
1695 if (fCloseSE.Length())
1696 printf("= Force job outputs to storage element: ________ %s\n", fCloseSE.Data());
1697 if (fFriendChainName.Length())
1698 printf("= Open friend chain file on worker: ____________ %s\n", fFriendChainName.Data());
1699 if (fPackages && fPackages->GetEntries()) {
1700 TIter next(fPackages);
1703 while ((obj=next())) list += obj->GetName();
1704 printf("= Par files to be used: ________________________ %s\n", list.Data());
1708 //______________________________________________________________________________
1709 void AliAnalysisAlien::SetDefaults()
1711 // Set default values for everything. What cannot be filled will be left empty.
1712 if (fGridJDL) delete fGridJDL;
1713 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
1714 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
1717 fSplitMaxInputFileNumber = 100;
1719 fMasterResubmitThreshold = 0;
1724 fNrunsPerMaster = 1;
1725 fMaxMergeFiles = 100;
1727 fExecutable = "analysis.sh";
1728 fExecutableCommand = "root -b -q";
1730 fExecutableArgs = "";
1731 fAnalysisMacro = "myAnalysis.C";
1732 fAnalysisSource = "";
1733 fAdditionalLibs = "";
1737 fAliROOTVersion = "";
1738 fUser = ""; // Your alien user name
1739 fGridWorkingDir = "";
1740 fGridDataDir = ""; // Can be like: /alice/sim/PDC_08a/LHC08c9/
1741 fDataPattern = "*AliESDs.root"; // Can be like: *AliESDs.root, */pass1/*AliESDs.root, ...
1742 fFriendChainName = "";
1743 fGridOutputDir = "output";
1744 fOutputArchive = "log_archive.zip:std*@disk=1 root_archive.zip:*.root@disk=2";
1745 fOutputFiles = ""; // Like "AliAODs.root histos.root"
1746 fInputFormat = "xml-single";
1747 fJDLName = "analysis.jdl";
1748 fJobTag = "Automatically generated analysis JDL";
1749 fMergeExcludes = "";
1752 SetCheckCopy(kTRUE);
1753 SetDefaultOutputs(kTRUE);
1757 //______________________________________________________________________________
1758 Bool_t AliAnalysisAlien::CheckMergedFiles(const char *filename, const char *aliendir, Int_t nperchunk, Bool_t submit, const char *jdl)
1760 // Static method that checks the status of merging. This can submit merging jobs that did not produced the expected
1761 // output. If <submit> is false (checking) returns true only when the final merged file was found. If submit is true returns
1762 // true if the jobs were successfully submitted.
1763 Int_t countOrig = 0;
1764 Int_t countStage = 0;
1767 Bool_t doneFinal = kFALSE;
1769 TString saliendir(aliendir);
1770 TString sfilename, stmp;
1771 saliendir.ReplaceAll("//","/");
1772 saliendir = saliendir.Strip(TString::kTrailing, '/');
1774 ::Error("GetNregisteredFiles", "You need to be connected to AliEn.");
1777 sfilename = filename;
1778 sfilename.ReplaceAll(".root", "*.root");
1779 printf("Checking directory <%s> for merged files <%s> ...\n", aliendir, sfilename.Data());
1780 TString command = Form("find %s/ *%s", saliendir.Data(), sfilename.Data());
1781 TGridResult *res = gGrid->Command(command);
1783 ::Error("GetNregisteredFiles","Error: No result for the find command\n");
1788 while ((map=(TMap*)nextmap())) {
1789 TString turl = map->GetValue("turl")->GetName();
1790 if (!turl.Length()) {
1795 turl.ReplaceAll("alien://", "");
1796 turl.ReplaceAll(saliendir, "");
1797 sfilename = gSystem->BaseName(turl);
1798 turl = turl.Strip(TString::kLeading, '/');
1799 // Now check to what the file corresponds to:
1800 // original output - aliendir/%03d/filename
1801 // merged file (which stage) - aliendir/filename-Stage%02d_%04d
1802 // final merged file - aliendir/filename
1803 if (sfilename == turl) {
1804 if (sfilename == filename) {
1808 Int_t index = sfilename.Index("Stage");
1809 if (index<0) continue;
1810 stmp = sfilename(index+5,2);
1811 Int_t istage = atoi(stmp);
1812 stmp = sfilename(index+8,4);
1813 Int_t ijob = atoi(stmp);
1814 if (istage<stage) continue; // Ignore lower stages
1817 chunksDone.ResetAllBits();
1821 chunksDone.SetBitNumber(ijob);
1828 printf("=> Removing files from previous stages...\n");
1829 gGrid->Rm(Form("%s/*Stage*.root", aliendir));
1830 for (i=1; i<stage; i++)
1831 gGrid->Rm(Form("%s/*Stage%d*.zip", aliendir, i));
1836 // Compute number of jobs that were submitted for the current stage
1837 Int_t ntotstage = countOrig;
1838 for (i=1; i<=stage; i++) {
1839 if (ntotstage%nperchunk) ntotstage = (ntotstage/nperchunk)+1;
1840 else ntotstage = (ntotstage/nperchunk);
1842 // Now compare with the number of set bits in the chunksDone array
1843 Int_t nmissing = (stage>0)?(ntotstage - countStage):0;
1845 printf("*** Found %d original files\n", countOrig);
1846 if (stage==0) printf("*** No merging completed so far.\n");
1847 else printf("*** Found %d out of %d files merged for stage %d\n", countStage, ntotstage, stage);
1848 if (nmissing) printf("*** Number of merged files missing for this stage: %d -> check merging job completion\n", nmissing);
1849 if (!submit) return doneFinal;
1850 // Sumbit merging jobs for all missing chunks for the current stage.
1851 TString query = Form("submit %s %s", jdl, aliendir);
1854 for (i=0; i<nmissing; i++) {
1855 ichunk = chunksDone.FirstNullBit(ichunk+1);
1856 Int_t jobId = SubmitSingleJob(Form("%s %d %d", query.Data(), stage, ichunk));
1857 if (!jobId) return kFALSE;
1861 // Submit next stage of merging
1862 if (stage==0) countStage = countOrig;
1863 Int_t nchunks = (countStage/nperchunk);
1864 if (countStage%nperchunk) nchunks += 1;
1865 for (i=0; i<nchunks; i++) {
1866 Int_t jobId = SubmitSingleJob(Form("%s %d %d", query.Data(), stage+1, i));
1867 if (!jobId) return kFALSE;
1872 //______________________________________________________________________________
1873 Int_t AliAnalysisAlien::SubmitSingleJob(const char *query)
1875 // Submits a single job corresponding to the query and returns job id. If 0 submission failed.
1876 if (!gGrid) return 0;
1877 printf("=> %s ------> ",query);
1878 TGridResult *res = gGrid->Command(query);
1880 TString jobId = res->GetKey(0,"jobId");
1882 if (jobId.IsNull()) {
1883 printf("submission failed. Reason:\n");
1886 ::Error("SubmitSingleJob", "Your query %s could not be submitted", query);
1889 printf(" Job id: %s\n", jobId.Data());
1893 //______________________________________________________________________________
1894 Bool_t AliAnalysisAlien::MergeOutput(const char *output, const char *basedir, Int_t nmaxmerge, Int_t stage, Int_t ichunk)
1896 // Merge given output files from basedir. The file merger will merge nmaxmerge
1897 // files in a group. Merging can be done in stages:
1898 // stage=0 : will merge all existing files in a single stage
1899 // stage=1 : does a find command for all files that do NOT contain the string "Stage".
1900 // If their number is bigger that nmaxmerge, only the files from
1901 // ichunk*nmaxmerge to ichunk*(nmaxmerge+1)-1 will get merged as output_stage_<ichunk>
1902 // stage=n : does a find command for files named <output>Stage<stage-1>_*. If their number is bigger than
1903 // nmaxmerge, merge just the chunk ichunk, otherwise write the merged output to the file
1905 TString outputFile = output;
1907 TString outputChunk;
1908 TString previousChunk = "";
1909 Int_t countChunk = 0;
1910 Int_t countZero = nmaxmerge;
1911 Bool_t merged = kTRUE;
1912 Int_t index = outputFile.Index("@");
1913 if (index > 0) outputFile.Remove(index);
1914 TString inputFile = outputFile;
1915 if (stage>1) inputFile.ReplaceAll(".root", Form("-Stage%02d_*.root", stage-1));
1916 command = Form("find %s/ *%s", basedir, inputFile.Data());
1917 printf("command: %s\n", command.Data());
1918 TGridResult *res = gGrid->Command(command);
1920 ::Error("MergeOutput","No result for the find command\n");
1924 TFileMerger *fm = 0;
1927 // Check if there is a merge operation to resume. Works only for stage 0 or 1.
1928 outputChunk = outputFile;
1929 outputChunk.ReplaceAll(".root", "_*.root");
1930 // Check for existent temporary merge files
1931 // Check overwrite mode and remove previous partial results if needed
1932 // Preserve old merging functionality for stage 0.
1934 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
1936 // Skip as many input files as in a chunk
1937 for (Int_t counter=0; counter<nmaxmerge; counter++) map = (TMap*)nextmap();
1939 ::Error("MergeOutput", "Cannot resume merging for <%s>, nentries=%d", outputFile.Data(), res->GetSize());
1943 outputChunk = outputFile;
1944 outputChunk.ReplaceAll(".root", Form("_%04d.root", countChunk));
1946 if (gSystem->AccessPathName(outputChunk)) continue;
1947 // Merged file with chunks up to <countChunk> found
1948 ::Info("MergeOutput", "Resume merging of <%s> from <%s>\n", outputFile.Data(), outputChunk.Data());
1949 previousChunk = outputChunk;
1953 countZero = nmaxmerge;
1955 while ((map=(TMap*)nextmap())) {
1956 // Loop 'find' results and get next LFN
1957 if (countZero == nmaxmerge) {
1958 // First file in chunk - create file merger and add previous chunk if any.
1959 fm = new TFileMerger(kFALSE);
1960 fm->SetFastMethod(kTRUE);
1961 if (previousChunk.Length()) fm->AddFile(previousChunk.Data());
1962 outputChunk = outputFile;
1963 outputChunk.ReplaceAll(".root", Form("_%04d.root", countChunk));
1965 // If last file found, put merged results in the output file
1966 if (map == res->Last()) outputChunk = outputFile;
1967 TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("turl"));
1968 if (!objs || !objs->GetString().Length()) {
1969 // Nothing found - skip this output
1974 // Add file to be merged and decrement chunk counter.
1975 fm->AddFile(objs->GetString());
1977 if (countZero==0 || map == res->Last()) {
1978 if (!fm->GetMergeList() || !fm->GetMergeList()->GetSize()) {
1979 // Nothing found - skip this output
1980 ::Warning("MergeOutput", "No <%s> files found.", inputFile.Data());
1985 fm->OutputFile(outputChunk);
1986 // Merge the outputs, then go to next chunk
1988 ::Error("MergeOutput", "Could not merge all <%s> files", outputFile.Data());
1993 ::Info("MergeOutputs", "\n##### Merged %d output files to <%s>", fm->GetMergeList()->GetSize(), outputChunk.Data());
1994 gSystem->Unlink(previousChunk);
1996 if (map == res->Last()) {
2002 countZero = nmaxmerge;
2003 previousChunk = outputChunk;
2008 // Merging stage different than 0.
2009 // Move to the begining of the requested chunk.
2010 outputChunk = outputFile;
2011 if (nmaxmerge < res->GetSize()) {
2012 if (ichunk*nmaxmerge >= res->GetSize()) {
2013 ::Error("MergeOutput", "Cannot merge merge chunk %d grouping %d files from %d total.", ichunk, nmaxmerge, res->GetSize());
2017 for (Int_t counter=0; counter<ichunk*nmaxmerge; counter++) nextmap();
2018 outputChunk.ReplaceAll(".root", Form("-Stage%02d_%04d.root", stage, ichunk));
2020 countZero = nmaxmerge;
2021 fm = new TFileMerger(kFALSE);
2022 fm->SetFastMethod(kTRUE);
2023 while ((map=(TMap*)nextmap())) {
2024 // Loop 'find' results and get next LFN
2025 TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("turl"));
2026 if (!objs || !objs->GetString().Length()) {
2027 // Nothing found - skip this output
2032 // Add file to be merged and decrement chunk counter.
2033 fm->AddFile(objs->GetString());
2035 if (countZero==0) break;
2038 if (!fm->GetMergeList() || !fm->GetMergeList()->GetSize()) {
2039 // Nothing found - skip this output
2040 ::Warning("MergeOutput", "No <%s> files found.", inputFile.Data());
2044 fm->OutputFile(outputChunk);
2045 // Merge the outputs
2047 ::Error("MergeOutput", "Could not merge all <%s> files", outputFile.Data());
2051 ::Info("MergeOutput", "\n##### Merged %d output files to <%s>", fm->GetMergeList()->GetSize(), outputChunk.Data());
2057 //______________________________________________________________________________
2058 Bool_t AliAnalysisAlien::MergeOutputs()
2060 // Merge analysis outputs existing in the AliEn space.
2061 if (TestBit(AliAnalysisGrid::kTest)) return kTRUE;
2062 if (TestBit(AliAnalysisGrid::kOffline)) return kFALSE;
2064 Error("MergeOutputs", "Cannot merge outputs without grid connection. Terminate will NOT be executed");
2068 if (!TestBit(AliAnalysisGrid::kMerge)) {
2069 Info("MergeOutputs", "### Re-run with <MergeViaJDL> option in terminate mode of the plugin to submit merging jobs ###");
2072 if (fProductionMode) {
2073 Info("MergeOutputs", "### Merging will be submitted by LPM manager... ###");
2076 Info("MergeOutputs", "Submitting merging JDL");
2077 if (!SubmitMerging()) return kFALSE;
2078 Info("MergeOutputs", "### Re-run with <MergeViaJDL> off to collect results after merging jobs are done ###");
2079 Info("MergeOutputs", "### The Terminate() method is executed by the merging jobs");
2082 // Get the output path
2083 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("/%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
2084 if (!DirectoryExists(fGridOutputDir)) {
2085 Error("MergeOutputs", "Grid output directory %s not found. Terminate() will NOT be executed", fGridOutputDir.Data());
2088 if (!fOutputFiles.Length()) {
2089 Error("MergeOutputs", "No output file names defined. Are you running the right AliAnalysisAlien configuration ?");
2092 // Check if fast read option was requested
2093 Info("MergeOutputs", "Started local merging of output files from: alien://%s \
2094 \n======= overwrite mode = %d", fGridOutputDir.Data(), (Int_t)fOverwriteMode);
2095 if (fFastReadOption) {
2096 Warning("MergeOutputs", "You requested FastRead option. Using xrootd flags to reduce timeouts. This may skip some files that could be accessed ! \
2097 \n+++ NOTE: To disable this option, use: plugin->SetFastReadOption(kFALSE)");
2098 gEnv->SetValue("XNet.ConnectTimeout",10);
2099 gEnv->SetValue("XNet.RequestTimeout",10);
2100 gEnv->SetValue("XNet.MaxRedirectCount",2);
2101 gEnv->SetValue("XNet.ReconnectTimeout",10);
2102 gEnv->SetValue("XNet.FirstConnectMaxCnt",1);
2104 // Make sure we change the temporary directory
2105 gSystem->Setenv("TMPDIR", gSystem->pwd());
2106 TObjArray *list = fOutputFiles.Tokenize(",");
2110 Bool_t merged = kTRUE;
2111 while((str=(TObjString*)next())) {
2112 outputFile = str->GetString();
2113 Int_t index = outputFile.Index("@");
2114 if (index > 0) outputFile.Remove(index);
2115 TString outputChunk = outputFile;
2116 outputChunk.ReplaceAll(".root", "_*.root");
2117 // Skip already merged outputs
2118 if (!gSystem->AccessPathName(outputFile)) {
2119 if (fOverwriteMode) {
2120 Info("MergeOutputs", "Overwrite mode. Existing file %s was deleted.", outputFile.Data());
2121 gSystem->Unlink(outputFile);
2122 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
2123 Info("MergeOutput", "Overwrite mode: partial merged files %s will removed",
2124 outputChunk.Data());
2125 gSystem->Exec(Form("rm -f %s", outputChunk.Data()));
2128 Info("MergeOutputs", "Output file <%s> found. Not merging again.", outputFile.Data());
2132 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
2133 Info("MergeOutput", "Overwrite mode: partial merged files %s will removed",
2134 outputChunk.Data());
2135 gSystem->Exec(Form("rm -f %s", outputChunk.Data()));
2138 if (fMergeExcludes.Length() &&
2139 fMergeExcludes.Contains(outputFile.Data())) continue;
2140 // Perform a 'find' command in the output directory, looking for registered outputs
2141 merged = MergeOutput(outputFile, fGridOutputDir, fMaxMergeFiles);
2143 Error("MergeOutputs", "Terminate() will NOT be executed");
2146 TFile *fileOpened = (TFile*)gROOT->GetListOfFiles()->FindObject(outputFile);
2147 if (fileOpened) fileOpened->Close();
2152 //______________________________________________________________________________
2153 void AliAnalysisAlien::SetDefaultOutputs(Bool_t flag)
2155 // Use the output files connected to output containers from the analysis manager
2156 // rather than the files defined by SetOutputFiles
2157 if (flag && !TObject::TestBit(AliAnalysisGrid::kDefaultOutputs))
2158 Info("SetDefaultOutputs", "Plugin will use the output files taken from analysis manager");
2159 TObject::SetBit(AliAnalysisGrid::kDefaultOutputs, flag);
2162 //______________________________________________________________________________
2163 void AliAnalysisAlien::SetOutputFiles(const char *list)
2165 // Manually set the output files list.
2166 // Removes duplicates. Not allowed if default outputs are not disabled.
2167 if (TObject::TestBit(AliAnalysisGrid::kDefaultOutputs)) {
2168 Fatal("SetOutputFiles", "You have to explicitly call SetDefaultOutputs(kFALSE) to manually set output files.");
2171 Info("SetOutputFiles", "Output file list is set manually - you are on your own.");
2173 TString slist = list;
2174 if (slist.Contains("@")) Warning("SetOutputFiles","The plugin does not allow explicit SE's. Please use: SetNumberOfReplicas() instead.");
2175 TObjArray *arr = slist.Tokenize(" ");
2179 while ((os=(TObjString*)next())) {
2180 sout = os->GetString();
2181 if (sout.Index("@")>0) sout.Remove(sout.Index("@"));
2182 if (fOutputFiles.Contains(sout)) continue;
2183 if (!fOutputFiles.IsNull()) fOutputFiles += ",";
2184 fOutputFiles += sout;
2189 //______________________________________________________________________________
2190 void AliAnalysisAlien::SetOutputArchive(const char *list)
2192 // Manually set the output archive list. Free text - you are on your own...
2193 // Not allowed if default outputs are not disabled.
2194 if (TObject::TestBit(AliAnalysisGrid::kDefaultOutputs)) {
2195 Fatal("SetOutputArchive", "You have to explicitly call SetDefaultOutputs(kFALSE) to manually set the output archives.");
2198 Info("SetOutputArchive", "Output archive is set manually - you are on your own.");
2199 fOutputArchive = list;
2202 //______________________________________________________________________________
2203 void AliAnalysisAlien::SetPreferedSE(const char */*se*/)
2205 // Setting a prefered output SE is not allowed anymore.
2206 Warning("SetPreferedSE", "Setting a preferential SE is not allowed anymore via the plugin. Use SetNumberOfReplicas() and SetDefaultOutputs()");
2209 //______________________________________________________________________________
2210 Bool_t AliAnalysisAlien::StartAnalysis(Long64_t /*nentries*/, Long64_t /*firstEntry*/)
2212 // Start remote grid analysis.
2213 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
2214 Bool_t testMode = TestBit(AliAnalysisGrid::kTest);
2215 if (!mgr || !mgr->IsInitialized()) {
2216 Error("StartAnalysis", "You need an initialized analysis manager for this");
2219 // Are we in PROOF mode ?
2220 if (mgr->IsProofMode()) {
2221 Info("StartAnalysis", "##### Starting PROOF analysis on cluster <%s> via the plugin #####", fProofCluster.Data());
2222 if (fProofCluster.IsNull()) {
2223 Error("StartAnalysis", "You need to specify the proof cluster name via SetProofCluster");
2226 if (fProofDataSet.IsNull() && !testMode) {
2227 Error("StartAnalysis", "You need to specify a dataset using SetProofDataSet()");
2230 // Set the needed environment
2231 gEnv->SetValue("XSec.GSI.DelegProxy","2");
2232 // Do we need to reset PROOF ? The success of the Reset operation cannot be checked
2233 if (fProofReset && !testMode) {
2234 if (fProofReset==1) {
2235 Info("StartAnalysis", "Sending soft reset signal to proof cluster %s", fProofCluster.Data());
2236 gROOT->ProcessLine(Form("TProof::Reset(\"%s\", kFALSE);", fProofCluster.Data()));
2238 Info("StartAnalysis", "Sending hard reset signal to proof cluster %s", fProofCluster.Data());
2239 gROOT->ProcessLine(Form("TProof::Reset(\"%s\", kTRUE);", fProofCluster.Data()));
2241 Info("StartAnalysis", "Stopping the analysis. Please use SetProofReset(0) to resume.");
2244 // Do we need to change the ROOT version ? The success of this cannot be checked.
2245 if (!fRootVersionForProof.IsNull() && !testMode) {
2246 gROOT->ProcessLine(Form("TProof::Mgr(\"%s\")->SetROOTVersion(\"%s\");",
2247 fProofCluster.Data(), fRootVersionForProof.Data()));
2249 // Connect to PROOF and check the status
2252 if (fNproofWorkersPerSlave) sworkers = Form("workers=%dx", fNproofWorkersPerSlave);
2253 else if (fNproofWorkers) sworkers = Form("workers=%d", fNproofWorkers);
2255 if (!sworkers.IsNull())
2256 proof = gROOT->ProcessLine(Form("TProof::Open(\"%s\", \"%s\");", fProofCluster.Data(), sworkers.Data()));
2258 proof = gROOT->ProcessLine(Form("TProof::Open(\"%s\");", fProofCluster.Data()));
2260 proof = gROOT->ProcessLine("TProof::Open(\"\");");
2262 Error("StartAnalysis", "Could not start PROOF in test mode");
2267 Error("StartAnalysis", "Could not connect to PROOF cluster <%s>", fProofCluster.Data());
2270 if (fNproofWorkersPerSlave*fNproofWorkers > 0)
2271 gROOT->ProcessLine(Form("gProof->SetParallel(%d);", fNproofWorkers));
2272 // Is dataset existing ?
2274 TString dataset = fProofDataSet;
2275 Int_t index = dataset.Index("#");
2276 if (index>=0) dataset.Remove(index);
2277 // if (!gROOT->ProcessLine(Form("gProof->ExistsDataSet(\"%s\");",fProofDataSet.Data()))) {
2278 // Error("StartAnalysis", "Dataset %s not existing", fProofDataSet.Data());
2281 // Info("StartAnalysis", "Dataset %s found", dataset.Data());
2283 // Is ClearPackages() needed ?
2284 if (TestSpecialBit(kClearPackages)) {
2285 Info("StartAnalysis", "ClearPackages signal sent to PROOF. Use SetClearPackages(kFALSE) to reset this.");
2286 gROOT->ProcessLine("gProof->ClearPackages();");
2288 // Is a given aliroot mode requested ?
2290 if (!fAliRootMode.IsNull()) {
2291 TString alirootMode = fAliRootMode;
2292 if (alirootMode == "default") alirootMode = "";
2293 Info("StartAnalysis", "You are requesting AliRoot mode: %s", fAliRootMode.Data());
2294 optionsList.SetOwner();
2295 optionsList.Add(new TNamed("ALIROOT_MODE", alirootMode.Data()));
2296 // Check the additional libs to be loaded
2298 if (!alirootMode.IsNull()) extraLibs = "ANALYSIS:ANALYSISalice";
2299 // Parse the extra libs for .so
2300 if (fAdditionalLibs.Length()) {
2301 TObjArray *list = fAdditionalLibs.Tokenize(" ");
2304 while((str=(TObjString*)next()) && str->GetString().Contains(".so")) {
2305 TString stmp = str->GetName();
2306 if (stmp.BeginsWith("lib")) stmp.Remove(0,3);
2307 stmp.ReplaceAll(".so","");
2308 if (!extraLibs.IsNull()) extraLibs += ":";
2311 if (list) delete list;
2313 if (!extraLibs.IsNull()) optionsList.Add(new TNamed("ALIROOT_EXTRA_LIBS",extraLibs.Data()));
2314 // Check extra includes
2315 if (!fIncludePath.IsNull()) {
2316 TString includePath = fIncludePath;
2317 includePath.ReplaceAll(" ",":");
2318 includePath.Strip(TString::kTrailing, ':');
2319 Info("StartAnalysis", "Adding extra includes: %s",includePath.Data());
2320 optionsList.Add(new TNamed("ALIROOT_EXTRA_INCLUDES",includePath.Data()));
2322 // Check if connection to grid is requested
2323 if (TestSpecialBit(kProofConnectGrid))
2324 optionsList.Add(new TNamed("ALIROOT_ENABLE_ALIEN", "1"));
2325 // Enable AliRoot par
2327 // Enable proof lite package
2328 TString alirootLite = gSystem->ExpandPathName("$ALICE_ROOT/ANALYSIS/macros/AliRootProofLite.par");
2329 for (Int_t i=0; i<optionsList.GetSize(); i++) {
2330 TNamed *obj = (TNamed*)optionsList.At(i);
2331 printf("%s %s\n", obj->GetName(), obj->GetTitle());
2333 if (!gROOT->ProcessLine(Form("gProof->UploadPackage(\"%s\");",alirootLite.Data()))
2334 && !gROOT->ProcessLine(Form("gProof->EnablePackage(\"%s\", (TList*)0x%lx);",alirootLite.Data(),(ULong_t)&optionsList))) {
2335 Info("StartAnalysis", "AliRootProofLite enabled");
2337 Error("StartAnalysis", "There was an error trying to enable package AliRootProofLite.par");
2341 if (gROOT->ProcessLine(Form("gProof->EnablePackage(\"VO_ALICE@AliRoot::%s\", (TList*)0x%lx);",
2342 fAliROOTVersion.Data(), (ULong_t)&optionsList))) {
2343 Error("StartAnalysis", "There was an error trying to enable package VO_ALICE@AliRoot::%s", fAliROOTVersion.Data());
2348 if (fAdditionalLibs.Contains(".so") && !testMode) {
2349 Error("StartAnalysis", "You request additional libs to be loaded but did not enabled any AliRoot mode. Please refer to: \
2350 \n http://aaf.cern.ch/node/83 and use a parameter for SetAliRootMode()");
2354 // Enable par files if requested
2355 if (fPackages && fPackages->GetEntries()) {
2356 TIter next(fPackages);
2358 while ((package=next())) {
2359 if (gROOT->ProcessLine(Form("gProof->UploadPackage(\"%s\");", package->GetName()))) {
2360 if (gROOT->ProcessLine(Form("gProof->EnablePackage(\"%s\",kTRUE);", package->GetName()))) {
2361 Error("StartAnalysis", "There was an error trying to enable package %s", package->GetName());
2365 Error("StartAnalysis", "There was an error trying to upload package %s", package->GetName());
2370 // Do we need to load analysis source files ?
2371 // NOTE: don't load on client since this is anyway done by the user to attach his task.
2372 if (fAnalysisSource.Length()) {
2373 TObjArray *list = fAnalysisSource.Tokenize(" ");
2376 while((str=(TObjString*)next())) {
2377 gROOT->ProcessLine(Form("gProof->Load(\"%s+g\", kTRUE);", str->GetName()));
2379 if (list) delete list;
2382 // Register dataset to proof lite.
2383 if (fFileForTestMode.IsNull()) {
2384 Error("GetChainForTestMode", "For proof test mode please use SetFileForTestMode() pointing to a file that contains data file locations.");
2387 if (gSystem->AccessPathName(fFileForTestMode)) {
2388 Error("GetChainForTestMode", "File not found: %s", fFileForTestMode.Data());
2391 TFileCollection *coll = new TFileCollection();
2392 coll->AddFromFile(fFileForTestMode);
2393 gROOT->ProcessLine(Form("gProof->RegisterDataSet(\"test_collection\", (TFileCollection*)0x%lx, \"OV\");", (ULong_t)coll));
2394 gROOT->ProcessLine("gProof->ShowDataSets()");
2399 // Check if output files have to be taken from the analysis manager
2400 if (TestBit(AliAnalysisGrid::kDefaultOutputs)) {
2401 // Add output files and AOD files
2402 fOutputFiles = GetListOfFiles("outaod");
2403 // Add extra files registered to the analysis manager
2404 TString extra = GetListOfFiles("ext");
2405 if (!extra.IsNull()) {
2406 extra.ReplaceAll(".root", "*.root");
2407 if (!fOutputFiles.IsNull()) fOutputFiles += ",";
2408 fOutputFiles += extra;
2410 // Compose the output archive.
2411 fOutputArchive = "log_archive.zip:std*@disk=1 ";
2412 fOutputArchive += Form("root_archive.zip:%s@disk=%d",fOutputFiles.Data(),fNreplicas);
2414 // if (!fCloseSE.Length()) fCloseSE = gSystem->Getenv("alien_CLOSE_SE");
2415 if (TestBit(AliAnalysisGrid::kOffline)) {
2416 Info("StartAnalysis","\n##### OFFLINE MODE ##### Files to be used in GRID are produced but not copied \
2417 \n there nor any job run. You can revise the JDL and analysis \
2418 \n macro then run the same in \"submit\" mode.");
2419 } else if (TestBit(AliAnalysisGrid::kTest)) {
2420 Info("StartAnalysis","\n##### LOCAL MODE ##### Your analysis will be run locally on a subset of the requested \
2422 } else if (TestBit(AliAnalysisGrid::kSubmit)) {
2423 Info("StartAnalysis","\n##### SUBMIT MODE ##### Files required by your analysis are copied to your grid working \
2424 \n space and job submitted.");
2425 } else if (TestBit(AliAnalysisGrid::kMerge)) {
2426 Info("StartAnalysis","\n##### MERGE MODE ##### The registered outputs of the analysis will be merged");
2427 if (fMergeViaJDL) CheckInputData();
2430 Info("StartAnalysis","\n##### FULL ANALYSIS MODE ##### Producing needed files and submitting your analysis job...");
2435 Error("StartAnalysis", "Cannot start grid analysis without grid connection");
2438 if (IsCheckCopy() && gGrid) CheckFileCopy(gGrid->GetHomeDirectory());
2439 if (!CheckInputData()) {
2440 Error("StartAnalysis", "There was an error in preprocessing your requested input data");
2443 if (!CreateDataset(fDataPattern)) {
2445 if (!fRunNumbers.Length() && !fRunRange[0]) serror = Form("path to data directory: <%s>", fGridDataDir.Data());
2446 if (fRunNumbers.Length()) serror = "run numbers";
2447 if (fRunRange[0]) serror = Form("run range [%d, %d]", fRunRange[0], fRunRange[1]);
2448 serror += Form("\n or data pattern <%s>", fDataPattern.Data());
2449 Error("StartAnalysis", "No data to process. Please fix %s in your plugin configuration.", serror.Data());
2452 WriteAnalysisFile();
2453 WriteAnalysisMacro();
2455 WriteValidationScript();
2457 WriteMergingMacro();
2458 WriteMergeExecutable();
2459 WriteValidationScript(kTRUE);
2461 if (!CreateJDL()) return kFALSE;
2462 if (TestBit(AliAnalysisGrid::kOffline)) return kFALSE;
2464 // Locally testing the analysis
2465 Info("StartAnalysis", "\n_______________________________________________________________________ \
2466 \n Running analysis script in a daughter shell as on a worker node \
2467 \n_______________________________________________________________________");
2468 TObjArray *list = fOutputFiles.Tokenize(",");
2472 while((str=(TObjString*)next())) {
2473 outputFile = str->GetString();
2474 Int_t index = outputFile.Index("@");
2475 if (index > 0) outputFile.Remove(index);
2476 if (!gSystem->AccessPathName(outputFile)) gSystem->Exec(Form("rm %s", outputFile.Data()));
2479 gSystem->Exec(Form("bash %s 2>stderr", fExecutable.Data()));
2480 gSystem->Exec(Form("bash %s",fValidationScript.Data()));
2481 // gSystem->Exec("cat stdout");
2484 // Check if submitting is managed by LPM manager
2485 if (fProductionMode) {
2486 TString prodfile = fJDLName;
2487 prodfile.ReplaceAll(".jdl", ".prod");
2488 WriteProductionFile(prodfile);
2489 Info("StartAnalysis", "Job submitting is managed by LPM. Rerun in terminate mode after jobs finished.");
2492 // Submit AliEn job(s)
2493 gGrid->Cd(fGridOutputDir);
2496 if (!fRunNumbers.Length() && !fRunRange[0]) {
2497 // Submit a given xml or a set of runs
2498 res = gGrid->Command(Form("submit %s", fJDLName.Data()));
2499 printf("*************************** %s\n",Form("submit %s", fJDLName.Data()));
2501 const char *cjobId = res->GetKey(0,"jobId");
2505 Error("StartAnalysis", "Your JDL %s could not be submitted", fJDLName.Data());
2508 Info("StartAnalysis", "\n_______________________________________________________________________ \
2509 \n##### Your JDL %s was successfully submitted. \nTHE JOB ID IS: %s \
2510 \n_______________________________________________________________________",
2511 fJDLName.Data(), cjobId);
2516 Error("StartAnalysis", "No grid result after submission !!! Bailing out...");
2520 // Submit for a range of enumeration of runs.
2521 if (!Submit()) return kFALSE;
2524 Info("StartAnalysis", "\n#### STARTING AN ALIEN SHELL FOR YOU. EXIT WHEN YOUR JOB %s HAS FINISHED. #### \
2525 \n You may exit at any time and terminate the job later using the option <terminate> \
2526 \n ##################################################################################", jobID.Data());
2527 gSystem->Exec("aliensh");
2531 //______________________________________________________________________________
2532 const char *AliAnalysisAlien::GetListOfFiles(const char *type)
2534 // Get a comma-separated list of output files of the requested type.
2535 // Type can be (case unsensitive):
2536 // aod - list of aod files (std, extensions and filters)
2537 // out - list of output files connected to containers (but not aod's or extras)
2538 // ext - list of extra files registered to the manager
2539 // ter - list of files produced in terminate
2540 static TString files;
2542 TString stype = type;
2544 TString aodfiles, extra;
2545 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
2547 ::Error("GetListOfFiles", "Cannot call this without analysis manager");
2548 return files.Data();
2550 if (mgr->GetOutputEventHandler()) {
2551 aodfiles = mgr->GetOutputEventHandler()->GetOutputFileName();
2552 TString extraaod = mgr->GetOutputEventHandler()->GetExtraOutputs();
2553 if (!extraaod.IsNull()) {
2555 aodfiles += extraaod;
2558 if (stype.Contains("aod")) {
2560 if (stype == "aod") return files.Data();
2562 // Add output files that are not in the list of AOD files
2563 TString outputfiles = "";
2564 TIter next(mgr->GetOutputs());
2565 AliAnalysisDataContainer *output;
2566 const char *filename = 0;
2567 while ((output=(AliAnalysisDataContainer*)next())) {
2568 filename = output->GetFileName();
2569 if (!(strcmp(filename, "default"))) continue;
2570 if (outputfiles.Contains(filename)) continue;
2571 if (aodfiles.Contains(filename)) continue;
2572 if (!outputfiles.IsNull()) outputfiles += ",";
2573 outputfiles += filename;
2575 if (stype.Contains("out")) {
2576 if (!files.IsNull()) files += ",";
2577 files += outputfiles;
2578 if (stype == "out") return files.Data();
2580 // Add extra files registered to the analysis manager
2582 extra = mgr->GetExtraFiles();
2583 if (!extra.IsNull()) {
2585 extra.ReplaceAll(" ", ",");
2586 TObjArray *fextra = extra.Tokenize(",");
2587 TIter nextx(fextra);
2589 while ((obj=nextx())) {
2590 if (aodfiles.Contains(obj->GetName())) continue;
2591 if (outputfiles.Contains(obj->GetName())) continue;
2592 if (sextra.Contains(obj->GetName())) continue;
2593 if (!sextra.IsNull()) sextra += ",";
2594 sextra += obj->GetName();
2597 if (stype.Contains("ext")) {
2598 if (!files.IsNull()) files += ",";
2602 if (stype == "ext") return files.Data();
2604 if (!fTerminateFiles.IsNull()) {
2605 fTerminateFiles.Strip();
2606 fTerminateFiles.ReplaceAll(" ",",");
2607 TObjArray *fextra = fTerminateFiles.Tokenize(",");
2608 TIter nextx(fextra);
2610 while ((obj=nextx())) {
2611 if (aodfiles.Contains(obj->GetName())) continue;
2612 if (outputfiles.Contains(obj->GetName())) continue;
2613 if (termfiles.Contains(obj->GetName())) continue;
2614 if (sextra.Contains(obj->GetName())) continue;
2615 if (!termfiles.IsNull()) termfiles += ",";
2616 termfiles += obj->GetName();
2620 if (stype.Contains("ter")) {
2621 if (!files.IsNull() && !termfiles.IsNull()) {
2626 return files.Data();
2629 //______________________________________________________________________________
2630 Bool_t AliAnalysisAlien::Submit()
2632 // Submit all master jobs.
2633 Int_t nmasterjobs = fInputFiles->GetEntries();
2634 Long_t tshoot = gSystem->Now();
2635 if (!fNsubmitted && !SubmitNext()) return kFALSE;
2636 while (fNsubmitted < nmasterjobs) {
2637 Long_t now = gSystem->Now();
2638 if ((now-tshoot)>30000) {
2640 if (!SubmitNext()) return kFALSE;
2646 //______________________________________________________________________________
2647 Bool_t AliAnalysisAlien::SubmitMerging()
2649 // Submit all merging jobs.
2650 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("/%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
2651 gGrid->Cd(fGridOutputDir);
2652 TString mergeJDLName = fExecutable;
2653 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
2654 Int_t ntosubmit = fInputFiles->GetEntries();
2655 for (Int_t i=0; i<ntosubmit; i++) {
2656 TString runOutDir = gSystem->BaseName(fInputFiles->At(i)->GetName());
2657 runOutDir.ReplaceAll(".xml", "");
2658 if (fOutputToRunNo) {
2659 // The output directory is the run number
2660 printf("### Submitting merging job for run <%s>\n", runOutDir.Data());
2661 runOutDir = Form("%s/%s", fGridOutputDir.Data(), runOutDir.Data());
2663 // The output directory is the master number in 3 digits format
2664 printf("### Submitting merging job for master <%03d>\n", i);
2665 runOutDir = Form("%s/%03d",fGridOutputDir.Data(), i);
2667 // Check now the number of merging stages.
2668 TObjArray *list = fOutputFiles.Tokenize(",");
2672 while((str=(TObjString*)next())) {
2673 outputFile = str->GetString();
2674 Int_t index = outputFile.Index("@");
2675 if (index > 0) outputFile.Remove(index);
2676 if (!fMergeExcludes.Contains(outputFile)) break;
2679 Bool_t done = CheckMergedFiles(outputFile, runOutDir, fMaxMergeFiles, kTRUE, mergeJDLName);
2680 if (!done) return kFALSE;
2682 if (!ntosubmit) return kTRUE;
2683 Info("StartAnalysis", "\n#### STARTING AN ALIEN SHELL FOR YOU. EXIT WHEN YOUR MERGING JOBS HAVE FINISHED. #### \
2684 \n You may exit at any time and terminate the job later using the option <terminate> but disabling SetMergeViaJDL\
2685 \n ##################################################################################");
2686 gSystem->Exec("aliensh");
2690 //______________________________________________________________________________
2691 Bool_t AliAnalysisAlien::SubmitNext()
2693 // Submit next bunch of master jobs if the queue is free. The first master job is
2694 // submitted right away, while the next will not be unless the previous was split.
2695 // The plugin will not submit new master jobs if there are more that 500 jobs in
2697 static Bool_t iscalled = kFALSE;
2698 static Int_t firstmaster = 0;
2699 static Int_t lastmaster = 0;
2700 static Int_t npermaster = 0;
2701 if (iscalled) return kTRUE;
2703 Int_t nrunning=0, nwaiting=0, nerror=0, ndone=0;
2704 Int_t ntosubmit = 0;
2707 Int_t nmasterjobs = fInputFiles->GetEntries();
2710 if (!IsUseSubmitPolicy()) {
2712 Info("SubmitNext","### Warning submit policy not used ! Submitting too many jobs at a time may be prohibitted. \
2713 \n### You can use SetUseSubmitPolicy() to enable if you have problems.");
2714 ntosubmit = nmasterjobs;
2717 TString status = GetJobStatus(firstmaster, lastmaster, nrunning, nwaiting, nerror, ndone);
2718 printf("=== master %d: %s\n", lastmaster, status.Data());
2719 // If last master not split, just return
2720 if (status != "SPLIT") {iscalled = kFALSE; return kTRUE;}
2721 // No more than 100 waiting jobs
2722 if (nwaiting>500) {iscalled = kFALSE; return kTRUE;}
2723 npermaster = (nrunning+nwaiting+nerror+ndone)/fNsubmitted;
2724 if (npermaster) ntosubmit = (500-nwaiting)/npermaster;
2725 if (!ntosubmit) ntosubmit = 1;
2726 printf("=== WAITING(%d) RUNNING(%d) DONE(%d) OTHER(%d) NperMaster=%d => to submit %d jobs\n",
2727 nwaiting, nrunning, ndone, nerror, npermaster, ntosubmit);
2729 for (Int_t i=0; i<ntosubmit; i++) {
2730 // Submit for a range of enumeration of runs.
2731 if (fNsubmitted>=nmasterjobs) {iscalled = kFALSE; return kTRUE;}
2733 TString runOutDir = gSystem->BaseName(fInputFiles->At(fNsubmitted)->GetName());
2734 runOutDir.ReplaceAll(".xml", "");
2736 query = Form("submit %s %s %s", fJDLName.Data(), fInputFiles->At(fNsubmitted)->GetName(), runOutDir.Data());
2738 query = Form("submit %s %s %03d", fJDLName.Data(), fInputFiles->At(fNsubmitted)->GetName(), fNsubmitted);
2739 printf("********* %s\n",query.Data());
2740 res = gGrid->Command(query);
2742 TString cjobId1 = res->GetKey(0,"jobId");
2743 if (!cjobId1.Length()) {
2747 Error("StartAnalysis", "Your JDL %s could not be submitted. The message was:", fJDLName.Data());
2750 Info("StartAnalysis", "\n_______________________________________________________________________ \
2751 \n##### Your JDL %s submitted (%d to go). \nTHE JOB ID IS: %s \
2752 \n_______________________________________________________________________",
2753 fJDLName.Data(), nmasterjobs-fNsubmitted-1, cjobId1.Data());
2756 lastmaster = cjobId1.Atoi();
2757 if (!firstmaster) firstmaster = lastmaster;
2762 Error("StartAnalysis", "No grid result after submission !!! Bailing out...");
2770 //______________________________________________________________________________
2771 void AliAnalysisAlien::WriteAnalysisFile()
2773 // Write current analysis manager into the file <analysisFile>
2774 TString analysisFile = fExecutable;
2775 analysisFile.ReplaceAll(".sh", ".root");
2776 if (!TestBit(AliAnalysisGrid::kSubmit)) {
2777 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
2778 if (!mgr || !mgr->IsInitialized()) {
2779 Error("WriteAnalysisFile", "You need an initialized analysis manager for this");
2782 // Check analysis type
2784 if (mgr->GetMCtruthEventHandler()) TObject::SetBit(AliAnalysisGrid::kUseMC);
2785 handler = (TObject*)mgr->GetInputEventHandler();
2787 if (handler->InheritsFrom("AliESDInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseESD);
2788 if (handler->InheritsFrom("AliAODInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseAOD);
2790 TDirectory *cdir = gDirectory;
2791 TFile *file = TFile::Open(analysisFile, "RECREATE");
2793 // Skip task Terminate calls for the grid job (but not in test mode, where we want to check also the terminate mode
2794 if (!TestBit(AliAnalysisGrid::kTest)) mgr->SetSkipTerminate(kTRUE);
2795 // Unless merging makes no sense
2796 if (IsSingleOutput()) mgr->SetSkipTerminate(kFALSE);
2799 // Enable termination for local jobs
2800 mgr->SetSkipTerminate(kFALSE);
2802 if (cdir) cdir->cd();
2803 Info("WriteAnalysisFile", "\n##### Analysis manager: %s wrote to file <%s>\n", mgr->GetName(),analysisFile.Data());
2805 Bool_t copy = kTRUE;
2806 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
2809 TString workdir = gGrid->GetHomeDirectory();
2810 workdir += fGridWorkingDir;
2811 Info("WriteAnalysisFile", "\n##### Copying file <%s> containing your initialized analysis manager to your alien workspace", analysisFile.Data());
2812 if (FileExists(analysisFile)) gGrid->Rm(analysisFile);
2813 TFile::Cp(Form("file:%s",analysisFile.Data()), Form("alien://%s/%s", workdir.Data(),analysisFile.Data()));
2817 //______________________________________________________________________________
2818 void AliAnalysisAlien::WriteAnalysisMacro()
2820 // Write the analysis macro that will steer the analysis in grid mode.
2821 if (!TestBit(AliAnalysisGrid::kSubmit)) {
2823 out.open(fAnalysisMacro.Data(), ios::out);
2825 Error("WriteAnalysisMacro", "could not open file %s for writing", fAnalysisMacro.Data());
2828 Bool_t hasSTEERBase = kFALSE;
2829 Bool_t hasESD = kFALSE;
2830 Bool_t hasAOD = kFALSE;
2831 Bool_t hasANALYSIS = kFALSE;
2832 Bool_t hasANALYSISalice = kFALSE;
2833 Bool_t hasCORRFW = kFALSE;
2834 TString func = fAnalysisMacro;
2835 TString type = "ESD";
2836 TString comment = "// Analysis using ";
2837 if (TObject::TestBit(AliAnalysisGrid::kUseESD)) comment += "ESD";
2838 if (TObject::TestBit(AliAnalysisGrid::kUseAOD)) {
2842 if (type!="AOD" && fFriendChainName!="") {
2843 Error("WriteAnalysisMacro", "Friend chain can be attached only to AOD");
2846 if (TObject::TestBit(AliAnalysisGrid::kUseMC)) comment += "/MC";
2847 else comment += " data";
2848 out << "const char *anatype = \"" << type.Data() << "\";" << endl << endl;
2849 func.ReplaceAll(".C", "");
2850 out << "void " << func.Data() << "()" << endl;
2852 out << comment.Data() << endl;
2853 out << "// Automatically generated analysis steering macro executed in grid subjobs" << endl << endl;
2854 out << " TStopwatch timer;" << endl;
2855 out << " timer.Start();" << endl << endl;
2856 // Change temp directory to current one
2857 out << "// Set temporary merging directory to current one" << endl;
2858 out << " gSystem->Setenv(\"TMPDIR\", gSystem->pwd());" << endl << endl;
2859 if (!fExecutableCommand.Contains("aliroot")) {
2860 out << "// load base root libraries" << endl;
2861 out << " gSystem->Load(\"libTree\");" << endl;
2862 out << " gSystem->Load(\"libGeom\");" << endl;
2863 out << " gSystem->Load(\"libVMC\");" << endl;
2864 out << " gSystem->Load(\"libPhysics\");" << endl << endl;
2865 out << " gSystem->Load(\"libMinuit\");" << endl << endl;
2867 if (fAdditionalRootLibs.Length()) {
2868 // in principle libtree /lib geom libvmc etc. can go into this list, too
2869 out << "// Add aditional libraries" << endl;
2870 TObjArray *list = fAdditionalRootLibs.Tokenize(" ");
2873 while((str=(TObjString*)next())) {
2874 if (str->GetString().Contains(".so"))
2875 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
2877 if (list) delete list;
2879 out << "// include path" << endl;
2880 if (fIncludePath.Length()) out << " gSystem->AddIncludePath(\"" << fIncludePath.Data() << "\");" << endl;
2881 out << " gSystem->AddIncludePath(\"-I$ALICE_ROOT/include\");" << endl << endl;
2882 out << "// Load analysis framework libraries" << endl;
2883 TString setupPar = "AliAnalysisAlien::SetupPar";
2885 if (!fExecutableCommand.Contains("aliroot")) {
2886 out << " gSystem->Load(\"libSTEERBase\");" << endl;
2887 out << " gSystem->Load(\"libESD\");" << endl;
2888 out << " gSystem->Load(\"libAOD\");" << endl;
2890 out << " gSystem->Load(\"libANALYSIS\");" << endl;
2891 out << " gSystem->Load(\"libANALYSISalice\");" << endl;
2892 out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
2894 TIter next(fPackages);
2897 while ((obj=next())) {
2898 pkgname = obj->GetName();
2899 if (pkgname == "STEERBase" ||
2900 pkgname == "STEERBase.par") hasSTEERBase = kTRUE;
2901 if (pkgname == "ESD" ||
2902 pkgname == "ESD.par") hasESD = kTRUE;
2903 if (pkgname == "AOD" ||
2904 pkgname == "AOD.par") hasAOD = kTRUE;
2905 if (pkgname == "ANALYSIS" ||
2906 pkgname == "ANALYSIS.par") hasANALYSIS = kTRUE;
2907 if (pkgname == "ANALYSISalice" ||
2908 pkgname == "ANALYSISalice.par") hasANALYSISalice = kTRUE;
2909 if (pkgname == "CORRFW" ||
2910 pkgname == "CORRFW.par") hasCORRFW = kTRUE;
2912 if (hasANALYSISalice) setupPar = "SetupPar";
2913 if (!hasSTEERBase) out << " gSystem->Load(\"libSTEERBase\");" << endl;
2914 else out << " if (!" << setupPar << "(\"STEERBase\")) return;" << endl;
2915 if (!hasESD) out << " gSystem->Load(\"libESD\");" << endl;
2916 else out << " if (!" << setupPar << "(\"ESD\")) return;" << endl;
2917 if (!hasAOD) out << " gSystem->Load(\"libAOD\");" << endl;
2918 else out << " if (!" << setupPar << "(\"AOD\")) return;" << endl;
2919 if (!hasANALYSIS) out << " gSystem->Load(\"libANALYSIS\");" << endl;
2920 else out << " if (!" << setupPar << "(\"ANALYSIS\")) return;" << endl;
2921 if (!hasANALYSISalice) out << " gSystem->Load(\"libANALYSISalice\");" << endl;
2922 else out << " if (!" << setupPar << "(\"ANALYSISalice\")) return;" << endl;
2923 if (!hasCORRFW) out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
2924 else out << " if (!" << setupPar << "(\"CORRFW\")) return;" << endl << endl;
2925 out << "// Compile other par packages" << endl;
2927 while ((obj=next())) {
2928 pkgname = obj->GetName();
2929 if (pkgname == "STEERBase" ||
2930 pkgname == "STEERBase.par" ||
2932 pkgname == "ESD.par" ||
2934 pkgname == "AOD.par" ||
2935 pkgname == "ANALYSIS" ||
2936 pkgname == "ANALYSIS.par" ||
2937 pkgname == "ANALYSISalice" ||
2938 pkgname == "ANALYSISalice.par" ||
2939 pkgname == "CORRFW" ||
2940 pkgname == "CORRFW.par") continue;
2941 out << " if (!" << setupPar << "(\"" << obj->GetName() << "\")) return;" << endl;
2944 if (fAdditionalLibs.Length()) {
2945 out << "// Add aditional AliRoot libraries" << endl;
2946 TObjArray *list = fAdditionalLibs.Tokenize(" ");
2949 while((str=(TObjString*)next())) {
2950 if (str->GetString().Contains(".so"))
2951 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
2952 if (str->GetString().Contains(".par"))
2953 out << " if (!" << setupPar << "(\"" << str->GetString() << "\")) return;" << endl;
2955 if (list) delete list;
2958 out << "// analysis source to be compiled at runtime (if any)" << endl;
2959 if (fAnalysisSource.Length()) {
2960 TObjArray *list = fAnalysisSource.Tokenize(" ");
2963 while((str=(TObjString*)next())) {
2964 out << " gROOT->ProcessLine(\".L " << str->GetString().Data() << "+g\");" << endl;
2966 if (list) delete list;
2969 if (fFastReadOption) {
2970 Warning("WriteAnalysisMacro", "!!! You requested FastRead option. Using xrootd flags to reduce timeouts in the grid jobs. This may skip some files that could be accessed !!! \
2971 \n+++ NOTE: To disable this option, use: plugin->SetFastReadOption(kFALSE)");
2972 out << "// fast xrootd reading enabled" << endl;
2973 out << " printf(\"!!! You requested FastRead option. Using xrootd flags to reduce timeouts. Note that this may skip some files that could be accessed !!!\");" << endl;
2974 out << " gEnv->SetValue(\"XNet.ConnectTimeout\",10);" << endl;
2975 out << " gEnv->SetValue(\"XNet.RequestTimeout\",10);" << endl;
2976 out << " gEnv->SetValue(\"XNet.MaxRedirectCount\",2);" << endl;
2977 out << " gEnv->SetValue(\"XNet.ReconnectTimeout\",10);" << endl;
2978 out << " gEnv->SetValue(\"XNet.FirstConnectMaxCnt\",1);" << endl << endl;
2980 out << "// connect to AliEn and make the chain" << endl;
2981 out << " if (!TGrid::Connect(\"alien://\")) return;" << endl;
2982 out << "// read the analysis manager from file" << endl;
2983 TString analysisFile = fExecutable;
2984 analysisFile.ReplaceAll(".sh", ".root");
2985 out << " TFile *file = TFile::Open(\"" << analysisFile << "\");" << endl;
2986 out << " if (!file) return;" << endl;
2987 out << " TIter nextkey(file->GetListOfKeys());" << endl;
2988 out << " AliAnalysisManager *mgr = 0;" << endl;
2989 out << " TKey *key;" << endl;
2990 out << " while ((key=(TKey*)nextkey())) {" << endl;
2991 out << " if (!strcmp(key->GetClassName(), \"AliAnalysisManager\"))" << endl;
2992 out << " mgr = (AliAnalysisManager*)file->Get(key->GetName());" << endl;
2993 out << " };" << endl;
2994 out << " if (!mgr) {" << endl;
2995 out << " ::Error(\"" << func.Data() << "\", \"No analysis manager found in file " << analysisFile <<"\");" << endl;
2996 out << " return;" << endl;
2997 out << " }" << endl << endl;
2998 out << " mgr->PrintStatus();" << endl;
2999 if (AliAnalysisManager::GetAnalysisManager()) {
3000 if (AliAnalysisManager::GetAnalysisManager()->GetDebugLevel()>3) {
3001 out << " gEnv->SetValue(\"XNet.Debug\", \"1\");" << endl;
3003 if (TestBit(AliAnalysisGrid::kTest))
3004 out << " AliLog::SetGlobalLogLevel(AliLog::kWarning);" << endl;
3006 out << " AliLog::SetGlobalLogLevel(AliLog::kError);" << endl;
3009 if (IsUsingTags()) {
3010 out << " TChain *chain = CreateChainFromTags(\"wn.xml\", anatype);" << endl << endl;
3012 out << " TChain *chain = CreateChain(\"wn.xml\", anatype);" << endl << endl;
3014 out << " mgr->StartAnalysis(\"localfile\", chain);" << endl;
3015 out << " timer.Stop();" << endl;
3016 out << " timer.Print();" << endl;
3017 out << "}" << endl << endl;
3018 if (IsUsingTags()) {
3019 out << "TChain* CreateChainFromTags(const char *xmlfile, const char *type=\"ESD\")" << endl;
3021 out << "// Create a chain using tags from the xml file." << endl;
3022 out << " TAlienCollection* coll = TAlienCollection::Open(xmlfile);" << endl;
3023 out << " if (!coll) {" << endl;
3024 out << " ::Error(\"CreateChainFromTags\", \"Cannot create an AliEn collection from %s\", xmlfile);" << endl;
3025 out << " return NULL;" << endl;
3026 out << " }" << endl;
3027 out << " TGridResult* tagResult = coll->GetGridResult(\"\",kFALSE,kFALSE);" << endl;
3028 out << " AliTagAnalysis *tagAna = new AliTagAnalysis(type);" << endl;
3029 out << " tagAna->ChainGridTags(tagResult);" << endl << endl;
3030 out << " AliRunTagCuts *runCuts = new AliRunTagCuts();" << endl;
3031 out << " AliLHCTagCuts *lhcCuts = new AliLHCTagCuts();" << endl;
3032 out << " AliDetectorTagCuts *detCuts = new AliDetectorTagCuts();" << endl;
3033 out << " AliEventTagCuts *evCuts = new AliEventTagCuts();" << endl;
3034 out << " // Check if the cuts configuration file was provided" << endl;
3035 out << " if (!gSystem->AccessPathName(\"ConfigureCuts.C\")) {" << endl;
3036 out << " gROOT->LoadMacro(\"ConfigureCuts.C\");" << endl;
3037 out << " ConfigureCuts(runCuts, lhcCuts, detCuts, evCuts);" << endl;
3038 out << " }" << endl;
3039 if (fFriendChainName=="") {
3040 out << " TChain *chain = tagAna->QueryTags(runCuts, lhcCuts, detCuts, evCuts);" << endl;
3042 out << " TString tmpColl=\"tmpCollection.xml\";" << endl;
3043 out << " tagAna->CreateXMLCollection(tmpColl.Data(),runCuts, lhcCuts, detCuts, evCuts);" << endl;
3044 out << " TChain *chain = CreateChain(tmpColl.Data(),type);" << endl;
3046 out << " if (!chain || !chain->GetNtrees()) return NULL;" << endl;
3047 out << " chain->ls();" << endl;
3048 out << " return chain;" << endl;
3049 out << "}" << endl << endl;
3050 if (gSystem->AccessPathName("ConfigureCuts.C")) {
3051 TString msg = "\n##### You may want to provide a macro ConfigureCuts.C with a method:\n";
3052 msg += " void ConfigureCuts(AliRunTagCuts *runCuts,\n";
3053 msg += " AliLHCTagCuts *lhcCuts,\n";
3054 msg += " AliDetectorTagCuts *detCuts,\n";
3055 msg += " AliEventTagCuts *evCuts)";
3056 Info("WriteAnalysisMacro", "%s", msg.Data());
3059 if (!IsUsingTags() || fFriendChainName!="") {
3060 out <<"//________________________________________________________________________________" << endl;
3061 out << "TChain* CreateChain(const char *xmlfile, const char *type=\"ESD\")" << endl;
3063 out << "// Create a chain using url's from xml file" << endl;
3064 out << " TString filename;" << endl;
3065 out << " Int_t run = 0;" << endl;
3066 out << " TString treename = type;" << endl;
3067 out << " treename.ToLower();" << endl;
3068 out << " treename += \"Tree\";" << endl;
3069 out << " printf(\"***************************************\\n\");" << endl;
3070 out << " printf(\" Getting chain of trees %s\\n\", treename.Data());" << endl;
3071 out << " printf(\"***************************************\\n\");" << endl;
3072 out << " TAlienCollection *coll = TAlienCollection::Open(xmlfile);" << endl;
3073 out << " if (!coll) {" << endl;
3074 out << " ::Error(\"CreateChain\", \"Cannot create an AliEn collection from %s\", xmlfile);" << endl;
3075 out << " return NULL;" << endl;
3076 out << " }" << endl;
3077 out << " AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();" << endl;
3078 out << " TChain *chain = new TChain(treename);" << endl;
3079 if(fFriendChainName!="") {
3080 out << " TChain *chainFriend = new TChain(treename);" << endl;
3082 out << " coll->Reset();" << endl;
3083 out << " while (coll->Next()) {" << endl;
3084 out << " filename = coll->GetTURL("");" << endl;
3085 out << " if (mgr) {" << endl;
3086 out << " Int_t nrun = AliAnalysisManager::GetRunFromAlienPath(filename);" << endl;
3087 out << " if (nrun && nrun != run) {" << endl;
3088 out << " printf(\"### Run number detected from chain: %d\\n\", nrun);" << endl;
3089 out << " mgr->SetRunFromPath(nrun);" << endl;
3090 out << " run = nrun;" << endl;
3091 out << " }" << endl;
3092 out << " }" << endl;
3093 out << " chain->Add(filename);" << endl;
3094 if(fFriendChainName!="") {
3095 out << " TString fileFriend=coll->GetTURL(\"\");" << endl;
3096 out << " fileFriend.ReplaceAll(\"AliAOD.root\",\""<<fFriendChainName.Data()<<"\");" << endl;
3097 out << " fileFriend.ReplaceAll(\"AliAODs.root\",\""<<fFriendChainName.Data()<<"\");" << endl;
3098 out << " chainFriend->Add(fileFriend.Data());" << endl;
3100 out << " }" << endl;
3101 out << " if (!chain->GetNtrees()) {" << endl;
3102 out << " ::Error(\"CreateChain\", \"No tree found from collection %s\", xmlfile);" << endl;
3103 out << " return NULL;" << endl;
3104 out << " }" << endl;
3105 if(fFriendChainName!="") {
3106 out << " chain->AddFriend(chainFriend);" << endl;
3108 out << " return chain;" << endl;
3109 out << "}" << endl << endl;
3111 if (hasANALYSISalice) {
3112 out <<"//________________________________________________________________________________" << endl;
3113 out << "Bool_t SetupPar(const char *package) {" << endl;
3114 out << "// Compile the package and set it up." << endl;
3115 out << " TString pkgdir = package;" << endl;
3116 out << " pkgdir.ReplaceAll(\".par\",\"\");" << endl;
3117 out << " gSystem->Exec(Form(\"tar xvzf %s.par\", pkgdir.Data()));" << endl;
3118 out << " TString cdir = gSystem->WorkingDirectory();" << endl;
3119 out << " gSystem->ChangeDirectory(pkgdir);" << endl;
3120 out << " // Check for BUILD.sh and execute" << endl;
3121 out << " if (!gSystem->AccessPathName(\"PROOF-INF/BUILD.sh\")) {" << endl;
3122 out << " printf(\"*******************************\\n\");" << endl;
3123 out << " printf(\"*** Building PAR archive ***\\n\");" << endl;
3124 out << " printf(\"*******************************\\n\");" << endl;
3125 out << " if (gSystem->Exec(\"PROOF-INF/BUILD.sh\")) {" << endl;
3126 out << " ::Error(\"SetupPar\", \"Cannot build par archive %s\", pkgdir.Data());" << endl;
3127 out << " gSystem->ChangeDirectory(cdir);" << endl;
3128 out << " return kFALSE;" << endl;
3129 out << " }" << endl;
3130 out << " } else {" << endl;
3131 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/BUILD.sh for package %s\", pkgdir.Data());" << endl;
3132 out << " gSystem->ChangeDirectory(cdir);" << endl;
3133 out << " return kFALSE;" << endl;
3134 out << " }" << endl;
3135 out << " // Check for SETUP.C and execute" << endl;
3136 out << " if (!gSystem->AccessPathName(\"PROOF-INF/SETUP.C\")) {" << endl;
3137 out << " printf(\"*******************************\\n\");" << endl;
3138 out << " printf(\"*** Setup PAR archive ***\\n\");" << endl;
3139 out << " printf(\"*******************************\\n\");" << endl;
3140 out << " gROOT->Macro(\"PROOF-INF/SETUP.C\");" << endl;
3141 out << " } else {" << endl;
3142 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/SETUP.C for package %s\", pkgdir.Data());" << endl;
3143 out << " gSystem->ChangeDirectory(cdir);" << endl;
3144 out << " return kFALSE;" << endl;
3145 out << " }" << endl;
3146 out << " // Restore original workdir" << endl;
3147 out << " gSystem->ChangeDirectory(cdir);" << endl;
3148 out << " return kTRUE;" << endl;
3151 Info("WriteAnalysisMacro", "\n##### Analysis macro to run on worker nodes <%s> written",fAnalysisMacro.Data());
3153 Bool_t copy = kTRUE;
3154 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3157 TString workdir = gGrid->GetHomeDirectory();
3158 workdir += fGridWorkingDir;
3159 if (FileExists(fAnalysisMacro)) gGrid->Rm(fAnalysisMacro);
3160 if (IsUsingTags() && !gSystem->AccessPathName("ConfigureCuts.C")) {
3161 if (FileExists("ConfigureCuts.C")) gGrid->Rm("ConfigureCuts.C");
3162 Info("WriteAnalysisMacro", "\n##### Copying cuts configuration macro: <ConfigureCuts.C> to your alien workspace");
3163 TFile::Cp("file:ConfigureCuts.C", Form("alien://%s/ConfigureCuts.C", workdir.Data()));
3165 Info("WriteAnalysisMacro", "\n##### Copying analysis macro: <%s> to your alien workspace", fAnalysisMacro.Data());
3166 TFile::Cp(Form("file:%s",fAnalysisMacro.Data()), Form("alien://%s/%s", workdir.Data(), fAnalysisMacro.Data()));
3170 //______________________________________________________________________________
3171 void AliAnalysisAlien::WriteMergingMacro()
3173 // Write a macro to merge the outputs per master job.
3174 if (!fMergeViaJDL) return;
3175 if (!fOutputFiles.Length()) {
3176 Error("WriteMergingMacro", "No output file names defined. Are you running the right AliAnalysisAlien configuration ?");
3179 TString mergingMacro = fExecutable;
3180 mergingMacro.ReplaceAll(".sh","_merge.C");
3181 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("/%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
3182 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3184 out.open(mergingMacro.Data(), ios::out);
3186 Error("WriteMergingMacro", "could not open file %s for writing", fAnalysisMacro.Data());
3189 Bool_t hasSTEERBase = kFALSE;
3190 Bool_t hasESD = kFALSE;
3191 Bool_t hasAOD = kFALSE;
3192 Bool_t hasANALYSIS = kFALSE;
3193 Bool_t hasANALYSISalice = kFALSE;
3194 Bool_t hasCORRFW = kFALSE;
3195 TString func = mergingMacro;
3197 func.ReplaceAll(".C", "");
3198 out << "void " << func.Data() << "(const char *dir, Int_t stage=0, Int_t ichunk=0)" << endl;
3200 out << "// Automatically generated merging macro executed in grid subjobs" << endl << endl;
3201 out << " TStopwatch timer;" << endl;
3202 out << " timer.Start();" << endl << endl;
3203 if (!fExecutableCommand.Contains("aliroot")) {
3204 out << "// load base root libraries" << endl;
3205 out << " gSystem->Load(\"libTree\");" << endl;
3206 out << " gSystem->Load(\"libGeom\");" << endl;
3207 out << " gSystem->Load(\"libVMC\");" << endl;
3208 out << " gSystem->Load(\"libPhysics\");" << endl << endl;
3209 out << " gSystem->Load(\"libMinuit\");" << endl << endl;
3211 if (fAdditionalRootLibs.Length()) {
3212 // in principle libtree /lib geom libvmc etc. can go into this list, too
3213 out << "// Add aditional libraries" << endl;
3214 TObjArray *list = fAdditionalRootLibs.Tokenize(" ");
3217 while((str=(TObjString*)next())) {
3218 if (str->GetString().Contains(".so"))
3219 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
3221 if (list) delete list;
3223 out << "// include path" << endl;
3224 if (fIncludePath.Length()) out << " gSystem->AddIncludePath(\"" << fIncludePath.Data() << "\");" << endl;
3225 out << " gSystem->AddIncludePath(\"-I$ALICE_ROOT/include\");" << endl << endl;
3226 out << "// Load analysis framework libraries" << endl;
3228 if (!fExecutableCommand.Contains("aliroot")) {
3229 out << " gSystem->Load(\"libSTEERBase\");" << endl;
3230 out << " gSystem->Load(\"libESD\");" << endl;
3231 out << " gSystem->Load(\"libAOD\");" << endl;
3233 out << " gSystem->Load(\"libANALYSIS\");" << endl;
3234 out << " gSystem->Load(\"libANALYSISalice\");" << endl;
3235 out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
3237 TIter next(fPackages);
3240 TString setupPar = "AliAnalysisAlien::SetupPar";
3241 while ((obj=next())) {
3242 pkgname = obj->GetName();
3243 if (pkgname == "STEERBase" ||
3244 pkgname == "STEERBase.par") hasSTEERBase = kTRUE;
3245 if (pkgname == "ESD" ||
3246 pkgname == "ESD.par") hasESD = kTRUE;
3247 if (pkgname == "AOD" ||
3248 pkgname == "AOD.par") hasAOD = kTRUE;
3249 if (pkgname == "ANALYSIS" ||
3250 pkgname == "ANALYSIS.par") hasANALYSIS = kTRUE;
3251 if (pkgname == "ANALYSISalice" ||
3252 pkgname == "ANALYSISalice.par") hasANALYSISalice = kTRUE;
3253 if (pkgname == "CORRFW" ||
3254 pkgname == "CORRFW.par") hasCORRFW = kTRUE;
3256 if (hasANALYSISalice) setupPar = "SetupPar";
3257 if (!hasSTEERBase) out << " gSystem->Load(\"libSTEERBase\");" << endl;
3258 else out << " if (!" << setupPar << "(\"STEERBase\")) return;" << endl;
3259 if (!hasESD) out << " gSystem->Load(\"libESD\");" << endl;
3260 else out << " if (!" << setupPar << "(\"ESD\")) return;" << endl;
3261 if (!hasAOD) out << " gSystem->Load(\"libAOD\");" << endl;
3262 else out << " if (!" << setupPar << "(\"AOD\")) return;" << endl;
3263 if (!hasANALYSIS) out << " gSystem->Load(\"libANALYSIS\");" << endl;
3264 else out << " if (!" << setupPar << "(\"ANALYSIS\")) return;" << endl;
3265 if (!hasANALYSISalice) out << " gSystem->Load(\"libANALYSISalice\");" << endl;
3266 else out << " if (!" << setupPar << "(\"ANALYSISalice\")) return;" << endl;
3267 if (!hasCORRFW) out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
3268 else out << " if (!" << setupPar << "(\"CORRFW\")) return;" << endl << endl;
3269 out << "// Compile other par packages" << endl;
3271 while ((obj=next())) {
3272 pkgname = obj->GetName();
3273 if (pkgname == "STEERBase" ||
3274 pkgname == "STEERBase.par" ||
3276 pkgname == "ESD.par" ||
3278 pkgname == "AOD.par" ||
3279 pkgname == "ANALYSIS" ||
3280 pkgname == "ANALYSIS.par" ||
3281 pkgname == "ANALYSISalice" ||
3282 pkgname == "ANALYSISalice.par" ||
3283 pkgname == "CORRFW" ||
3284 pkgname == "CORRFW.par") continue;
3285 out << " if (!" << setupPar << "(\"" << obj->GetName() << "\")) return;" << endl;
3288 if (fAdditionalLibs.Length()) {
3289 out << "// Add aditional AliRoot libraries" << endl;
3290 TObjArray *list = fAdditionalLibs.Tokenize(" ");
3293 while((str=(TObjString*)next())) {
3294 if (str->GetString().Contains(".so"))
3295 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
3297 if (list) delete list;
3300 out << "// Analysis source to be compiled at runtime (if any)" << endl;
3301 if (fAnalysisSource.Length()) {
3302 TObjArray *list = fAnalysisSource.Tokenize(" ");
3305 while((str=(TObjString*)next())) {
3306 out << " gROOT->ProcessLine(\".L " << str->GetString().Data() << "+g\");" << endl;
3308 if (list) delete list;
3312 if (fFastReadOption) {
3313 Warning("WriteMergingMacro", "!!! You requested FastRead option. Using xrootd flags to reduce timeouts in the grid merging jobs. Note that this may skip some files that could be accessed !!!");
3314 out << "// fast xrootd reading enabled" << endl;
3315 out << " printf(\"!!! You requested FastRead option. Using xrootd flags to reduce timeouts. Note that this may skip some files that could be accessed !!!\");" << endl;
3316 out << " gEnv->SetValue(\"XNet.ConnectTimeout\",10);" << endl;
3317 out << " gEnv->SetValue(\"XNet.RequestTimeout\",10);" << endl;
3318 out << " gEnv->SetValue(\"XNet.MaxRedirectCount\",2);" << endl;
3319 out << " gEnv->SetValue(\"XNet.ReconnectTimeout\",10);" << endl;
3320 out << " gEnv->SetValue(\"XNet.FirstConnectMaxCnt\",1);" << endl << endl;
3322 // Change temp directory to current one
3323 out << "// Set temporary merging directory to current one" << endl;
3324 out << " gSystem->Setenv(\"TMPDIR\", gSystem->pwd());" << endl << endl;
3325 out << "// Connect to AliEn" << endl;
3326 out << " if (!TGrid::Connect(\"alien://\")) return;" << endl;
3327 out << " Bool_t laststage = kFALSE;" << endl;
3328 out << " TString outputDir = dir;" << endl;
3329 out << " TString outputFiles = \"" << GetListOfFiles("out") << "\";" << endl;
3330 out << " TString mergeExcludes = \"" << fMergeExcludes << "\";" << endl;
3331 out << " TObjArray *list = outputFiles.Tokenize(\",\");" << endl;
3332 out << " TIter *iter = new TIter(list);" << endl;
3333 out << " TObjString *str;" << endl;
3334 out << " TString outputFile;" << endl;
3335 out << " Bool_t merged = kTRUE;" << endl;
3336 out << " while((str=(TObjString*)iter->Next())) {" << endl;
3337 out << " outputFile = str->GetString();" << endl;
3338 out << " if (outputFile.Contains(\"*\")) continue;" << endl;
3339 out << " Int_t index = outputFile.Index(\"@\");" << endl;
3340 out << " if (index > 0) outputFile.Remove(index);" << endl;
3341 out << " // Skip already merged outputs" << endl;
3342 out << " if (!gSystem->AccessPathName(outputFile)) {" << endl;
3343 out << " printf(\"Output file <%s> found. Not merging again.\",outputFile.Data());" << endl;
3344 out << " continue;" << endl;
3345 out << " }" << endl;
3346 out << " if (mergeExcludes.Contains(outputFile.Data())) continue;" << endl;
3347 out << " merged = AliAnalysisAlien::MergeOutput(outputFile, outputDir, " << fMaxMergeFiles << ", stage, ichunk);" << endl;
3348 out << " if (!merged) {" << endl;
3349 out << " printf(\"ERROR: Cannot merge %s\\n\", outputFile.Data());" << endl;
3350 out << " return;" << endl;
3351 out << " }" << endl;
3352 out << " // Check if this was the last stage. If yes, run terminate for the tasks." << endl;
3353 out << " if (!gSystem->AccessPathName(outputFile)) laststage = kTRUE;" << endl;
3354 out << " }" << endl;
3355 out << " // all outputs merged, validate" << endl;
3356 out << " ofstream out;" << endl;
3357 out << " out.open(\"outputs_valid\", ios::out);" << endl;
3358 out << " out.close();" << endl;
3359 out << " // read the analysis manager from file" << endl;
3360 TString analysisFile = fExecutable;
3361 analysisFile.ReplaceAll(".sh", ".root");
3362 out << " if (!laststage) return;" << endl;
3363 out << " TFile *file = TFile::Open(\"" << analysisFile << "\");" << endl;
3364 out << " if (!file) return;" << endl;
3365 out << " TIter nextkey(file->GetListOfKeys());" << endl;
3366 out << " AliAnalysisManager *mgr = 0;" << endl;
3367 out << " TKey *key;" << endl;
3368 out << " while ((key=(TKey*)nextkey())) {" << endl;
3369 out << " if (!strcmp(key->GetClassName(), \"AliAnalysisManager\"))" << endl;
3370 out << " mgr = (AliAnalysisManager*)file->Get(key->GetName());" << endl;
3371 out << " };" << endl;
3372 out << " if (!mgr) {" << endl;
3373 out << " ::Error(\"" << func.Data() << "\", \"No analysis manager found in file" << analysisFile <<"\");" << endl;
3374 out << " return;" << endl;
3375 out << " }" << endl << endl;
3376 out << " mgr->SetRunFromPath(mgr->GetRunFromAlienPath(dir));" << endl;
3377 out << " mgr->SetSkipTerminate(kFALSE);" << endl;
3378 out << " mgr->PrintStatus();" << endl;
3379 if (AliAnalysisManager::GetAnalysisManager()) {
3380 if (AliAnalysisManager::GetAnalysisManager()->GetDebugLevel()>3) {
3381 out << " gEnv->SetValue(\"XNet.Debug\", \"1\");" << endl;
3383 if (TestBit(AliAnalysisGrid::kTest))
3384 out << " AliLog::SetGlobalLogLevel(AliLog::kWarning);" << endl;
3386 out << " AliLog::SetGlobalLogLevel(AliLog::kError);" << endl;
3389 out << " TTree *tree = NULL;" << endl;
3390 out << " mgr->StartAnalysis(\"gridterminate\", tree);" << endl;
3391 out << "}" << endl << endl;
3392 if (hasANALYSISalice) {
3393 out <<"//________________________________________________________________________________" << endl;
3394 out << "Bool_t SetupPar(const char *package) {" << endl;
3395 out << "// Compile the package and set it up." << endl;
3396 out << " TString pkgdir = package;" << endl;
3397 out << " pkgdir.ReplaceAll(\".par\",\"\");" << endl;
3398 out << " gSystem->Exec(Form(\"tar xvzf %s.par\", pkgdir.Data()));" << endl;
3399 out << " TString cdir = gSystem->WorkingDirectory();" << endl;
3400 out << " gSystem->ChangeDirectory(pkgdir);" << endl;
3401 out << " // Check for BUILD.sh and execute" << endl;
3402 out << " if (!gSystem->AccessPathName(\"PROOF-INF/BUILD.sh\")) {" << endl;
3403 out << " printf(\"*******************************\\n\");" << endl;
3404 out << " printf(\"*** Building PAR archive ***\\n\");" << endl;
3405 out << " printf(\"*******************************\\n\");" << endl;
3406 out << " if (gSystem->Exec(\"PROOF-INF/BUILD.sh\")) {" << endl;
3407 out << " ::Error(\"SetupPar\", \"Cannot build par archive %s\", pkgdir.Data());" << endl;
3408 out << " gSystem->ChangeDirectory(cdir);" << endl;
3409 out << " return kFALSE;" << endl;
3410 out << " }" << endl;
3411 out << " } else {" << endl;
3412 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/BUILD.sh for package %s\", pkgdir.Data());" << endl;
3413 out << " gSystem->ChangeDirectory(cdir);" << endl;
3414 out << " return kFALSE;" << endl;
3415 out << " }" << endl;
3416 out << " // Check for SETUP.C and execute" << endl;
3417 out << " if (!gSystem->AccessPathName(\"PROOF-INF/SETUP.C\")) {" << endl;
3418 out << " printf(\"*******************************\\n\");" << endl;
3419 out << " printf(\"*** Setup PAR archive ***\\n\");" << endl;
3420 out << " printf(\"*******************************\\n\");" << endl;
3421 out << " gROOT->Macro(\"PROOF-INF/SETUP.C\");" << endl;
3422 out << " } else {" << endl;
3423 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/SETUP.C for package %s\", pkgdir.Data());" << endl;
3424 out << " gSystem->ChangeDirectory(cdir);" << endl;
3425 out << " return kFALSE;" << endl;
3426 out << " }" << endl;
3427 out << " // Restore original workdir" << endl;
3428 out << " gSystem->ChangeDirectory(cdir);" << endl;
3429 out << " return kTRUE;" << endl;
3433 Bool_t copy = kTRUE;
3434 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3437 TString workdir = gGrid->GetHomeDirectory();
3438 workdir += fGridWorkingDir;
3439 if (FileExists(mergingMacro)) gGrid->Rm(mergingMacro);
3440 Info("WriteMergingMacro", "\n##### Copying merging macro: <%s> to your alien workspace", mergingMacro.Data());
3441 TFile::Cp(Form("file:%s",mergingMacro.Data()), Form("alien://%s/%s", workdir.Data(), mergingMacro.Data()));
3445 //______________________________________________________________________________
3446 Bool_t AliAnalysisAlien::SetupPar(const char *package)
3448 // Compile the par file archive pointed by <package>. This must be present in the current directory.
3449 // Note that for loading the compiled library. The current directory should have precedence in
3451 TString pkgdir = package;
3452 pkgdir.ReplaceAll(".par","");
3453 gSystem->Exec(Form("tar xvzf %s.par", pkgdir.Data()));
3454 TString cdir = gSystem->WorkingDirectory();
3455 gSystem->ChangeDirectory(pkgdir);
3456 // Check for BUILD.sh and execute
3457 if (!gSystem->AccessPathName("PROOF-INF/BUILD.sh")) {
3458 printf("**************************************************\n");
3459 printf("*** Building PAR archive %s\n", package);
3460 printf("**************************************************\n");
3461 if (gSystem->Exec("PROOF-INF/BUILD.sh")) {
3462 ::Error("SetupPar", "Cannot build par archive %s", pkgdir.Data());
3463 gSystem->ChangeDirectory(cdir);
3467 ::Error("SetupPar","Cannot access PROOF-INF/BUILD.sh for package %s", pkgdir.Data());
3468 gSystem->ChangeDirectory(cdir);
3471 // Check for SETUP.C and execute
3472 if (!gSystem->AccessPathName("PROOF-INF/SETUP.C")) {
3473 printf("**************************************************\n");
3474 printf("*** Setup PAR archive %s\n", package);
3475 printf("**************************************************\n");
3476 gROOT->Macro("PROOF-INF/SETUP.C");
3477 printf("*** Loaded library: %s\n", gSystem->GetLibraries(pkgdir,"",kFALSE));
3479 ::Error("SetupPar","Cannot access PROOF-INF/SETUP.C for package %s", pkgdir.Data());
3480 gSystem->ChangeDirectory(cdir);
3483 // Restore original workdir
3484 gSystem->ChangeDirectory(cdir);
3488 //______________________________________________________________________________
3489 void AliAnalysisAlien::WriteExecutable()
3491 // Generate the alien executable script.
3492 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3494 out.open(fExecutable.Data(), ios::out);
3496 Error("WriteExecutable", "Bad file name for executable: %s", fExecutable.Data());
3499 out << "#!/bin/bash" << endl;
3500 out << "echo \"=========================================\"" << endl;
3501 out << "echo \"############## PATH : ##############\"" << endl;
3502 out << "echo $PATH" << endl;
3503 out << "echo \"############## LD_LIBRARY_PATH : ##############\"" << endl;
3504 out << "echo $LD_LIBRARY_PATH" << endl;
3505 out << "echo \"############## ROOTSYS : ##############\"" << endl;
3506 out << "echo $ROOTSYS" << endl;
3507 out << "echo \"############## which root : ##############\"" << endl;
3508 out << "which root" << endl;
3509 out << "echo \"############## ALICE_ROOT : ##############\"" << endl;
3510 out << "echo $ALICE_ROOT" << endl;
3511 out << "echo \"############## which aliroot : ##############\"" << endl;
3512 out << "which aliroot" << endl;
3513 out << "echo \"############## system limits : ##############\"" << endl;
3514 out << "ulimit -a" << endl;
3515 out << "echo \"############## memory : ##############\"" << endl;
3516 out << "free -m" << endl;
3517 out << "echo \"=========================================\"" << endl << endl;
3518 // Make sure we can properly compile par files
3519 if (TObject::TestBit(AliAnalysisGrid::kUsePars)) out << "export LD_LIBRARY_PATH=.:$LD_LIBRARY_PATH" << endl;
3520 out << fExecutableCommand << " ";
3521 out << fAnalysisMacro.Data() << " " << fExecutableArgs.Data() << endl << endl;
3522 out << "echo \"======== " << fAnalysisMacro.Data() << " finished with exit code: $? ========\"" << endl;
3523 out << "echo \"############## memory after: ##############\"" << endl;
3524 out << "free -m" << endl;
3526 Bool_t copy = kTRUE;
3527 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3530 TString workdir = gGrid->GetHomeDirectory();
3531 TString bindir = Form("%s/bin", workdir.Data());
3532 if (!DirectoryExists(bindir)) gGrid->Mkdir(bindir,"-p");
3533 workdir += fGridWorkingDir;
3534 TString executable = Form("%s/bin/%s", gGrid->GetHomeDirectory(), fExecutable.Data());
3535 if (FileExists(executable)) gGrid->Rm(executable);
3536 Info("WriteExecutable", "\n##### Copying executable file <%s> to your AliEn bin directory", fExecutable.Data());
3537 TFile::Cp(Form("file:%s",fExecutable.Data()), Form("alien://%s", executable.Data()));
3541 //______________________________________________________________________________
3542 void AliAnalysisAlien::WriteMergeExecutable()
3544 // Generate the alien executable script for the merging job.
3545 if (!fMergeViaJDL) return;
3546 TString mergeExec = fExecutable;
3547 mergeExec.ReplaceAll(".sh", "_merge.sh");
3548 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3550 out.open(mergeExec.Data(), ios::out);
3552 Error("WriteMergingExecutable", "Bad file name for executable: %s", mergeExec.Data());
3555 out << "#!/bin/bash" << endl;
3556 out << "echo \"=========================================\"" << endl;
3557 out << "echo \"############## PATH : ##############\"" << endl;
3558 out << "echo $PATH" << endl;
3559 out << "echo \"############## LD_LIBRARY_PATH : ##############\"" << endl;
3560 out << "echo $LD_LIBRARY_PATH" << endl;
3561 out << "echo \"############## ROOTSYS : ##############\"" << endl;
3562 out << "echo $ROOTSYS" << endl;
3563 out << "echo \"############## which root : ##############\"" << endl;
3564 out << "which root" << endl;
3565 out << "echo \"############## ALICE_ROOT : ##############\"" << endl;
3566 out << "echo $ALICE_ROOT" << endl;
3567 out << "echo \"############## which aliroot : ##############\"" << endl;
3568 out << "which aliroot" << endl;
3569 out << "echo \"############## system limits : ##############\"" << endl;
3570 out << "ulimit -a" << endl;
3571 out << "echo \"############## memory : ##############\"" << endl;
3572 out << "free -m" << endl;
3573 out << "echo \"=========================================\"" << endl << endl;
3574 // Make sure we can properly compile par files
3575 if (TObject::TestBit(AliAnalysisGrid::kUsePars)) out << "export LD_LIBRARY_PATH=.:$LD_LIBRARY_PATH" << endl;
3576 TString mergeMacro = fExecutable;
3577 mergeMacro.ReplaceAll(".sh", "_merge.C");
3578 if (IsOneStageMerging())
3579 out << "export ARG=\"" << mergeMacro << "(\\\"$1\\\")\"" << endl;
3581 out << "export ARG=\"" << mergeMacro << "(\\\"$1\\\",$2,$3)\"" << endl;
3582 out << fExecutableCommand << " " << "$ARG" << endl;
3583 out << "echo \"======== " << mergeMacro.Data() << " finished with exit code: $? ========\"" << endl;
3584 out << "echo \"############## memory after: ##############\"" << endl;
3585 out << "free -m" << endl;
3587 Bool_t copy = kTRUE;
3588 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3591 TString workdir = gGrid->GetHomeDirectory();
3592 TString bindir = Form("%s/bin", workdir.Data());
3593 if (!DirectoryExists(bindir)) gGrid->Mkdir(bindir,"-p");
3594 workdir += fGridWorkingDir;
3595 TString executable = Form("%s/bin/%s", gGrid->GetHomeDirectory(), mergeExec.Data());
3596 if (FileExists(executable)) gGrid->Rm(executable);
3597 Info("WriteMergeExecutable", "\n##### Copying executable file <%s> to your AliEn bin directory", mergeExec.Data());
3598 TFile::Cp(Form("file:%s",mergeExec.Data()), Form("alien://%s", executable.Data()));
3602 //______________________________________________________________________________
3603 void AliAnalysisAlien::WriteProductionFile(const char *filename) const
3605 // Write the production file to be submitted by LPM manager. The format is:
3606 // First line: full_path_to_jdl estimated_no_subjobs_per_master
3607 // Next lines: full_path_to_dataset XXX (XXX is a string)
3608 // To submit, one has to: submit jdl XXX for all lines
3610 out.open(filename, ios::out);
3612 Error("WriteProductionFile", "Bad file name: %s", filename);
3616 if (!fProductionMode && !fGridWorkingDir.BeginsWith("/alice"))
3617 workdir = gGrid->GetHomeDirectory();
3618 workdir += fGridWorkingDir;
3619 Int_t njobspermaster = 1000*fNrunsPerMaster/fSplitMaxInputFileNumber;
3620 TString locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
3621 out << locjdl << " " << njobspermaster << endl;
3622 Int_t nmasterjobs = fInputFiles->GetEntries();
3623 for (Int_t i=0; i<nmasterjobs; i++) {
3624 TString runOutDir = gSystem->BaseName(fInputFiles->At(i)->GetName());
3625 runOutDir.ReplaceAll(".xml", "");
3627 out << Form("%s", fInputFiles->At(i)->GetName()) << " " << runOutDir << endl;
3629 out << Form("%s", fInputFiles->At(i)->GetName()) << " " << Form("%03d", i) << endl;
3632 Info("WriteProductionFile", "\n##### Copying production file <%s> to your work directory", filename);
3633 if (FileExists(filename)) gGrid->Rm(filename);
3634 TFile::Cp(Form("file:%s",filename), Form("alien://%s/%s", workdir.Data(),filename));
3638 //______________________________________________________________________________
3639 void AliAnalysisAlien::WriteValidationScript(Bool_t merge)
3641 // Generate the alien validation script.
3642 // Generate the validation script
3644 if (fValidationScript.IsNull()) {
3645 fValidationScript = fExecutable;
3646 fValidationScript.ReplaceAll(".sh", "_validation.sh");
3648 TString validationScript = fValidationScript;
3649 if (merge) validationScript.ReplaceAll(".sh", "_merge.sh");
3651 Error("WriteValidationScript", "Alien connection required");
3654 if (!fTerminateFiles.IsNull()) {
3655 fTerminateFiles.Strip();
3656 fTerminateFiles.ReplaceAll(" ",",");
3658 TString outStream = "";
3659 if (!TestBit(AliAnalysisGrid::kTest)) outStream = " >> stdout";
3660 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3662 out.open(validationScript, ios::out);
3663 out << "#!/bin/bash" << endl;
3664 out << "##################################################" << endl;
3665 out << "validateout=`dirname $0`" << endl;
3666 out << "validatetime=`date`" << endl;
3667 out << "validated=\"0\";" << endl;
3668 out << "error=0" << endl;
3669 out << "if [ -z $validateout ]" << endl;
3670 out << "then" << endl;
3671 out << " validateout=\".\"" << endl;
3672 out << "fi" << endl << endl;
3673 out << "cd $validateout;" << endl;
3674 out << "validateworkdir=`pwd`;" << endl << endl;
3675 out << "echo \"*******************************************************\"" << outStream << endl;
3676 out << "echo \"* Automatically generated validation script *\"" << outStream << endl;
3678 out << "echo \"* Time: $validatetime \"" << outStream << endl;
3679 out << "echo \"* Dir: $validateout\"" << outStream << endl;
3680 out << "echo \"* Workdir: $validateworkdir\"" << outStream << endl;
3681 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl;
3682 out << "ls -la ./" << outStream << endl;
3683 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl << endl;
3684 out << "##################################################" << endl;
3687 out << "if [ ! -f stderr ] ; then" << endl;
3688 out << " error=1" << endl;
3689 out << " echo \"* ########## Job not validated - no stderr ###\" " << outStream << endl;
3690 out << " echo \"Error = $error\" " << outStream << endl;
3691 out << "fi" << endl;
3693 out << "parArch=`grep -Ei \"Cannot Build the PAR Archive\" stderr`" << endl;
3694 out << "segViol=`grep -Ei \"Segmentation violation\" stderr`" << endl;
3695 out << "segFault=`grep -Ei \"Segmentation fault\" stderr`" << endl;
3696 out << "glibcErr=`grep -Ei \"*** glibc detected ***\" stderr`" << endl;
3699 out << "if [ \"$parArch\" != \"\" ] ; then" << endl;
3700 out << " error=1" << endl;
3701 out << " echo \"* ########## Job not validated - PAR archive not built ###\" " << outStream << endl;
3702 out << " echo \"$parArch\" " << outStream << endl;
3703 out << " echo \"Error = $error\" " << outStream << endl;
3704 out << "fi" << endl;
3706 out << "if [ \"$segViol\" != \"\" ] ; then" << endl;
3707 out << " error=1" << endl;
3708 out << " echo \"* ########## Job not validated - Segment. violation ###\" " << outStream << endl;
3709 out << " echo \"$segViol\" " << outStream << endl;
3710 out << " echo \"Error = $error\" " << outStream << endl;
3711 out << "fi" << endl;
3713 out << "if [ \"$segFault\" != \"\" ] ; then" << endl;
3714 out << " error=1" << endl;
3715 out << " echo \"* ########## Job not validated - Segment. fault ###\" " << outStream << endl;
3716 out << " echo \"$segFault\" " << outStream << endl;
3717 out << " echo \"Error = $error\" " << outStream << endl;
3718 out << "fi" << endl;
3720 out << "if [ \"$glibcErr\" != \"\" ] ; then" << endl;
3721 out << " error=1" << endl;
3722 out << " echo \"* ########## Job not validated - *** glibc detected *** ###\" " << outStream << endl;
3723 out << " echo \"$glibcErr\" " << outStream << endl;
3724 out << " echo \"Error = $error\" " << outStream << endl;
3725 out << "fi" << endl;
3727 // Part dedicated to the specific analyses running into the train
3729 TString outputFiles = fOutputFiles;
3730 if (merge && !fTerminateFiles.IsNull()) {
3732 outputFiles += fTerminateFiles;
3734 TObjArray *arr = outputFiles.Tokenize(",");
3737 while (!merge && (os=(TObjString*)next1())) {
3738 // No need to validate outputs produced by merging since the merging macro does this
3739 outputFile = os->GetString();
3740 Int_t index = outputFile.Index("@");
3741 if (index > 0) outputFile.Remove(index);
3742 if (fTerminateFiles.Contains(outputFile)) continue;
3743 if (outputFile.Contains("*")) continue;
3744 out << "if ! [ -f " << outputFile.Data() << " ] ; then" << endl;
3745 out << " error=1" << endl;
3746 out << " echo \"Output file " << outputFile << " not found. Job FAILED !\"" << outStream << endl;
3747 out << " echo \"Output file " << outputFile << " not found. Job FAILED !\" >> stderr" << endl;
3748 out << "fi" << endl;
3751 out << "if ! [ -f outputs_valid ] ; then" << endl;
3752 out << " error=1" << endl;
3753 out << " echo \"Output files were not validated by the analysis manager\" >> stdout" << endl;
3754 out << " echo \"Output files were not validated by the analysis manager\" >> stderr" << endl;
3755 out << "fi" << endl;
3757 out << "if [ $error = 0 ] ; then" << endl;
3758 out << " echo \"* ---------------- Job Validated ------------------*\"" << outStream << endl;
3759 if (!IsKeepLogs()) {
3760 out << " echo \"* === Logs std* will be deleted === \"" << endl;
3762 out << " rm -f std*" << endl;
3764 out << "fi" << endl;
3766 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl;
3767 out << "echo \"*******************************************************\"" << outStream << endl;
3768 out << "cd -" << endl;
3769 out << "exit $error" << endl;
3771 Bool_t copy = kTRUE;
3772 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3775 TString workdir = gGrid->GetHomeDirectory();
3776 workdir += fGridWorkingDir;
3777 Info("WriteValidationScript", "\n##### Copying validation script <%s> to your AliEn working space", validationScript.Data());
3778 if (FileExists(validationScript)) gGrid->Rm(validationScript);
3779 TFile::Cp(Form("file:%s",validationScript.Data()), Form("alien://%s/%s", workdir.Data(),validationScript.Data()));