1 /**************************************************************************
2 * Copyright(c) 1998-2007, ALICE Experiment at CERN, All rights reserved. *
4 * Author: The ALICE Off-line Project. *
5 * Contributors are mentioned in the code where appropriate. *
7 * Permission to use, copy, modify and distribute this software and its *
8 * documentation strictly for non-commercial purposes is hereby granted *
9 * without fee, provided that the above copyright notice appears in all *
10 * copies and that both the copyright notice and this permission notice *
11 * appear in the supporting documentation. The authors make no claims *
12 * about the suitability of this software for any purpose. It is *
13 * provided "as is" without express or implied warranty. *
14 **************************************************************************/
16 // Author: Mihaela Gheata, 01/09/2008
18 //==============================================================================
19 // AliAnalysisAlien - AliEn utility class. Provides interface for creating
20 // a personalized JDL, finding and creating a dataset.
21 //==============================================================================
23 #include "Riostream.h"
30 #include "TFileCollection.h"
32 #include "TObjString.h"
33 #include "TObjArray.h"
35 #include "TGridResult.h"
36 #include "TGridCollection.h"
38 #include "TGridJobStatusList.h"
39 #include "TGridJobStatus.h"
40 #include "TFileMerger.h"
41 #include "AliAnalysisManager.h"
42 #include "AliVEventHandler.h"
43 #include "AliAnalysisDataContainer.h"
44 #include "AliAnalysisAlien.h"
46 ClassImp(AliAnalysisAlien)
48 //______________________________________________________________________________
49 AliAnalysisAlien::AliAnalysisAlien()
55 fSplitMaxInputFileNumber(0),
57 fMasterResubmitThreshold(0),
69 fNproofWorkersPerSlave(0),
79 fAdditionalRootLibs(),
107 fRootVersionForProof(),
116 //______________________________________________________________________________
117 AliAnalysisAlien::AliAnalysisAlien(const char *name)
118 :AliAnalysisGrid(name),
123 fSplitMaxInputFileNumber(0),
125 fMasterResubmitThreshold(0),
137 fNproofWorkersPerSlave(0),
141 fExecutableCommand(),
147 fAdditionalRootLibs(),
175 fRootVersionForProof(),
184 //______________________________________________________________________________
185 AliAnalysisAlien::AliAnalysisAlien(const AliAnalysisAlien& other)
186 :AliAnalysisGrid(other),
189 fPrice(other.fPrice),
191 fSplitMaxInputFileNumber(other.fSplitMaxInputFileNumber),
192 fMaxInitFailed(other.fMaxInitFailed),
193 fMasterResubmitThreshold(other.fMasterResubmitThreshold),
194 fNtestFiles(other.fNtestFiles),
195 fNrunsPerMaster(other.fNrunsPerMaster),
196 fMaxMergeFiles(other.fMaxMergeFiles),
197 fNsubmitted(other.fNsubmitted),
198 fProductionMode(other.fProductionMode),
199 fOutputToRunNo(other.fOutputToRunNo),
200 fMergeViaJDL(other.fMergeViaJDL),
201 fFastReadOption(other.fFastReadOption),
202 fOverwriteMode(other.fOverwriteMode),
203 fNreplicas(other.fNreplicas),
204 fNproofWorkers(other.fNproofWorkers),
205 fNproofWorkersPerSlave(other.fNproofWorkersPerSlave),
206 fProofReset(other.fProofReset),
207 fRunNumbers(other.fRunNumbers),
208 fExecutable(other.fExecutable),
209 fExecutableCommand(other.fExecutableCommand),
210 fArguments(other.fArguments),
211 fExecutableArgs(other.fExecutableArgs),
212 fAnalysisMacro(other.fAnalysisMacro),
213 fAnalysisSource(other.fAnalysisSource),
214 fValidationScript(other.fValidationScript),
215 fAdditionalRootLibs(other.fAdditionalRootLibs),
216 fAdditionalLibs(other.fAdditionalLibs),
217 fSplitMode(other.fSplitMode),
218 fAPIVersion(other.fAPIVersion),
219 fROOTVersion(other.fROOTVersion),
220 fAliROOTVersion(other.fAliROOTVersion),
221 fExternalPackages(other.fExternalPackages),
223 fGridWorkingDir(other.fGridWorkingDir),
224 fGridDataDir(other.fGridDataDir),
225 fDataPattern(other.fDataPattern),
226 fGridOutputDir(other.fGridOutputDir),
227 fOutputArchive(other.fOutputArchive),
228 fOutputFiles(other.fOutputFiles),
229 fInputFormat(other.fInputFormat),
230 fDatasetName(other.fDatasetName),
231 fJDLName(other.fJDLName),
232 fTerminateFiles(other.fTerminateFiles),
233 fMergeExcludes(other.fMergeExcludes),
234 fIncludePath(other.fIncludePath),
235 fCloseSE(other.fCloseSE),
236 fFriendChainName(other.fFriendChainName),
237 fJobTag(other.fJobTag),
238 fOutputSingle(other.fOutputSingle),
239 fRunPrefix(other.fRunPrefix),
240 fProofCluster(other.fProofCluster),
241 fProofDataSet(other.fProofDataSet),
242 fFileForTestMode(other.fFileForTestMode),
243 fRootVersionForProof(other.fRootVersionForProof),
244 fAliRootMode(other.fAliRootMode),
249 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
250 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
251 fRunRange[0] = other.fRunRange[0];
252 fRunRange[1] = other.fRunRange[1];
253 if (other.fInputFiles) {
254 fInputFiles = new TObjArray();
255 TIter next(other.fInputFiles);
257 while ((obj=next())) fInputFiles->Add(new TObjString(obj->GetName()));
258 fInputFiles->SetOwner();
260 if (other.fPackages) {
261 fPackages = new TObjArray();
262 TIter next(other.fPackages);
264 while ((obj=next())) fPackages->Add(new TObjString(obj->GetName()));
265 fPackages->SetOwner();
269 //______________________________________________________________________________
270 AliAnalysisAlien::~AliAnalysisAlien()
273 if (fGridJDL) delete fGridJDL;
274 if (fMergingJDL) delete fMergingJDL;
275 if (fInputFiles) delete fInputFiles;
276 if (fPackages) delete fPackages;
279 //______________________________________________________________________________
280 AliAnalysisAlien &AliAnalysisAlien::operator=(const AliAnalysisAlien& other)
283 if (this != &other) {
284 AliAnalysisGrid::operator=(other);
285 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
286 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
287 fPrice = other.fPrice;
289 fSplitMaxInputFileNumber = other.fSplitMaxInputFileNumber;
290 fMaxInitFailed = other.fMaxInitFailed;
291 fMasterResubmitThreshold = other.fMasterResubmitThreshold;
292 fNtestFiles = other.fNtestFiles;
293 fNrunsPerMaster = other.fNrunsPerMaster;
294 fMaxMergeFiles = other.fMaxMergeFiles;
295 fNsubmitted = other.fNsubmitted;
296 fProductionMode = other.fProductionMode;
297 fOutputToRunNo = other.fOutputToRunNo;
298 fMergeViaJDL = other.fMergeViaJDL;
299 fFastReadOption = other.fFastReadOption;
300 fOverwriteMode = other.fOverwriteMode;
301 fNreplicas = other.fNreplicas;
302 fNproofWorkers = other.fNproofWorkers;
303 fNproofWorkersPerSlave = other.fNproofWorkersPerSlave;
304 fProofReset = other.fProofReset;
305 fRunNumbers = other.fRunNumbers;
306 fExecutable = other.fExecutable;
307 fExecutableCommand = other.fExecutableCommand;
308 fArguments = other.fArguments;
309 fExecutableArgs = other.fExecutableArgs;
310 fAnalysisMacro = other.fAnalysisMacro;
311 fAnalysisSource = other.fAnalysisSource;
312 fValidationScript = other.fValidationScript;
313 fAdditionalRootLibs = other.fAdditionalRootLibs;
314 fAdditionalLibs = other.fAdditionalLibs;
315 fSplitMode = other.fSplitMode;
316 fAPIVersion = other.fAPIVersion;
317 fROOTVersion = other.fROOTVersion;
318 fAliROOTVersion = other.fAliROOTVersion;
319 fExternalPackages = other.fExternalPackages;
321 fGridWorkingDir = other.fGridWorkingDir;
322 fGridDataDir = other.fGridDataDir;
323 fDataPattern = other.fDataPattern;
324 fGridOutputDir = other.fGridOutputDir;
325 fOutputArchive = other.fOutputArchive;
326 fOutputFiles = other.fOutputFiles;
327 fInputFormat = other.fInputFormat;
328 fDatasetName = other.fDatasetName;
329 fJDLName = other.fJDLName;
330 fTerminateFiles = other.fTerminateFiles;
331 fMergeExcludes = other.fMergeExcludes;
332 fIncludePath = other.fIncludePath;
333 fCloseSE = other.fCloseSE;
334 fFriendChainName = other.fFriendChainName;
335 fJobTag = other.fJobTag;
336 fOutputSingle = other.fOutputSingle;
337 fRunPrefix = other.fRunPrefix;
338 fProofCluster = other.fProofCluster;
339 fProofDataSet = other.fProofDataSet;
340 fFileForTestMode = other.fFileForTestMode;
341 fRootVersionForProof = other.fRootVersionForProof;
342 fAliRootMode = other.fAliRootMode;
343 if (other.fInputFiles) {
344 fInputFiles = new TObjArray();
345 TIter next(other.fInputFiles);
347 while ((obj=next())) fInputFiles->Add(new TObjString(obj->GetName()));
348 fInputFiles->SetOwner();
350 if (other.fPackages) {
351 fPackages = new TObjArray();
352 TIter next(other.fPackages);
354 while ((obj=next())) fPackages->Add(new TObjString(obj->GetName()));
355 fPackages->SetOwner();
361 //______________________________________________________________________________
362 void AliAnalysisAlien::AddIncludePath(const char *path)
364 // Add include path in the remote analysis macro.
366 if (p.Contains("-I")) fIncludePath += Form("%s ", path);
367 else fIncludePath += Form("-I%s ", path);
370 //______________________________________________________________________________
371 void AliAnalysisAlien::AddRunNumber(Int_t run)
373 // Add a run number to the list of runs to be processed.
374 if (fRunNumbers.Length()) fRunNumbers += " ";
375 fRunNumbers += Form("%s%d", fRunPrefix.Data(), run);
378 //______________________________________________________________________________
379 void AliAnalysisAlien::AddRunNumber(const char* run)
381 // Add a run number to the list of runs to be processed.
382 if (fRunNumbers.Length()) fRunNumbers += " ";
386 //______________________________________________________________________________
387 void AliAnalysisAlien::AddDataFile(const char *lfn)
389 // Adds a data file to the input to be analysed. The file should be a valid LFN
390 // or point to an existing file in the alien workdir.
391 if (!fInputFiles) fInputFiles = new TObjArray();
392 fInputFiles->Add(new TObjString(lfn));
395 //______________________________________________________________________________
396 void AliAnalysisAlien::AddExternalPackage(const char *package)
398 // Adds external packages w.r.t to the default ones (root,aliroot and gapi)
399 if (fExternalPackages) fExternalPackages += " ";
400 fExternalPackages += package;
403 //______________________________________________________________________________
404 Bool_t AliAnalysisAlien::Connect()
406 // Try to connect to AliEn. User needs a valid token and /tmp/gclient_env_$UID sourced.
407 if (gGrid && gGrid->IsConnected()) return kTRUE;
408 if (fProductionMode) return kTRUE;
410 Info("Connect", "Trying to connect to AliEn ...");
411 TGrid::Connect("alien://");
413 if (!gGrid || !gGrid->IsConnected()) {
414 Error("Connect", "Did not managed to connect to AliEn. Make sure you have a valid token.");
417 fUser = gGrid->GetUser();
418 Info("Connect", "\n##### Connected to AliEn as user %s. Setting analysis user to <%s>", fUser.Data(), fUser.Data());
422 //______________________________________________________________________________
423 void AliAnalysisAlien::CdWork()
425 // Check validity of alien workspace. Create directory if possible.
427 Error("CdWork", "Alien connection required");
430 TString homedir = gGrid->GetHomeDirectory();
431 TString workdir = homedir + fGridWorkingDir;
432 if (DirectoryExists(workdir)) {
436 // Work directory not existing - create it
438 if (gGrid->Mkdir(workdir, "-p")) {
439 gGrid->Cd(fGridWorkingDir);
440 Info("CdWork", "\n##### Created alien working directory %s", fGridWorkingDir.Data());
442 Warning("CdWork", "Working directory %s cannot be created.\n Using %s instead.",
443 workdir.Data(), homedir.Data());
444 fGridWorkingDir = "";
448 //______________________________________________________________________________
449 Bool_t AliAnalysisAlien::CheckFileCopy(const char *alienpath)
451 // Check if file copying is possible.
452 if (fProductionMode) return kTRUE;
454 Error("CheckFileCopy", "Not connected to AliEn. File copying cannot be tested.");
457 Info("CheckFileCopy", "Checking possibility to copy files to your AliEn home directory... \
458 \n +++ NOTE: You can disable this via: plugin->SetCheckCopy(kFALSE);");
459 // Check if alien_CLOSE_SE is defined
460 TString closeSE = gSystem->Getenv("alien_CLOSE_SE");
461 if (!closeSE.IsNull()) {
462 Info("CheckFileCopy", "Your current close storage is pointing to: \
463 \n alien_CLOSE_SE = \"%s\"", closeSE.Data());
465 Warning("CheckFileCopy", "Your current close storage is empty ! Depending on your location, file copying may fail.");
467 // Check if grid directory exists.
468 if (!DirectoryExists(alienpath)) {
469 Error("CheckFileCopy", "Alien path %s does not seem to exist", alienpath);
472 TFile f("plugin_test_copy", "RECREATE");
473 // User may not have write permissions to current directory
475 Error("CheckFileCopy", "Cannot create local test file. Do you have write access to current directory: <%s> ?",
476 gSystem->WorkingDirectory());
480 if (FileExists(Form("alien://%s/%s",alienpath, f.GetName()))) gGrid->Rm(Form("alien://%s/%s",alienpath, f.GetName()));
481 if (!TFile::Cp(f.GetName(), Form("alien://%s/%s",alienpath, f.GetName()))) {
482 Error("CheckFileCopy", "Cannot copy files to Alien destination: <%s> This may be temporary, or: \
483 \n# 1. Make sure you have write permissions there. If this is the case: \
484 \n# 2. Check the storage availability at: http://alimonitor.cern.ch/stats?page=SE/table \
485 \n# Do: export alien_CLOSE_SE=\"working_disk_SE\" \
486 \n# To make this permanent put in in your .bashrc (in .alienshrc is not enough) \
487 \n# Redo token: rm /tmp/x509up_u$UID then: alien-token-init <username>", alienpath);
488 gSystem->Unlink(f.GetName());
491 gSystem->Unlink(f.GetName());
492 gGrid->Rm(Form("%s%s",alienpath,f.GetName()));
493 Info("CheckFileCopy", "### ...SUCCESS ###");
497 //______________________________________________________________________________
498 Bool_t AliAnalysisAlien::CheckInputData()
500 // Check validity of input data. If necessary, create xml files.
501 if (fProductionMode) return kTRUE;
502 if (!fInputFiles && !fRunNumbers.Length() && !fRunRange[0]) {
503 if (!fGridDataDir.Length()) {
504 Error("CkeckInputData", "AliEn path to base data directory must be set.\n = Use: SetGridDataDir()");
507 Info("CheckInputData", "Analysis will make a single xml for base data directory %s",fGridDataDir.Data());
508 if (fDataPattern.Contains("tag") && TestBit(AliAnalysisGrid::kTest))
509 TObject::SetBit(AliAnalysisGrid::kUseTags, kTRUE); // ADDED (fix problem in determining the tag usage in test mode)
512 // Process declared files
513 Bool_t isCollection = kFALSE;
514 Bool_t isXml = kFALSE;
515 Bool_t useTags = kFALSE;
516 Bool_t checked = kFALSE;
517 if (!TestBit(AliAnalysisGrid::kTest)) CdWork();
519 TString workdir = gGrid->GetHomeDirectory();
520 workdir += fGridWorkingDir;
523 TIter next(fInputFiles);
524 while ((objstr=(TObjString*)next())) {
527 file += objstr->GetString();
528 // Store full lfn path
529 if (FileExists(file)) objstr->SetString(file);
531 file = objstr->GetName();
532 if (!FileExists(objstr->GetName())) {
533 Error("CheckInputData", "Data file %s not found or not in your working dir: %s",
534 objstr->GetName(), workdir.Data());
538 Bool_t iscoll, isxml, usetags;
539 CheckDataType(file, iscoll, isxml, usetags);
542 isCollection = iscoll;
545 TObject::SetBit(AliAnalysisGrid::kUseTags, useTags);
547 if ((iscoll != isCollection) || (isxml != isXml) || (usetags != useTags)) {
548 Error("CheckInputData", "Some conflict was found in the types of inputs");
554 // Process requested run numbers
555 if (!fRunNumbers.Length() && !fRunRange[0]) return kTRUE;
556 // Check validity of alien data directory
557 if (!fGridDataDir.Length()) {
558 Error("CkeckInputData", "AliEn path to base data directory must be set.\n = Use: SetGridDataDir()");
561 if (!DirectoryExists(fGridDataDir)) {
562 Error("CheckInputData", "Data directory %s not existing.", fGridDataDir.Data());
566 Error("CheckInputData", "You are using raw AliEn collections as input. Cannot process run numbers.");
570 if (checked && !isXml) {
571 Error("CheckInputData", "Cannot mix processing of full runs with non-xml files");
574 // Check validity of run number(s)
578 TString schunk, schunk2;
582 useTags = fDataPattern.Contains("tag");
583 TObject::SetBit(AliAnalysisGrid::kUseTags, useTags);
585 if (useTags != fDataPattern.Contains("tag")) {
586 Error("CheckInputData", "Cannot mix input files using/not using tags");
589 if (fRunNumbers.Length()) {
590 Info("CheckDataType", "Using supplied run numbers (run ranges are ignored)");
591 arr = fRunNumbers.Tokenize(" ");
593 while ((os=(TObjString*)next())) {
594 path = Form("%s/%s ", fGridDataDir.Data(), os->GetString().Data());
595 if (!DirectoryExists(path)) {
596 Warning("CheckInputData", "Run number %s not found in path: <%s>", os->GetString().Data(), path.Data());
599 path = Form("%s/%s.xml", workdir.Data(),os->GetString().Data());
600 TString msg = "\n##### file: ";
602 msg += " type: xml_collection;";
603 if (useTags) msg += " using_tags: Yes";
604 else msg += " using_tags: No";
605 Info("CheckDataType", "%s", msg.Data());
606 if (fNrunsPerMaster<2) {
607 AddDataFile(Form("%s.xml", os->GetString().Data()));
610 if (((nruns-1)%fNrunsPerMaster) == 0) {
611 schunk = os->GetString();
613 if ((nruns%fNrunsPerMaster)!=0 && os!=arr->Last()) continue;
614 schunk += Form("_%s.xml", os->GetString().Data());
620 Info("CheckDataType", "Using run range [%d, %d]", fRunRange[0], fRunRange[1]);
621 for (Int_t irun=fRunRange[0]; irun<=fRunRange[1]; irun++) {
622 path = Form("%s/%s%d ", fGridDataDir.Data(), fRunPrefix.Data(), irun);
623 if (!DirectoryExists(path)) {
624 // Warning("CheckInputData", "Run number %d not found in path: <%s>", irun, path.Data());
627 path = Form("%s/%s%d.xml", workdir.Data(),fRunPrefix.Data(),irun);
628 TString msg = "\n##### file: ";
630 msg += " type: xml_collection;";
631 if (useTags) msg += " using_tags: Yes";
632 else msg += " using_tags: No";
633 Info("CheckDataType", "%s", msg.Data());
634 if (fNrunsPerMaster<2) {
635 AddDataFile(Form("%s%d.xml",fRunPrefix.Data(),irun));
638 if (((nruns-1)%fNrunsPerMaster) == 0) {
639 schunk = Form("%s%d", fRunPrefix.Data(),irun);
641 schunk2 = Form("_%s%d.xml", fRunPrefix.Data(), irun);
642 if ((nruns%fNrunsPerMaster)!=0 && irun != fRunRange[1]) continue;
655 //______________________________________________________________________________
656 Bool_t AliAnalysisAlien::CreateDataset(const char *pattern)
658 // Create dataset for the grid data directory + run number.
659 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline)) return kTRUE;
661 Error("CreateDataset", "Cannot create dataset with no grid connection");
666 if (!TestBit(AliAnalysisGrid::kTest)) CdWork();
667 TString workdir = gGrid->GetHomeDirectory();
668 workdir += fGridWorkingDir;
670 // Compose the 'find' command arguments
672 TString options = "-x collection ";
673 if (TestBit(AliAnalysisGrid::kTest)) options += Form("-l %d ", fNtestFiles);
674 TString conditions = "";
679 TString schunk, schunk2;
680 TGridCollection *cbase=0, *cadd=0;
681 if (!fRunNumbers.Length() && !fRunRange[0]) {
682 if (fInputFiles && fInputFiles->GetEntries()) return kTRUE;
683 // Make a single data collection from data directory.
685 if (!DirectoryExists(path)) {
686 Error("CreateDataset", "Path to data directory %s not valid",fGridDataDir.Data());
690 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
691 else file = Form("%s.xml", gSystem->BaseName(path));
692 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
698 command += conditions;
699 printf("command: %s\n", command.Data());
700 TGridResult *res = gGrid->Command(command);
702 // Write standard output to file
703 gROOT->ProcessLine(Form("gGrid->Stdout(); > %s", file.Data()));
704 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
705 Bool_t nullFile = kFALSE;
707 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
709 nullFile = (gSystem->Exec(Form("grep /event %s 2>/dev/null > /dev/null",file.Data()))==0)?kFALSE:kTRUE;
711 Error("CreateDataset","Dataset %s produced by the previous find command is empty !", file.Data());
716 Bool_t fileExists = FileExists(file);
717 if (!TestBit(AliAnalysisGrid::kTest) && (!fileExists || fOverwriteMode)) {
718 // Copy xml file to alien space
719 if (fileExists) gGrid->Rm(file);
720 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
721 if (!FileExists(file)) {
722 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
725 // Update list of files to be processed.
727 AddDataFile(Form("%s/%s", workdir.Data(), file.Data()));
731 Bool_t nullResult = kTRUE;
732 if (fRunNumbers.Length()) {
733 TObjArray *arr = fRunNumbers.Tokenize(" ");
736 while ((os=(TObjString*)next())) {
737 path = Form("%s/%s ", fGridDataDir.Data(), os->GetString().Data());
738 if (!DirectoryExists(path)) continue;
740 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
741 else file = Form("%s.xml", os->GetString().Data());
742 // If local collection file does not exist, create it via 'find' command.
743 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
748 command += conditions;
749 TGridResult *res = gGrid->Command(command);
751 // Write standard output to file
752 gROOT->ProcessLine(Form("gGrid->Stdout(); > %s", file.Data()));
753 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
754 Bool_t nullFile = kFALSE;
756 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
758 nullFile = (gSystem->Exec(Form("grep /event %s 2>/dev/null > /dev/null",file.Data()))==0)?kFALSE:kTRUE;
760 Warning("CreateDataset","Dataset %s produced by: <%s> is empty !", file.Data(), command.Data());
761 fRunNumbers.ReplaceAll(os->GetString().Data(), "");
767 if (TestBit(AliAnalysisGrid::kTest)) break;
768 // Check if there is one run per master job.
769 if (fNrunsPerMaster<2) {
770 if (FileExists(file)) {
771 if (fOverwriteMode) gGrid->Rm(file);
773 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
777 // Copy xml file to alien space
778 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
779 if (!FileExists(file)) {
780 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
786 if (((nruns-1)%fNrunsPerMaster) == 0) {
787 schunk = os->GetString();
788 cbase = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
790 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
791 printf(" Merging collection <%s> into masterjob input...\n", file.Data());
795 if ((nruns%fNrunsPerMaster)!=0 && os!=arr->Last()) {
798 schunk += Form("_%s.xml", os->GetString().Data());
799 if (FileExists(schunk)) {
800 if (fOverwriteMode) gGrid->Rm(file);
802 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", schunk.Data());
806 printf("Exporting merged collection <%s> and copying to AliEn\n", schunk.Data());
807 cbase->ExportXML(Form("file://%s", schunk.Data()),kFALSE,kFALSE, schunk, "Merged runs");
808 TFile::Cp(Form("file:%s",schunk.Data()), Form("alien://%s/%s",workdir.Data(), schunk.Data()));
809 if (!FileExists(schunk)) {
810 Error("CreateDataset", "Copy command did NOT succeed for %s", schunk.Data());
818 Error("CreateDataset", "No valid dataset corresponding to the query!");
822 // Process a full run range.
823 for (Int_t irun=fRunRange[0]; irun<=fRunRange[1]; irun++) {
824 path = Form("%s/%s%d ", fGridDataDir.Data(), fRunPrefix.Data(), irun);
825 if (!DirectoryExists(path)) continue;
827 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
828 else file = Form("%s%d.xml", fRunPrefix.Data(), irun);
829 if (FileExists(file) && fNrunsPerMaster<2 && !TestBit(AliAnalysisGrid::kTest)) {
830 if (fOverwriteMode) gGrid->Rm(file);
832 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
836 // If local collection file does not exist, create it via 'find' command.
837 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
842 command += conditions;
843 TGridResult *res = gGrid->Command(command);
845 // Write standard output to file
846 gROOT->ProcessLine(Form("gGrid->Stdout(); > %s", file.Data()));
847 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
848 Bool_t nullFile = kFALSE;
850 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
852 nullFile = (gSystem->Exec(Form("grep /event %s 2>/dev/null > /dev/null",file.Data()))==0)?kFALSE:kTRUE;
854 Warning("CreateDataset","Dataset %s produced by: <%s> is empty !", file.Data(), command.Data());
860 if (TestBit(AliAnalysisGrid::kTest)) break;
861 // Check if there is one run per master job.
862 if (fNrunsPerMaster<2) {
863 if (FileExists(file)) {
864 if (fOverwriteMode) gGrid->Rm(file);
866 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
870 // Copy xml file to alien space
871 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
872 if (!FileExists(file)) {
873 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
878 // Check if the collection for the chunk exist locally.
879 Int_t nchunk = (nruns-1)/fNrunsPerMaster;
880 if (FileExists(fInputFiles->At(nchunk)->GetName())) {
881 if (fOverwriteMode) gGrid->Rm(fInputFiles->At(nchunk)->GetName());
884 printf(" Merging collection <%s> into %d runs chunk...\n",file.Data(),fNrunsPerMaster);
885 if (((nruns-1)%fNrunsPerMaster) == 0) {
886 schunk = Form("%s%d", fRunPrefix.Data(), irun);
887 cbase = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
889 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
893 schunk2 = Form("%s_%s%d.xml", schunk.Data(), fRunPrefix.Data(), irun);
894 if ((nruns%fNrunsPerMaster)!=0 && irun!=fRunRange[1] && schunk2 != fInputFiles->Last()->GetName()) {
898 if (FileExists(schunk)) {
899 if (fOverwriteMode) gGrid->Rm(schunk);
901 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", schunk.Data());
905 printf("Exporting merged collection <%s> and copying to AliEn.\n", schunk.Data());
906 cbase->ExportXML(Form("file://%s", schunk.Data()),kFALSE,kFALSE, schunk, "Merged runs");
907 if (FileExists(schunk)) {
908 if (fOverwriteMode) gGrid->Rm(schunk);
910 Info("CreateDataset", "\n##### Dataset %s exist. Skipping copy...", schunk.Data());
914 TFile::Cp(Form("file:%s",schunk.Data()), Form("alien://%s/%s",workdir.Data(), schunk.Data()));
915 if (!FileExists(schunk)) {
916 Error("CreateDataset", "Copy command did NOT succeed for %s", schunk.Data());
922 Error("CreateDataset", "No valid dataset corresponding to the query!");
929 //______________________________________________________________________________
930 Bool_t AliAnalysisAlien::CreateJDL()
932 // Generate a JDL file according to current settings. The name of the file is
933 // specified by fJDLName.
934 Bool_t error = kFALSE;
937 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
938 Bool_t generate = kTRUE;
939 if (TestBit(AliAnalysisGrid::kTest) || TestBit(AliAnalysisGrid::kSubmit)) generate = kFALSE;
941 Error("CreateJDL", "Alien connection required");
944 // Check validity of alien workspace
946 if (!fProductionMode && !fGridWorkingDir.BeginsWith("/alice")) workdir = gGrid->GetHomeDirectory();
947 if (!fProductionMode && !TestBit(AliAnalysisGrid::kTest)) CdWork();
948 workdir += fGridWorkingDir;
952 Error("CreateJDL()", "Define some input files for your analysis.");
955 // Compose list of input files
956 // Check if output files were defined
957 if (!fOutputFiles.Length()) {
958 Error("CreateJDL", "You must define at least one output file");
961 // Check if an output directory was defined and valid
962 if (!fGridOutputDir.Length()) {
963 Error("CreateJDL", "You must define AliEn output directory");
966 if (!fProductionMode) {
967 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s", workdir.Data(), fGridOutputDir.Data());
968 if (!DirectoryExists(fGridOutputDir)) {
969 if (gGrid->Mkdir(fGridOutputDir,"-p")) {
970 Info("CreateJDL", "\n##### Created alien output directory %s", fGridOutputDir.Data());
972 Error("CreateJDL", "Could not create alien output directory %s", fGridOutputDir.Data());
979 // Exit if any error up to now
980 if (error) return kFALSE;
982 if (!fUser.IsNull()) {
983 fGridJDL->SetValue("User", Form("\"%s\"", fUser.Data()));
984 fMergingJDL->SetValue("User", Form("\"%s\"", fUser.Data()));
986 fGridJDL->SetExecutable(fExecutable, "This is the startup script");
987 TString mergeExec = fExecutable;
988 mergeExec.ReplaceAll(".sh", "_merge.sh");
989 fMergingJDL->SetExecutable(mergeExec, "This is the startup script");
990 mergeExec.ReplaceAll(".sh", ".C");
991 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),mergeExec.Data()), "List of input files to be uploaded to workers");
992 if (!fArguments.IsNull())
993 fGridJDL->SetArguments(fArguments, "Arguments for the executable command");
994 if (IsOneStageMerging()) fMergingJDL->SetArguments(fGridOutputDir);
995 else fMergingJDL->SetArguments("$1 $2 $3");
996 fGridJDL->SetValue("TTL", Form("\"%d\"",fTTL));
997 fGridJDL->SetDescription("TTL", Form("Time after which the job is killed (%d min.)", fTTL/60));
998 fMergingJDL->SetValue("TTL", Form("\"%d\"",fTTL));
999 fMergingJDL->SetDescription("TTL", Form("Time after which the job is killed (%d min.)", fTTL/60));
1001 if (fMaxInitFailed > 0) {
1002 fGridJDL->SetValue("MaxInitFailed", Form("\"%d\"",fMaxInitFailed));
1003 fGridJDL->SetDescription("MaxInitFailed", "Maximum number of first failing jobs to abort the master job");
1005 if (fSplitMaxInputFileNumber > 0) {
1006 fGridJDL->SetValue("SplitMaxInputFileNumber", Form("\"%d\"", fSplitMaxInputFileNumber));
1007 fGridJDL->SetDescription("SplitMaxInputFileNumber", "Maximum number of input files to be processed per subjob");
1009 if (fSplitMode.Length()) {
1010 fGridJDL->SetValue("Split", Form("\"%s\"", fSplitMode.Data()));
1011 fGridJDL->SetDescription("Split", "We split per SE or file");
1013 if (!fAliROOTVersion.IsNull()) {
1014 fGridJDL->AddToPackages("AliRoot", fAliROOTVersion,"VO_ALICE", "List of requested packages");
1015 fMergingJDL->AddToPackages("AliRoot", fAliROOTVersion, "VO_ALICE", "List of requested packages");
1017 if (!fROOTVersion.IsNull()) {
1018 fGridJDL->AddToPackages("ROOT", fROOTVersion);
1019 fMergingJDL->AddToPackages("ROOT", fROOTVersion);
1021 if (!fAPIVersion.IsNull()) {
1022 fGridJDL->AddToPackages("APISCONFIG", fAPIVersion);
1023 fMergingJDL->AddToPackages("APISCONFIG", fAPIVersion);
1025 if (!fExternalPackages.IsNull()) {
1026 arr = fExternalPackages.Tokenize(" ");
1028 while ((os=(TObjString*)next())) {
1029 TString pkgname = os->GetString();
1030 Int_t index = pkgname.Index("::");
1031 TString pkgversion = pkgname(index+2, pkgname.Length());
1032 pkgname.Remove(index);
1033 fGridJDL->AddToPackages(pkgname, pkgversion);
1034 fMergingJDL->AddToPackages(pkgname, pkgversion);
1038 fGridJDL->SetInputDataListFormat(fInputFormat, "Format of input data");
1039 fGridJDL->SetInputDataList("wn.xml", "Collection name to be processed on each worker node");
1040 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), fAnalysisMacro.Data()), "List of input files to be uploaded to workers");
1041 TString analysisFile = fExecutable;
1042 analysisFile.ReplaceAll(".sh", ".root");
1043 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),analysisFile.Data()));
1044 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),analysisFile.Data()));
1045 if (IsUsingTags() && !gSystem->AccessPathName("ConfigureCuts.C"))
1046 fGridJDL->AddToInputSandbox(Form("LF:%s/ConfigureCuts.C", workdir.Data()));
1047 if (fAdditionalLibs.Length()) {
1048 arr = fAdditionalLibs.Tokenize(" ");
1050 while ((os=(TObjString*)next())) {
1051 if (os->GetString().Contains(".so")) continue;
1052 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), os->GetString().Data()));
1053 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), os->GetString().Data()));
1058 TIter next(fPackages);
1060 while ((obj=next())) {
1061 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), obj->GetName()));
1062 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), obj->GetName()));
1065 if (fOutputArchive.Length()) {
1066 arr = fOutputArchive.Tokenize(" ");
1068 Bool_t first = kTRUE;
1069 const char *comment = "Files to be archived";
1070 const char *comment1 = comment;
1071 while ((os=(TObjString*)next())) {
1072 if (!first) comment = NULL;
1073 if (!os->GetString().Contains("@") && fCloseSE.Length())
1074 fGridJDL->AddToOutputArchive(Form("%s@%s",os->GetString().Data(), fCloseSE.Data()), comment);
1076 fGridJDL->AddToOutputArchive(os->GetString(), comment);
1080 TString outputArchive = fOutputArchive;
1081 if (!fMergeExcludes.IsNull()) {
1082 arr = fMergeExcludes.Tokenize(" ");
1084 while ((os=(TObjString*)next1())) {
1085 outputArchive.ReplaceAll(Form("%s,",os->GetString().Data()),"");
1086 outputArchive.ReplaceAll(os->GetString(),"");
1090 if (!fTerminateFiles.IsNull()) {
1091 fTerminateFiles.Strip();
1092 fTerminateFiles.ReplaceAll(" ", ",");
1093 outputArchive.ReplaceAll("root_archive.zip:", Form("root_archive.zip:%s,", fTerminateFiles.Data()));
1095 arr = outputArchive.Tokenize(" ");
1099 while ((os=(TObjString*)next2())) {
1100 if (!first) comment = NULL;
1101 TString currentfile = os->GetString();
1102 currentfile.ReplaceAll(".root", "*.root");
1103 if (!IsOneStageMerging()) currentfile.ReplaceAll(".zip", "-Stage$2_$3.zip");
1104 if (!currentfile.Contains("@") && fCloseSE.Length())
1105 fMergingJDL->AddToOutputArchive(Form("%s@%s",currentfile.Data(), fCloseSE.Data()), comment);
1107 fMergingJDL->AddToOutputArchive(currentfile, comment);
1112 arr = fOutputFiles.Tokenize(",");
1114 Bool_t first = kTRUE;
1115 const char *comment = "Files to be archived";
1116 const char *comment1 = comment;
1117 while ((os=(TObjString*)next())) {
1118 // Ignore ouputs in jdl that are also in outputarchive
1119 TString sout = os->GetString();
1120 if (sout.Index("@")>0) sout.Remove(sout.Index("@"));
1121 if (fOutputArchive.Contains(sout)) continue;
1122 if (!first) comment = NULL;
1123 if (!os->GetString().Contains("@") && fCloseSE.Length())
1124 fGridJDL->AddToOutputSandbox(Form("%s@%s",os->GetString().Data(), fCloseSE.Data()), comment);
1126 fGridJDL->AddToOutputSandbox(os->GetString(), comment);
1130 if (fOutputFiles.Length()) {
1131 TString outputFiles = fOutputFiles;
1132 if (!fMergeExcludes.IsNull()) {
1133 arr = fMergeExcludes.Tokenize(" ");
1135 while ((os=(TObjString*)next1())) {
1136 outputFiles.ReplaceAll(Form("%s,",os->GetString().Data()),"");
1137 outputFiles.ReplaceAll(os->GetString(),"");
1141 arr = outputFiles.Tokenize(" ");
1145 while ((os=(TObjString*)next2())) {
1146 // Ignore ouputs in jdl that are also in outputarchive
1147 TString sout = os->GetString();
1148 if (sout.Index("@")>0) sout.Remove(sout.Index("@"));
1149 if (fOutputArchive.Contains(sout)) continue;
1150 if (!first) comment = NULL;
1151 if (!os->GetString().Contains("@") && fCloseSE.Length())
1152 fMergingJDL->AddToOutputSandbox(Form("%s@%s",os->GetString().Data(), fCloseSE.Data()), comment);
1154 fMergingJDL->AddToOutputSandbox(os->GetString(), comment);
1159 fGridJDL->SetPrice((UInt_t)fPrice, "AliEn price for this job");
1160 fMergingJDL->SetPrice((UInt_t)fPrice, "AliEn price for this job");
1161 TString validationScript = fValidationScript;
1162 fGridJDL->SetValidationCommand(Form("%s/%s", workdir.Data(),validationScript.Data()), "Validation script to be run for each subjob");
1163 validationScript.ReplaceAll(".sh", "_merge.sh");
1164 fMergingJDL->SetValidationCommand(Form("%s/%s", workdir.Data(),validationScript.Data()), "Validation script to be run for each subjob");
1165 if (fMasterResubmitThreshold) {
1166 fGridJDL->SetValue("MasterResubmitThreshold", Form("\"%d%%\"", fMasterResubmitThreshold));
1167 fGridJDL->SetDescription("MasterResubmitThreshold", "Resubmit failed jobs until DONE rate reaches this percentage");
1169 // Write a jdl with 2 input parameters: collection name and output dir name.
1172 // Copy jdl to grid workspace
1174 // Check if an output directory was defined and valid
1175 if (!fGridOutputDir.Length()) {
1176 Error("CreateJDL", "You must define AliEn output directory");
1179 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s", workdir.Data(), fGridOutputDir.Data());
1180 if (!fProductionMode && !DirectoryExists(fGridOutputDir)) {
1181 if (gGrid->Mkdir(fGridOutputDir,"-p")) {
1182 Info("CreateJDL", "\n##### Created alien output directory %s", fGridOutputDir.Data());
1184 Error("CreateJDL", "Could not create alien output directory %s", fGridOutputDir.Data());
1190 if (TestBit(AliAnalysisGrid::kSubmit)) {
1191 TString mergeJDLName = fExecutable;
1192 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
1193 TString locjdl = Form("%s/%s", fGridOutputDir.Data(),fJDLName.Data());
1194 TString locjdl1 = Form("%s/%s", fGridOutputDir.Data(),mergeJDLName.Data());
1195 if (fProductionMode) {
1196 locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
1197 locjdl1 = Form("%s/%s", workdir.Data(),mergeJDLName.Data());
1199 if (FileExists(locjdl)) gGrid->Rm(locjdl);
1200 if (FileExists(locjdl1)) gGrid->Rm(locjdl1);
1201 Info("CreateJDL", "\n##### Copying JDL file <%s> to your AliEn output directory", fJDLName.Data());
1202 TFile::Cp(Form("file:%s",fJDLName.Data()), Form("alien://%s", locjdl.Data()));
1204 Info("CreateJDL", "\n##### Copying merging JDL file <%s> to your AliEn output directory", mergeJDLName.Data());
1205 TFile::Cp(Form("file:%s",mergeJDLName.Data()), Form("alien://%s", locjdl1.Data()));
1208 if (fAdditionalLibs.Length()) {
1209 arr = fAdditionalLibs.Tokenize(" ");
1212 while ((os=(TObjString*)next())) {
1213 if (os->GetString().Contains(".so")) continue;
1214 Info("CreateJDL", "\n##### Copying dependency: <%s> to your alien workspace", os->GetString().Data());
1215 if (FileExists(os->GetString())) gGrid->Rm(os->GetString());
1216 TFile::Cp(Form("file:%s",os->GetString().Data()), Form("alien://%s/%s", workdir.Data(), os->GetString().Data()));
1221 TIter next(fPackages);
1223 while ((obj=next())) {
1224 if (FileExists(obj->GetName())) gGrid->Rm(obj->GetName());
1225 Info("CreateJDL", "\n##### Copying dependency: <%s> to your alien workspace", obj->GetName());
1226 TFile::Cp(Form("file:%s",obj->GetName()), Form("alien://%s/%s", workdir.Data(), obj->GetName()));
1233 //______________________________________________________________________________
1234 Bool_t AliAnalysisAlien::WriteJDL(Bool_t copy)
1236 // Writes one or more JDL's corresponding to findex. If findex is negative,
1237 // all run numbers are considered in one go (jdl). For non-negative indices
1238 // they correspond to the indices in the array fInputFiles.
1239 if (!fInputFiles) return kFALSE;
1242 if (!fProductionMode && !fGridWorkingDir.BeginsWith("/alice")) workdir = gGrid->GetHomeDirectory();
1243 workdir += fGridWorkingDir;
1245 if (fProductionMode) {
1246 TIter next(fInputFiles);
1248 fGridJDL->AddToInputDataCollection(Form("LF:%s,nodownload", os->GetName()), "Input xml collections");
1249 fGridJDL->SetOutputDirectory(Form("%s/#alien_counter_04i#", fGridOutputDir.Data()));
1250 fMergingJDL->SetOutputDirectory(fGridOutputDir);
1252 if (!fRunNumbers.Length() && !fRunRange[0]) {
1253 // One jdl with no parameters in case input data is specified by name.
1254 TIter next(fInputFiles);
1256 fGridJDL->AddToInputDataCollection(Form("LF:%s,nodownload", os->GetName()), "Input xml collections");
1257 if (!fOutputSingle.IsNull())
1258 fGridJDL->SetOutputDirectory(Form("#alienfulldir#/../%s",fOutputSingle.Data()), "Output directory");
1260 fGridJDL->SetOutputDirectory(Form("%s/#alien_counter_03i#", fGridOutputDir.Data()), "Output directory");
1261 fMergingJDL->SetOutputDirectory(fGridOutputDir);
1264 // One jdl to be submitted with 2 input parameters: data collection name and output dir prefix
1265 fGridJDL->AddToInputDataCollection(Form("LF:%s/$1,nodownload", workdir.Data()), "Input xml collections");
1266 if (!fOutputSingle.IsNull()) {
1267 if (!fOutputToRunNo) fGridJDL->SetOutputDirectory(Form("#alienfulldir#/%s",fOutputSingle.Data()), "Output directory");
1268 else fGridJDL->SetOutputDirectory(Form("%s/$2",fGridOutputDir.Data()), "Output directory");
1270 fGridJDL->SetOutputDirectory(Form("%s/$2/#alien_counter_03i#", fGridOutputDir.Data()), "Output directory");
1271 fMergingJDL->SetOutputDirectory("$1", "Output directory");
1276 // Generate the JDL as a string
1277 TString sjdl = fGridJDL->Generate();
1278 TString sjdl1 = fMergingJDL->Generate();
1280 sjdl.ReplaceAll("\"LF:", "\n \"LF:");
1281 sjdl.ReplaceAll("(member", "\n (member");
1282 sjdl.ReplaceAll("\",\"VO_", "\",\n \"VO_");
1283 sjdl.ReplaceAll("{", "{\n ");
1284 sjdl.ReplaceAll("};", "\n};");
1285 sjdl.ReplaceAll("{\n \n", "{\n");
1286 sjdl.ReplaceAll("\n\n", "\n");
1287 sjdl.ReplaceAll("OutputDirectory", "OutputDir");
1288 sjdl1.ReplaceAll("\"LF:", "\n \"LF:");
1289 sjdl1.ReplaceAll("(member", "\n (member");
1290 sjdl1.ReplaceAll("\",\"VO_", "\",\n \"VO_");
1291 sjdl1.ReplaceAll("{", "{\n ");
1292 sjdl1.ReplaceAll("};", "\n};");
1293 sjdl1.ReplaceAll("{\n \n", "{\n");
1294 sjdl1.ReplaceAll("\n\n", "\n");
1295 sjdl1.ReplaceAll("OutputDirectory", "OutputDir");
1296 sjdl += "JDLVariables = \n{\n \"Packages\",\n \"OutputDir\"\n};\n";
1297 sjdl.Prepend(Form("Jobtag = {\n \"comment:%s\"\n};\n", fJobTag.Data()));
1298 index = sjdl.Index("JDLVariables");
1299 if (index >= 0) sjdl.Insert(index, "\n# JDL variables\n");
1300 sjdl += "Workdirectorysize = {\"5000MB\"};";
1301 sjdl1 += "JDLVariables = \n{\n \"Packages\",\n \"OutputDir\"\n};\n";
1302 index = fJobTag.Index(":");
1303 if (index < 0) index = fJobTag.Length();
1304 TString jobTag = fJobTag;
1305 jobTag.Insert(index, "_Merging");
1306 sjdl1.Prepend(Form("Jobtag = {\n \"comment:%s_Merging\"\n};\n", jobTag.Data()));
1307 sjdl1.Prepend("# Generated merging jdl\n# $1 = full alien path to output directory to be merged\n# $2 = merging stage\n# $3 = merged chunk\n");
1308 index = sjdl1.Index("JDLVariables");
1309 if (index >= 0) sjdl1.Insert(index, "\n# JDL variables\n");
1310 sjdl1 += "Workdirectorysize = {\"5000MB\"};";
1311 // Write jdl to file
1313 out.open(fJDLName.Data(), ios::out);
1315 Error("WriteJDL", "Bad file name: %s", fJDLName.Data());
1318 out << sjdl << endl;
1319 TString mergeJDLName = fExecutable;
1320 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
1323 out1.open(mergeJDLName.Data(), ios::out);
1325 Error("WriteJDL", "Bad file name: %s", mergeJDLName.Data());
1328 out1 << sjdl1 << endl;
1331 // Copy jdl to grid workspace
1333 Info("WriteJDL", "\n##### You may want to review jdl:%s and analysis macro:%s before running in <submit> mode", fJDLName.Data(), fAnalysisMacro.Data());
1335 TString locjdl = Form("%s/%s", fGridOutputDir.Data(),fJDLName.Data());
1336 TString locjdl1 = Form("%s/%s", fGridOutputDir.Data(),mergeJDLName.Data());
1337 if (fProductionMode) {
1338 locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
1339 locjdl1 = Form("%s/%s", workdir.Data(),mergeJDLName.Data());
1341 if (FileExists(locjdl)) gGrid->Rm(locjdl);
1342 if (FileExists(locjdl1)) gGrid->Rm(locjdl1);
1343 Info("WriteJDL", "\n##### Copying JDL file <%s> to your AliEn output directory", fJDLName.Data());
1344 TFile::Cp(Form("file:%s",fJDLName.Data()), Form("alien://%s", locjdl.Data()));
1346 Info("WriteJDL", "\n##### Copying merging JDL file <%s> to your AliEn output directory", mergeJDLName.Data());
1347 TFile::Cp(Form("file:%s",mergeJDLName.Data()), Form("alien://%s", locjdl1.Data()));
1353 //______________________________________________________________________________
1354 Bool_t AliAnalysisAlien::FileExists(const char *lfn)
1356 // Returns true if file exists.
1357 if (!gGrid) return kFALSE;
1358 TGridResult *res = gGrid->Ls(lfn);
1359 if (!res) return kFALSE;
1360 TMap *map = dynamic_cast<TMap*>(res->At(0));
1365 TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("name"));
1366 if (!objs || !objs->GetString().Length()) {
1374 //______________________________________________________________________________
1375 Bool_t AliAnalysisAlien::DirectoryExists(const char *dirname)
1377 // Returns true if directory exists. Can be also a path.
1378 if (!gGrid) return kFALSE;
1379 // Check if dirname is a path
1380 TString dirstripped = dirname;
1381 dirstripped = dirstripped.Strip();
1382 dirstripped = dirstripped.Strip(TString::kTrailing, '/');
1383 TString dir = gSystem->BaseName(dirstripped);
1385 TString path = gSystem->DirName(dirstripped);
1386 TGridResult *res = gGrid->Ls(path, "-F");
1387 if (!res) return kFALSE;
1391 while ((map=dynamic_cast<TMap*>(next()))) {
1392 obj = map->GetValue("name");
1394 if (dir == obj->GetName()) {
1403 //______________________________________________________________________________
1404 void AliAnalysisAlien::CheckDataType(const char *lfn, Bool_t &isCollection, Bool_t &isXml, Bool_t &useTags)
1406 // Check input data type.
1407 isCollection = kFALSE;
1411 Error("CheckDataType", "No connection to grid");
1414 isCollection = IsCollection(lfn);
1415 TString msg = "\n##### file: ";
1418 msg += " type: raw_collection;";
1419 // special treatment for collections
1421 // check for tag files in the collection
1422 TGridResult *res = gGrid->Command(Form("listFilesFromCollection -z -v %s",lfn), kFALSE);
1424 msg += " using_tags: No (unknown)";
1425 Info("CheckDataType", "%s", msg.Data());
1428 const char* typeStr = res->GetKey(0, "origLFN");
1429 if (!typeStr || !strlen(typeStr)) {
1430 msg += " using_tags: No (unknown)";
1431 Info("CheckDataType", "%s", msg.Data());
1434 TString file = typeStr;
1435 useTags = file.Contains(".tag");
1436 if (useTags) msg += " using_tags: Yes";
1437 else msg += " using_tags: No";
1438 Info("CheckDataType", "%s", msg.Data());
1443 isXml = slfn.Contains(".xml");
1445 // Open xml collection and check if there are tag files inside
1446 msg += " type: xml_collection;";
1447 TGridCollection *coll = (TGridCollection*)gROOT->ProcessLine(Form("TAlienCollection::Open(\"alien://%s\",1);",lfn));
1449 msg += " using_tags: No (unknown)";
1450 Info("CheckDataType", "%s", msg.Data());
1453 TMap *map = coll->Next();
1455 msg += " using_tags: No (unknown)";
1456 Info("CheckDataType", "%s", msg.Data());
1459 map = (TMap*)map->GetValue("");
1461 if (map && map->GetValue("name")) file = map->GetValue("name")->GetName();
1462 useTags = file.Contains(".tag");
1464 if (useTags) msg += " using_tags: Yes";
1465 else msg += " using_tags: No";
1466 Info("CheckDataType", "%s", msg.Data());
1469 useTags = slfn.Contains(".tag");
1470 if (slfn.Contains(".root")) msg += " type: root file;";
1471 else msg += " type: unknown file;";
1472 if (useTags) msg += " using_tags: Yes";
1473 else msg += " using_tags: No";
1474 Info("CheckDataType", "%s", msg.Data());
1477 //______________________________________________________________________________
1478 void AliAnalysisAlien::EnablePackage(const char *package)
1480 // Enables a par file supposed to exist in the current directory.
1481 TString pkg(package);
1482 pkg.ReplaceAll(".par", "");
1484 if (gSystem->AccessPathName(pkg)) {
1485 Fatal("EnablePackage", "Package %s not found", pkg.Data());
1488 if (!TObject::TestBit(AliAnalysisGrid::kUsePars))
1489 Info("EnablePackage", "AliEn plugin will use .par packages");
1490 TObject::SetBit(AliAnalysisGrid::kUsePars, kTRUE);
1492 fPackages = new TObjArray();
1493 fPackages->SetOwner();
1495 fPackages->Add(new TObjString(pkg));
1498 //______________________________________________________________________________
1499 TChain *AliAnalysisAlien::GetChainForTestMode(const char *treeName) const
1501 // Make a tree from files having the location specified in fFileForTestMode.
1502 // Inspired from JF's CreateESDChain.
1503 if (fFileForTestMode.IsNull()) {
1504 Error("GetChainForTestMode", "For proof test mode please use SetFileForTestMode() pointing to a file that contains data file locations.");
1507 if (gSystem->AccessPathName(fFileForTestMode)) {
1508 Error("GetChainForTestMode", "File not found: %s", fFileForTestMode.Data());
1513 in.open(fFileForTestMode);
1515 // Read the input list of files and add them to the chain
1517 TChain *chain = new TChain(treeName);
1521 if (line.IsNull()) continue;
1522 if (count++ == fNtestFiles) break;
1523 TString esdFile(line);
1524 TFile *file = TFile::Open(esdFile);
1526 if (!file->IsZombie()) chain->Add(esdFile);
1529 Error("GetChainforTestMode", "Skipping un-openable file: %s", esdFile.Data());
1533 if (!chain->GetListOfFiles()->GetEntries()) {
1534 Error("GetChainForTestMode", "No file from %s could be opened", fFileForTestMode.Data());
1542 //______________________________________________________________________________
1543 const char *AliAnalysisAlien::GetJobStatus(Int_t jobidstart, Int_t lastid, Int_t &nrunning, Int_t &nwaiting, Int_t &nerror, Int_t &ndone)
1545 // Get job status for all jobs with jobid>jobidstart.
1546 static char mstatus[20];
1552 TGridJobStatusList *list = gGrid->Ps("");
1553 if (!list) return mstatus;
1554 Int_t nentries = list->GetSize();
1555 TGridJobStatus *status;
1557 for (Int_t ijob=0; ijob<nentries; ijob++) {
1558 status = (TGridJobStatus *)list->At(ijob);
1559 pid = gROOT->ProcessLine(Form("atoi(((TAlienJobStatus*)0x%lx)->GetKey(\"queueId\"));", (ULong_t)status));
1560 if (pid<jobidstart) continue;
1561 if (pid == lastid) {
1562 gROOT->ProcessLine(Form("sprintf((char*)0x%lx,((TAlienJobStatus*)0x%lx)->GetKey(\"status\"));",(ULong_t)mstatus, (ULong_t)status));
1564 switch (status->GetStatus()) {
1565 case TGridJobStatus::kWAITING:
1567 case TGridJobStatus::kRUNNING:
1569 case TGridJobStatus::kABORTED:
1570 case TGridJobStatus::kFAIL:
1571 case TGridJobStatus::kUNKNOWN:
1573 case TGridJobStatus::kDONE:
1582 //______________________________________________________________________________
1583 Bool_t AliAnalysisAlien::IsCollection(const char *lfn) const
1585 // Returns true if file is a collection. Functionality duplicated from
1586 // TAlien::Type() because we don't want to directly depend on TAlien.
1588 Error("IsCollection", "No connection to grid");
1591 TGridResult *res = gGrid->Command(Form("type -z %s",lfn),kFALSE);
1592 if (!res) return kFALSE;
1593 const char* typeStr = res->GetKey(0, "type");
1594 if (!typeStr || !strlen(typeStr)) return kFALSE;
1595 if (!strcmp(typeStr, "collection")) return kTRUE;
1600 //______________________________________________________________________________
1601 Bool_t AliAnalysisAlien::IsSingleOutput() const
1603 // Check if single-ouput option is on.
1604 return (!fOutputSingle.IsNull());
1607 //______________________________________________________________________________
1608 void AliAnalysisAlien::Print(Option_t *) const
1610 // Print current plugin settings.
1611 printf("### AliEn analysis plugin current settings ###\n");
1612 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
1613 if (mgr && mgr->IsProofMode()) {
1614 TString proofType = "= PLUGIN IN PROOF MODE ON CLUSTER:_________________";
1615 if (TestBit(AliAnalysisGrid::kTest))
1616 proofType = "= PLUGIN IN PROOF LITE MODE ON CLUSTER:____________";
1617 printf("%s %s\n", proofType.Data(), fProofCluster.Data());
1618 if (!fProofDataSet.IsNull())
1619 printf("= Requested data set:___________________________ %s\n", fProofDataSet.Data());
1621 printf("= Soft reset signal will be send to master______ CHANGE BEHAVIOR AFTER COMPLETION\n");
1623 printf("= Hard reset signal will be send to master______ CHANGE BEHAVIOR AFTER COMPLETION\n");
1624 if (!fRootVersionForProof.IsNull())
1625 printf("= ROOT version requested________________________ %s\n", fRootVersionForProof.Data());
1627 printf("= ROOT version requested________________________ default\n");
1628 printf("= AliRoot version requested_____________________ %s\n", fAliROOTVersion.Data());
1629 if (!fAliRootMode.IsNull())
1630 printf("= Requested AliRoot mode________________________ %s\n", fAliRootMode.Data());
1632 printf("= Number of PROOF workers limited to____________ %d\n", fNproofWorkers);
1633 if (fNproofWorkersPerSlave)
1634 printf("= Maximum number of workers per slave___________ %d\n", fNproofWorkersPerSlave);
1635 if (TestSpecialBit(kClearPackages))
1636 printf("= ClearPackages requested...\n");
1637 if (fIncludePath.Data())
1638 printf("= Include path for runtime task compilation: ___ %s\n", fIncludePath.Data());
1639 printf("= Additional libs to be loaded or souces to be compiled runtime: <%s>\n",fAdditionalLibs.Data());
1640 if (fPackages && fPackages->GetEntries()) {
1641 TIter next(fPackages);
1644 while ((obj=next())) list += obj->GetName();
1645 printf("= Par files to be used: ________________________ %s\n", list.Data());
1647 if (TestSpecialBit(kProofConnectGrid))
1648 printf("= Requested PROOF connection to grid\n");
1651 printf("= OverwriteMode:________________________________ %d\n", fOverwriteMode);
1652 if (fOverwriteMode) {
1653 printf("***** NOTE: Overwrite mode will overwrite the input generated datasets and partial results from previous analysis. \
1654 \n***** To disable, use: plugin->SetOverwriteMode(kFALSE);\n");
1656 printf("= Copy files to grid: __________________________ %s\n", (IsUseCopy())?"YES":"NO");
1657 printf("= Check if files can be copied to grid: ________ %s\n", (IsCheckCopy())?"YES":"NO");
1658 printf("= Production mode:______________________________ %d\n", fProductionMode);
1659 printf("= Version of API requested: ____________________ %s\n", fAPIVersion.Data());
1660 printf("= Version of ROOT requested: ___________________ %s\n", fROOTVersion.Data());
1661 printf("= Version of AliRoot requested: ________________ %s\n", fAliROOTVersion.Data());
1663 printf("= User running the plugin: _____________________ %s\n", fUser.Data());
1664 printf("= Grid workdir relative to user $HOME: _________ %s\n", fGridWorkingDir.Data());
1665 printf("= Grid output directory relative to workdir: ___ %s\n", fGridOutputDir.Data());
1666 printf("= Data base directory path requested: __________ %s\n", fGridDataDir.Data());
1667 printf("= Data search pattern: _________________________ %s\n", fDataPattern.Data());
1668 printf("= Input data format: ___________________________ %s\n", fInputFormat.Data());
1669 if (fRunNumbers.Length())
1670 printf("= Run numbers to be processed: _________________ %s\n", fRunNumbers.Data());
1672 printf("= Run range to be processed: ___________________ %s%d-%s%d\n", fRunPrefix.Data(), fRunRange[0], fRunPrefix.Data(), fRunRange[1]);
1673 if (!fRunRange[0] && !fRunNumbers.Length()) {
1674 TIter next(fInputFiles);
1677 while ((obj=next())) list += obj->GetName();
1678 printf("= Input files to be processed: _________________ %s\n", list.Data());
1680 if (TestBit(AliAnalysisGrid::kTest))
1681 printf("= Number of input files used in test mode: _____ %d\n", fNtestFiles);
1682 printf("= List of output files to be registered: _______ %s\n", fOutputFiles.Data());
1683 printf("= List of outputs going to be archived: ________ %s\n", fOutputArchive.Data());
1684 printf("= List of outputs that should not be merged: ___ %s\n", fMergeExcludes.Data());
1685 printf("= List of outputs produced during Terminate: ___ %s\n", fTerminateFiles.Data());
1686 printf("=====================================================================\n");
1687 printf("= Job price: ___________________________________ %d\n", fPrice);
1688 printf("= Time to live (TTL): __________________________ %d\n", fTTL);
1689 printf("= Max files per subjob: ________________________ %d\n", fSplitMaxInputFileNumber);
1690 if (fMaxInitFailed>0)
1691 printf("= Max number of subjob fails to kill: __________ %d\n", fMaxInitFailed);
1692 if (fMasterResubmitThreshold>0)
1693 printf("= Resubmit master job if failed subjobs >_______ %d\n", fMasterResubmitThreshold);
1694 printf("= Number of replicas for the output files_______ %d\n", fNreplicas);
1695 if (fNrunsPerMaster>0)
1696 printf("= Number of runs per master job: _______________ %d\n", fNrunsPerMaster);
1697 printf("= Number of files in one chunk to be merged: ___ %d\n", fMaxMergeFiles);
1698 printf("= Name of the generated execution script: ______ %s\n", fExecutable.Data());
1699 printf("= Executable command: __________________________ %s\n", fExecutableCommand.Data());
1700 if (fArguments.Length())
1701 printf("= Arguments for the execution script: __________ %s\n",fArguments.Data());
1702 if (fExecutableArgs.Length())
1703 printf("= Arguments after macro name in executable______ %s\n",fExecutableArgs.Data());
1704 printf("= Name of the generated analysis macro: ________ %s\n",fAnalysisMacro.Data());
1705 printf("= User analysis files to be deployed: __________ %s\n",fAnalysisSource.Data());
1706 printf("= Additional libs to be loaded or souces to be compiled runtime: <%s>\n",fAdditionalLibs.Data());
1707 printf("= Master jobs split mode: ______________________ %s\n",fSplitMode.Data());
1709 printf("= Custom name for the dataset to be created: ___ %s\n", fDatasetName.Data());
1710 printf("= Name of the generated JDL: ___________________ %s\n", fJDLName.Data());
1711 if (fIncludePath.Data())
1712 printf("= Include path for runtime task compilation: ___ %s\n", fIncludePath.Data());
1713 if (fCloseSE.Length())
1714 printf("= Force job outputs to storage element: ________ %s\n", fCloseSE.Data());
1715 if (fFriendChainName.Length())
1716 printf("= Open friend chain file on worker: ____________ %s\n", fFriendChainName.Data());
1717 if (fPackages && fPackages->GetEntries()) {
1718 TIter next(fPackages);
1721 while ((obj=next())) list += obj->GetName();
1722 printf("= Par files to be used: ________________________ %s\n", list.Data());
1726 //______________________________________________________________________________
1727 void AliAnalysisAlien::SetDefaults()
1729 // Set default values for everything. What cannot be filled will be left empty.
1730 if (fGridJDL) delete fGridJDL;
1731 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
1732 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
1735 fSplitMaxInputFileNumber = 100;
1737 fMasterResubmitThreshold = 0;
1742 fNrunsPerMaster = 1;
1743 fMaxMergeFiles = 100;
1745 fExecutable = "analysis.sh";
1746 fExecutableCommand = "root -b -q";
1748 fExecutableArgs = "";
1749 fAnalysisMacro = "myAnalysis.C";
1750 fAnalysisSource = "";
1751 fAdditionalLibs = "";
1755 fAliROOTVersion = "";
1756 fUser = ""; // Your alien user name
1757 fGridWorkingDir = "";
1758 fGridDataDir = ""; // Can be like: /alice/sim/PDC_08a/LHC08c9/
1759 fDataPattern = "*AliESDs.root"; // Can be like: *AliESDs.root, */pass1/*AliESDs.root, ...
1760 fFriendChainName = "";
1761 fGridOutputDir = "output";
1762 fOutputArchive = "log_archive.zip:std*@disk=1 root_archive.zip:*.root@disk=2";
1763 fOutputFiles = ""; // Like "AliAODs.root histos.root"
1764 fInputFormat = "xml-single";
1765 fJDLName = "analysis.jdl";
1766 fJobTag = "Automatically generated analysis JDL";
1767 fMergeExcludes = "";
1770 SetCheckCopy(kTRUE);
1771 SetDefaultOutputs(kTRUE);
1775 //______________________________________________________________________________
1776 Bool_t AliAnalysisAlien::CheckMergedFiles(const char *filename, const char *aliendir, Int_t nperchunk, Bool_t submit, const char *jdl)
1778 // Static method that checks the status of merging. This can submit merging jobs that did not produced the expected
1779 // output. If <submit> is false (checking) returns true only when the final merged file was found. If submit is true returns
1780 // true if the jobs were successfully submitted.
1781 Int_t countOrig = 0;
1782 Int_t countStage = 0;
1785 Bool_t doneFinal = kFALSE;
1787 TString saliendir(aliendir);
1788 TString sfilename, stmp;
1789 saliendir.ReplaceAll("//","/");
1790 saliendir = saliendir.Strip(TString::kTrailing, '/');
1792 ::Error("GetNregisteredFiles", "You need to be connected to AliEn.");
1795 sfilename = filename;
1796 sfilename.ReplaceAll(".root", "*.root");
1797 printf("Checking directory <%s> for merged files <%s> ...\n", aliendir, sfilename.Data());
1798 TString command = Form("find %s/ *%s", saliendir.Data(), sfilename.Data());
1799 TGridResult *res = gGrid->Command(command);
1801 ::Error("GetNregisteredFiles","Error: No result for the find command\n");
1806 while ((map=(TMap*)nextmap())) {
1807 TString turl = map->GetValue("turl")->GetName();
1808 if (!turl.Length()) {
1813 turl.ReplaceAll("alien://", "");
1814 turl.ReplaceAll(saliendir, "");
1815 sfilename = gSystem->BaseName(turl);
1816 turl = turl.Strip(TString::kLeading, '/');
1817 // Now check to what the file corresponds to:
1818 // original output - aliendir/%03d/filename
1819 // merged file (which stage) - aliendir/filename-Stage%02d_%04d
1820 // final merged file - aliendir/filename
1821 if (sfilename == turl) {
1822 if (sfilename == filename) {
1826 Int_t index = sfilename.Index("Stage");
1827 if (index<0) continue;
1828 stmp = sfilename(index+5,2);
1829 Int_t istage = atoi(stmp);
1830 stmp = sfilename(index+8,4);
1831 Int_t ijob = atoi(stmp);
1832 if (istage<stage) continue; // Ignore lower stages
1835 chunksDone.ResetAllBits();
1839 chunksDone.SetBitNumber(ijob);
1846 printf("=> Removing files from previous stages...\n");
1847 gGrid->Rm(Form("%s/*Stage*.root", aliendir));
1848 for (i=1; i<stage; i++)
1849 gGrid->Rm(Form("%s/*Stage%d*.zip", aliendir, i));
1854 // Compute number of jobs that were submitted for the current stage
1855 Int_t ntotstage = countOrig;
1856 for (i=1; i<=stage; i++) {
1857 if (ntotstage%nperchunk) ntotstage = (ntotstage/nperchunk)+1;
1858 else ntotstage = (ntotstage/nperchunk);
1860 // Now compare with the number of set bits in the chunksDone array
1861 Int_t nmissing = (stage>0)?(ntotstage - countStage):0;
1863 printf("*** Found %d original files\n", countOrig);
1864 if (stage==0) printf("*** No merging completed so far.\n");
1865 else printf("*** Found %d out of %d files merged for stage %d\n", countStage, ntotstage, stage);
1866 if (nmissing) printf("*** Number of merged files missing for this stage: %d -> check merging job completion\n", nmissing);
1867 if (!submit) return doneFinal;
1868 // Sumbit merging jobs for all missing chunks for the current stage.
1869 TString query = Form("submit %s %s", jdl, aliendir);
1872 for (i=0; i<nmissing; i++) {
1873 ichunk = chunksDone.FirstNullBit(ichunk+1);
1874 Int_t jobId = SubmitSingleJob(Form("%s %d %d", query.Data(), stage, ichunk));
1875 if (!jobId) return kFALSE;
1879 // Submit next stage of merging
1880 if (stage==0) countStage = countOrig;
1881 Int_t nchunks = (countStage/nperchunk);
1882 if (countStage%nperchunk) nchunks += 1;
1883 for (i=0; i<nchunks; i++) {
1884 Int_t jobId = SubmitSingleJob(Form("%s %d %d", query.Data(), stage+1, i));
1885 if (!jobId) return kFALSE;
1890 //______________________________________________________________________________
1891 Int_t AliAnalysisAlien::SubmitSingleJob(const char *query)
1893 // Submits a single job corresponding to the query and returns job id. If 0 submission failed.
1894 if (!gGrid) return 0;
1895 printf("=> %s ------> ",query);
1896 TGridResult *res = gGrid->Command(query);
1898 TString jobId = res->GetKey(0,"jobId");
1900 if (jobId.IsNull()) {
1901 printf("submission failed. Reason:\n");
1904 ::Error("SubmitSingleJob", "Your query %s could not be submitted", query);
1907 printf(" Job id: %s\n", jobId.Data());
1911 //______________________________________________________________________________
1912 Bool_t AliAnalysisAlien::MergeOutput(const char *output, const char *basedir, Int_t nmaxmerge, Int_t stage, Int_t ichunk)
1914 // Merge given output files from basedir. The file merger will merge nmaxmerge
1915 // files in a group. Merging can be done in stages:
1916 // stage=0 : will merge all existing files in a single stage
1917 // stage=1 : does a find command for all files that do NOT contain the string "Stage".
1918 // If their number is bigger that nmaxmerge, only the files from
1919 // ichunk*nmaxmerge to ichunk*(nmaxmerge+1)-1 will get merged as output_stage_<ichunk>
1920 // stage=n : does a find command for files named <output>Stage<stage-1>_*. If their number is bigger than
1921 // nmaxmerge, merge just the chunk ichunk, otherwise write the merged output to the file
1923 TString outputFile = output;
1925 TString outputChunk;
1926 TString previousChunk = "";
1927 Int_t countChunk = 0;
1928 Int_t countZero = nmaxmerge;
1929 Bool_t merged = kTRUE;
1930 Int_t index = outputFile.Index("@");
1931 if (index > 0) outputFile.Remove(index);
1932 TString inputFile = outputFile;
1933 if (stage>1) inputFile.ReplaceAll(".root", Form("-Stage%02d_*.root", stage-1));
1934 command = Form("find %s/ *%s", basedir, inputFile.Data());
1935 printf("command: %s\n", command.Data());
1936 TGridResult *res = gGrid->Command(command);
1938 ::Error("MergeOutput","No result for the find command\n");
1942 TFileMerger *fm = 0;
1945 // Check if there is a merge operation to resume. Works only for stage 0 or 1.
1946 outputChunk = outputFile;
1947 outputChunk.ReplaceAll(".root", "_*.root");
1948 // Check for existent temporary merge files
1949 // Check overwrite mode and remove previous partial results if needed
1950 // Preserve old merging functionality for stage 0.
1952 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
1954 // Skip as many input files as in a chunk
1955 for (Int_t counter=0; counter<nmaxmerge; counter++) map = (TMap*)nextmap();
1957 ::Error("MergeOutput", "Cannot resume merging for <%s>, nentries=%d", outputFile.Data(), res->GetSize());
1961 outputChunk = outputFile;
1962 outputChunk.ReplaceAll(".root", Form("_%04d.root", countChunk));
1964 if (gSystem->AccessPathName(outputChunk)) continue;
1965 // Merged file with chunks up to <countChunk> found
1966 ::Info("MergeOutput", "Resume merging of <%s> from <%s>\n", outputFile.Data(), outputChunk.Data());
1967 previousChunk = outputChunk;
1971 countZero = nmaxmerge;
1973 while ((map=(TMap*)nextmap())) {
1974 // Loop 'find' results and get next LFN
1975 if (countZero == nmaxmerge) {
1976 // First file in chunk - create file merger and add previous chunk if any.
1977 fm = new TFileMerger(kFALSE);
1978 fm->SetFastMethod(kTRUE);
1979 if (previousChunk.Length()) fm->AddFile(previousChunk.Data());
1980 outputChunk = outputFile;
1981 outputChunk.ReplaceAll(".root", Form("_%04d.root", countChunk));
1983 // If last file found, put merged results in the output file
1984 if (map == res->Last()) outputChunk = outputFile;
1985 TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("turl"));
1986 if (!objs || !objs->GetString().Length()) {
1987 // Nothing found - skip this output
1992 // Add file to be merged and decrement chunk counter.
1993 fm->AddFile(objs->GetString());
1995 if (countZero==0 || map == res->Last()) {
1996 if (!fm->GetMergeList() || !fm->GetMergeList()->GetSize()) {
1997 // Nothing found - skip this output
1998 ::Warning("MergeOutput", "No <%s> files found.", inputFile.Data());
2003 fm->OutputFile(outputChunk);
2004 // Merge the outputs, then go to next chunk
2006 ::Error("MergeOutput", "Could not merge all <%s> files", outputFile.Data());
2011 ::Info("MergeOutputs", "\n##### Merged %d output files to <%s>", fm->GetMergeList()->GetSize(), outputChunk.Data());
2012 gSystem->Unlink(previousChunk);
2014 if (map == res->Last()) {
2020 countZero = nmaxmerge;
2021 previousChunk = outputChunk;
2026 // Merging stage different than 0.
2027 // Move to the begining of the requested chunk.
2028 outputChunk = outputFile;
2029 if (nmaxmerge < res->GetSize()) {
2030 if (ichunk*nmaxmerge >= res->GetSize()) {
2031 ::Error("MergeOutput", "Cannot merge merge chunk %d grouping %d files from %d total.", ichunk, nmaxmerge, res->GetSize());
2035 for (Int_t counter=0; counter<ichunk*nmaxmerge; counter++) map = (TMap*)nextmap();
2036 outputChunk.ReplaceAll(".root", Form("-Stage%02d_%04d.root", stage, ichunk));
2038 countZero = nmaxmerge;
2039 fm = new TFileMerger(kFALSE);
2040 fm->SetFastMethod(kTRUE);
2041 while ((map=(TMap*)nextmap())) {
2042 // Loop 'find' results and get next LFN
2043 TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("turl"));
2044 if (!objs || !objs->GetString().Length()) {
2045 // Nothing found - skip this output
2050 // Add file to be merged and decrement chunk counter.
2051 fm->AddFile(objs->GetString());
2053 if (countZero==0) break;
2056 if (!fm->GetMergeList() || !fm->GetMergeList()->GetSize()) {
2057 // Nothing found - skip this output
2058 ::Warning("MergeOutput", "No <%s> files found.", inputFile.Data());
2062 fm->OutputFile(outputChunk);
2063 // Merge the outputs
2065 ::Error("MergeOutput", "Could not merge all <%s> files", outputFile.Data());
2069 ::Info("MergeOutput", "\n##### Merged %d output files to <%s>", fm->GetMergeList()->GetSize(), outputChunk.Data());
2075 //______________________________________________________________________________
2076 Bool_t AliAnalysisAlien::MergeOutputs()
2078 // Merge analysis outputs existing in the AliEn space.
2079 if (TestBit(AliAnalysisGrid::kTest)) return kTRUE;
2080 if (TestBit(AliAnalysisGrid::kOffline)) return kFALSE;
2082 Error("MergeOutputs", "Cannot merge outputs without grid connection. Terminate will NOT be executed");
2086 if (!TestBit(AliAnalysisGrid::kMerge)) {
2087 Info("MergeOutputs", "### Re-run with <MergeViaJDL> option in terminate mode of the plugin to submit merging jobs ###");
2090 if (fProductionMode) {
2091 Info("MergeOutputs", "### Merging will be submitted by LPM manager... ###");
2094 Info("MergeOutputs", "Submitting merging JDL");
2095 if (!SubmitMerging()) return kFALSE;
2096 Info("MergeOutputs", "### Re-run with <MergeViaJDL> off to collect results after merging jobs are done ###");
2097 Info("MergeOutputs", "### The Terminate() method is executed by the merging jobs");
2100 // Get the output path
2101 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("/%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
2102 if (!DirectoryExists(fGridOutputDir)) {
2103 Error("MergeOutputs", "Grid output directory %s not found. Terminate() will NOT be executed", fGridOutputDir.Data());
2106 if (!fOutputFiles.Length()) {
2107 Error("MergeOutputs", "No output file names defined. Are you running the right AliAnalysisAlien configuration ?");
2110 // Check if fast read option was requested
2111 Info("MergeOutputs", "Started local merging of output files from: alien://%s \
2112 \n======= overwrite mode = %d", fGridOutputDir.Data(), (Int_t)fOverwriteMode);
2113 if (fFastReadOption) {
2114 Warning("MergeOutputs", "You requested FastRead option. Using xrootd flags to reduce timeouts. This may skip some files that could be accessed ! \
2115 \n+++ NOTE: To disable this option, use: plugin->SetFastReadOption(kFALSE)");
2116 gEnv->SetValue("XNet.ConnectTimeout",10);
2117 gEnv->SetValue("XNet.RequestTimeout",10);
2118 gEnv->SetValue("XNet.MaxRedirectCount",2);
2119 gEnv->SetValue("XNet.ReconnectTimeout",10);
2120 gEnv->SetValue("XNet.FirstConnectMaxCnt",1);
2122 // Make sure we change the temporary directory
2123 gSystem->Setenv("TMPDIR", gSystem->pwd());
2124 TObjArray *list = fOutputFiles.Tokenize(",");
2128 Bool_t merged = kTRUE;
2129 while((str=(TObjString*)next())) {
2130 outputFile = str->GetString();
2131 Int_t index = outputFile.Index("@");
2132 if (index > 0) outputFile.Remove(index);
2133 TString outputChunk = outputFile;
2134 outputChunk.ReplaceAll(".root", "_*.root");
2135 // Skip already merged outputs
2136 if (!gSystem->AccessPathName(outputFile)) {
2137 if (fOverwriteMode) {
2138 Info("MergeOutputs", "Overwrite mode. Existing file %s was deleted.", outputFile.Data());
2139 gSystem->Unlink(outputFile);
2140 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
2141 Info("MergeOutput", "Overwrite mode: partial merged files %s will removed",
2142 outputChunk.Data());
2143 gSystem->Exec(Form("rm -f %s", outputChunk.Data()));
2146 Info("MergeOutputs", "Output file <%s> found. Not merging again.", outputFile.Data());
2150 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
2151 Info("MergeOutput", "Overwrite mode: partial merged files %s will removed",
2152 outputChunk.Data());
2153 gSystem->Exec(Form("rm -f %s", outputChunk.Data()));
2156 if (fMergeExcludes.Length() &&
2157 fMergeExcludes.Contains(outputFile.Data())) continue;
2158 // Perform a 'find' command in the output directory, looking for registered outputs
2159 merged = MergeOutput(outputFile, fGridOutputDir, fMaxMergeFiles);
2161 Error("MergeOutputs", "Terminate() will NOT be executed");
2164 TFile *fileOpened = (TFile*)gROOT->GetListOfFiles()->FindObject(outputFile);
2165 if (fileOpened) fileOpened->Close();
2170 //______________________________________________________________________________
2171 void AliAnalysisAlien::SetDefaultOutputs(Bool_t flag)
2173 // Use the output files connected to output containers from the analysis manager
2174 // rather than the files defined by SetOutputFiles
2175 if (flag && !TObject::TestBit(AliAnalysisGrid::kDefaultOutputs))
2176 Info("SetDefaultOutputs", "Plugin will use the output files taken from analysis manager");
2177 TObject::SetBit(AliAnalysisGrid::kDefaultOutputs, flag);
2180 //______________________________________________________________________________
2181 void AliAnalysisAlien::SetOutputFiles(const char *list)
2183 // Manually set the output files list.
2184 // Removes duplicates. Not allowed if default outputs are not disabled.
2185 if (TObject::TestBit(AliAnalysisGrid::kDefaultOutputs)) {
2186 Fatal("SetOutputFiles", "You have to explicitly call SetDefaultOutputs(kFALSE) to manually set output files.");
2189 Info("SetOutputFiles", "Output file list is set manually - you are on your own.");
2191 TString slist = list;
2192 if (slist.Contains("@")) Warning("SetOutputFiles","The plugin does not allow explicit SE's. Please use: SetNumberOfReplicas() instead.");
2193 TObjArray *arr = slist.Tokenize(" ");
2197 while ((os=(TObjString*)next())) {
2198 sout = os->GetString();
2199 if (sout.Index("@")>0) sout.Remove(sout.Index("@"));
2200 if (fOutputFiles.Contains(sout)) continue;
2201 if (!fOutputFiles.IsNull()) fOutputFiles += ",";
2202 fOutputFiles += sout;
2207 //______________________________________________________________________________
2208 void AliAnalysisAlien::SetOutputArchive(const char *list)
2210 // Manually set the output archive list. Free text - you are on your own...
2211 // Not allowed if default outputs are not disabled.
2212 if (TObject::TestBit(AliAnalysisGrid::kDefaultOutputs)) {
2213 Fatal("SetOutputArchive", "You have to explicitly call SetDefaultOutputs(kFALSE) to manually set the output archives.");
2216 Info("SetOutputArchive", "Output archive is set manually - you are on your own.");
2217 fOutputArchive = list;
2220 //______________________________________________________________________________
2221 void AliAnalysisAlien::SetPreferedSE(const char */*se*/)
2223 // Setting a prefered output SE is not allowed anymore.
2224 Warning("SetPreferedSE", "Setting a preferential SE is not allowed anymore via the plugin. Use SetNumberOfReplicas() and SetDefaultOutputs()");
2227 //______________________________________________________________________________
2228 Bool_t AliAnalysisAlien::StartAnalysis(Long64_t /*nentries*/, Long64_t /*firstEntry*/)
2230 // Start remote grid analysis.
2231 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
2232 Bool_t testMode = TestBit(AliAnalysisGrid::kTest);
2233 if (!mgr || !mgr->IsInitialized()) {
2234 Error("StartAnalysis", "You need an initialized analysis manager for this");
2237 // Are we in PROOF mode ?
2238 if (mgr->IsProofMode()) {
2239 Info("StartAnalysis", "##### Starting PROOF analysis on cluster <%s> via the plugin #####", fProofCluster.Data());
2240 if (fProofCluster.IsNull()) {
2241 Error("StartAnalysis", "You need to specify the proof cluster name via SetProofCluster");
2244 if (fProofDataSet.IsNull() && !testMode) {
2245 Error("StartAnalysis", "You need to specify a dataset using SetProofDataSet()");
2248 // Set the needed environment
2249 gEnv->SetValue("XSec.GSI.DelegProxy","2");
2250 // Do we need to reset PROOF ? The success of the Reset operation cannot be checked
2251 if (fProofReset && !testMode) {
2252 if (fProofReset==1) {
2253 Info("StartAnalysis", "Sending soft reset signal to proof cluster %s", fProofCluster.Data());
2254 gROOT->ProcessLine(Form("TProof::Reset(\"%s\", kFALSE);", fProofCluster.Data()));
2256 Info("StartAnalysis", "Sending hard reset signal to proof cluster %s", fProofCluster.Data());
2257 gROOT->ProcessLine(Form("TProof::Reset(\"%s\", kTRUE);", fProofCluster.Data()));
2259 Info("StartAnalysis", "Stopping the analysis. Please use SetProofReset(0) to resume.");
2262 // Do we need to change the ROOT version ? The success of this cannot be checked.
2263 if (!fRootVersionForProof.IsNull() && !testMode) {
2264 gROOT->ProcessLine(Form("TProof::Mgr(\"%s\")->SetROOTVersion(\"%s\");",
2265 fProofCluster.Data(), fRootVersionForProof.Data()));
2267 // Connect to PROOF and check the status
2270 if (fNproofWorkersPerSlave) sworkers = Form("workers=%dx", fNproofWorkersPerSlave);
2271 else if (fNproofWorkers) sworkers = Form("workers=%d", fNproofWorkers);
2273 if (!sworkers.IsNull())
2274 proof = gROOT->ProcessLine(Form("TProof::Open(\"%s\", \"%s\");", fProofCluster.Data(), sworkers.Data()));
2276 proof = gROOT->ProcessLine(Form("TProof::Open(\"%s\");", fProofCluster.Data()));
2278 proof = gROOT->ProcessLine("TProof::Open(\"\");");
2280 Error("StartAnalysis", "Could not start PROOF in test mode");
2285 Error("StartAnalysis", "Could not connect to PROOF cluster <%s>", fProofCluster.Data());
2288 if (fNproofWorkersPerSlave*fNproofWorkers > 0)
2289 gROOT->ProcessLine(Form("gProof->SetParallel(%d);", fNproofWorkers));
2290 // Is dataset existing ?
2292 TString dataset = fProofDataSet;
2293 Int_t index = dataset.Index("#");
2294 if (index>=0) dataset.Remove(index);
2295 // if (!gROOT->ProcessLine(Form("gProof->ExistsDataSet(\"%s\");",fProofDataSet.Data()))) {
2296 // Error("StartAnalysis", "Dataset %s not existing", fProofDataSet.Data());
2299 // Info("StartAnalysis", "Dataset %s found", dataset.Data());
2301 // Is ClearPackages() needed ?
2302 if (TestSpecialBit(kClearPackages)) {
2303 Info("StartAnalysis", "ClearPackages signal sent to PROOF. Use SetClearPackages(kFALSE) to reset this.");
2304 gROOT->ProcessLine("gProof->ClearPackages();");
2306 // Is a given aliroot mode requested ?
2308 if (!fAliRootMode.IsNull()) {
2309 TString alirootMode = fAliRootMode;
2310 if (alirootMode == "default") alirootMode = "";
2311 Info("StartAnalysis", "You are requesting AliRoot mode: %s", fAliRootMode.Data());
2312 optionsList.SetOwner();
2313 optionsList.Add(new TNamed("ALIROOT_MODE", alirootMode.Data()));
2314 // Check the additional libs to be loaded
2316 if (!alirootMode.IsNull()) extraLibs = "ANALYSIS:ANALYSISalice";
2317 // Parse the extra libs for .so
2318 if (fAdditionalLibs.Length()) {
2319 TObjArray *list = fAdditionalLibs.Tokenize(" ");
2322 while((str=(TObjString*)next()) && str->GetString().Contains(".so")) {
2323 TString stmp = str->GetName();
2324 if (stmp.BeginsWith("lib")) stmp.Remove(0,3);
2325 stmp.ReplaceAll(".so","");
2326 if (!extraLibs.IsNull()) extraLibs += ":";
2329 if (list) delete list;
2331 if (!extraLibs.IsNull()) optionsList.Add(new TNamed("ALIROOT_EXTRA_LIBS",extraLibs.Data()));
2332 // Check extra includes
2333 if (!fIncludePath.IsNull()) {
2334 TString includePath = fIncludePath;
2335 includePath.ReplaceAll(" ",":");
2336 includePath.Strip(TString::kTrailing, ':');
2337 Info("StartAnalysis", "Adding extra includes: %s",includePath.Data());
2338 optionsList.Add(new TNamed("ALIROOT_EXTRA_INCLUDES",includePath.Data()));
2340 // Check if connection to grid is requested
2341 if (TestSpecialBit(kProofConnectGrid))
2342 optionsList.Add(new TNamed("ALIROOT_ENABLE_ALIEN", "1"));
2343 // Enable AliRoot par
2345 // Enable proof lite package
2346 TString alirootLite = gSystem->ExpandPathName("$ALICE_ROOT/ANALYSIS/macros/AliRootProofLite.par");
2347 for (Int_t i=0; i<optionsList.GetSize(); i++) {
2348 TNamed *obj = (TNamed*)optionsList.At(i);
2349 printf("%s %s\n", obj->GetName(), obj->GetTitle());
2351 if (!gROOT->ProcessLine(Form("gProof->UploadPackage(\"%s\");",alirootLite.Data()))
2352 && !gROOT->ProcessLine(Form("gProof->EnablePackage(\"%s\", (TList*)0x%lx);",alirootLite.Data(),(ULong_t)&optionsList))) {
2353 Info("StartAnalysis", "AliRootProofLite enabled");
2355 Error("StartAnalysis", "There was an error trying to enable package AliRootProofLite.par");
2359 if (gROOT->ProcessLine(Form("gProof->EnablePackage(\"VO_ALICE@AliRoot::%s\", (TList*)0x%lx);",
2360 fAliROOTVersion.Data(), (ULong_t)&optionsList))) {
2361 Error("StartAnalysis", "There was an error trying to enable package VO_ALICE@AliRoot::%s", fAliROOTVersion.Data());
2366 if (fAdditionalLibs.Contains(".so") && !testMode) {
2367 Error("StartAnalysis", "You request additional libs to be loaded but did not enabled any AliRoot mode. Please refer to: \
2368 \n http://aaf.cern.ch/node/83 and use a parameter for SetAliRootMode()");
2372 // Enable par files if requested
2373 if (fPackages && fPackages->GetEntries()) {
2374 TIter next(fPackages);
2376 while ((package=next())) {
2377 if (gROOT->ProcessLine(Form("gProof->UploadPackage(\"%s\");", package->GetName()))) {
2378 if (gROOT->ProcessLine(Form("gProof->EnablePackage(\"%s\",kTRUE);", package->GetName()))) {
2379 Error("StartAnalysis", "There was an error trying to enable package %s", package->GetName());
2383 Error("StartAnalysis", "There was an error trying to upload package %s", package->GetName());
2388 // Do we need to load analysis source files ?
2389 // NOTE: don't load on client since this is anyway done by the user to attach his task.
2390 if (fAnalysisSource.Length()) {
2391 TObjArray *list = fAnalysisSource.Tokenize(" ");
2394 while((str=(TObjString*)next())) {
2395 gROOT->ProcessLine(Form("gProof->Load(\"%s+g\", kTRUE);", str->GetName()));
2397 if (list) delete list;
2400 // Register dataset to proof lite.
2401 if (fFileForTestMode.IsNull()) {
2402 Error("GetChainForTestMode", "For proof test mode please use SetFileForTestMode() pointing to a file that contains data file locations.");
2405 if (gSystem->AccessPathName(fFileForTestMode)) {
2406 Error("GetChainForTestMode", "File not found: %s", fFileForTestMode.Data());
2409 TFileCollection *coll = new TFileCollection();
2410 coll->AddFromFile(fFileForTestMode);
2411 gROOT->ProcessLine(Form("gProof->RegisterDataSet(\"test_collection\", (TFileCollection*)0x%lx, \"OV\");", (ULong_t)coll));
2412 gROOT->ProcessLine("gProof->ShowDataSets()");
2417 // Check if output files have to be taken from the analysis manager
2418 if (TestBit(AliAnalysisGrid::kDefaultOutputs)) {
2420 TIter next(mgr->GetOutputs());
2421 AliAnalysisDataContainer *output;
2422 while ((output=(AliAnalysisDataContainer*)next())) {
2423 const char *filename = output->GetFileName();
2424 if (!(strcmp(filename, "default"))) {
2425 if (!mgr->GetOutputEventHandler()) continue;
2426 filename = mgr->GetOutputEventHandler()->GetOutputFileName();
2428 if (fOutputFiles.Contains(filename)) continue;
2429 if (fOutputFiles.Length()) fOutputFiles += ",";
2430 fOutputFiles += filename;
2432 // Add extra files registered to the analysis manager
2433 if (mgr->GetExtraFiles().Length()) {
2434 if (fOutputFiles.Length()) fOutputFiles += ",";
2435 TString extra = mgr->GetExtraFiles();
2436 extra.ReplaceAll(" ", ",");
2437 // Protection in case extra files do not exist (will it work?)
2438 extra.ReplaceAll(".root", "*.root");
2439 fOutputFiles += extra;
2441 // Compose the output archive.
2442 fOutputArchive = "log_archive.zip:std*@disk=1 ";
2443 fOutputArchive += Form("root_archive.zip:%s@disk=%d",fOutputFiles.Data(),fNreplicas);
2445 // if (!fCloseSE.Length()) fCloseSE = gSystem->Getenv("alien_CLOSE_SE");
2446 if (TestBit(AliAnalysisGrid::kOffline)) {
2447 Info("StartAnalysis","\n##### OFFLINE MODE ##### Files to be used in GRID are produced but not copied \
2448 \n there nor any job run. You can revise the JDL and analysis \
2449 \n macro then run the same in \"submit\" mode.");
2450 } else if (TestBit(AliAnalysisGrid::kTest)) {
2451 Info("StartAnalysis","\n##### LOCAL MODE ##### Your analysis will be run locally on a subset of the requested \
2453 } else if (TestBit(AliAnalysisGrid::kSubmit)) {
2454 Info("StartAnalysis","\n##### SUBMIT MODE ##### Files required by your analysis are copied to your grid working \
2455 \n space and job submitted.");
2456 } else if (TestBit(AliAnalysisGrid::kMerge)) {
2457 Info("StartAnalysis","\n##### MERGE MODE ##### The registered outputs of the analysis will be merged");
2458 if (fMergeViaJDL) CheckInputData();
2461 Info("StartAnalysis","\n##### FULL ANALYSIS MODE ##### Producing needed files and submitting your analysis job...");
2466 Error("StartAnalysis", "Cannot start grid analysis without grid connection");
2469 if (IsCheckCopy() && gGrid) CheckFileCopy(gGrid->GetHomeDirectory());
2470 if (!CheckInputData()) {
2471 Error("StartAnalysis", "There was an error in preprocessing your requested input data");
2474 if (!CreateDataset(fDataPattern)) {
2476 if (!fRunNumbers.Length() && !fRunRange[0]) serror = Form("path to data directory: <%s>", fGridDataDir.Data());
2477 if (fRunNumbers.Length()) serror = "run numbers";
2478 if (fRunRange[0]) serror = Form("run range [%d, %d]", fRunRange[0], fRunRange[1]);
2479 serror += Form("\n or data pattern <%s>", fDataPattern.Data());
2480 Error("StartAnalysis", "No data to process. Please fix %s in your plugin configuration.", serror.Data());
2483 WriteAnalysisFile();
2484 WriteAnalysisMacro();
2486 WriteValidationScript();
2488 WriteMergingMacro();
2489 WriteMergeExecutable();
2490 WriteValidationScript(kTRUE);
2492 if (!CreateJDL()) return kFALSE;
2493 if (TestBit(AliAnalysisGrid::kOffline)) return kFALSE;
2495 // Locally testing the analysis
2496 Info("StartAnalysis", "\n_______________________________________________________________________ \
2497 \n Running analysis script in a daughter shell as on a worker node \
2498 \n_______________________________________________________________________");
2499 TObjArray *list = fOutputFiles.Tokenize(",");
2503 while((str=(TObjString*)next())) {
2504 outputFile = str->GetString();
2505 Int_t index = outputFile.Index("@");
2506 if (index > 0) outputFile.Remove(index);
2507 if (!gSystem->AccessPathName(outputFile)) gSystem->Exec(Form("rm %s", outputFile.Data()));
2510 gSystem->Exec(Form("bash %s 2>stderr", fExecutable.Data()));
2511 TString validationScript = fExecutable;
2512 validationScript.ReplaceAll(".sh", "_validation.sh");
2513 gSystem->Exec(Form("bash %s",validationScript.Data()));
2514 // gSystem->Exec("cat stdout");
2517 // Check if submitting is managed by LPM manager
2518 if (fProductionMode) {
2519 TString prodfile = fJDLName;
2520 prodfile.ReplaceAll(".jdl", ".prod");
2521 WriteProductionFile(prodfile);
2522 Info("StartAnalysis", "Job submitting is managed by LPM. Rerun in terminate mode after jobs finished.");
2525 // Submit AliEn job(s)
2526 gGrid->Cd(fGridOutputDir);
2529 if (!fRunNumbers.Length() && !fRunRange[0]) {
2530 // Submit a given xml or a set of runs
2531 res = gGrid->Command(Form("submit %s", fJDLName.Data()));
2532 printf("*************************** %s\n",Form("submit %s", fJDLName.Data()));
2534 const char *cjobId = res->GetKey(0,"jobId");
2538 Error("StartAnalysis", "Your JDL %s could not be submitted", fJDLName.Data());
2541 Info("StartAnalysis", "\n_______________________________________________________________________ \
2542 \n##### Your JDL %s was successfully submitted. \nTHE JOB ID IS: %s \
2543 \n_______________________________________________________________________",
2544 fJDLName.Data(), cjobId);
2549 Error("StartAnalysis", "No grid result after submission !!! Bailing out...");
2553 // Submit for a range of enumeration of runs.
2554 if (!Submit()) return kFALSE;
2557 Info("StartAnalysis", "\n#### STARTING AN ALIEN SHELL FOR YOU. EXIT WHEN YOUR JOB %s HAS FINISHED. #### \
2558 \n You may exit at any time and terminate the job later using the option <terminate> \
2559 \n ##################################################################################", jobID.Data());
2560 gSystem->Exec("aliensh");
2564 //______________________________________________________________________________
2565 Bool_t AliAnalysisAlien::Submit()
2567 // Submit all master jobs.
2568 Int_t nmasterjobs = fInputFiles->GetEntries();
2569 Long_t tshoot = gSystem->Now();
2570 if (!fNsubmitted && !SubmitNext()) return kFALSE;
2571 while (fNsubmitted < nmasterjobs) {
2572 Long_t now = gSystem->Now();
2573 if ((now-tshoot)>30000) {
2575 if (!SubmitNext()) return kFALSE;
2581 //______________________________________________________________________________
2582 Bool_t AliAnalysisAlien::SubmitMerging()
2584 // Submit all merging jobs.
2585 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("/%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
2586 gGrid->Cd(fGridOutputDir);
2587 TString mergeJDLName = fExecutable;
2588 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
2589 Int_t ntosubmit = fInputFiles->GetEntries();
2590 for (Int_t i=0; i<ntosubmit; i++) {
2591 TString runOutDir = gSystem->BaseName(fInputFiles->At(i)->GetName());
2592 runOutDir.ReplaceAll(".xml", "");
2593 if (fOutputToRunNo) {
2594 // The output directory is the run number
2595 printf("### Submitting merging job for run <%s>\n", runOutDir.Data());
2596 runOutDir = Form("%s/%s", fGridOutputDir.Data(), runOutDir.Data());
2598 // The output directory is the master number in 3 digits format
2599 printf("### Submitting merging job for master <%03d>\n", i);
2600 runOutDir = Form("%s/%03d",fGridOutputDir.Data(), i);
2602 // Check now the number of merging stages.
2603 TObjArray *list = fOutputFiles.Tokenize(",");
2607 while((str=(TObjString*)next())) {
2608 outputFile = str->GetString();
2609 Int_t index = outputFile.Index("@");
2610 if (index > 0) outputFile.Remove(index);
2611 if (!fMergeExcludes.Contains(outputFile)) break;
2614 Bool_t done = CheckMergedFiles(outputFile, runOutDir, fMaxMergeFiles, kTRUE, mergeJDLName);
2615 if (!done) return kFALSE;
2617 if (!ntosubmit) return kTRUE;
2618 Info("StartAnalysis", "\n#### STARTING AN ALIEN SHELL FOR YOU. EXIT WHEN YOUR MERGING JOBS HAVE FINISHED. #### \
2619 \n You may exit at any time and terminate the job later using the option <terminate> but disabling SetMergeViaJDL\
2620 \n ##################################################################################");
2621 gSystem->Exec("aliensh");
2625 //______________________________________________________________________________
2626 Bool_t AliAnalysisAlien::SubmitNext()
2628 // Submit next bunch of master jobs if the queue is free. The first master job is
2629 // submitted right away, while the next will not be unless the previous was split.
2630 // The plugin will not submit new master jobs if there are more that 500 jobs in
2632 static Bool_t iscalled = kFALSE;
2633 static Int_t firstmaster = 0;
2634 static Int_t lastmaster = 0;
2635 static Int_t npermaster = 0;
2636 if (iscalled) return kTRUE;
2638 Int_t nrunning=0, nwaiting=0, nerror=0, ndone=0;
2639 Int_t ntosubmit = 0;
2642 Int_t nmasterjobs = fInputFiles->GetEntries();
2645 if (!IsUseSubmitPolicy()) {
2647 Info("SubmitNext","### Warning submit policy not used ! Submitting too many jobs at a time may be prohibitted. \
2648 \n### You can use SetUseSubmitPolicy() to enable if you have problems.");
2649 ntosubmit = nmasterjobs;
2652 TString status = GetJobStatus(firstmaster, lastmaster, nrunning, nwaiting, nerror, ndone);
2653 printf("=== master %d: %s\n", lastmaster, status.Data());
2654 // If last master not split, just return
2655 if (status != "SPLIT") {iscalled = kFALSE; return kTRUE;}
2656 // No more than 100 waiting jobs
2657 if (nwaiting>500) {iscalled = kFALSE; return kTRUE;}
2658 npermaster = (nrunning+nwaiting+nerror+ndone)/fNsubmitted;
2659 if (npermaster) ntosubmit = (500-nwaiting)/npermaster;
2660 if (!ntosubmit) ntosubmit = 1;
2661 printf("=== WAITING(%d) RUNNING(%d) DONE(%d) OTHER(%d) NperMaster=%d => to submit %d jobs\n",
2662 nwaiting, nrunning, ndone, nerror, npermaster, ntosubmit);
2664 for (Int_t i=0; i<ntosubmit; i++) {
2665 // Submit for a range of enumeration of runs.
2666 if (fNsubmitted>=nmasterjobs) {iscalled = kFALSE; return kTRUE;}
2668 TString runOutDir = gSystem->BaseName(fInputFiles->At(fNsubmitted)->GetName());
2669 runOutDir.ReplaceAll(".xml", "");
2671 query = Form("submit %s %s %s", fJDLName.Data(), fInputFiles->At(fNsubmitted)->GetName(), runOutDir.Data());
2673 query = Form("submit %s %s %03d", fJDLName.Data(), fInputFiles->At(fNsubmitted)->GetName(), fNsubmitted);
2674 printf("********* %s\n",query.Data());
2675 res = gGrid->Command(query);
2677 TString cjobId1 = res->GetKey(0,"jobId");
2678 if (!cjobId1.Length()) {
2682 Error("StartAnalysis", "Your JDL %s could not be submitted. The message was:", fJDLName.Data());
2685 Info("StartAnalysis", "\n_______________________________________________________________________ \
2686 \n##### Your JDL %s submitted (%d to go). \nTHE JOB ID IS: %s \
2687 \n_______________________________________________________________________",
2688 fJDLName.Data(), nmasterjobs-fNsubmitted-1, cjobId1.Data());
2691 lastmaster = cjobId1.Atoi();
2692 if (!firstmaster) firstmaster = lastmaster;
2697 Error("StartAnalysis", "No grid result after submission !!! Bailing out...");
2705 //______________________________________________________________________________
2706 void AliAnalysisAlien::WriteAnalysisFile()
2708 // Write current analysis manager into the file <analysisFile>
2709 TString analysisFile = fExecutable;
2710 analysisFile.ReplaceAll(".sh", ".root");
2711 if (!TestBit(AliAnalysisGrid::kSubmit)) {
2712 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
2713 if (!mgr || !mgr->IsInitialized()) {
2714 Error("WriteAnalysisFile", "You need an initialized analysis manager for this");
2717 // Check analysis type
2719 if (mgr->GetMCtruthEventHandler()) TObject::SetBit(AliAnalysisGrid::kUseMC);
2720 handler = (TObject*)mgr->GetInputEventHandler();
2722 if (handler->InheritsFrom("AliESDInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseESD);
2723 if (handler->InheritsFrom("AliAODInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseAOD);
2725 TDirectory *cdir = gDirectory;
2726 TFile *file = TFile::Open(analysisFile, "RECREATE");
2728 // Skip task Terminate calls for the grid job (but not in test mode, where we want to check also the terminate mode
2729 if (!TestBit(AliAnalysisGrid::kTest)) mgr->SetSkipTerminate(kTRUE);
2730 // Unless merging makes no sense
2731 if (IsSingleOutput()) mgr->SetSkipTerminate(kFALSE);
2734 // Enable termination for local jobs
2735 mgr->SetSkipTerminate(kFALSE);
2737 if (cdir) cdir->cd();
2738 Info("WriteAnalysisFile", "\n##### Analysis manager: %s wrote to file <%s>\n", mgr->GetName(),analysisFile.Data());
2740 Bool_t copy = kTRUE;
2741 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
2744 TString workdir = gGrid->GetHomeDirectory();
2745 workdir += fGridWorkingDir;
2746 Info("WriteAnalysisFile", "\n##### Copying file <%s> containing your initialized analysis manager to your alien workspace", analysisFile.Data());
2747 if (FileExists(analysisFile)) gGrid->Rm(analysisFile);
2748 TFile::Cp(Form("file:%s",analysisFile.Data()), Form("alien://%s/%s", workdir.Data(),analysisFile.Data()));
2752 //______________________________________________________________________________
2753 void AliAnalysisAlien::WriteAnalysisMacro()
2755 // Write the analysis macro that will steer the analysis in grid mode.
2756 if (!TestBit(AliAnalysisGrid::kSubmit)) {
2758 out.open(fAnalysisMacro.Data(), ios::out);
2760 Error("WriteAnalysisMacro", "could not open file %s for writing", fAnalysisMacro.Data());
2763 Bool_t hasSTEERBase = kFALSE;
2764 Bool_t hasESD = kFALSE;
2765 Bool_t hasAOD = kFALSE;
2766 Bool_t hasANALYSIS = kFALSE;
2767 Bool_t hasANALYSISalice = kFALSE;
2768 Bool_t hasCORRFW = kFALSE;
2769 TString func = fAnalysisMacro;
2770 TString type = "ESD";
2771 TString comment = "// Analysis using ";
2772 if (TObject::TestBit(AliAnalysisGrid::kUseESD)) comment += "ESD";
2773 if (TObject::TestBit(AliAnalysisGrid::kUseAOD)) {
2777 if (type!="AOD" && fFriendChainName!="") {
2778 Error("WriteAnalysisMacro", "Friend chain can be attached only to AOD");
2781 if (TObject::TestBit(AliAnalysisGrid::kUseMC)) comment += "/MC";
2782 else comment += " data";
2783 out << "const char *anatype = \"" << type.Data() << "\";" << endl << endl;
2784 func.ReplaceAll(".C", "");
2785 out << "void " << func.Data() << "()" << endl;
2787 out << comment.Data() << endl;
2788 out << "// Automatically generated analysis steering macro executed in grid subjobs" << endl << endl;
2789 out << " TStopwatch timer;" << endl;
2790 out << " timer.Start();" << endl << endl;
2791 // Change temp directory to current one
2792 out << "// Set temporary merging directory to current one" << endl;
2793 out << " gSystem->Setenv(\"TMPDIR\", gSystem->pwd());" << endl << endl;
2794 out << "// load base root libraries" << endl;
2795 out << " gSystem->Load(\"libTree\");" << endl;
2796 out << " gSystem->Load(\"libGeom\");" << endl;
2797 out << " gSystem->Load(\"libVMC\");" << endl;
2798 out << " gSystem->Load(\"libPhysics\");" << endl << endl;
2799 out << " gSystem->Load(\"libMinuit\");" << endl << endl;
2800 if (fAdditionalRootLibs.Length()) {
2801 // in principle libtree /lib geom libvmc etc. can go into this list, too
2802 out << "// Add aditional libraries" << endl;
2803 TObjArray *list = fAdditionalRootLibs.Tokenize(" ");
2806 while((str=(TObjString*)next())) {
2807 if (str->GetString().Contains(".so"))
2808 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
2810 if (list) delete list;
2812 out << "// include path" << endl;
2813 if (fIncludePath.Length()) out << " gSystem->AddIncludePath(\"" << fIncludePath.Data() << "\");" << endl;
2814 out << " gSystem->AddIncludePath(\"-I$ALICE_ROOT/include\");" << endl << endl;
2815 out << "// Load analysis framework libraries" << endl;
2816 TString setupPar = "AliAnalysisAlien::SetupPar";
2818 out << " gSystem->Load(\"libSTEERBase\");" << endl;
2819 out << " gSystem->Load(\"libESD\");" << endl;
2820 out << " gSystem->Load(\"libAOD\");" << endl;
2821 out << " gSystem->Load(\"libANALYSIS\");" << endl;
2822 out << " gSystem->Load(\"libANALYSISalice\");" << endl;
2823 out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
2825 TIter next(fPackages);
2828 while ((obj=next())) {
2829 pkgname = obj->GetName();
2830 if (pkgname == "STEERBase" ||
2831 pkgname == "STEERBase.par") hasSTEERBase = kTRUE;
2832 if (pkgname == "ESD" ||
2833 pkgname == "ESD.par") hasESD = kTRUE;
2834 if (pkgname == "AOD" ||
2835 pkgname == "AOD.par") hasAOD = kTRUE;
2836 if (pkgname == "ANALYSIS" ||
2837 pkgname == "ANALYSIS.par") hasANALYSIS = kTRUE;
2838 if (pkgname == "ANALYSISalice" ||
2839 pkgname == "ANALYSISalice.par") hasANALYSISalice = kTRUE;
2840 if (pkgname == "CORRFW" ||
2841 pkgname == "CORRFW.par") hasCORRFW = kTRUE;
2843 if (hasANALYSISalice) setupPar = "SetupPar";
2844 if (!hasSTEERBase) out << " gSystem->Load(\"libSTEERBase\");" << endl;
2845 else out << " if (!" << setupPar << "(\"STEERBase\")) return;" << endl;
2846 if (!hasESD) out << " gSystem->Load(\"libESD\");" << endl;
2847 else out << " if (!" << setupPar << "(\"ESD\")) return;" << endl;
2848 if (!hasAOD) out << " gSystem->Load(\"libAOD\");" << endl;
2849 else out << " if (!" << setupPar << "(\"AOD\")) return;" << endl;
2850 if (!hasANALYSIS) out << " gSystem->Load(\"libANALYSIS\");" << endl;
2851 else out << " if (!" << setupPar << "(\"ANALYSIS\")) return;" << endl;
2852 if (!hasANALYSISalice) out << " gSystem->Load(\"libANALYSISalice\");" << endl;
2853 else out << " if (!" << setupPar << "(\"ANALYSISalice\")) return;" << endl;
2854 if (!hasCORRFW) out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
2855 else out << " if (!" << setupPar << "(\"CORRFW\")) return;" << endl << endl;
2856 out << "// Compile other par packages" << endl;
2858 while ((obj=next())) {
2859 pkgname = obj->GetName();
2860 if (pkgname == "STEERBase" ||
2861 pkgname == "STEERBase.par" ||
2863 pkgname == "ESD.par" ||
2865 pkgname == "AOD.par" ||
2866 pkgname == "ANALYSIS" ||
2867 pkgname == "ANALYSIS.par" ||
2868 pkgname == "ANALYSISalice" ||
2869 pkgname == "ANALYSISalice.par" ||
2870 pkgname == "CORRFW" ||
2871 pkgname == "CORRFW.par") continue;
2872 out << " if (!" << setupPar << "(\"" << obj->GetName() << "\")) return;" << endl;
2875 if (fAdditionalLibs.Length()) {
2876 out << "// Add aditional AliRoot libraries" << endl;
2877 TObjArray *list = fAdditionalLibs.Tokenize(" ");
2880 while((str=(TObjString*)next())) {
2881 if (str->GetString().Contains(".so"))
2882 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
2883 if (str->GetString().Contains(".par"))
2884 out << " if (!" << setupPar << "(\"" << str->GetString() << "\")) return;" << endl;
2886 if (list) delete list;
2889 out << "// analysis source to be compiled at runtime (if any)" << endl;
2890 if (fAnalysisSource.Length()) {
2891 TObjArray *list = fAnalysisSource.Tokenize(" ");
2894 while((str=(TObjString*)next())) {
2895 out << " gROOT->ProcessLine(\".L " << str->GetString().Data() << "+g\");" << endl;
2897 if (list) delete list;
2900 if (fFastReadOption) {
2901 Warning("WriteAnalysisMacro", "!!! You requested FastRead option. Using xrootd flags to reduce timeouts in the grid jobs. This may skip some files that could be accessed !!! \
2902 \n+++ NOTE: To disable this option, use: plugin->SetFastReadOption(kFALSE)");
2903 out << "// fast xrootd reading enabled" << endl;
2904 out << " printf(\"!!! You requested FastRead option. Using xrootd flags to reduce timeouts. Note that this may skip some files that could be accessed !!!\");" << endl;
2905 out << " gEnv->SetValue(\"XNet.ConnectTimeout\",10);" << endl;
2906 out << " gEnv->SetValue(\"XNet.RequestTimeout\",20);" << endl;
2907 out << " gEnv->SetValue(\"XNet.MaxRedirectCount\",2);" << endl;
2908 out << " gEnv->SetValue(\"XNet.ReconnectTimeout\",50);" << endl;
2909 out << " gEnv->SetValue(\"XNet.FirstConnectMaxCnt\",1);" << endl << endl;
2911 out << "// connect to AliEn and make the chain" << endl;
2912 out << " if (!TGrid::Connect(\"alien://\")) return;" << endl;
2913 if (IsUsingTags()) {
2914 out << " TChain *chain = CreateChainFromTags(\"wn.xml\", anatype);" << endl << endl;
2916 out << " TChain *chain = CreateChain(\"wn.xml\", anatype);" << endl << endl;
2918 out << "// read the analysis manager from file" << endl;
2919 TString analysisFile = fExecutable;
2920 analysisFile.ReplaceAll(".sh", ".root");
2921 out << " TFile *file = TFile::Open(\"" << analysisFile << "\");" << endl;
2922 out << " if (!file) return;" << endl;
2923 out << " TIter nextkey(file->GetListOfKeys());" << endl;
2924 out << " AliAnalysisManager *mgr = 0;" << endl;
2925 out << " TKey *key;" << endl;
2926 out << " while ((key=(TKey*)nextkey())) {" << endl;
2927 out << " if (!strcmp(key->GetClassName(), \"AliAnalysisManager\"))" << endl;
2928 out << " mgr = (AliAnalysisManager*)file->Get(key->GetName());" << endl;
2929 out << " };" << endl;
2930 out << " if (!mgr) {" << endl;
2931 out << " ::Error(\"" << func.Data() << "\", \"No analysis manager found in file " << analysisFile <<"\");" << endl;
2932 out << " return;" << endl;
2933 out << " }" << endl << endl;
2934 out << " mgr->PrintStatus();" << endl;
2935 if (AliAnalysisManager::GetAnalysisManager()) {
2936 if (AliAnalysisManager::GetAnalysisManager()->GetDebugLevel()>3) {
2937 out << " gEnv->SetValue(\"XNet.Debug\", \"1\");" << endl;
2939 if (TestBit(AliAnalysisGrid::kTest))
2940 out << " AliLog::SetGlobalLogLevel(AliLog::kWarning);" << endl;
2942 out << " AliLog::SetGlobalLogLevel(AliLog::kError);" << endl;
2945 out << " mgr->StartAnalysis(\"localfile\", chain);" << endl;
2946 out << " timer.Stop();" << endl;
2947 out << " timer.Print();" << endl;
2948 out << "}" << endl << endl;
2949 if (IsUsingTags()) {
2950 out << "TChain* CreateChainFromTags(const char *xmlfile, const char *type=\"ESD\")" << endl;
2952 out << "// Create a chain using tags from the xml file." << endl;
2953 out << " TAlienCollection* coll = TAlienCollection::Open(xmlfile);" << endl;
2954 out << " if (!coll) {" << endl;
2955 out << " ::Error(\"CreateChainFromTags\", \"Cannot create an AliEn collection from %s\", xmlfile);" << endl;
2956 out << " return NULL;" << endl;
2957 out << " }" << endl;
2958 out << " TGridResult* tagResult = coll->GetGridResult(\"\",kFALSE,kFALSE);" << endl;
2959 out << " AliTagAnalysis *tagAna = new AliTagAnalysis(type);" << endl;
2960 out << " tagAna->ChainGridTags(tagResult);" << endl << endl;
2961 out << " AliRunTagCuts *runCuts = new AliRunTagCuts();" << endl;
2962 out << " AliLHCTagCuts *lhcCuts = new AliLHCTagCuts();" << endl;
2963 out << " AliDetectorTagCuts *detCuts = new AliDetectorTagCuts();" << endl;
2964 out << " AliEventTagCuts *evCuts = new AliEventTagCuts();" << endl;
2965 out << " // Check if the cuts configuration file was provided" << endl;
2966 out << " if (!gSystem->AccessPathName(\"ConfigureCuts.C\")) {" << endl;
2967 out << " gROOT->LoadMacro(\"ConfigureCuts.C\");" << endl;
2968 out << " ConfigureCuts(runCuts, lhcCuts, detCuts, evCuts);" << endl;
2969 out << " }" << endl;
2970 if (fFriendChainName=="") {
2971 out << " TChain *chain = tagAna->QueryTags(runCuts, lhcCuts, detCuts, evCuts);" << endl;
2973 out << " TString tmpColl=\"tmpCollection.xml\";" << endl;
2974 out << " tagAna->CreateXMLCollection(tmpColl.Data(),runCuts, lhcCuts, detCuts, evCuts);" << endl;
2975 out << " TChain *chain = CreateChain(tmpColl.Data(),type);" << endl;
2977 out << " if (!chain || !chain->GetNtrees()) return NULL;" << endl;
2978 out << " chain->ls();" << endl;
2979 out << " return chain;" << endl;
2980 out << "}" << endl << endl;
2981 if (gSystem->AccessPathName("ConfigureCuts.C")) {
2982 TString msg = "\n##### You may want to provide a macro ConfigureCuts.C with a method:\n";
2983 msg += " void ConfigureCuts(AliRunTagCuts *runCuts,\n";
2984 msg += " AliLHCTagCuts *lhcCuts,\n";
2985 msg += " AliDetectorTagCuts *detCuts,\n";
2986 msg += " AliEventTagCuts *evCuts)";
2987 Info("WriteAnalysisMacro", "%s", msg.Data());
2990 if (!IsUsingTags() || fFriendChainName!="") {
2991 out <<"//________________________________________________________________________________" << endl;
2992 out << "TChain* CreateChain(const char *xmlfile, const char *type=\"ESD\")" << endl;
2994 out << "// Create a chain using url's from xml file" << endl;
2995 out << " TString treename = type;" << endl;
2996 out << " treename.ToLower();" << endl;
2997 out << " treename += \"Tree\";" << endl;
2998 out << " printf(\"***************************************\\n\");" << endl;
2999 out << " printf(\" Getting chain of trees %s\\n\", treename.Data());" << endl;
3000 out << " printf(\"***************************************\\n\");" << endl;
3001 out << " TAlienCollection *coll = TAlienCollection::Open(xmlfile);" << endl;
3002 out << " if (!coll) {" << endl;
3003 out << " ::Error(\"CreateChain\", \"Cannot create an AliEn collection from %s\", xmlfile);" << endl;
3004 out << " return NULL;" << endl;
3005 out << " }" << endl;
3006 out << " TChain *chain = new TChain(treename);" << endl;
3007 if(fFriendChainName!="") {
3008 out << " TChain *chainFriend = new TChain(treename);" << endl;
3010 out << " coll->Reset();" << endl;
3011 out << " while (coll->Next()) {" << endl;
3012 out << " chain->Add(coll->GetTURL(\"\"));" << endl;
3013 if(fFriendChainName!="") {
3014 out << " TString fileFriend=coll->GetTURL(\"\");" << endl;
3015 out << " fileFriend.ReplaceAll(\"AliAOD.root\",\""<<fFriendChainName.Data()<<"\");" << endl;
3016 out << " fileFriend.ReplaceAll(\"AliAODs.root\",\""<<fFriendChainName.Data()<<"\");" << endl;
3017 out << " chainFriend->Add(fileFriend.Data());" << endl;
3019 out << " }" << endl;
3020 out << " if (!chain->GetNtrees()) {" << endl;
3021 out << " ::Error(\"CreateChain\", \"No tree found from collection %s\", xmlfile);" << endl;
3022 out << " return NULL;" << endl;
3023 out << " }" << endl;
3024 if(fFriendChainName!="") {
3025 out << " chain->AddFriend(chainFriend);" << endl;
3027 out << " return chain;" << endl;
3028 out << "}" << endl << endl;
3030 if (hasANALYSISalice) {
3031 out <<"//________________________________________________________________________________" << endl;
3032 out << "Bool_t SetupPar(const char *package) {" << endl;
3033 out << "// Compile the package and set it up." << endl;
3034 out << " TString pkgdir = package;" << endl;
3035 out << " pkgdir.ReplaceAll(\".par\",\"\");" << endl;
3036 out << " gSystem->Exec(Form(\"tar xvzf %s.par\", pkgdir.Data()));" << endl;
3037 out << " TString cdir = gSystem->WorkingDirectory();" << endl;
3038 out << " gSystem->ChangeDirectory(pkgdir);" << endl;
3039 out << " // Check for BUILD.sh and execute" << endl;
3040 out << " if (!gSystem->AccessPathName(\"PROOF-INF/BUILD.sh\")) {" << endl;
3041 out << " printf(\"*******************************\\n\");" << endl;
3042 out << " printf(\"*** Building PAR archive ***\\n\");" << endl;
3043 out << " printf(\"*******************************\\n\");" << endl;
3044 out << " if (gSystem->Exec(\"PROOF-INF/BUILD.sh\")) {" << endl;
3045 out << " ::Error(\"SetupPar\", \"Cannot build par archive %s\", pkgdir.Data());" << endl;
3046 out << " gSystem->ChangeDirectory(cdir);" << endl;
3047 out << " return kFALSE;" << endl;
3048 out << " }" << endl;
3049 out << " } else {" << endl;
3050 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/BUILD.sh for package %s\", pkgdir.Data());" << endl;
3051 out << " gSystem->ChangeDirectory(cdir);" << endl;
3052 out << " return kFALSE;" << endl;
3053 out << " }" << endl;
3054 out << " // Check for SETUP.C and execute" << endl;
3055 out << " if (!gSystem->AccessPathName(\"PROOF-INF/SETUP.C\")) {" << endl;
3056 out << " printf(\"*******************************\\n\");" << endl;
3057 out << " printf(\"*** Setup PAR archive ***\\n\");" << endl;
3058 out << " printf(\"*******************************\\n\");" << endl;
3059 out << " gROOT->Macro(\"PROOF-INF/SETUP.C\");" << endl;
3060 out << " } else {" << endl;
3061 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/SETUP.C for package %s\", pkgdir.Data());" << endl;
3062 out << " gSystem->ChangeDirectory(cdir);" << endl;
3063 out << " return kFALSE;" << endl;
3064 out << " }" << endl;
3065 out << " // Restore original workdir" << endl;
3066 out << " gSystem->ChangeDirectory(cdir);" << endl;
3067 out << " return kTRUE;" << endl;
3070 Info("WriteAnalysisMacro", "\n##### Analysis macro to run on worker nodes <%s> written",fAnalysisMacro.Data());
3072 Bool_t copy = kTRUE;
3073 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3076 TString workdir = gGrid->GetHomeDirectory();
3077 workdir += fGridWorkingDir;
3078 if (FileExists(fAnalysisMacro)) gGrid->Rm(fAnalysisMacro);
3079 if (IsUsingTags() && !gSystem->AccessPathName("ConfigureCuts.C")) {
3080 if (FileExists("ConfigureCuts.C")) gGrid->Rm("ConfigureCuts.C");
3081 Info("WriteAnalysisMacro", "\n##### Copying cuts configuration macro: <ConfigureCuts.C> to your alien workspace");
3082 TFile::Cp("file:ConfigureCuts.C", Form("alien://%s/ConfigureCuts.C", workdir.Data()));
3084 Info("WriteAnalysisMacro", "\n##### Copying analysis macro: <%s> to your alien workspace", fAnalysisMacro.Data());
3085 TFile::Cp(Form("file:%s",fAnalysisMacro.Data()), Form("alien://%s/%s", workdir.Data(), fAnalysisMacro.Data()));
3089 //______________________________________________________________________________
3090 void AliAnalysisAlien::WriteMergingMacro()
3092 // Write a macro to merge the outputs per master job.
3093 if (!fMergeViaJDL) return;
3094 if (!fOutputFiles.Length()) {
3095 Error("WriteMergingMacro", "No output file names defined. Are you running the right AliAnalysisAlien configuration ?");
3098 TString mergingMacro = fExecutable;
3099 mergingMacro.ReplaceAll(".sh","_merge.C");
3100 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("/%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
3101 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3103 out.open(mergingMacro.Data(), ios::out);
3105 Error("WriteMergingMacro", "could not open file %s for writing", fAnalysisMacro.Data());
3108 Bool_t hasSTEERBase = kFALSE;
3109 Bool_t hasESD = kFALSE;
3110 Bool_t hasAOD = kFALSE;
3111 Bool_t hasANALYSIS = kFALSE;
3112 Bool_t hasANALYSISalice = kFALSE;
3113 Bool_t hasCORRFW = kFALSE;
3114 TString func = mergingMacro;
3116 func.ReplaceAll(".C", "");
3117 out << "void " << func.Data() << "(const char *dir, Int_t stage=0, Int_t ichunk=0)" << endl;
3119 out << "// Automatically generated merging macro executed in grid subjobs" << endl << endl;
3120 out << " TStopwatch timer;" << endl;
3121 out << " timer.Start();" << endl << endl;
3122 if (!fExecutableCommand.Contains("aliroot")) {
3123 out << "// load base root libraries" << endl;
3124 out << " gSystem->Load(\"libTree\");" << endl;
3125 out << " gSystem->Load(\"libGeom\");" << endl;
3126 out << " gSystem->Load(\"libVMC\");" << endl;
3127 out << " gSystem->Load(\"libPhysics\");" << endl << endl;
3128 out << " gSystem->Load(\"libMinuit\");" << endl << endl;
3130 if (fAdditionalRootLibs.Length()) {
3131 // in principle libtree /lib geom libvmc etc. can go into this list, too
3132 out << "// Add aditional libraries" << endl;
3133 TObjArray *list = fAdditionalRootLibs.Tokenize(" ");
3136 while((str=(TObjString*)next())) {
3137 if (str->GetString().Contains(".so"))
3138 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
3140 if (list) delete list;
3142 out << "// include path" << endl;
3143 if (fIncludePath.Length()) out << " gSystem->AddIncludePath(\"" << fIncludePath.Data() << "\");" << endl;
3144 out << " gSystem->AddIncludePath(\"-I$ALICE_ROOT/include\");" << endl << endl;
3145 out << "// Load analysis framework libraries" << endl;
3147 if (!fExecutableCommand.Contains("aliroot")) {
3148 out << " gSystem->Load(\"libSTEERBase\");" << endl;
3149 out << " gSystem->Load(\"libESD\");" << endl;
3150 out << " gSystem->Load(\"libAOD\");" << endl;
3152 out << " gSystem->Load(\"libANALYSIS\");" << endl;
3153 out << " gSystem->Load(\"libANALYSISalice\");" << endl;
3154 out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
3156 TIter next(fPackages);
3159 TString setupPar = "AliAnalysisAlien::SetupPar";
3160 while ((obj=next())) {
3161 pkgname = obj->GetName();
3162 if (pkgname == "STEERBase" ||
3163 pkgname == "STEERBase.par") hasSTEERBase = kTRUE;
3164 if (pkgname == "ESD" ||
3165 pkgname == "ESD.par") hasESD = kTRUE;
3166 if (pkgname == "AOD" ||
3167 pkgname == "AOD.par") hasAOD = kTRUE;
3168 if (pkgname == "ANALYSIS" ||
3169 pkgname == "ANALYSIS.par") hasANALYSIS = kTRUE;
3170 if (pkgname == "ANALYSISalice" ||
3171 pkgname == "ANALYSISalice.par") hasANALYSISalice = kTRUE;
3172 if (pkgname == "CORRFW" ||
3173 pkgname == "CORRFW.par") hasCORRFW = kTRUE;
3175 if (hasANALYSISalice) setupPar = "SetupPar";
3176 if (!hasSTEERBase) out << " gSystem->Load(\"libSTEERBase\");" << endl;
3177 else out << " if (!" << setupPar << "(\"STEERBase\")) return;" << endl;
3178 if (!hasESD) out << " gSystem->Load(\"libESD\");" << endl;
3179 else out << " if (!" << setupPar << "(\"ESD\")) return;" << endl;
3180 if (!hasAOD) out << " gSystem->Load(\"libAOD\");" << endl;
3181 else out << " if (!" << setupPar << "(\"AOD\")) return;" << endl;
3182 if (!hasANALYSIS) out << " gSystem->Load(\"libANALYSIS\");" << endl;
3183 else out << " if (!" << setupPar << "(\"ANALYSIS\")) return;" << endl;
3184 if (!hasANALYSISalice) out << " gSystem->Load(\"libANALYSISalice\");" << endl;
3185 else out << " if (!" << setupPar << "(\"ANALYSISalice\")) return;" << endl;
3186 if (!hasCORRFW) out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
3187 else out << " if (!" << setupPar << "(\"CORRFW\")) return;" << endl << endl;
3188 out << "// Compile other par packages" << endl;
3190 while ((obj=next())) {
3191 pkgname = obj->GetName();
3192 if (pkgname == "STEERBase" ||
3193 pkgname == "STEERBase.par" ||
3195 pkgname == "ESD.par" ||
3197 pkgname == "AOD.par" ||
3198 pkgname == "ANALYSIS" ||
3199 pkgname == "ANALYSIS.par" ||
3200 pkgname == "ANALYSISalice" ||
3201 pkgname == "ANALYSISalice.par" ||
3202 pkgname == "CORRFW" ||
3203 pkgname == "CORRFW.par") continue;
3204 out << " if (!" << setupPar << "(\"" << obj->GetName() << "\")) return;" << endl;
3207 if (fAdditionalLibs.Length()) {
3208 out << "// Add aditional AliRoot libraries" << endl;
3209 TObjArray *list = fAdditionalLibs.Tokenize(" ");
3212 while((str=(TObjString*)next())) {
3213 if (str->GetString().Contains(".so"))
3214 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
3216 if (list) delete list;
3219 out << "// Analysis source to be compiled at runtime (if any)" << endl;
3220 if (fAnalysisSource.Length()) {
3221 TObjArray *list = fAnalysisSource.Tokenize(" ");
3224 while((str=(TObjString*)next())) {
3225 out << " gROOT->ProcessLine(\".L " << str->GetString().Data() << "+g\");" << endl;
3227 if (list) delete list;
3231 if (fFastReadOption) {
3232 Warning("WriteMergingMacro", "!!! You requested FastRead option. Using xrootd flags to reduce timeouts in the grid merging jobs. Note that this may skip some files that could be accessed !!!");
3233 out << "// fast xrootd reading enabled" << endl;
3234 out << " printf(\"!!! You requested FastRead option. Using xrootd flags to reduce timeouts. Note that this may skip some files that could be accessed !!!\");" << endl;
3235 out << " gEnv->SetValue(\"XNet.ConnectTimeout\",10);" << endl;
3236 out << " gEnv->SetValue(\"XNet.RequestTimeout\",20);" << endl;
3237 out << " gEnv->SetValue(\"XNet.MaxRedirectCount\",2);" << endl;
3238 out << " gEnv->SetValue(\"XNet.ReconnectTimeout\",50);" << endl;
3239 out << " gEnv->SetValue(\"XNet.FirstConnectMaxCnt\",1);" << endl << endl;
3241 // Change temp directory to current one
3242 out << "// Set temporary merging directory to current one" << endl;
3243 out << " gSystem->Setenv(\"TMPDIR\", gSystem->pwd());" << endl << endl;
3244 out << "// Connect to AliEn" << endl;
3245 out << " if (!TGrid::Connect(\"alien://\")) return;" << endl;
3246 out << " Bool_t laststage = kFALSE;" << endl;
3247 out << " TString outputDir = dir;" << endl;
3248 out << " TString outputFiles = \"" << fOutputFiles << "\";" << endl;
3249 out << " TString mergeExcludes = \"" << fMergeExcludes << "\";" << endl;
3250 out << " mergeExcludes += \"" << AliAnalysisManager::GetAnalysisManager()->GetExtraFiles() << "\";" << endl;
3251 out << " TObjArray *list = outputFiles.Tokenize(\",\");" << endl;
3252 out << " TIter *iter = new TIter(list);" << endl;
3253 out << " TObjString *str;" << endl;
3254 out << " TString outputFile;" << endl;
3255 out << " Bool_t merged = kTRUE;" << endl;
3256 out << " while((str=(TObjString*)iter->Next())) {" << endl;
3257 out << " outputFile = str->GetString();" << endl;
3258 out << " if (outputFile.Contains(\"*\")) continue;" << endl;
3259 out << " Int_t index = outputFile.Index(\"@\");" << endl;
3260 out << " if (index > 0) outputFile.Remove(index);" << endl;
3261 out << " // Skip already merged outputs" << endl;
3262 out << " if (!gSystem->AccessPathName(outputFile)) {" << endl;
3263 out << " printf(\"Output file <%s> found. Not merging again.\",outputFile.Data());" << endl;
3264 out << " continue;" << endl;
3265 out << " }" << endl;
3266 out << " if (mergeExcludes.Contains(outputFile.Data())) continue;" << endl;
3267 out << " merged = AliAnalysisAlien::MergeOutput(outputFile, outputDir, " << fMaxMergeFiles << ", stage, ichunk);" << endl;
3268 out << " if (!merged) {" << endl;
3269 out << " printf(\"ERROR: Cannot merge %s\\n\", outputFile.Data());" << endl;
3270 out << " return;" << endl;
3271 out << " }" << endl;
3272 out << " // Check if this was the last stage. If yes, run terminate for the tasks." << endl;
3273 out << " if (!gSystem->AccessPathName(outputFile)) laststage = kTRUE;" << endl;
3274 out << " }" << endl;
3275 out << " // all outputs merged, validate" << endl;
3276 out << " ofstream out;" << endl;
3277 out << " out.open(\"outputs_valid\", ios::out);" << endl;
3278 out << " out.close();" << endl;
3279 out << " // read the analysis manager from file" << endl;
3280 TString analysisFile = fExecutable;
3281 analysisFile.ReplaceAll(".sh", ".root");
3282 out << " if (!laststage) return;" << endl;
3283 out << " TFile *file = TFile::Open(\"" << analysisFile << "\");" << endl;
3284 out << " if (!file) return;" << endl;
3285 out << " TIter nextkey(file->GetListOfKeys());" << endl;
3286 out << " AliAnalysisManager *mgr = 0;" << endl;
3287 out << " TKey *key;" << endl;
3288 out << " while ((key=(TKey*)nextkey())) {" << endl;
3289 out << " if (!strcmp(key->GetClassName(), \"AliAnalysisManager\"))" << endl;
3290 out << " mgr = (AliAnalysisManager*)file->Get(key->GetName());" << endl;
3291 out << " };" << endl;
3292 out << " if (!mgr) {" << endl;
3293 out << " ::Error(\"" << func.Data() << "\", \"No analysis manager found in file" << analysisFile <<"\");" << endl;
3294 out << " return;" << endl;
3295 out << " }" << endl << endl;
3296 out << " mgr->SetSkipTerminate(kFALSE);" << endl;
3297 out << " mgr->PrintStatus();" << endl;
3298 if (AliAnalysisManager::GetAnalysisManager()) {
3299 if (AliAnalysisManager::GetAnalysisManager()->GetDebugLevel()>3) {
3300 out << " gEnv->SetValue(\"XNet.Debug\", \"1\");" << endl;
3302 if (TestBit(AliAnalysisGrid::kTest))
3303 out << " AliLog::SetGlobalLogLevel(AliLog::kWarning);" << endl;
3305 out << " AliLog::SetGlobalLogLevel(AliLog::kError);" << endl;
3308 out << " TTree *tree = NULL;" << endl;
3309 out << " mgr->StartAnalysis(\"gridterminate\", tree);" << endl;
3310 out << "}" << endl << endl;
3311 if (hasANALYSISalice) {
3312 out <<"//________________________________________________________________________________" << endl;
3313 out << "Bool_t SetupPar(const char *package) {" << endl;
3314 out << "// Compile the package and set it up." << endl;
3315 out << " TString pkgdir = package;" << endl;
3316 out << " pkgdir.ReplaceAll(\".par\",\"\");" << endl;
3317 out << " gSystem->Exec(Form(\"tar xvzf %s.par\", pkgdir.Data()));" << endl;
3318 out << " TString cdir = gSystem->WorkingDirectory();" << endl;
3319 out << " gSystem->ChangeDirectory(pkgdir);" << endl;
3320 out << " // Check for BUILD.sh and execute" << endl;
3321 out << " if (!gSystem->AccessPathName(\"PROOF-INF/BUILD.sh\")) {" << endl;
3322 out << " printf(\"*******************************\\n\");" << endl;
3323 out << " printf(\"*** Building PAR archive ***\\n\");" << endl;
3324 out << " printf(\"*******************************\\n\");" << endl;
3325 out << " if (gSystem->Exec(\"PROOF-INF/BUILD.sh\")) {" << endl;
3326 out << " ::Error(\"SetupPar\", \"Cannot build par archive %s\", pkgdir.Data());" << endl;
3327 out << " gSystem->ChangeDirectory(cdir);" << endl;
3328 out << " return kFALSE;" << endl;
3329 out << " }" << endl;
3330 out << " } else {" << endl;
3331 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/BUILD.sh for package %s\", pkgdir.Data());" << endl;
3332 out << " gSystem->ChangeDirectory(cdir);" << endl;
3333 out << " return kFALSE;" << endl;
3334 out << " }" << endl;
3335 out << " // Check for SETUP.C and execute" << endl;
3336 out << " if (!gSystem->AccessPathName(\"PROOF-INF/SETUP.C\")) {" << endl;
3337 out << " printf(\"*******************************\\n\");" << endl;
3338 out << " printf(\"*** Setup PAR archive ***\\n\");" << endl;
3339 out << " printf(\"*******************************\\n\");" << endl;
3340 out << " gROOT->Macro(\"PROOF-INF/SETUP.C\");" << endl;
3341 out << " } else {" << endl;
3342 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/SETUP.C for package %s\", pkgdir.Data());" << endl;
3343 out << " gSystem->ChangeDirectory(cdir);" << endl;
3344 out << " return kFALSE;" << endl;
3345 out << " }" << endl;
3346 out << " // Restore original workdir" << endl;
3347 out << " gSystem->ChangeDirectory(cdir);" << endl;
3348 out << " return kTRUE;" << endl;
3352 Bool_t copy = kTRUE;
3353 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3356 TString workdir = gGrid->GetHomeDirectory();
3357 workdir += fGridWorkingDir;
3358 if (FileExists(mergingMacro)) gGrid->Rm(mergingMacro);
3359 Info("WriteMergingMacro", "\n##### Copying merging macro: <%s> to your alien workspace", mergingMacro.Data());
3360 TFile::Cp(Form("file:%s",mergingMacro.Data()), Form("alien://%s/%s", workdir.Data(), mergingMacro.Data()));
3364 //______________________________________________________________________________
3365 Bool_t AliAnalysisAlien::SetupPar(const char *package)
3367 // Compile the par file archive pointed by <package>. This must be present in the current directory.
3368 // Note that for loading the compiled library. The current directory should have precedence in
3370 TString pkgdir = package;
3371 pkgdir.ReplaceAll(".par","");
3372 gSystem->Exec(Form("tar xvzf %s.par", pkgdir.Data()));
3373 TString cdir = gSystem->WorkingDirectory();
3374 gSystem->ChangeDirectory(pkgdir);
3375 // Check for BUILD.sh and execute
3376 if (!gSystem->AccessPathName("PROOF-INF/BUILD.sh")) {
3377 printf("**************************************************\n");
3378 printf("*** Building PAR archive %s\n", package);
3379 printf("**************************************************\n");
3380 if (gSystem->Exec("PROOF-INF/BUILD.sh")) {
3381 ::Error("SetupPar", "Cannot build par archive %s", pkgdir.Data());
3382 gSystem->ChangeDirectory(cdir);
3386 ::Error("SetupPar","Cannot access PROOF-INF/BUILD.sh for package %s", pkgdir.Data());
3387 gSystem->ChangeDirectory(cdir);
3390 // Check for SETUP.C and execute
3391 if (!gSystem->AccessPathName("PROOF-INF/SETUP.C")) {
3392 printf("**************************************************\n");
3393 printf("*** Setup PAR archive %s\n", package);
3394 printf("**************************************************\n");
3395 gROOT->Macro("PROOF-INF/SETUP.C");
3396 printf("*** Loaded library: %s\n", gSystem->GetLibraries(pkgdir,"",kFALSE));
3398 ::Error("SetupPar","Cannot access PROOF-INF/SETUP.C for package %s", pkgdir.Data());
3399 gSystem->ChangeDirectory(cdir);
3402 // Restore original workdir
3403 gSystem->ChangeDirectory(cdir);
3407 //______________________________________________________________________________
3408 void AliAnalysisAlien::WriteExecutable()
3410 // Generate the alien executable script.
3411 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3413 out.open(fExecutable.Data(), ios::out);
3415 Error("WriteExecutable", "Bad file name for executable: %s", fExecutable.Data());
3418 out << "#!/bin/bash" << endl;
3419 out << "echo \"=========================================\"" << endl;
3420 out << "echo \"############## PATH : ##############\"" << endl;
3421 out << "echo $PATH" << endl;
3422 out << "echo \"############## LD_LIBRARY_PATH : ##############\"" << endl;
3423 out << "echo $LD_LIBRARY_PATH" << endl;
3424 out << "echo \"############## ROOTSYS : ##############\"" << endl;
3425 out << "echo $ROOTSYS" << endl;
3426 out << "echo \"############## which root : ##############\"" << endl;
3427 out << "which root" << endl;
3428 out << "echo \"############## ALICE_ROOT : ##############\"" << endl;
3429 out << "echo $ALICE_ROOT" << endl;
3430 out << "echo \"############## which aliroot : ##############\"" << endl;
3431 out << "which aliroot" << endl;
3432 out << "echo \"############## system limits : ##############\"" << endl;
3433 out << "ulimit -a" << endl;
3434 out << "echo \"############## memory : ##############\"" << endl;
3435 out << "free -m" << endl;
3436 out << "echo \"=========================================\"" << endl << endl;
3437 // Make sure we can properly compile par files
3438 if (TObject::TestBit(AliAnalysisGrid::kUsePars)) out << "export LD_LIBRARY_PATH=.:$LD_LIBRARY_PATH" << endl;
3439 out << fExecutableCommand << " ";
3440 out << fAnalysisMacro.Data() << " " << fExecutableArgs.Data() << endl << endl;
3441 out << "echo \"======== " << fAnalysisMacro.Data() << " finished with exit code: $? ========\"" << endl;
3442 out << "echo \"############## memory after: ##############\"" << endl;
3443 out << "free -m" << endl;
3445 Bool_t copy = kTRUE;
3446 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3449 TString workdir = gGrid->GetHomeDirectory();
3450 TString bindir = Form("%s/bin", workdir.Data());
3451 if (!DirectoryExists(bindir)) gGrid->Mkdir(bindir,"-p");
3452 workdir += fGridWorkingDir;
3453 TString executable = Form("%s/bin/%s", gGrid->GetHomeDirectory(), fExecutable.Data());
3454 if (FileExists(executable)) gGrid->Rm(executable);
3455 Info("WriteExecutable", "\n##### Copying executable file <%s> to your AliEn bin directory", fExecutable.Data());
3456 TFile::Cp(Form("file:%s",fExecutable.Data()), Form("alien://%s", executable.Data()));
3460 //______________________________________________________________________________
3461 void AliAnalysisAlien::WriteMergeExecutable()
3463 // Generate the alien executable script for the merging job.
3464 if (!fMergeViaJDL) return;
3465 TString mergeExec = fExecutable;
3466 mergeExec.ReplaceAll(".sh", "_merge.sh");
3467 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3469 out.open(mergeExec.Data(), ios::out);
3471 Error("WriteMergingExecutable", "Bad file name for executable: %s", mergeExec.Data());
3474 out << "#!/bin/bash" << endl;
3475 out << "echo \"=========================================\"" << endl;
3476 out << "echo \"############## PATH : ##############\"" << endl;
3477 out << "echo $PATH" << endl;
3478 out << "echo \"############## LD_LIBRARY_PATH : ##############\"" << endl;
3479 out << "echo $LD_LIBRARY_PATH" << endl;
3480 out << "echo \"############## ROOTSYS : ##############\"" << endl;
3481 out << "echo $ROOTSYS" << endl;
3482 out << "echo \"############## which root : ##############\"" << endl;
3483 out << "which root" << endl;
3484 out << "echo \"############## ALICE_ROOT : ##############\"" << endl;
3485 out << "echo $ALICE_ROOT" << endl;
3486 out << "echo \"############## which aliroot : ##############\"" << endl;
3487 out << "which aliroot" << endl;
3488 out << "echo \"############## system limits : ##############\"" << endl;
3489 out << "ulimit -a" << endl;
3490 out << "echo \"############## memory : ##############\"" << endl;
3491 out << "free -m" << endl;
3492 out << "echo \"=========================================\"" << endl << endl;
3493 // Make sure we can properly compile par files
3494 if (TObject::TestBit(AliAnalysisGrid::kUsePars)) out << "export LD_LIBRARY_PATH=.:$LD_LIBRARY_PATH" << endl;
3495 TString mergeMacro = fExecutable;
3496 mergeMacro.ReplaceAll(".sh", "_merge.C");
3497 out << "export ARG=\"" << mergeMacro << "(\\\"$1\\\",$2,$3)\"" << endl;
3498 out << fExecutableCommand << " " << "$ARG" << endl;
3499 out << "echo \"======== " << mergeMacro.Data() << " finished with exit code: $? ========\"" << endl;
3500 out << "echo \"############## memory after: ##############\"" << endl;
3501 out << "free -m" << endl;
3503 Bool_t copy = kTRUE;
3504 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3507 TString workdir = gGrid->GetHomeDirectory();
3508 TString bindir = Form("%s/bin", workdir.Data());
3509 if (!DirectoryExists(bindir)) gGrid->Mkdir(bindir,"-p");
3510 workdir += fGridWorkingDir;
3511 TString executable = Form("%s/bin/%s", gGrid->GetHomeDirectory(), mergeExec.Data());
3512 if (FileExists(executable)) gGrid->Rm(executable);
3513 Info("WriteMergeExecutable", "\n##### Copying executable file <%s> to your AliEn bin directory", mergeExec.Data());
3514 TFile::Cp(Form("file:%s",mergeExec.Data()), Form("alien://%s", executable.Data()));
3518 //______________________________________________________________________________
3519 void AliAnalysisAlien::WriteProductionFile(const char *filename) const
3521 // Write the production file to be submitted by LPM manager. The format is:
3522 // First line: full_path_to_jdl estimated_no_subjobs_per_master
3523 // Next lines: full_path_to_dataset XXX (XXX is a string)
3524 // To submit, one has to: submit jdl XXX for all lines
3526 out.open(filename, ios::out);
3528 Error("WriteProductionFile", "Bad file name: %s", filename);
3532 if (!fProductionMode && !fGridWorkingDir.BeginsWith("/alice"))
3533 workdir = gGrid->GetHomeDirectory();
3534 workdir += fGridWorkingDir;
3535 Int_t njobspermaster = 1000*fNrunsPerMaster/fSplitMaxInputFileNumber;
3536 TString locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
3537 out << locjdl << " " << njobspermaster << endl;
3538 Int_t nmasterjobs = fInputFiles->GetEntries();
3539 for (Int_t i=0; i<nmasterjobs; i++) {
3540 TString runOutDir = gSystem->BaseName(fInputFiles->At(i)->GetName());
3541 runOutDir.ReplaceAll(".xml", "");
3543 out << Form("%s", fInputFiles->At(i)->GetName()) << " " << runOutDir << endl;
3545 out << Form("%s", fInputFiles->At(i)->GetName()) << " " << Form("%03d", i) << endl;
3548 Info("WriteProductionFile", "\n##### Copying production file <%s> to your work directory", filename);
3549 if (FileExists(filename)) gGrid->Rm(filename);
3550 TFile::Cp(Form("file:%s",filename), Form("alien://%s/%s", workdir.Data(),filename));
3554 //______________________________________________________________________________
3555 void AliAnalysisAlien::WriteValidationScript(Bool_t merge)
3557 // Generate the alien validation script.
3558 // Generate the validation script
3560 if (fValidationScript.IsNull()) {
3561 fValidationScript = fExecutable;
3562 fValidationScript.ReplaceAll(".sh", "_validation.sh");
3564 TString validationScript = fValidationScript;
3565 if (merge) validationScript.ReplaceAll(".sh", "_merge.sh");
3567 Error("WriteValidationScript", "Alien connection required");
3570 if (!fTerminateFiles.IsNull()) {
3571 fTerminateFiles.Strip();
3572 fTerminateFiles.ReplaceAll(" ",",");
3574 TString outStream = "";
3575 if (!TestBit(AliAnalysisGrid::kTest)) outStream = " >> stdout";
3576 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3578 out.open(validationScript, ios::out);
3579 out << "#!/bin/bash" << endl;
3580 out << "##################################################" << endl;
3581 out << "validateout=`dirname $0`" << endl;
3582 out << "validatetime=`date`" << endl;
3583 out << "validated=\"0\";" << endl;
3584 out << "error=0" << endl;
3585 out << "if [ -z $validateout ]" << endl;
3586 out << "then" << endl;
3587 out << " validateout=\".\"" << endl;
3588 out << "fi" << endl << endl;
3589 out << "cd $validateout;" << endl;
3590 out << "validateworkdir=`pwd`;" << endl << endl;
3591 out << "echo \"*******************************************************\"" << outStream << endl;
3592 out << "echo \"* Automatically generated validation script *\"" << outStream << endl;
3594 out << "echo \"* Time: $validatetime \"" << outStream << endl;
3595 out << "echo \"* Dir: $validateout\"" << outStream << endl;
3596 out << "echo \"* Workdir: $validateworkdir\"" << outStream << endl;
3597 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl;
3598 out << "ls -la ./" << outStream << endl;
3599 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl << endl;
3600 out << "##################################################" << endl;
3603 out << "if [ ! -f stderr ] ; then" << endl;
3604 out << " error=1" << endl;
3605 out << " echo \"* ########## Job not validated - no stderr ###\" " << outStream << endl;
3606 out << " echo \"Error = $error\" " << outStream << endl;
3607 out << "fi" << endl;
3609 out << "parArch=`grep -Ei \"Cannot Build the PAR Archive\" stderr`" << endl;
3610 out << "segViol=`grep -Ei \"Segmentation violation\" stderr`" << endl;
3611 out << "segFault=`grep -Ei \"Segmentation fault\" stderr`" << endl;
3612 out << "glibcErr=`grep -Ei \"*** glibc detected ***\" stderr`" << endl;
3615 out << "if [ \"$parArch\" != \"\" ] ; then" << endl;
3616 out << " error=1" << endl;
3617 out << " echo \"* ########## Job not validated - PAR archive not built ###\" " << outStream << endl;
3618 out << " echo \"$parArch\" " << outStream << endl;
3619 out << " echo \"Error = $error\" " << outStream << endl;
3620 out << "fi" << endl;
3622 out << "if [ \"$segViol\" != \"\" ] ; then" << endl;
3623 out << " error=1" << endl;
3624 out << " echo \"* ########## Job not validated - Segment. violation ###\" " << outStream << endl;
3625 out << " echo \"$segViol\" " << outStream << endl;
3626 out << " echo \"Error = $error\" " << outStream << endl;
3627 out << "fi" << endl;
3629 out << "if [ \"$segFault\" != \"\" ] ; then" << endl;
3630 out << " error=1" << endl;
3631 out << " echo \"* ########## Job not validated - Segment. fault ###\" " << outStream << endl;
3632 out << " echo \"$segFault\" " << outStream << endl;
3633 out << " echo \"Error = $error\" " << outStream << endl;
3634 out << "fi" << endl;
3636 out << "if [ \"$glibcErr\" != \"\" ] ; then" << endl;
3637 out << " error=1" << endl;
3638 out << " echo \"* ########## Job not validated - *** glibc detected *** ###\" " << outStream << endl;
3639 out << " echo \"$glibcErr\" " << outStream << endl;
3640 out << " echo \"Error = $error\" " << outStream << endl;
3641 out << "fi" << endl;
3643 // Part dedicated to the specific analyses running into the train
3645 TString outputFiles = fOutputFiles;
3646 if (merge && !fTerminateFiles.IsNull()) {
3648 outputFiles += fTerminateFiles;
3650 TObjArray *arr = outputFiles.Tokenize(",");
3653 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
3654 TString extra = mgr->GetExtraFiles();
3655 while ((os=(TObjString*)next1())) {
3656 outputFile = os->GetString();
3657 Int_t index = outputFile.Index("@");
3658 if (index > 0) outputFile.Remove(index);
3659 if (!merge && fTerminateFiles.Contains(outputFile)) continue;
3660 if (merge && fMergeExcludes.Contains(outputFile)) continue;
3661 if (extra.Contains(outputFile)) continue;
3662 if (outputFile.Contains("*")) continue;
3663 out << "if ! [ -f " << outputFile.Data() << " ] ; then" << endl;
3664 out << " error=1" << endl;
3665 out << " echo \"Output file " << outputFile << " not found. Job FAILED !\"" << outStream << endl;
3666 out << " echo \"Output file " << outputFile << " not found. Job FAILED !\" >> stderr" << endl;
3667 out << "fi" << endl;
3670 out << "if ! [ -f outputs_valid ] ; then" << endl;
3671 out << " error=1" << endl;
3672 out << " echo \"Output files were not validated by the analysis manager\" >> stdout" << endl;
3673 out << " echo \"Output files were not validated by the analysis manager\" >> stderr" << endl;
3674 out << "fi" << endl;
3676 out << "if [ $error = 0 ] ; then" << endl;
3677 out << " echo \"* ---------------- Job Validated ------------------*\"" << outStream << endl;
3678 if (!IsKeepLogs()) {
3679 out << " echo \"* === Logs std* will be deleted === \"" << endl;
3681 out << " rm -f std*" << endl;
3683 out << "fi" << endl;
3685 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl;
3686 out << "echo \"*******************************************************\"" << outStream << endl;
3687 out << "cd -" << endl;
3688 out << "exit $error" << endl;
3690 Bool_t copy = kTRUE;
3691 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3694 TString workdir = gGrid->GetHomeDirectory();
3695 workdir += fGridWorkingDir;
3696 Info("WriteValidationScript", "\n##### Copying validation script <%s> to your AliEn working space", validationScript.Data());
3697 if (FileExists(validationScript)) gGrid->Rm(validationScript);
3698 TFile::Cp(Form("file:%s",validationScript.Data()), Form("alien://%s/%s", workdir.Data(),validationScript.Data()));