1 /**************************************************************************
2 * Copyright(c) 1998-2007, ALICE Experiment at CERN, All rights reserved. *
4 * Author: The ALICE Off-line Project. *
5 * Contributors are mentioned in the code where appropriate. *
7 * Permission to use, copy, modify and distribute this software and its *
8 * documentation strictly for non-commercial purposes is hereby granted *
9 * without fee, provided that the above copyright notice appears in all *
10 * copies and that both the copyright notice and this permission notice *
11 * appear in the supporting documentation. The authors make no claims *
12 * about the suitability of this software for any purpose. It is *
13 * provided "as is" without express or implied warranty. *
14 **************************************************************************/
16 // Author: Mihaela Gheata, 01/09/2008
18 //==============================================================================
19 // AliAnalysisAlien - AliEn utility class. Provides interface for creating
20 // a personalized JDL, finding and creating a dataset.
21 //==============================================================================
23 #include "Riostream.h"
30 #include "TFileCollection.h"
32 #include "TObjString.h"
33 #include "TObjArray.h"
35 #include "TGridResult.h"
36 #include "TGridCollection.h"
38 #include "TGridJobStatusList.h"
39 #include "TGridJobStatus.h"
40 #include "TFileMerger.h"
41 #include "AliAnalysisManager.h"
42 #include "AliVEventHandler.h"
43 #include "AliAnalysisDataContainer.h"
44 #include "AliAnalysisAlien.h"
46 ClassImp(AliAnalysisAlien)
48 //______________________________________________________________________________
49 AliAnalysisAlien::AliAnalysisAlien()
55 fSplitMaxInputFileNumber(0),
57 fMasterResubmitThreshold(0),
69 fNproofWorkersPerSlave(0),
79 fAdditionalRootLibs(),
107 fRootVersionForProof(),
116 //______________________________________________________________________________
117 AliAnalysisAlien::AliAnalysisAlien(const char *name)
118 :AliAnalysisGrid(name),
123 fSplitMaxInputFileNumber(0),
125 fMasterResubmitThreshold(0),
137 fNproofWorkersPerSlave(0),
141 fExecutableCommand(),
147 fAdditionalRootLibs(),
175 fRootVersionForProof(),
184 //______________________________________________________________________________
185 AliAnalysisAlien::AliAnalysisAlien(const AliAnalysisAlien& other)
186 :AliAnalysisGrid(other),
189 fPrice(other.fPrice),
191 fSplitMaxInputFileNumber(other.fSplitMaxInputFileNumber),
192 fMaxInitFailed(other.fMaxInitFailed),
193 fMasterResubmitThreshold(other.fMasterResubmitThreshold),
194 fNtestFiles(other.fNtestFiles),
195 fNrunsPerMaster(other.fNrunsPerMaster),
196 fMaxMergeFiles(other.fMaxMergeFiles),
197 fNsubmitted(other.fNsubmitted),
198 fProductionMode(other.fProductionMode),
199 fOutputToRunNo(other.fOutputToRunNo),
200 fMergeViaJDL(other.fMergeViaJDL),
201 fFastReadOption(other.fFastReadOption),
202 fOverwriteMode(other.fOverwriteMode),
203 fNreplicas(other.fNreplicas),
204 fNproofWorkers(other.fNproofWorkers),
205 fNproofWorkersPerSlave(other.fNproofWorkersPerSlave),
206 fProofReset(other.fProofReset),
207 fRunNumbers(other.fRunNumbers),
208 fExecutable(other.fExecutable),
209 fExecutableCommand(other.fExecutableCommand),
210 fArguments(other.fArguments),
211 fExecutableArgs(other.fExecutableArgs),
212 fAnalysisMacro(other.fAnalysisMacro),
213 fAnalysisSource(other.fAnalysisSource),
214 fValidationScript(other.fValidationScript),
215 fAdditionalRootLibs(other.fAdditionalRootLibs),
216 fAdditionalLibs(other.fAdditionalLibs),
217 fSplitMode(other.fSplitMode),
218 fAPIVersion(other.fAPIVersion),
219 fROOTVersion(other.fROOTVersion),
220 fAliROOTVersion(other.fAliROOTVersion),
221 fExternalPackages(other.fExternalPackages),
223 fGridWorkingDir(other.fGridWorkingDir),
224 fGridDataDir(other.fGridDataDir),
225 fDataPattern(other.fDataPattern),
226 fGridOutputDir(other.fGridOutputDir),
227 fOutputArchive(other.fOutputArchive),
228 fOutputFiles(other.fOutputFiles),
229 fInputFormat(other.fInputFormat),
230 fDatasetName(other.fDatasetName),
231 fJDLName(other.fJDLName),
232 fTerminateFiles(other.fTerminateFiles),
233 fMergeExcludes(other.fMergeExcludes),
234 fIncludePath(other.fIncludePath),
235 fCloseSE(other.fCloseSE),
236 fFriendChainName(other.fFriendChainName),
237 fJobTag(other.fJobTag),
238 fOutputSingle(other.fOutputSingle),
239 fRunPrefix(other.fRunPrefix),
240 fProofCluster(other.fProofCluster),
241 fProofDataSet(other.fProofDataSet),
242 fFileForTestMode(other.fFileForTestMode),
243 fRootVersionForProof(other.fRootVersionForProof),
244 fAliRootMode(other.fAliRootMode),
249 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
250 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
251 fRunRange[0] = other.fRunRange[0];
252 fRunRange[1] = other.fRunRange[1];
253 if (other.fInputFiles) {
254 fInputFiles = new TObjArray();
255 TIter next(other.fInputFiles);
257 while ((obj=next())) fInputFiles->Add(new TObjString(obj->GetName()));
258 fInputFiles->SetOwner();
260 if (other.fPackages) {
261 fPackages = new TObjArray();
262 TIter next(other.fPackages);
264 while ((obj=next())) fPackages->Add(new TObjString(obj->GetName()));
265 fPackages->SetOwner();
269 //______________________________________________________________________________
270 AliAnalysisAlien::~AliAnalysisAlien()
273 if (fGridJDL) delete fGridJDL;
274 if (fMergingJDL) delete fMergingJDL;
275 if (fInputFiles) delete fInputFiles;
276 if (fPackages) delete fPackages;
279 //______________________________________________________________________________
280 AliAnalysisAlien &AliAnalysisAlien::operator=(const AliAnalysisAlien& other)
283 if (this != &other) {
284 AliAnalysisGrid::operator=(other);
285 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
286 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
287 fPrice = other.fPrice;
289 fSplitMaxInputFileNumber = other.fSplitMaxInputFileNumber;
290 fMaxInitFailed = other.fMaxInitFailed;
291 fMasterResubmitThreshold = other.fMasterResubmitThreshold;
292 fNtestFiles = other.fNtestFiles;
293 fNrunsPerMaster = other.fNrunsPerMaster;
294 fMaxMergeFiles = other.fMaxMergeFiles;
295 fNsubmitted = other.fNsubmitted;
296 fProductionMode = other.fProductionMode;
297 fOutputToRunNo = other.fOutputToRunNo;
298 fMergeViaJDL = other.fMergeViaJDL;
299 fFastReadOption = other.fFastReadOption;
300 fOverwriteMode = other.fOverwriteMode;
301 fNreplicas = other.fNreplicas;
302 fNproofWorkers = other.fNproofWorkers;
303 fNproofWorkersPerSlave = other.fNproofWorkersPerSlave;
304 fProofReset = other.fProofReset;
305 fRunNumbers = other.fRunNumbers;
306 fExecutable = other.fExecutable;
307 fExecutableCommand = other.fExecutableCommand;
308 fArguments = other.fArguments;
309 fExecutableArgs = other.fExecutableArgs;
310 fAnalysisMacro = other.fAnalysisMacro;
311 fAnalysisSource = other.fAnalysisSource;
312 fValidationScript = other.fValidationScript;
313 fAdditionalRootLibs = other.fAdditionalRootLibs;
314 fAdditionalLibs = other.fAdditionalLibs;
315 fSplitMode = other.fSplitMode;
316 fAPIVersion = other.fAPIVersion;
317 fROOTVersion = other.fROOTVersion;
318 fAliROOTVersion = other.fAliROOTVersion;
319 fExternalPackages = other.fExternalPackages;
321 fGridWorkingDir = other.fGridWorkingDir;
322 fGridDataDir = other.fGridDataDir;
323 fDataPattern = other.fDataPattern;
324 fGridOutputDir = other.fGridOutputDir;
325 fOutputArchive = other.fOutputArchive;
326 fOutputFiles = other.fOutputFiles;
327 fInputFormat = other.fInputFormat;
328 fDatasetName = other.fDatasetName;
329 fJDLName = other.fJDLName;
330 fTerminateFiles = other.fTerminateFiles;
331 fMergeExcludes = other.fMergeExcludes;
332 fIncludePath = other.fIncludePath;
333 fCloseSE = other.fCloseSE;
334 fFriendChainName = other.fFriendChainName;
335 fJobTag = other.fJobTag;
336 fOutputSingle = other.fOutputSingle;
337 fRunPrefix = other.fRunPrefix;
338 fProofCluster = other.fProofCluster;
339 fProofDataSet = other.fProofDataSet;
340 fFileForTestMode = other.fFileForTestMode;
341 fRootVersionForProof = other.fRootVersionForProof;
342 fAliRootMode = other.fAliRootMode;
343 if (other.fInputFiles) {
344 fInputFiles = new TObjArray();
345 TIter next(other.fInputFiles);
347 while ((obj=next())) fInputFiles->Add(new TObjString(obj->GetName()));
348 fInputFiles->SetOwner();
350 if (other.fPackages) {
351 fPackages = new TObjArray();
352 TIter next(other.fPackages);
354 while ((obj=next())) fPackages->Add(new TObjString(obj->GetName()));
355 fPackages->SetOwner();
361 //______________________________________________________________________________
362 void AliAnalysisAlien::AddIncludePath(const char *path)
364 // Add include path in the remote analysis macro.
366 if (p.Contains("-I")) fIncludePath += Form("%s ", path);
367 else fIncludePath += Form("-I%s ", path);
370 //______________________________________________________________________________
371 void AliAnalysisAlien::AddRunNumber(Int_t run)
373 // Add a run number to the list of runs to be processed.
374 if (fRunNumbers.Length()) fRunNumbers += " ";
375 fRunNumbers += Form("%s%d", fRunPrefix.Data(), run);
378 //______________________________________________________________________________
379 void AliAnalysisAlien::AddRunNumber(const char* run)
381 // Add a run number to the list of runs to be processed.
382 if (fRunNumbers.Length()) fRunNumbers += " ";
386 //______________________________________________________________________________
387 void AliAnalysisAlien::AddDataFile(const char *lfn)
389 // Adds a data file to the input to be analysed. The file should be a valid LFN
390 // or point to an existing file in the alien workdir.
391 if (!fInputFiles) fInputFiles = new TObjArray();
392 fInputFiles->Add(new TObjString(lfn));
395 //______________________________________________________________________________
396 void AliAnalysisAlien::AddExternalPackage(const char *package)
398 // Adds external packages w.r.t to the default ones (root,aliroot and gapi)
399 if (fExternalPackages) fExternalPackages += " ";
400 fExternalPackages += package;
403 //______________________________________________________________________________
404 Bool_t AliAnalysisAlien::Connect()
406 // Try to connect to AliEn. User needs a valid token and /tmp/gclient_env_$UID sourced.
407 if (gGrid && gGrid->IsConnected()) return kTRUE;
408 if (fProductionMode) return kTRUE;
410 Info("Connect", "Trying to connect to AliEn ...");
411 TGrid::Connect("alien://");
413 if (!gGrid || !gGrid->IsConnected()) {
414 Error("Connect", "Did not managed to connect to AliEn. Make sure you have a valid token.");
417 fUser = gGrid->GetUser();
418 Info("Connect", "\n##### Connected to AliEn as user %s. Setting analysis user to <%s>", fUser.Data(), fUser.Data());
422 //______________________________________________________________________________
423 void AliAnalysisAlien::CdWork()
425 // Check validity of alien workspace. Create directory if possible.
427 Error("CdWork", "Alien connection required");
430 TString homedir = gGrid->GetHomeDirectory();
431 TString workdir = homedir + fGridWorkingDir;
432 if (DirectoryExists(workdir)) {
436 // Work directory not existing - create it
438 if (gGrid->Mkdir(workdir, "-p")) {
439 gGrid->Cd(fGridWorkingDir);
440 Info("CdWork", "\n##### Created alien working directory %s", fGridWorkingDir.Data());
442 Warning("CdWork", "Working directory %s cannot be created.\n Using %s instead.",
443 workdir.Data(), homedir.Data());
444 fGridWorkingDir = "";
448 //______________________________________________________________________________
449 Bool_t AliAnalysisAlien::CheckFileCopy(const char *alienpath)
451 // Check if file copying is possible.
452 if (fProductionMode) return kTRUE;
454 Error("CheckFileCopy", "Not connected to AliEn. File copying cannot be tested.");
457 Info("CheckFileCopy", "Checking possibility to copy files to your AliEn home directory... \
458 \n +++ NOTE: You can disable this via: plugin->SetCheckCopy(kFALSE);");
459 // Check if alien_CLOSE_SE is defined
460 TString closeSE = gSystem->Getenv("alien_CLOSE_SE");
461 if (!closeSE.IsNull()) {
462 Info("CheckFileCopy", "Your current close storage is pointing to: \
463 \n alien_CLOSE_SE = \"%s\"", closeSE.Data());
465 Warning("CheckFileCopy", "Your current close storage is empty ! Depending on your location, file copying may fail.");
467 // Check if grid directory exists.
468 if (!DirectoryExists(alienpath)) {
469 Error("CheckFileCopy", "Alien path %s does not seem to exist", alienpath);
472 TFile f("plugin_test_copy", "RECREATE");
473 // User may not have write permissions to current directory
475 Error("CheckFileCopy", "Cannot create local test file. Do you have write access to current directory: <%s> ?",
476 gSystem->WorkingDirectory());
480 if (FileExists(Form("alien://%s/%s",alienpath, f.GetName()))) gGrid->Rm(Form("alien://%s/%s",alienpath, f.GetName()));
481 if (!TFile::Cp(f.GetName(), Form("alien://%s/%s",alienpath, f.GetName()))) {
482 Error("CheckFileCopy", "Cannot copy files to Alien destination: <%s> This may be temporary, or: \
483 \n# 1. Make sure you have write permissions there. If this is the case: \
484 \n# 2. Check the storage availability at: http://alimonitor.cern.ch/stats?page=SE/table \
485 \n# Do: export alien_CLOSE_SE=\"working_disk_SE\" \
486 \n# To make this permanent put in in your .bashrc (in .alienshrc is not enough) \
487 \n# Redo token: rm /tmp/x509up_u$UID then: alien-token-init <username>", alienpath);
488 gSystem->Unlink(f.GetName());
491 gSystem->Unlink(f.GetName());
492 gGrid->Rm(Form("%s%s",alienpath,f.GetName()));
493 Info("CheckFileCopy", "### ...SUCCESS ###");
497 //______________________________________________________________________________
498 Bool_t AliAnalysisAlien::CheckInputData()
500 // Check validity of input data. If necessary, create xml files.
501 if (fProductionMode) return kTRUE;
502 if (!fInputFiles && !fRunNumbers.Length() && !fRunRange[0]) {
503 if (!fGridDataDir.Length()) {
504 Error("CkeckInputData", "AliEn path to base data directory must be set.\n = Use: SetGridDataDir()");
507 Info("CheckInputData", "Analysis will make a single xml for base data directory %s",fGridDataDir.Data());
508 if (fDataPattern.Contains("tag") && TestBit(AliAnalysisGrid::kTest))
509 TObject::SetBit(AliAnalysisGrid::kUseTags, kTRUE); // ADDED (fix problem in determining the tag usage in test mode)
512 // Process declared files
513 Bool_t isCollection = kFALSE;
514 Bool_t isXml = kFALSE;
515 Bool_t useTags = kFALSE;
516 Bool_t checked = kFALSE;
517 if (!TestBit(AliAnalysisGrid::kTest)) CdWork();
519 TString workdir = gGrid->GetHomeDirectory();
520 workdir += fGridWorkingDir;
523 TIter next(fInputFiles);
524 while ((objstr=(TObjString*)next())) {
527 file += objstr->GetString();
528 // Store full lfn path
529 if (FileExists(file)) objstr->SetString(file);
531 file = objstr->GetName();
532 if (!FileExists(objstr->GetName())) {
533 Error("CheckInputData", "Data file %s not found or not in your working dir: %s",
534 objstr->GetName(), workdir.Data());
538 Bool_t iscoll, isxml, usetags;
539 CheckDataType(file, iscoll, isxml, usetags);
542 isCollection = iscoll;
545 TObject::SetBit(AliAnalysisGrid::kUseTags, useTags);
547 if ((iscoll != isCollection) || (isxml != isXml) || (usetags != useTags)) {
548 Error("CheckInputData", "Some conflict was found in the types of inputs");
554 // Process requested run numbers
555 if (!fRunNumbers.Length() && !fRunRange[0]) return kTRUE;
556 // Check validity of alien data directory
557 if (!fGridDataDir.Length()) {
558 Error("CkeckInputData", "AliEn path to base data directory must be set.\n = Use: SetGridDataDir()");
561 if (!DirectoryExists(fGridDataDir)) {
562 Error("CheckInputData", "Data directory %s not existing.", fGridDataDir.Data());
566 Error("CheckInputData", "You are using raw AliEn collections as input. Cannot process run numbers.");
570 if (checked && !isXml) {
571 Error("CheckInputData", "Cannot mix processing of full runs with non-xml files");
574 // Check validity of run number(s)
578 TString schunk, schunk2;
582 useTags = fDataPattern.Contains("tag");
583 TObject::SetBit(AliAnalysisGrid::kUseTags, useTags);
585 if (useTags != fDataPattern.Contains("tag")) {
586 Error("CheckInputData", "Cannot mix input files using/not using tags");
589 if (fRunNumbers.Length()) {
590 Info("CheckDataType", "Using supplied run numbers (run ranges are ignored)");
591 arr = fRunNumbers.Tokenize(" ");
593 while ((os=(TObjString*)next())) {
594 path = Form("%s/%s ", fGridDataDir.Data(), os->GetString().Data());
595 if (!DirectoryExists(path)) {
596 Warning("CheckInputData", "Run number %s not found in path: <%s>", os->GetString().Data(), path.Data());
599 path = Form("%s/%s.xml", workdir.Data(),os->GetString().Data());
600 TString msg = "\n##### file: ";
602 msg += " type: xml_collection;";
603 if (useTags) msg += " using_tags: Yes";
604 else msg += " using_tags: No";
605 Info("CheckDataType", "%s", msg.Data());
606 if (fNrunsPerMaster<2) {
607 AddDataFile(Form("%s.xml", os->GetString().Data()));
610 if (((nruns-1)%fNrunsPerMaster) == 0) {
611 schunk = os->GetString();
613 if ((nruns%fNrunsPerMaster)!=0 && os!=arr->Last()) continue;
614 schunk += Form("_%s.xml", os->GetString().Data());
620 Info("CheckDataType", "Using run range [%d, %d]", fRunRange[0], fRunRange[1]);
621 for (Int_t irun=fRunRange[0]; irun<=fRunRange[1]; irun++) {
622 path = Form("%s/%s%d ", fGridDataDir.Data(), fRunPrefix.Data(), irun);
623 if (!DirectoryExists(path)) {
624 // Warning("CheckInputData", "Run number %d not found in path: <%s>", irun, path.Data());
627 path = Form("%s/%s%d.xml", workdir.Data(),fRunPrefix.Data(),irun);
628 TString msg = "\n##### file: ";
630 msg += " type: xml_collection;";
631 if (useTags) msg += " using_tags: Yes";
632 else msg += " using_tags: No";
633 Info("CheckDataType", "%s", msg.Data());
634 if (fNrunsPerMaster<2) {
635 AddDataFile(Form("%s%d.xml",fRunPrefix.Data(),irun));
638 if (((nruns-1)%fNrunsPerMaster) == 0) {
639 schunk = Form("%s%d", fRunPrefix.Data(),irun);
641 schunk2 = Form("_%s%d.xml", fRunPrefix.Data(), irun);
642 if ((nruns%fNrunsPerMaster)!=0 && irun != fRunRange[1]) continue;
655 //______________________________________________________________________________
656 Bool_t AliAnalysisAlien::CreateDataset(const char *pattern)
658 // Create dataset for the grid data directory + run number.
659 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline)) return kTRUE;
661 Error("CreateDataset", "Cannot create dataset with no grid connection");
666 if (!TestBit(AliAnalysisGrid::kTest)) CdWork();
667 TString workdir = gGrid->GetHomeDirectory();
668 workdir += fGridWorkingDir;
670 // Compose the 'find' command arguments
672 TString options = "-x collection ";
673 if (TestBit(AliAnalysisGrid::kTest)) options += Form("-l %d ", fNtestFiles);
674 TString conditions = "";
679 TString schunk, schunk2;
680 TGridCollection *cbase=0, *cadd=0;
681 if (!fRunNumbers.Length() && !fRunRange[0]) {
682 if (fInputFiles && fInputFiles->GetEntries()) return kTRUE;
683 // Make a single data collection from data directory.
685 if (!DirectoryExists(path)) {
686 Error("CreateDataset", "Path to data directory %s not valid",fGridDataDir.Data());
690 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
691 else file = Form("%s.xml", gSystem->BaseName(path));
692 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
698 command += conditions;
699 printf("command: %s\n", command.Data());
700 TGridResult *res = gGrid->Command(command);
702 // Write standard output to file
703 gROOT->ProcessLine(Form("gGrid->Stdout(); > %s", file.Data()));
704 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
705 Bool_t nullFile = kFALSE;
707 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
709 nullFile = (gSystem->Exec(Form("grep /event %s 2>/dev/null > /dev/null",file.Data()))==0)?kFALSE:kTRUE;
711 Error("CreateDataset","Dataset %s produced by the previous find command is empty !", file.Data());
716 Bool_t fileExists = FileExists(file);
717 if (!TestBit(AliAnalysisGrid::kTest) && (!fileExists || fOverwriteMode)) {
718 // Copy xml file to alien space
719 if (fileExists) gGrid->Rm(file);
720 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
721 if (!FileExists(file)) {
722 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
725 // Update list of files to be processed.
727 AddDataFile(Form("%s/%s", workdir.Data(), file.Data()));
731 Bool_t nullResult = kTRUE;
732 if (fRunNumbers.Length()) {
733 TObjArray *arr = fRunNumbers.Tokenize(" ");
736 while ((os=(TObjString*)next())) {
737 path = Form("%s/%s ", fGridDataDir.Data(), os->GetString().Data());
738 if (!DirectoryExists(path)) continue;
740 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
741 else file = Form("%s.xml", os->GetString().Data());
742 // If local collection file does not exist, create it via 'find' command.
743 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
748 command += conditions;
749 TGridResult *res = gGrid->Command(command);
751 // Write standard output to file
752 gROOT->ProcessLine(Form("gGrid->Stdout(); > %s", file.Data()));
753 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
754 Bool_t nullFile = kFALSE;
756 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
758 nullFile = (gSystem->Exec(Form("grep /event %s 2>/dev/null > /dev/null",file.Data()))==0)?kFALSE:kTRUE;
760 Warning("CreateDataset","Dataset %s produced by: <%s> is empty !", file.Data(), command.Data());
761 fRunNumbers.ReplaceAll(os->GetString().Data(), "");
767 if (TestBit(AliAnalysisGrid::kTest)) break;
768 // Check if there is one run per master job.
769 if (fNrunsPerMaster<2) {
770 if (FileExists(file)) {
771 if (fOverwriteMode) gGrid->Rm(file);
773 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
777 // Copy xml file to alien space
778 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
779 if (!FileExists(file)) {
780 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
786 if (((nruns-1)%fNrunsPerMaster) == 0) {
787 schunk = os->GetString();
788 cbase = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
790 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
791 printf(" Merging collection <%s> into masterjob input...\n", file.Data());
795 if ((nruns%fNrunsPerMaster)!=0 && os!=arr->Last()) {
798 schunk += Form("_%s.xml", os->GetString().Data());
799 if (FileExists(schunk)) {
800 if (fOverwriteMode) gGrid->Rm(file);
802 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", schunk.Data());
806 printf("Exporting merged collection <%s> and copying to AliEn\n", schunk.Data());
807 cbase->ExportXML(Form("file://%s", schunk.Data()),kFALSE,kFALSE, schunk, "Merged runs");
808 TFile::Cp(Form("file:%s",schunk.Data()), Form("alien://%s/%s",workdir.Data(), schunk.Data()));
809 if (!FileExists(schunk)) {
810 Error("CreateDataset", "Copy command did NOT succeed for %s", schunk.Data());
818 Error("CreateDataset", "No valid dataset corresponding to the query!");
822 // Process a full run range.
823 for (Int_t irun=fRunRange[0]; irun<=fRunRange[1]; irun++) {
824 path = Form("%s/%s%d ", fGridDataDir.Data(), fRunPrefix.Data(), irun);
825 if (!DirectoryExists(path)) continue;
827 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
828 else file = Form("%s%d.xml", fRunPrefix.Data(), irun);
829 if (FileExists(file) && fNrunsPerMaster<2 && !TestBit(AliAnalysisGrid::kTest)) {
830 if (fOverwriteMode) gGrid->Rm(file);
832 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
836 // If local collection file does not exist, create it via 'find' command.
837 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
842 command += conditions;
843 TGridResult *res = gGrid->Command(command);
845 // Write standard output to file
846 gROOT->ProcessLine(Form("gGrid->Stdout(); > %s", file.Data()));
847 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
848 Bool_t nullFile = kFALSE;
850 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
852 nullFile = (gSystem->Exec(Form("grep /event %s 2>/dev/null > /dev/null",file.Data()))==0)?kFALSE:kTRUE;
854 Warning("CreateDataset","Dataset %s produced by: <%s> is empty !", file.Data(), command.Data());
860 if (TestBit(AliAnalysisGrid::kTest)) break;
861 // Check if there is one run per master job.
862 if (fNrunsPerMaster<2) {
863 if (FileExists(file)) {
864 if (fOverwriteMode) gGrid->Rm(file);
866 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
870 // Copy xml file to alien space
871 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
872 if (!FileExists(file)) {
873 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
878 // Check if the collection for the chunk exist locally.
879 Int_t nchunk = (nruns-1)/fNrunsPerMaster;
880 if (FileExists(fInputFiles->At(nchunk)->GetName())) {
881 if (fOverwriteMode) gGrid->Rm(fInputFiles->At(nchunk)->GetName());
884 printf(" Merging collection <%s> into %d runs chunk...\n",file.Data(),fNrunsPerMaster);
885 if (((nruns-1)%fNrunsPerMaster) == 0) {
886 schunk = Form("%s%d", fRunPrefix.Data(), irun);
887 cbase = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
889 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
893 schunk2 = Form("%s_%s%d.xml", schunk.Data(), fRunPrefix.Data(), irun);
894 if ((nruns%fNrunsPerMaster)!=0 && irun!=fRunRange[1] && schunk2 != fInputFiles->Last()->GetName()) {
898 if (FileExists(schunk)) {
899 if (fOverwriteMode) gGrid->Rm(schunk);
901 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", schunk.Data());
905 printf("Exporting merged collection <%s> and copying to AliEn.\n", schunk.Data());
906 cbase->ExportXML(Form("file://%s", schunk.Data()),kFALSE,kFALSE, schunk, "Merged runs");
907 if (FileExists(schunk)) {
908 if (fOverwriteMode) gGrid->Rm(schunk);
910 Info("CreateDataset", "\n##### Dataset %s exist. Skipping copy...", schunk.Data());
914 TFile::Cp(Form("file:%s",schunk.Data()), Form("alien://%s/%s",workdir.Data(), schunk.Data()));
915 if (!FileExists(schunk)) {
916 Error("CreateDataset", "Copy command did NOT succeed for %s", schunk.Data());
922 Error("CreateDataset", "No valid dataset corresponding to the query!");
929 //______________________________________________________________________________
930 Bool_t AliAnalysisAlien::CreateJDL()
932 // Generate a JDL file according to current settings. The name of the file is
933 // specified by fJDLName.
934 Bool_t error = kFALSE;
937 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
938 Bool_t generate = kTRUE;
939 if (TestBit(AliAnalysisGrid::kTest) || TestBit(AliAnalysisGrid::kSubmit)) generate = kFALSE;
941 Error("CreateJDL", "Alien connection required");
944 // Check validity of alien workspace
946 if (!fProductionMode && !fGridWorkingDir.BeginsWith("/alice")) workdir = gGrid->GetHomeDirectory();
947 if (!fProductionMode && !TestBit(AliAnalysisGrid::kTest)) CdWork();
948 workdir += fGridWorkingDir;
952 Error("CreateJDL()", "Define some input files for your analysis.");
955 // Compose list of input files
956 // Check if output files were defined
957 if (!fOutputFiles.Length()) {
958 Error("CreateJDL", "You must define at least one output file");
961 // Check if an output directory was defined and valid
962 if (!fGridOutputDir.Length()) {
963 Error("CreateJDL", "You must define AliEn output directory");
966 if (!fProductionMode) {
967 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s", workdir.Data(), fGridOutputDir.Data());
968 if (!DirectoryExists(fGridOutputDir)) {
969 if (gGrid->Mkdir(fGridOutputDir,"-p")) {
970 Info("CreateJDL", "\n##### Created alien output directory %s", fGridOutputDir.Data());
972 Error("CreateJDL", "Could not create alien output directory %s", fGridOutputDir.Data());
979 // Exit if any error up to now
980 if (error) return kFALSE;
982 if (!fUser.IsNull()) {
983 fGridJDL->SetValue("User", Form("\"%s\"", fUser.Data()));
984 fMergingJDL->SetValue("User", Form("\"%s\"", fUser.Data()));
986 fGridJDL->SetExecutable(fExecutable, "This is the startup script");
987 TString mergeExec = fExecutable;
988 mergeExec.ReplaceAll(".sh", "_merge.sh");
989 fMergingJDL->SetExecutable(mergeExec, "This is the startup script");
990 mergeExec.ReplaceAll(".sh", ".C");
991 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),mergeExec.Data()), "List of input files to be uploaded to workers");
992 if (!fArguments.IsNull())
993 fGridJDL->SetArguments(fArguments, "Arguments for the executable command");
994 if (IsOneStageMerging()) fMergingJDL->SetArguments(fGridOutputDir);
995 else fMergingJDL->SetArguments("$1 $2 $3");
996 fGridJDL->SetValue("TTL", Form("\"%d\"",fTTL));
997 fGridJDL->SetDescription("TTL", Form("Time after which the job is killed (%d min.)", fTTL/60));
998 fMergingJDL->SetValue("TTL", Form("\"%d\"",fTTL));
999 fMergingJDL->SetDescription("TTL", Form("Time after which the job is killed (%d min.)", fTTL/60));
1001 if (fMaxInitFailed > 0) {
1002 fGridJDL->SetValue("MaxInitFailed", Form("\"%d\"",fMaxInitFailed));
1003 fGridJDL->SetDescription("MaxInitFailed", "Maximum number of first failing jobs to abort the master job");
1005 if (fSplitMaxInputFileNumber > 0) {
1006 fGridJDL->SetValue("SplitMaxInputFileNumber", Form("\"%d\"", fSplitMaxInputFileNumber));
1007 fGridJDL->SetDescription("SplitMaxInputFileNumber", "Maximum number of input files to be processed per subjob");
1009 if (fSplitMode.Length()) {
1010 fGridJDL->SetValue("Split", Form("\"%s\"", fSplitMode.Data()));
1011 fGridJDL->SetDescription("Split", "We split per SE or file");
1013 if (!fAliROOTVersion.IsNull()) {
1014 fGridJDL->AddToPackages("AliRoot", fAliROOTVersion,"VO_ALICE", "List of requested packages");
1015 fMergingJDL->AddToPackages("AliRoot", fAliROOTVersion, "VO_ALICE", "List of requested packages");
1017 if (!fROOTVersion.IsNull()) {
1018 fGridJDL->AddToPackages("ROOT", fROOTVersion);
1019 fMergingJDL->AddToPackages("ROOT", fROOTVersion);
1021 if (!fAPIVersion.IsNull()) {
1022 fGridJDL->AddToPackages("APISCONFIG", fAPIVersion);
1023 fMergingJDL->AddToPackages("APISCONFIG", fAPIVersion);
1025 if (!fExternalPackages.IsNull()) {
1026 arr = fExternalPackages.Tokenize(" ");
1028 while ((os=(TObjString*)next())) {
1029 TString pkgname = os->GetString();
1030 Int_t index = pkgname.Index("::");
1031 TString pkgversion = pkgname(index+2, pkgname.Length());
1032 pkgname.Remove(index);
1033 fGridJDL->AddToPackages(pkgname, pkgversion);
1034 fMergingJDL->AddToPackages(pkgname, pkgversion);
1038 fGridJDL->SetInputDataListFormat(fInputFormat, "Format of input data");
1039 fGridJDL->SetInputDataList("wn.xml", "Collection name to be processed on each worker node");
1040 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), fAnalysisMacro.Data()), "List of input files to be uploaded to workers");
1041 TString analysisFile = fExecutable;
1042 analysisFile.ReplaceAll(".sh", ".root");
1043 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),analysisFile.Data()));
1044 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),analysisFile.Data()));
1045 if (IsUsingTags() && !gSystem->AccessPathName("ConfigureCuts.C"))
1046 fGridJDL->AddToInputSandbox(Form("LF:%s/ConfigureCuts.C", workdir.Data()));
1047 if (fAdditionalLibs.Length()) {
1048 arr = fAdditionalLibs.Tokenize(" ");
1050 while ((os=(TObjString*)next())) {
1051 if (os->GetString().Contains(".so")) continue;
1052 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), os->GetString().Data()));
1053 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), os->GetString().Data()));
1058 TIter next(fPackages);
1060 while ((obj=next())) {
1061 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), obj->GetName()));
1062 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), obj->GetName()));
1065 if (fOutputArchive.Length()) {
1066 arr = fOutputArchive.Tokenize(" ");
1068 Bool_t first = kTRUE;
1069 const char *comment = "Files to be archived";
1070 const char *comment1 = comment;
1071 while ((os=(TObjString*)next())) {
1072 if (!first) comment = NULL;
1073 if (!os->GetString().Contains("@") && fCloseSE.Length())
1074 fGridJDL->AddToOutputArchive(Form("%s@%s",os->GetString().Data(), fCloseSE.Data()), comment);
1076 fGridJDL->AddToOutputArchive(os->GetString(), comment);
1080 // Output archive for the merging jdl
1081 TString outputArchive;
1082 if (TestBit(AliAnalysisGrid::kDefaultOutputs)) {
1083 outputArchive = "log_archive.zip:std*@disk=1 ";
1084 // Add normal output files, extra files + terminate files
1085 TString files = GetListOfFiles("outextter");
1086 // Do not register merge excludes
1087 if (!fMergeExcludes.IsNull()) {
1088 arr = fMergeExcludes.Tokenize(" ");
1090 while ((os=(TObjString*)next1())) {
1091 files.ReplaceAll(Form("%s,",os->GetString().Data()),"");
1092 files.ReplaceAll(os->GetString(),"");
1096 files.ReplaceAll(".root", "*.root");
1097 outputArchive += Form("root_archive.zip:%s@disk=%d",files.Data(),fNreplicas);
1099 outputArchive = fOutputArchive;
1101 arr = outputArchive.Tokenize(" ");
1105 while ((os=(TObjString*)next2())) {
1106 if (!first) comment = NULL;
1107 TString currentfile = os->GetString();
1108 if (!IsOneStageMerging()) currentfile.ReplaceAll(".zip", "-Stage$2_$3.zip");
1109 if (!currentfile.Contains("@") && fCloseSE.Length())
1110 fMergingJDL->AddToOutputArchive(Form("%s@%s",currentfile.Data(), fCloseSE.Data()), comment);
1112 fMergingJDL->AddToOutputArchive(currentfile, comment);
1117 arr = fOutputFiles.Tokenize(",");
1119 Bool_t first = kTRUE;
1120 const char *comment = "Files to be saved";
1121 while ((os=(TObjString*)next())) {
1122 // Ignore ouputs in jdl that are also in outputarchive
1123 TString sout = os->GetString();
1124 sout.ReplaceAll("*", "");
1125 sout.ReplaceAll(".root", "");
1126 if (sout.Index("@")>0) sout.Remove(sout.Index("@"));
1127 if (fOutputArchive.Contains(sout)) continue;
1128 if (!first) comment = NULL;
1129 if (!os->GetString().Contains("@") && fCloseSE.Length())
1130 fGridJDL->AddToOutputSandbox(Form("%s@%s",os->GetString().Data(), fCloseSE.Data()), comment);
1132 fGridJDL->AddToOutputSandbox(os->GetString(), comment);
1134 if (fMergeExcludes.Contains(sout)) continue;
1135 if (!os->GetString().Contains("@") && fCloseSE.Length())
1136 fMergingJDL->AddToOutputSandbox(Form("%s@%s",os->GetString().Data(), fCloseSE.Data()), comment);
1138 fMergingJDL->AddToOutputSandbox(os->GetString(), comment);
1141 fGridJDL->SetPrice((UInt_t)fPrice, "AliEn price for this job");
1142 fMergingJDL->SetPrice((UInt_t)fPrice, "AliEn price for this job");
1143 TString validationScript = fValidationScript;
1144 fGridJDL->SetValidationCommand(Form("%s/%s", workdir.Data(),validationScript.Data()), "Validation script to be run for each subjob");
1145 validationScript.ReplaceAll(".sh", "_merge.sh");
1146 fMergingJDL->SetValidationCommand(Form("%s/%s", workdir.Data(),validationScript.Data()), "Validation script to be run for each subjob");
1147 if (fMasterResubmitThreshold) {
1148 fGridJDL->SetValue("MasterResubmitThreshold", Form("\"%d%%\"", fMasterResubmitThreshold));
1149 fGridJDL->SetDescription("MasterResubmitThreshold", "Resubmit failed jobs until DONE rate reaches this percentage");
1151 // Write a jdl with 2 input parameters: collection name and output dir name.
1154 // Copy jdl to grid workspace
1156 // Check if an output directory was defined and valid
1157 if (!fGridOutputDir.Length()) {
1158 Error("CreateJDL", "You must define AliEn output directory");
1161 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s", workdir.Data(), fGridOutputDir.Data());
1162 if (!fProductionMode && !DirectoryExists(fGridOutputDir)) {
1163 if (gGrid->Mkdir(fGridOutputDir,"-p")) {
1164 Info("CreateJDL", "\n##### Created alien output directory %s", fGridOutputDir.Data());
1166 Error("CreateJDL", "Could not create alien output directory %s", fGridOutputDir.Data());
1172 if (TestBit(AliAnalysisGrid::kSubmit)) {
1173 TString mergeJDLName = fExecutable;
1174 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
1175 TString locjdl = Form("%s/%s", fGridOutputDir.Data(),fJDLName.Data());
1176 TString locjdl1 = Form("%s/%s", fGridOutputDir.Data(),mergeJDLName.Data());
1177 if (fProductionMode) {
1178 locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
1179 locjdl1 = Form("%s/%s", workdir.Data(),mergeJDLName.Data());
1181 if (FileExists(locjdl)) gGrid->Rm(locjdl);
1182 if (FileExists(locjdl1)) gGrid->Rm(locjdl1);
1183 Info("CreateJDL", "\n##### Copying JDL file <%s> to your AliEn output directory", fJDLName.Data());
1184 TFile::Cp(Form("file:%s",fJDLName.Data()), Form("alien://%s", locjdl.Data()));
1186 Info("CreateJDL", "\n##### Copying merging JDL file <%s> to your AliEn output directory", mergeJDLName.Data());
1187 TFile::Cp(Form("file:%s",mergeJDLName.Data()), Form("alien://%s", locjdl1.Data()));
1190 if (fAdditionalLibs.Length()) {
1191 arr = fAdditionalLibs.Tokenize(" ");
1194 while ((os=(TObjString*)next())) {
1195 if (os->GetString().Contains(".so")) continue;
1196 Info("CreateJDL", "\n##### Copying dependency: <%s> to your alien workspace", os->GetString().Data());
1197 if (FileExists(os->GetString())) gGrid->Rm(os->GetString());
1198 TFile::Cp(Form("file:%s",os->GetString().Data()), Form("alien://%s/%s", workdir.Data(), os->GetString().Data()));
1203 TIter next(fPackages);
1205 while ((obj=next())) {
1206 if (FileExists(obj->GetName())) gGrid->Rm(obj->GetName());
1207 Info("CreateJDL", "\n##### Copying dependency: <%s> to your alien workspace", obj->GetName());
1208 TFile::Cp(Form("file:%s",obj->GetName()), Form("alien://%s/%s", workdir.Data(), obj->GetName()));
1215 //______________________________________________________________________________
1216 Bool_t AliAnalysisAlien::WriteJDL(Bool_t copy)
1218 // Writes one or more JDL's corresponding to findex. If findex is negative,
1219 // all run numbers are considered in one go (jdl). For non-negative indices
1220 // they correspond to the indices in the array fInputFiles.
1221 if (!fInputFiles) return kFALSE;
1224 if (!fProductionMode && !fGridWorkingDir.BeginsWith("/alice")) workdir = gGrid->GetHomeDirectory();
1225 workdir += fGridWorkingDir;
1227 if (fProductionMode) {
1228 TIter next(fInputFiles);
1230 fGridJDL->AddToInputDataCollection(Form("LF:%s,nodownload", os->GetName()), "Input xml collections");
1231 fGridJDL->SetOutputDirectory(Form("%s/#alien_counter_04i#", fGridOutputDir.Data()));
1232 fMergingJDL->SetOutputDirectory(fGridOutputDir);
1234 if (!fRunNumbers.Length() && !fRunRange[0]) {
1235 // One jdl with no parameters in case input data is specified by name.
1236 TIter next(fInputFiles);
1238 fGridJDL->AddToInputDataCollection(Form("LF:%s,nodownload", os->GetName()), "Input xml collections");
1239 if (!fOutputSingle.IsNull())
1240 fGridJDL->SetOutputDirectory(Form("#alienfulldir#/../%s",fOutputSingle.Data()), "Output directory");
1242 fGridJDL->SetOutputDirectory(Form("%s/#alien_counter_03i#", fGridOutputDir.Data()), "Output directory");
1243 fMergingJDL->SetOutputDirectory(fGridOutputDir);
1246 // One jdl to be submitted with 2 input parameters: data collection name and output dir prefix
1247 fGridJDL->AddToInputDataCollection(Form("LF:%s/$1,nodownload", workdir.Data()), "Input xml collections");
1248 if (!fOutputSingle.IsNull()) {
1249 if (!fOutputToRunNo) fGridJDL->SetOutputDirectory(Form("#alienfulldir#/%s",fOutputSingle.Data()), "Output directory");
1250 else fGridJDL->SetOutputDirectory(Form("%s/$2",fGridOutputDir.Data()), "Output directory");
1252 fGridJDL->SetOutputDirectory(Form("%s/$2/#alien_counter_03i#", fGridOutputDir.Data()), "Output directory");
1253 fMergingJDL->SetOutputDirectory("$1", "Output directory");
1258 // Generate the JDL as a string
1259 TString sjdl = fGridJDL->Generate();
1260 TString sjdl1 = fMergingJDL->Generate();
1262 sjdl.ReplaceAll("\"LF:", "\n \"LF:");
1263 sjdl.ReplaceAll("(member", "\n (member");
1264 sjdl.ReplaceAll("\",\"VO_", "\",\n \"VO_");
1265 sjdl.ReplaceAll("{", "{\n ");
1266 sjdl.ReplaceAll("};", "\n};");
1267 sjdl.ReplaceAll("{\n \n", "{\n");
1268 sjdl.ReplaceAll("\n\n", "\n");
1269 sjdl.ReplaceAll("OutputDirectory", "OutputDir");
1270 sjdl1.ReplaceAll("\"LF:", "\n \"LF:");
1271 sjdl1.ReplaceAll("(member", "\n (member");
1272 sjdl1.ReplaceAll("\",\"VO_", "\",\n \"VO_");
1273 sjdl1.ReplaceAll("{", "{\n ");
1274 sjdl1.ReplaceAll("};", "\n};");
1275 sjdl1.ReplaceAll("{\n \n", "{\n");
1276 sjdl1.ReplaceAll("\n\n", "\n");
1277 sjdl1.ReplaceAll("OutputDirectory", "OutputDir");
1278 sjdl += "JDLVariables = \n{\n \"Packages\",\n \"OutputDir\"\n};\n";
1279 sjdl.Prepend(Form("Jobtag = {\n \"comment:%s\"\n};\n", fJobTag.Data()));
1280 index = sjdl.Index("JDLVariables");
1281 if (index >= 0) sjdl.Insert(index, "\n# JDL variables\n");
1282 sjdl += "Workdirectorysize = {\"5000MB\"};";
1283 sjdl1 += "JDLVariables = \n{\n \"Packages\",\n \"OutputDir\"\n};\n";
1284 index = fJobTag.Index(":");
1285 if (index < 0) index = fJobTag.Length();
1286 TString jobTag = fJobTag;
1287 jobTag.Insert(index, "_Merging");
1288 sjdl1.Prepend(Form("Jobtag = {\n \"comment:%s_Merging\"\n};\n", jobTag.Data()));
1289 sjdl1.Prepend("# Generated merging jdl\n# $1 = full alien path to output directory to be merged\n# $2 = merging stage\n# $3 = merged chunk\n");
1290 index = sjdl1.Index("JDLVariables");
1291 if (index >= 0) sjdl1.Insert(index, "\n# JDL variables\n");
1292 sjdl1 += "Workdirectorysize = {\"5000MB\"};";
1293 // Write jdl to file
1295 out.open(fJDLName.Data(), ios::out);
1297 Error("WriteJDL", "Bad file name: %s", fJDLName.Data());
1300 out << sjdl << endl;
1301 TString mergeJDLName = fExecutable;
1302 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
1305 out1.open(mergeJDLName.Data(), ios::out);
1307 Error("WriteJDL", "Bad file name: %s", mergeJDLName.Data());
1310 out1 << sjdl1 << endl;
1313 // Copy jdl to grid workspace
1315 Info("WriteJDL", "\n##### You may want to review jdl:%s and analysis macro:%s before running in <submit> mode", fJDLName.Data(), fAnalysisMacro.Data());
1317 TString locjdl = Form("%s/%s", fGridOutputDir.Data(),fJDLName.Data());
1318 TString locjdl1 = Form("%s/%s", fGridOutputDir.Data(),mergeJDLName.Data());
1319 if (fProductionMode) {
1320 locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
1321 locjdl1 = Form("%s/%s", workdir.Data(),mergeJDLName.Data());
1323 if (FileExists(locjdl)) gGrid->Rm(locjdl);
1324 if (FileExists(locjdl1)) gGrid->Rm(locjdl1);
1325 Info("WriteJDL", "\n##### Copying JDL file <%s> to your AliEn output directory", fJDLName.Data());
1326 TFile::Cp(Form("file:%s",fJDLName.Data()), Form("alien://%s", locjdl.Data()));
1328 Info("WriteJDL", "\n##### Copying merging JDL file <%s> to your AliEn output directory", mergeJDLName.Data());
1329 TFile::Cp(Form("file:%s",mergeJDLName.Data()), Form("alien://%s", locjdl1.Data()));
1335 //______________________________________________________________________________
1336 Bool_t AliAnalysisAlien::FileExists(const char *lfn)
1338 // Returns true if file exists.
1339 if (!gGrid) return kFALSE;
1341 slfn.ReplaceAll("alien://","");
1342 TGridResult *res = gGrid->Ls(slfn);
1343 if (!res) return kFALSE;
1344 TMap *map = dynamic_cast<TMap*>(res->At(0));
1349 TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("name"));
1350 if (!objs || !objs->GetString().Length()) {
1358 //______________________________________________________________________________
1359 Bool_t AliAnalysisAlien::DirectoryExists(const char *dirname)
1361 // Returns true if directory exists. Can be also a path.
1362 if (!gGrid) return kFALSE;
1363 // Check if dirname is a path
1364 TString dirstripped = dirname;
1365 dirstripped = dirstripped.Strip();
1366 dirstripped = dirstripped.Strip(TString::kTrailing, '/');
1367 TString dir = gSystem->BaseName(dirstripped);
1369 TString path = gSystem->DirName(dirstripped);
1370 TGridResult *res = gGrid->Ls(path, "-F");
1371 if (!res) return kFALSE;
1375 while ((map=dynamic_cast<TMap*>(next()))) {
1376 obj = map->GetValue("name");
1378 if (dir == obj->GetName()) {
1387 //______________________________________________________________________________
1388 void AliAnalysisAlien::CheckDataType(const char *lfn, Bool_t &isCollection, Bool_t &isXml, Bool_t &useTags)
1390 // Check input data type.
1391 isCollection = kFALSE;
1395 Error("CheckDataType", "No connection to grid");
1398 isCollection = IsCollection(lfn);
1399 TString msg = "\n##### file: ";
1402 msg += " type: raw_collection;";
1403 // special treatment for collections
1405 // check for tag files in the collection
1406 TGridResult *res = gGrid->Command(Form("listFilesFromCollection -z -v %s",lfn), kFALSE);
1408 msg += " using_tags: No (unknown)";
1409 Info("CheckDataType", "%s", msg.Data());
1412 const char* typeStr = res->GetKey(0, "origLFN");
1413 if (!typeStr || !strlen(typeStr)) {
1414 msg += " using_tags: No (unknown)";
1415 Info("CheckDataType", "%s", msg.Data());
1418 TString file = typeStr;
1419 useTags = file.Contains(".tag");
1420 if (useTags) msg += " using_tags: Yes";
1421 else msg += " using_tags: No";
1422 Info("CheckDataType", "%s", msg.Data());
1427 isXml = slfn.Contains(".xml");
1429 // Open xml collection and check if there are tag files inside
1430 msg += " type: xml_collection;";
1431 TGridCollection *coll = (TGridCollection*)gROOT->ProcessLine(Form("TAlienCollection::Open(\"alien://%s\",1);",lfn));
1433 msg += " using_tags: No (unknown)";
1434 Info("CheckDataType", "%s", msg.Data());
1437 TMap *map = coll->Next();
1439 msg += " using_tags: No (unknown)";
1440 Info("CheckDataType", "%s", msg.Data());
1443 map = (TMap*)map->GetValue("");
1445 if (map && map->GetValue("name")) file = map->GetValue("name")->GetName();
1446 useTags = file.Contains(".tag");
1448 if (useTags) msg += " using_tags: Yes";
1449 else msg += " using_tags: No";
1450 Info("CheckDataType", "%s", msg.Data());
1453 useTags = slfn.Contains(".tag");
1454 if (slfn.Contains(".root")) msg += " type: root file;";
1455 else msg += " type: unknown file;";
1456 if (useTags) msg += " using_tags: Yes";
1457 else msg += " using_tags: No";
1458 Info("CheckDataType", "%s", msg.Data());
1461 //______________________________________________________________________________
1462 void AliAnalysisAlien::EnablePackage(const char *package)
1464 // Enables a par file supposed to exist in the current directory.
1465 TString pkg(package);
1466 pkg.ReplaceAll(".par", "");
1468 if (gSystem->AccessPathName(pkg)) {
1469 Fatal("EnablePackage", "Package %s not found", pkg.Data());
1472 if (!TObject::TestBit(AliAnalysisGrid::kUsePars))
1473 Info("EnablePackage", "AliEn plugin will use .par packages");
1474 TObject::SetBit(AliAnalysisGrid::kUsePars, kTRUE);
1476 fPackages = new TObjArray();
1477 fPackages->SetOwner();
1479 fPackages->Add(new TObjString(pkg));
1482 //______________________________________________________________________________
1483 TChain *AliAnalysisAlien::GetChainForTestMode(const char *treeName) const
1485 // Make a tree from files having the location specified in fFileForTestMode.
1486 // Inspired from JF's CreateESDChain.
1487 if (fFileForTestMode.IsNull()) {
1488 Error("GetChainForTestMode", "For proof test mode please use SetFileForTestMode() pointing to a file that contains data file locations.");
1491 if (gSystem->AccessPathName(fFileForTestMode)) {
1492 Error("GetChainForTestMode", "File not found: %s", fFileForTestMode.Data());
1497 in.open(fFileForTestMode);
1499 // Read the input list of files and add them to the chain
1501 TChain *chain = new TChain(treeName);
1505 if (line.IsNull()) continue;
1506 if (count++ == fNtestFiles) break;
1507 TString esdFile(line);
1508 TFile *file = TFile::Open(esdFile);
1510 if (!file->IsZombie()) chain->Add(esdFile);
1513 Error("GetChainforTestMode", "Skipping un-openable file: %s", esdFile.Data());
1517 if (!chain->GetListOfFiles()->GetEntries()) {
1518 Error("GetChainForTestMode", "No file from %s could be opened", fFileForTestMode.Data());
1526 //______________________________________________________________________________
1527 const char *AliAnalysisAlien::GetJobStatus(Int_t jobidstart, Int_t lastid, Int_t &nrunning, Int_t &nwaiting, Int_t &nerror, Int_t &ndone)
1529 // Get job status for all jobs with jobid>jobidstart.
1530 static char mstatus[20];
1536 TGridJobStatusList *list = gGrid->Ps("");
1537 if (!list) return mstatus;
1538 Int_t nentries = list->GetSize();
1539 TGridJobStatus *status;
1541 for (Int_t ijob=0; ijob<nentries; ijob++) {
1542 status = (TGridJobStatus *)list->At(ijob);
1543 pid = gROOT->ProcessLine(Form("atoi(((TAlienJobStatus*)0x%lx)->GetKey(\"queueId\"));", (ULong_t)status));
1544 if (pid<jobidstart) continue;
1545 if (pid == lastid) {
1546 gROOT->ProcessLine(Form("sprintf((char*)0x%lx,((TAlienJobStatus*)0x%lx)->GetKey(\"status\"));",(ULong_t)mstatus, (ULong_t)status));
1548 switch (status->GetStatus()) {
1549 case TGridJobStatus::kWAITING:
1551 case TGridJobStatus::kRUNNING:
1553 case TGridJobStatus::kABORTED:
1554 case TGridJobStatus::kFAIL:
1555 case TGridJobStatus::kUNKNOWN:
1557 case TGridJobStatus::kDONE:
1566 //______________________________________________________________________________
1567 Bool_t AliAnalysisAlien::IsCollection(const char *lfn) const
1569 // Returns true if file is a collection. Functionality duplicated from
1570 // TAlien::Type() because we don't want to directly depend on TAlien.
1572 Error("IsCollection", "No connection to grid");
1575 TGridResult *res = gGrid->Command(Form("type -z %s",lfn),kFALSE);
1576 if (!res) return kFALSE;
1577 const char* typeStr = res->GetKey(0, "type");
1578 if (!typeStr || !strlen(typeStr)) return kFALSE;
1579 if (!strcmp(typeStr, "collection")) return kTRUE;
1584 //______________________________________________________________________________
1585 Bool_t AliAnalysisAlien::IsSingleOutput() const
1587 // Check if single-ouput option is on.
1588 return (!fOutputSingle.IsNull());
1591 //______________________________________________________________________________
1592 void AliAnalysisAlien::Print(Option_t *) const
1594 // Print current plugin settings.
1595 printf("### AliEn analysis plugin current settings ###\n");
1596 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
1597 if (mgr && mgr->IsProofMode()) {
1598 TString proofType = "= PLUGIN IN PROOF MODE ON CLUSTER:_________________";
1599 if (TestBit(AliAnalysisGrid::kTest))
1600 proofType = "= PLUGIN IN PROOF LITE MODE ON CLUSTER:____________";
1601 printf("%s %s\n", proofType.Data(), fProofCluster.Data());
1602 if (!fProofDataSet.IsNull())
1603 printf("= Requested data set:___________________________ %s\n", fProofDataSet.Data());
1605 printf("= Soft reset signal will be send to master______ CHANGE BEHAVIOR AFTER COMPLETION\n");
1607 printf("= Hard reset signal will be send to master______ CHANGE BEHAVIOR AFTER COMPLETION\n");
1608 if (!fRootVersionForProof.IsNull())
1609 printf("= ROOT version requested________________________ %s\n", fRootVersionForProof.Data());
1611 printf("= ROOT version requested________________________ default\n");
1612 printf("= AliRoot version requested_____________________ %s\n", fAliROOTVersion.Data());
1613 if (!fAliRootMode.IsNull())
1614 printf("= Requested AliRoot mode________________________ %s\n", fAliRootMode.Data());
1616 printf("= Number of PROOF workers limited to____________ %d\n", fNproofWorkers);
1617 if (fNproofWorkersPerSlave)
1618 printf("= Maximum number of workers per slave___________ %d\n", fNproofWorkersPerSlave);
1619 if (TestSpecialBit(kClearPackages))
1620 printf("= ClearPackages requested...\n");
1621 if (fIncludePath.Data())
1622 printf("= Include path for runtime task compilation: ___ %s\n", fIncludePath.Data());
1623 printf("= Additional libs to be loaded or souces to be compiled runtime: <%s>\n",fAdditionalLibs.Data());
1624 if (fPackages && fPackages->GetEntries()) {
1625 TIter next(fPackages);
1628 while ((obj=next())) list += obj->GetName();
1629 printf("= Par files to be used: ________________________ %s\n", list.Data());
1631 if (TestSpecialBit(kProofConnectGrid))
1632 printf("= Requested PROOF connection to grid\n");
1635 printf("= OverwriteMode:________________________________ %d\n", fOverwriteMode);
1636 if (fOverwriteMode) {
1637 printf("***** NOTE: Overwrite mode will overwrite the input generated datasets and partial results from previous analysis. \
1638 \n***** To disable, use: plugin->SetOverwriteMode(kFALSE);\n");
1640 printf("= Copy files to grid: __________________________ %s\n", (IsUseCopy())?"YES":"NO");
1641 printf("= Check if files can be copied to grid: ________ %s\n", (IsCheckCopy())?"YES":"NO");
1642 printf("= Production mode:______________________________ %d\n", fProductionMode);
1643 printf("= Version of API requested: ____________________ %s\n", fAPIVersion.Data());
1644 printf("= Version of ROOT requested: ___________________ %s\n", fROOTVersion.Data());
1645 printf("= Version of AliRoot requested: ________________ %s\n", fAliROOTVersion.Data());
1647 printf("= User running the plugin: _____________________ %s\n", fUser.Data());
1648 printf("= Grid workdir relative to user $HOME: _________ %s\n", fGridWorkingDir.Data());
1649 printf("= Grid output directory relative to workdir: ___ %s\n", fGridOutputDir.Data());
1650 printf("= Data base directory path requested: __________ %s\n", fGridDataDir.Data());
1651 printf("= Data search pattern: _________________________ %s\n", fDataPattern.Data());
1652 printf("= Input data format: ___________________________ %s\n", fInputFormat.Data());
1653 if (fRunNumbers.Length())
1654 printf("= Run numbers to be processed: _________________ %s\n", fRunNumbers.Data());
1656 printf("= Run range to be processed: ___________________ %s%d-%s%d\n", fRunPrefix.Data(), fRunRange[0], fRunPrefix.Data(), fRunRange[1]);
1657 if (!fRunRange[0] && !fRunNumbers.Length()) {
1658 TIter next(fInputFiles);
1661 while ((obj=next())) list += obj->GetName();
1662 printf("= Input files to be processed: _________________ %s\n", list.Data());
1664 if (TestBit(AliAnalysisGrid::kTest))
1665 printf("= Number of input files used in test mode: _____ %d\n", fNtestFiles);
1666 printf("= List of output files to be registered: _______ %s\n", fOutputFiles.Data());
1667 printf("= List of outputs going to be archived: ________ %s\n", fOutputArchive.Data());
1668 printf("= List of outputs that should not be merged: ___ %s\n", fMergeExcludes.Data());
1669 printf("= List of outputs produced during Terminate: ___ %s\n", fTerminateFiles.Data());
1670 printf("=====================================================================\n");
1671 printf("= Job price: ___________________________________ %d\n", fPrice);
1672 printf("= Time to live (TTL): __________________________ %d\n", fTTL);
1673 printf("= Max files per subjob: ________________________ %d\n", fSplitMaxInputFileNumber);
1674 if (fMaxInitFailed>0)
1675 printf("= Max number of subjob fails to kill: __________ %d\n", fMaxInitFailed);
1676 if (fMasterResubmitThreshold>0)
1677 printf("= Resubmit master job if failed subjobs >_______ %d\n", fMasterResubmitThreshold);
1678 printf("= Number of replicas for the output files_______ %d\n", fNreplicas);
1679 if (fNrunsPerMaster>0)
1680 printf("= Number of runs per master job: _______________ %d\n", fNrunsPerMaster);
1681 printf("= Number of files in one chunk to be merged: ___ %d\n", fMaxMergeFiles);
1682 printf("= Name of the generated execution script: ______ %s\n", fExecutable.Data());
1683 printf("= Executable command: __________________________ %s\n", fExecutableCommand.Data());
1684 if (fArguments.Length())
1685 printf("= Arguments for the execution script: __________ %s\n",fArguments.Data());
1686 if (fExecutableArgs.Length())
1687 printf("= Arguments after macro name in executable______ %s\n",fExecutableArgs.Data());
1688 printf("= Name of the generated analysis macro: ________ %s\n",fAnalysisMacro.Data());
1689 printf("= User analysis files to be deployed: __________ %s\n",fAnalysisSource.Data());
1690 printf("= Additional libs to be loaded or souces to be compiled runtime: <%s>\n",fAdditionalLibs.Data());
1691 printf("= Master jobs split mode: ______________________ %s\n",fSplitMode.Data());
1693 printf("= Custom name for the dataset to be created: ___ %s\n", fDatasetName.Data());
1694 printf("= Name of the generated JDL: ___________________ %s\n", fJDLName.Data());
1695 if (fIncludePath.Data())
1696 printf("= Include path for runtime task compilation: ___ %s\n", fIncludePath.Data());
1697 if (fCloseSE.Length())
1698 printf("= Force job outputs to storage element: ________ %s\n", fCloseSE.Data());
1699 if (fFriendChainName.Length())
1700 printf("= Open friend chain file on worker: ____________ %s\n", fFriendChainName.Data());
1701 if (fPackages && fPackages->GetEntries()) {
1702 TIter next(fPackages);
1705 while ((obj=next())) list += obj->GetName();
1706 printf("= Par files to be used: ________________________ %s\n", list.Data());
1710 //______________________________________________________________________________
1711 void AliAnalysisAlien::SetDefaults()
1713 // Set default values for everything. What cannot be filled will be left empty.
1714 if (fGridJDL) delete fGridJDL;
1715 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
1716 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
1719 fSplitMaxInputFileNumber = 100;
1721 fMasterResubmitThreshold = 0;
1726 fNrunsPerMaster = 1;
1727 fMaxMergeFiles = 100;
1729 fExecutable = "analysis.sh";
1730 fExecutableCommand = "root -b -q";
1732 fExecutableArgs = "";
1733 fAnalysisMacro = "myAnalysis.C";
1734 fAnalysisSource = "";
1735 fAdditionalLibs = "";
1739 fAliROOTVersion = "";
1740 fUser = ""; // Your alien user name
1741 fGridWorkingDir = "";
1742 fGridDataDir = ""; // Can be like: /alice/sim/PDC_08a/LHC08c9/
1743 fDataPattern = "*AliESDs.root"; // Can be like: *AliESDs.root, */pass1/*AliESDs.root, ...
1744 fFriendChainName = "";
1745 fGridOutputDir = "output";
1746 fOutputArchive = "log_archive.zip:std*@disk=1 root_archive.zip:*.root@disk=2";
1747 fOutputFiles = ""; // Like "AliAODs.root histos.root"
1748 fInputFormat = "xml-single";
1749 fJDLName = "analysis.jdl";
1750 fJobTag = "Automatically generated analysis JDL";
1751 fMergeExcludes = "";
1754 SetCheckCopy(kTRUE);
1755 SetDefaultOutputs(kTRUE);
1759 //______________________________________________________________________________
1760 Bool_t AliAnalysisAlien::CheckMergedFiles(const char *filename, const char *aliendir, Int_t nperchunk, Bool_t submit, const char *jdl)
1762 // Static method that checks the status of merging. This can submit merging jobs that did not produced the expected
1763 // output. If <submit> is false (checking) returns true only when the final merged file was found. If submit is true returns
1764 // true if the jobs were successfully submitted.
1765 Int_t countOrig = 0;
1766 Int_t countStage = 0;
1769 Bool_t doneFinal = kFALSE;
1771 TString saliendir(aliendir);
1772 TString sfilename, stmp;
1773 saliendir.ReplaceAll("//","/");
1774 saliendir = saliendir.Strip(TString::kTrailing, '/');
1776 ::Error("GetNregisteredFiles", "You need to be connected to AliEn.");
1779 sfilename = filename;
1780 sfilename.ReplaceAll(".root", "*.root");
1781 printf("Checking directory <%s> for merged files <%s> ...\n", aliendir, sfilename.Data());
1782 TString command = Form("find %s/ *%s", saliendir.Data(), sfilename.Data());
1783 TGridResult *res = gGrid->Command(command);
1785 ::Error("GetNregisteredFiles","Error: No result for the find command\n");
1790 while ((map=(TMap*)nextmap())) {
1791 TString turl = map->GetValue("turl")->GetName();
1792 if (!turl.Length()) {
1797 turl.ReplaceAll("alien://", "");
1798 turl.ReplaceAll(saliendir, "");
1799 sfilename = gSystem->BaseName(turl);
1800 turl = turl.Strip(TString::kLeading, '/');
1801 // Now check to what the file corresponds to:
1802 // original output - aliendir/%03d/filename
1803 // merged file (which stage) - aliendir/filename-Stage%02d_%04d
1804 // final merged file - aliendir/filename
1805 if (sfilename == turl) {
1806 if (sfilename == filename) {
1810 Int_t index = sfilename.Index("Stage");
1811 if (index<0) continue;
1812 stmp = sfilename(index+5,2);
1813 Int_t istage = atoi(stmp);
1814 stmp = sfilename(index+8,4);
1815 Int_t ijob = atoi(stmp);
1816 if (istage<stage) continue; // Ignore lower stages
1819 chunksDone.ResetAllBits();
1823 chunksDone.SetBitNumber(ijob);
1830 printf("=> Removing files from previous stages...\n");
1831 gGrid->Rm(Form("%s/*Stage*.root", aliendir));
1832 for (i=1; i<stage; i++)
1833 gGrid->Rm(Form("%s/*Stage%d*.zip", aliendir, i));
1838 // Compute number of jobs that were submitted for the current stage
1839 Int_t ntotstage = countOrig;
1840 for (i=1; i<=stage; i++) {
1841 if (ntotstage%nperchunk) ntotstage = (ntotstage/nperchunk)+1;
1842 else ntotstage = (ntotstage/nperchunk);
1844 // Now compare with the number of set bits in the chunksDone array
1845 Int_t nmissing = (stage>0)?(ntotstage - countStage):0;
1847 printf("*** Found %d original files\n", countOrig);
1848 if (stage==0) printf("*** No merging completed so far.\n");
1849 else printf("*** Found %d out of %d files merged for stage %d\n", countStage, ntotstage, stage);
1850 if (nmissing) printf("*** Number of merged files missing for this stage: %d -> check merging job completion\n", nmissing);
1851 if (!submit) return doneFinal;
1852 // Sumbit merging jobs for all missing chunks for the current stage.
1853 TString query = Form("submit %s %s", jdl, aliendir);
1856 for (i=0; i<nmissing; i++) {
1857 ichunk = chunksDone.FirstNullBit(ichunk+1);
1858 Int_t jobId = SubmitSingleJob(Form("%s %d %d", query.Data(), stage, ichunk));
1859 if (!jobId) return kFALSE;
1863 // Submit next stage of merging
1864 if (stage==0) countStage = countOrig;
1865 Int_t nchunks = (countStage/nperchunk);
1866 if (countStage%nperchunk) nchunks += 1;
1867 for (i=0; i<nchunks; i++) {
1868 Int_t jobId = SubmitSingleJob(Form("%s %d %d", query.Data(), stage+1, i));
1869 if (!jobId) return kFALSE;
1874 //______________________________________________________________________________
1875 Int_t AliAnalysisAlien::SubmitSingleJob(const char *query)
1877 // Submits a single job corresponding to the query and returns job id. If 0 submission failed.
1878 if (!gGrid) return 0;
1879 printf("=> %s ------> ",query);
1880 TGridResult *res = gGrid->Command(query);
1882 TString jobId = res->GetKey(0,"jobId");
1884 if (jobId.IsNull()) {
1885 printf("submission failed. Reason:\n");
1888 ::Error("SubmitSingleJob", "Your query %s could not be submitted", query);
1891 printf(" Job id: %s\n", jobId.Data());
1895 //______________________________________________________________________________
1896 Bool_t AliAnalysisAlien::MergeOutput(const char *output, const char *basedir, Int_t nmaxmerge, Int_t stage, Int_t ichunk)
1898 // Merge given output files from basedir. The file merger will merge nmaxmerge
1899 // files in a group. Merging can be done in stages:
1900 // stage=0 : will merge all existing files in a single stage
1901 // stage=1 : does a find command for all files that do NOT contain the string "Stage".
1902 // If their number is bigger that nmaxmerge, only the files from
1903 // ichunk*nmaxmerge to ichunk*(nmaxmerge+1)-1 will get merged as output_stage_<ichunk>
1904 // stage=n : does a find command for files named <output>Stage<stage-1>_*. If their number is bigger than
1905 // nmaxmerge, merge just the chunk ichunk, otherwise write the merged output to the file
1907 TString outputFile = output;
1909 TString outputChunk;
1910 TString previousChunk = "";
1911 Int_t countChunk = 0;
1912 Int_t countZero = nmaxmerge;
1913 Bool_t merged = kTRUE;
1914 Int_t index = outputFile.Index("@");
1915 if (index > 0) outputFile.Remove(index);
1916 TString inputFile = outputFile;
1917 if (stage>1) inputFile.ReplaceAll(".root", Form("-Stage%02d_*.root", stage-1));
1918 command = Form("find %s/ *%s", basedir, inputFile.Data());
1919 printf("command: %s\n", command.Data());
1920 TGridResult *res = gGrid->Command(command);
1922 ::Error("MergeOutput","No result for the find command\n");
1926 TFileMerger *fm = 0;
1929 // Check if there is a merge operation to resume. Works only for stage 0 or 1.
1930 outputChunk = outputFile;
1931 outputChunk.ReplaceAll(".root", "_*.root");
1932 // Check for existent temporary merge files
1933 // Check overwrite mode and remove previous partial results if needed
1934 // Preserve old merging functionality for stage 0.
1936 Int_t countChar = 0;
1937 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
1939 // Skip as many input files as in a chunk
1940 for (Int_t counter=0; counter<nmaxmerge; counter++) {
1941 map = (TMap*)nextmap();
1943 ::Error("MergeOutput", "Mismatch found. Please remove partial merged files from local dir.");
1947 TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("turl"));
1948 // Count the '/' characters in the path to the current file.
1949 Int_t crtCount = objs->GetString().CountChar('/');
1951 countChar = crtCount;
1952 // Make sure we check if the same file in the parent dir exists
1953 if (FileExists(Form("%s/../%s", basedir, output))) countChar--;
1955 if (crtCount > countChar) counter--;
1958 ::Error("MergeOutput", "Cannot resume merging for <%s>, nentries=%d", outputFile.Data(), res->GetSize());
1962 outputChunk = outputFile;
1963 outputChunk.ReplaceAll(".root", Form("_%04d.root", countChunk));
1965 if (gSystem->AccessPathName(outputChunk)) continue;
1966 // Merged file with chunks up to <countChunk> found
1967 ::Info("MergeOutput", "Resume merging of <%s> from <%s>\n", outputFile.Data(), outputChunk.Data());
1968 previousChunk = outputChunk;
1972 countZero = nmaxmerge;
1974 while ((map=(TMap*)nextmap())) {
1975 TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("turl"));
1976 if (!objs || !objs->GetString().Length()) {
1977 // Nothing found - skip this output
1982 // Make sure this is a good file and not one from a subjob directory in case we merge runs
1983 // Count the '/' characters in the path to the current file.
1984 Int_t crtCount = objs->GetString().CountChar('/');
1986 countChar = crtCount;
1987 // Make sure we check if the same file in the parent dir exists
1988 if (FileExists(Form("%s/../%s", basedir, output))) countChar--;
1990 if (crtCount > countChar) continue;
1991 // Loop 'find' results and get next LFN
1992 if (countZero == nmaxmerge) {
1993 // First file in chunk - create file merger and add previous chunk if any.
1994 fm = new TFileMerger(kFALSE);
1995 fm->SetFastMethod(kTRUE);
1996 if (previousChunk.Length()) fm->AddFile(previousChunk.Data());
1997 outputChunk = outputFile;
1998 outputChunk.ReplaceAll(".root", Form("_%04d.root", countChunk));
2000 // If last file found, put merged results in the output file
2001 if (map == res->Last()) outputChunk = outputFile;
2002 // Add file to be merged and decrement chunk counter.
2003 fm->AddFile(objs->GetString());
2005 if (countZero==0 || map == res->Last()) {
2006 if (!fm->GetMergeList() || !fm->GetMergeList()->GetSize()) {
2007 // Nothing found - skip this output
2008 ::Warning("MergeOutput", "No <%s> files found.", inputFile.Data());
2013 fm->OutputFile(outputChunk);
2014 // Merge the outputs, then go to next chunk
2016 ::Error("MergeOutput", "Could not merge all <%s> files", outputFile.Data());
2021 ::Info("MergeOutputs", "\n##### Merged %d output files to <%s>", fm->GetMergeList()->GetSize(), outputChunk.Data());
2022 gSystem->Unlink(previousChunk);
2024 if (map == res->Last()) {
2030 countZero = nmaxmerge;
2031 previousChunk = outputChunk;
2036 // Merging stage different than 0.
2037 // Move to the begining of the requested chunk.
2038 outputChunk = outputFile;
2039 if (nmaxmerge < res->GetSize()) {
2040 if (ichunk*nmaxmerge >= res->GetSize()) {
2041 ::Error("MergeOutput", "Cannot merge merge chunk %d grouping %d files from %d total.", ichunk, nmaxmerge, res->GetSize());
2045 for (Int_t counter=0; counter<ichunk*nmaxmerge; counter++) nextmap();
2046 outputChunk.ReplaceAll(".root", Form("-Stage%02d_%04d.root", stage, ichunk));
2048 countZero = nmaxmerge;
2049 fm = new TFileMerger(kFALSE);
2050 fm->SetFastMethod(kTRUE);
2051 while ((map=(TMap*)nextmap())) {
2052 // Loop 'find' results and get next LFN
2053 TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("turl"));
2054 if (!objs || !objs->GetString().Length()) {
2055 // Nothing found - skip this output
2060 // Add file to be merged and decrement chunk counter.
2061 fm->AddFile(objs->GetString());
2063 if (countZero==0) break;
2066 if (!fm->GetMergeList() || !fm->GetMergeList()->GetSize()) {
2067 // Nothing found - skip this output
2068 ::Warning("MergeOutput", "No <%s> files found.", inputFile.Data());
2072 fm->OutputFile(outputChunk);
2073 // Merge the outputs
2075 ::Error("MergeOutput", "Could not merge all <%s> files", outputFile.Data());
2079 ::Info("MergeOutput", "\n##### Merged %d output files to <%s>", fm->GetMergeList()->GetSize(), outputChunk.Data());
2085 //______________________________________________________________________________
2086 Bool_t AliAnalysisAlien::MergeOutputs()
2088 // Merge analysis outputs existing in the AliEn space.
2089 if (TestBit(AliAnalysisGrid::kTest)) return kTRUE;
2090 if (TestBit(AliAnalysisGrid::kOffline)) return kFALSE;
2092 Error("MergeOutputs", "Cannot merge outputs without grid connection. Terminate will NOT be executed");
2096 if (!TestBit(AliAnalysisGrid::kMerge)) {
2097 Info("MergeOutputs", "### Re-run with <MergeViaJDL> option in terminate mode of the plugin to submit merging jobs ###");
2100 if (fProductionMode) {
2101 Info("MergeOutputs", "### Merging will be submitted by LPM manager... ###");
2104 Info("MergeOutputs", "Submitting merging JDL");
2105 if (!SubmitMerging()) return kFALSE;
2106 Info("MergeOutputs", "### Re-run with <MergeViaJDL> off to collect results after merging jobs are done ###");
2107 Info("MergeOutputs", "### The Terminate() method is executed by the merging jobs");
2110 // Get the output path
2111 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("/%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
2112 if (!DirectoryExists(fGridOutputDir)) {
2113 Error("MergeOutputs", "Grid output directory %s not found. Terminate() will NOT be executed", fGridOutputDir.Data());
2116 if (!fOutputFiles.Length()) {
2117 Error("MergeOutputs", "No output file names defined. Are you running the right AliAnalysisAlien configuration ?");
2120 // Check if fast read option was requested
2121 Info("MergeOutputs", "Started local merging of output files from: alien://%s \
2122 \n======= overwrite mode = %d", fGridOutputDir.Data(), (Int_t)fOverwriteMode);
2123 if (fFastReadOption) {
2124 Warning("MergeOutputs", "You requested FastRead option. Using xrootd flags to reduce timeouts. This may skip some files that could be accessed ! \
2125 \n+++ NOTE: To disable this option, use: plugin->SetFastReadOption(kFALSE)");
2126 gEnv->SetValue("XNet.ConnectTimeout",10);
2127 gEnv->SetValue("XNet.RequestTimeout",10);
2128 gEnv->SetValue("XNet.MaxRedirectCount",2);
2129 gEnv->SetValue("XNet.ReconnectTimeout",10);
2130 gEnv->SetValue("XNet.FirstConnectMaxCnt",1);
2132 // Make sure we change the temporary directory
2133 gSystem->Setenv("TMPDIR", gSystem->pwd());
2134 TObjArray *list = fOutputFiles.Tokenize(",");
2138 Bool_t merged = kTRUE;
2139 while((str=(TObjString*)next())) {
2140 outputFile = str->GetString();
2141 Int_t index = outputFile.Index("@");
2142 if (index > 0) outputFile.Remove(index);
2143 TString outputChunk = outputFile;
2144 outputChunk.ReplaceAll(".root", "_*.root");
2145 // Skip already merged outputs
2146 if (!gSystem->AccessPathName(outputFile)) {
2147 if (fOverwriteMode) {
2148 Info("MergeOutputs", "Overwrite mode. Existing file %s was deleted.", outputFile.Data());
2149 gSystem->Unlink(outputFile);
2150 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
2151 Info("MergeOutput", "Overwrite mode: partial merged files %s will removed",
2152 outputChunk.Data());
2153 gSystem->Exec(Form("rm -f %s", outputChunk.Data()));
2156 Info("MergeOutputs", "Output file <%s> found. Not merging again.", outputFile.Data());
2160 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
2161 Info("MergeOutput", "Overwrite mode: partial merged files %s will removed",
2162 outputChunk.Data());
2163 gSystem->Exec(Form("rm -f %s", outputChunk.Data()));
2166 if (fMergeExcludes.Length() &&
2167 fMergeExcludes.Contains(outputFile.Data())) continue;
2168 // Perform a 'find' command in the output directory, looking for registered outputs
2169 merged = MergeOutput(outputFile, fGridOutputDir, fMaxMergeFiles);
2171 Error("MergeOutputs", "Terminate() will NOT be executed");
2174 TFile *fileOpened = (TFile*)gROOT->GetListOfFiles()->FindObject(outputFile);
2175 if (fileOpened) fileOpened->Close();
2180 //______________________________________________________________________________
2181 void AliAnalysisAlien::SetDefaultOutputs(Bool_t flag)
2183 // Use the output files connected to output containers from the analysis manager
2184 // rather than the files defined by SetOutputFiles
2185 if (flag && !TObject::TestBit(AliAnalysisGrid::kDefaultOutputs))
2186 Info("SetDefaultOutputs", "Plugin will use the output files taken from analysis manager");
2187 TObject::SetBit(AliAnalysisGrid::kDefaultOutputs, flag);
2190 //______________________________________________________________________________
2191 void AliAnalysisAlien::SetOutputFiles(const char *list)
2193 // Manually set the output files list.
2194 // Removes duplicates. Not allowed if default outputs are not disabled.
2195 if (TObject::TestBit(AliAnalysisGrid::kDefaultOutputs)) {
2196 Fatal("SetOutputFiles", "You have to explicitly call SetDefaultOutputs(kFALSE) to manually set output files.");
2199 Info("SetOutputFiles", "Output file list is set manually - you are on your own.");
2201 TString slist = list;
2202 if (slist.Contains("@")) Warning("SetOutputFiles","The plugin does not allow explicit SE's. Please use: SetNumberOfReplicas() instead.");
2203 TObjArray *arr = slist.Tokenize(" ");
2207 while ((os=(TObjString*)next())) {
2208 sout = os->GetString();
2209 if (sout.Index("@")>0) sout.Remove(sout.Index("@"));
2210 if (fOutputFiles.Contains(sout)) continue;
2211 if (!fOutputFiles.IsNull()) fOutputFiles += ",";
2212 fOutputFiles += sout;
2217 //______________________________________________________________________________
2218 void AliAnalysisAlien::SetOutputArchive(const char *list)
2220 // Manually set the output archive list. Free text - you are on your own...
2221 // Not allowed if default outputs are not disabled.
2222 if (TObject::TestBit(AliAnalysisGrid::kDefaultOutputs)) {
2223 Fatal("SetOutputArchive", "You have to explicitly call SetDefaultOutputs(kFALSE) to manually set the output archives.");
2226 Info("SetOutputArchive", "Output archive is set manually - you are on your own.");
2227 fOutputArchive = list;
2230 //______________________________________________________________________________
2231 void AliAnalysisAlien::SetPreferedSE(const char */*se*/)
2233 // Setting a prefered output SE is not allowed anymore.
2234 Warning("SetPreferedSE", "Setting a preferential SE is not allowed anymore via the plugin. Use SetNumberOfReplicas() and SetDefaultOutputs()");
2237 //______________________________________________________________________________
2238 Bool_t AliAnalysisAlien::StartAnalysis(Long64_t /*nentries*/, Long64_t /*firstEntry*/)
2240 // Start remote grid analysis.
2241 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
2242 Bool_t testMode = TestBit(AliAnalysisGrid::kTest);
2243 if (!mgr || !mgr->IsInitialized()) {
2244 Error("StartAnalysis", "You need an initialized analysis manager for this");
2247 // Are we in PROOF mode ?
2248 if (mgr->IsProofMode()) {
2249 Info("StartAnalysis", "##### Starting PROOF analysis on cluster <%s> via the plugin #####", fProofCluster.Data());
2250 if (fProofCluster.IsNull()) {
2251 Error("StartAnalysis", "You need to specify the proof cluster name via SetProofCluster");
2254 if (fProofDataSet.IsNull() && !testMode) {
2255 Error("StartAnalysis", "You need to specify a dataset using SetProofDataSet()");
2258 // Set the needed environment
2259 gEnv->SetValue("XSec.GSI.DelegProxy","2");
2260 // Do we need to reset PROOF ? The success of the Reset operation cannot be checked
2261 if (fProofReset && !testMode) {
2262 if (fProofReset==1) {
2263 Info("StartAnalysis", "Sending soft reset signal to proof cluster %s", fProofCluster.Data());
2264 gROOT->ProcessLine(Form("TProof::Reset(\"%s\", kFALSE);", fProofCluster.Data()));
2266 Info("StartAnalysis", "Sending hard reset signal to proof cluster %s", fProofCluster.Data());
2267 gROOT->ProcessLine(Form("TProof::Reset(\"%s\", kTRUE);", fProofCluster.Data()));
2269 Info("StartAnalysis", "Stopping the analysis. Please use SetProofReset(0) to resume.");
2272 // Do we need to change the ROOT version ? The success of this cannot be checked.
2273 if (!fRootVersionForProof.IsNull() && !testMode) {
2274 gROOT->ProcessLine(Form("TProof::Mgr(\"%s\")->SetROOTVersion(\"%s\");",
2275 fProofCluster.Data(), fRootVersionForProof.Data()));
2277 // Connect to PROOF and check the status
2280 if (fNproofWorkersPerSlave) sworkers = Form("workers=%dx", fNproofWorkersPerSlave);
2281 else if (fNproofWorkers) sworkers = Form("workers=%d", fNproofWorkers);
2283 if (!sworkers.IsNull())
2284 proof = gROOT->ProcessLine(Form("TProof::Open(\"%s\", \"%s\");", fProofCluster.Data(), sworkers.Data()));
2286 proof = gROOT->ProcessLine(Form("TProof::Open(\"%s\");", fProofCluster.Data()));
2288 proof = gROOT->ProcessLine("TProof::Open(\"\");");
2290 Error("StartAnalysis", "Could not start PROOF in test mode");
2295 Error("StartAnalysis", "Could not connect to PROOF cluster <%s>", fProofCluster.Data());
2298 if (fNproofWorkersPerSlave*fNproofWorkers > 0)
2299 gROOT->ProcessLine(Form("gProof->SetParallel(%d);", fNproofWorkers));
2300 // Is dataset existing ?
2302 TString dataset = fProofDataSet;
2303 Int_t index = dataset.Index("#");
2304 if (index>=0) dataset.Remove(index);
2305 // if (!gROOT->ProcessLine(Form("gProof->ExistsDataSet(\"%s\");",fProofDataSet.Data()))) {
2306 // Error("StartAnalysis", "Dataset %s not existing", fProofDataSet.Data());
2309 // Info("StartAnalysis", "Dataset %s found", dataset.Data());
2311 // Is ClearPackages() needed ?
2312 if (TestSpecialBit(kClearPackages)) {
2313 Info("StartAnalysis", "ClearPackages signal sent to PROOF. Use SetClearPackages(kFALSE) to reset this.");
2314 gROOT->ProcessLine("gProof->ClearPackages();");
2316 // Is a given aliroot mode requested ?
2319 if (!fAliRootMode.IsNull()) {
2320 TString alirootMode = fAliRootMode;
2321 if (alirootMode == "default") alirootMode = "";
2322 Info("StartAnalysis", "You are requesting AliRoot mode: %s", fAliRootMode.Data());
2323 optionsList.SetOwner();
2324 optionsList.Add(new TNamed("ALIROOT_MODE", alirootMode.Data()));
2325 // Check the additional libs to be loaded
2327 Bool_t parMode = kFALSE;
2328 if (!alirootMode.IsNull()) extraLibs = "ANALYSIS:ANALYSISalice";
2329 // Parse the extra libs for .so
2330 if (fAdditionalLibs.Length()) {
2331 TObjArray *list = fAdditionalLibs.Tokenize(" ");
2334 while((str=(TObjString*)next())) {
2335 if (str->GetString().Contains(".so")) {
2337 Warning("StartAnalysis", "Plugin does not support loading libs after par files in PROOF mode. Library %s and following will not load on workers", str->GetName());
2340 TString stmp = str->GetName();
2341 if (stmp.BeginsWith("lib")) stmp.Remove(0,3);
2342 stmp.ReplaceAll(".so","");
2343 if (!extraLibs.IsNull()) extraLibs += ":";
2347 if (str->GetString().Contains(".par")) {
2348 // The first par file found in the list will not allow any further .so
2350 if (!parLibs.IsNull()) parLibs += ":";
2351 parLibs += str->GetName();
2355 if (list) delete list;
2357 if (!extraLibs.IsNull()) optionsList.Add(new TNamed("ALIROOT_EXTRA_LIBS",extraLibs.Data()));
2358 // Check extra includes
2359 if (!fIncludePath.IsNull()) {
2360 TString includePath = fIncludePath;
2361 includePath.ReplaceAll(" ",":");
2362 includePath.Strip(TString::kTrailing, ':');
2363 Info("StartAnalysis", "Adding extra includes: %s",includePath.Data());
2364 optionsList.Add(new TNamed("ALIROOT_EXTRA_INCLUDES",includePath.Data()));
2366 // Check if connection to grid is requested
2367 if (TestSpecialBit(kProofConnectGrid))
2368 optionsList.Add(new TNamed("ALIROOT_ENABLE_ALIEN", "1"));
2369 // Enable AliRoot par
2371 // Enable proof lite package
2372 TString alirootLite = gSystem->ExpandPathName("$ALICE_ROOT/ANALYSIS/macros/AliRootProofLite.par");
2373 for (Int_t i=0; i<optionsList.GetSize(); i++) {
2374 TNamed *obj = (TNamed*)optionsList.At(i);
2375 printf("%s %s\n", obj->GetName(), obj->GetTitle());
2377 if (!gROOT->ProcessLine(Form("gProof->UploadPackage(\"%s\");",alirootLite.Data()))
2378 && !gROOT->ProcessLine(Form("gProof->EnablePackage(\"%s\", (TList*)0x%lx);",alirootLite.Data(),(ULong_t)&optionsList))) {
2379 Info("StartAnalysis", "AliRootProofLite enabled");
2381 Error("StartAnalysis", "There was an error trying to enable package AliRootProofLite.par");
2385 if (gROOT->ProcessLine(Form("gProof->EnablePackage(\"VO_ALICE@AliRoot::%s\", (TList*)0x%lx);",
2386 fAliROOTVersion.Data(), (ULong_t)&optionsList))) {
2387 Error("StartAnalysis", "There was an error trying to enable package VO_ALICE@AliRoot::%s", fAliROOTVersion.Data());
2391 // Enable first par files from fAdditionalLibs
2392 if (!parLibs.IsNull()) {
2393 TObjArray *list = parLibs.Tokenize(":");
2395 TObjString *package;
2396 while((package=(TObjString*)next())) {
2397 if (gROOT->ProcessLine(Form("gProof->UploadPackage(\"%s\");", package->GetName()))) {
2398 if (gROOT->ProcessLine(Form("gProof->EnablePackage(\"%s\",kTRUE);", package->GetName()))) {
2399 Error("StartAnalysis", "There was an error trying to enable package %s", package->GetName());
2403 Error("StartAnalysis", "There was an error trying to upload package %s", package->GetName());
2407 if (list) delete list;
2410 if (fAdditionalLibs.Contains(".so") && !testMode) {
2411 Error("StartAnalysis", "You request additional libs to be loaded but did not enabled any AliRoot mode. Please refer to: \
2412 \n http://aaf.cern.ch/node/83 and use a parameter for SetAliRootMode()");
2416 // Enable par files if requested
2417 if (fPackages && fPackages->GetEntries()) {
2418 TIter next(fPackages);
2420 while ((package=next())) {
2421 // Skip packages already enabled
2422 if (parLibs.Contains(package->GetName())) continue;
2423 if (gROOT->ProcessLine(Form("gProof->UploadPackage(\"%s\");", package->GetName()))) {
2424 if (gROOT->ProcessLine(Form("gProof->EnablePackage(\"%s\",kTRUE);", package->GetName()))) {
2425 Error("StartAnalysis", "There was an error trying to enable package %s", package->GetName());
2429 Error("StartAnalysis", "There was an error trying to upload package %s", package->GetName());
2434 // Do we need to load analysis source files ?
2435 // NOTE: don't load on client since this is anyway done by the user to attach his task.
2436 if (fAnalysisSource.Length()) {
2437 TObjArray *list = fAnalysisSource.Tokenize(" ");
2440 while((str=(TObjString*)next())) {
2441 gROOT->ProcessLine(Form("gProof->Load(\"%s+g\", kTRUE);", str->GetName()));
2443 if (list) delete list;
2446 // Register dataset to proof lite.
2447 if (fFileForTestMode.IsNull()) {
2448 Error("GetChainForTestMode", "For proof test mode please use SetFileForTestMode() pointing to a file that contains data file locations.");
2451 if (gSystem->AccessPathName(fFileForTestMode)) {
2452 Error("GetChainForTestMode", "File not found: %s", fFileForTestMode.Data());
2455 TFileCollection *coll = new TFileCollection();
2456 coll->AddFromFile(fFileForTestMode);
2457 gROOT->ProcessLine(Form("gProof->RegisterDataSet(\"test_collection\", (TFileCollection*)0x%lx, \"OV\");", (ULong_t)coll));
2458 gROOT->ProcessLine("gProof->ShowDataSets()");
2463 // Check if output files have to be taken from the analysis manager
2464 if (TestBit(AliAnalysisGrid::kDefaultOutputs)) {
2465 // Add output files and AOD files
2466 fOutputFiles = GetListOfFiles("outaod");
2467 // Add extra files registered to the analysis manager
2468 TString extra = GetListOfFiles("ext");
2469 if (!extra.IsNull()) {
2470 extra.ReplaceAll(".root", "*.root");
2471 if (!fOutputFiles.IsNull()) fOutputFiles += ",";
2472 fOutputFiles += extra;
2474 // Compose the output archive.
2475 fOutputArchive = "log_archive.zip:std*@disk=1 ";
2476 fOutputArchive += Form("root_archive.zip:%s@disk=%d",fOutputFiles.Data(),fNreplicas);
2478 // if (!fCloseSE.Length()) fCloseSE = gSystem->Getenv("alien_CLOSE_SE");
2479 if (TestBit(AliAnalysisGrid::kOffline)) {
2480 Info("StartAnalysis","\n##### OFFLINE MODE ##### Files to be used in GRID are produced but not copied \
2481 \n there nor any job run. You can revise the JDL and analysis \
2482 \n macro then run the same in \"submit\" mode.");
2483 } else if (TestBit(AliAnalysisGrid::kTest)) {
2484 Info("StartAnalysis","\n##### LOCAL MODE ##### Your analysis will be run locally on a subset of the requested \
2486 } else if (TestBit(AliAnalysisGrid::kSubmit)) {
2487 Info("StartAnalysis","\n##### SUBMIT MODE ##### Files required by your analysis are copied to your grid working \
2488 \n space and job submitted.");
2489 } else if (TestBit(AliAnalysisGrid::kMerge)) {
2490 Info("StartAnalysis","\n##### MERGE MODE ##### The registered outputs of the analysis will be merged");
2491 if (fMergeViaJDL) CheckInputData();
2494 Info("StartAnalysis","\n##### FULL ANALYSIS MODE ##### Producing needed files and submitting your analysis job...");
2499 Error("StartAnalysis", "Cannot start grid analysis without grid connection");
2502 if (IsCheckCopy() && gGrid) CheckFileCopy(gGrid->GetHomeDirectory());
2503 if (!CheckInputData()) {
2504 Error("StartAnalysis", "There was an error in preprocessing your requested input data");
2507 if (!CreateDataset(fDataPattern)) {
2509 if (!fRunNumbers.Length() && !fRunRange[0]) serror = Form("path to data directory: <%s>", fGridDataDir.Data());
2510 if (fRunNumbers.Length()) serror = "run numbers";
2511 if (fRunRange[0]) serror = Form("run range [%d, %d]", fRunRange[0], fRunRange[1]);
2512 serror += Form("\n or data pattern <%s>", fDataPattern.Data());
2513 Error("StartAnalysis", "No data to process. Please fix %s in your plugin configuration.", serror.Data());
2516 WriteAnalysisFile();
2517 WriteAnalysisMacro();
2519 WriteValidationScript();
2521 WriteMergingMacro();
2522 WriteMergeExecutable();
2523 WriteValidationScript(kTRUE);
2525 if (!CreateJDL()) return kFALSE;
2526 if (TestBit(AliAnalysisGrid::kOffline)) return kFALSE;
2528 // Locally testing the analysis
2529 Info("StartAnalysis", "\n_______________________________________________________________________ \
2530 \n Running analysis script in a daughter shell as on a worker node \
2531 \n_______________________________________________________________________");
2532 TObjArray *list = fOutputFiles.Tokenize(",");
2536 while((str=(TObjString*)next())) {
2537 outputFile = str->GetString();
2538 Int_t index = outputFile.Index("@");
2539 if (index > 0) outputFile.Remove(index);
2540 if (!gSystem->AccessPathName(outputFile)) gSystem->Exec(Form("rm %s", outputFile.Data()));
2543 gSystem->Exec(Form("bash %s 2>stderr", fExecutable.Data()));
2544 gSystem->Exec(Form("bash %s",fValidationScript.Data()));
2545 // gSystem->Exec("cat stdout");
2548 // Check if submitting is managed by LPM manager
2549 if (fProductionMode) {
2550 TString prodfile = fJDLName;
2551 prodfile.ReplaceAll(".jdl", ".prod");
2552 WriteProductionFile(prodfile);
2553 Info("StartAnalysis", "Job submitting is managed by LPM. Rerun in terminate mode after jobs finished.");
2556 // Submit AliEn job(s)
2557 gGrid->Cd(fGridOutputDir);
2560 if (!fRunNumbers.Length() && !fRunRange[0]) {
2561 // Submit a given xml or a set of runs
2562 res = gGrid->Command(Form("submit %s", fJDLName.Data()));
2563 printf("*************************** %s\n",Form("submit %s", fJDLName.Data()));
2565 const char *cjobId = res->GetKey(0,"jobId");
2569 Error("StartAnalysis", "Your JDL %s could not be submitted", fJDLName.Data());
2572 Info("StartAnalysis", "\n_______________________________________________________________________ \
2573 \n##### Your JDL %s was successfully submitted. \nTHE JOB ID IS: %s \
2574 \n_______________________________________________________________________",
2575 fJDLName.Data(), cjobId);
2580 Error("StartAnalysis", "No grid result after submission !!! Bailing out...");
2584 // Submit for a range of enumeration of runs.
2585 if (!Submit()) return kFALSE;
2588 Info("StartAnalysis", "\n#### STARTING AN ALIEN SHELL FOR YOU. EXIT WHEN YOUR JOB %s HAS FINISHED. #### \
2589 \n You may exit at any time and terminate the job later using the option <terminate> \
2590 \n ##################################################################################", jobID.Data());
2591 gSystem->Exec("aliensh");
2595 //______________________________________________________________________________
2596 const char *AliAnalysisAlien::GetListOfFiles(const char *type)
2598 // Get a comma-separated list of output files of the requested type.
2599 // Type can be (case unsensitive):
2600 // aod - list of aod files (std, extensions and filters)
2601 // out - list of output files connected to containers (but not aod's or extras)
2602 // ext - list of extra files registered to the manager
2603 // ter - list of files produced in terminate
2604 static TString files;
2606 TString stype = type;
2608 TString aodfiles, extra;
2609 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
2611 ::Error("GetListOfFiles", "Cannot call this without analysis manager");
2612 return files.Data();
2614 if (mgr->GetOutputEventHandler()) {
2615 aodfiles = mgr->GetOutputEventHandler()->GetOutputFileName();
2616 TString extraaod = mgr->GetOutputEventHandler()->GetExtraOutputs();
2617 if (!extraaod.IsNull()) {
2619 aodfiles += extraaod;
2622 if (stype.Contains("aod")) {
2624 if (stype == "aod") return files.Data();
2626 // Add output files that are not in the list of AOD files
2627 TString outputfiles = "";
2628 TIter next(mgr->GetOutputs());
2629 AliAnalysisDataContainer *output;
2630 const char *filename = 0;
2631 while ((output=(AliAnalysisDataContainer*)next())) {
2632 filename = output->GetFileName();
2633 if (!(strcmp(filename, "default"))) continue;
2634 if (outputfiles.Contains(filename)) continue;
2635 if (aodfiles.Contains(filename)) continue;
2636 if (!outputfiles.IsNull()) outputfiles += ",";
2637 outputfiles += filename;
2639 if (stype.Contains("out")) {
2640 if (!files.IsNull()) files += ",";
2641 files += outputfiles;
2642 if (stype == "out") return files.Data();
2644 // Add extra files registered to the analysis manager
2646 extra = mgr->GetExtraFiles();
2647 if (!extra.IsNull()) {
2649 extra.ReplaceAll(" ", ",");
2650 TObjArray *fextra = extra.Tokenize(",");
2651 TIter nextx(fextra);
2653 while ((obj=nextx())) {
2654 if (aodfiles.Contains(obj->GetName())) continue;
2655 if (outputfiles.Contains(obj->GetName())) continue;
2656 if (sextra.Contains(obj->GetName())) continue;
2657 if (!sextra.IsNull()) sextra += ",";
2658 sextra += obj->GetName();
2661 if (stype.Contains("ext")) {
2662 if (!files.IsNull()) files += ",";
2666 if (stype == "ext") return files.Data();
2668 if (!fTerminateFiles.IsNull()) {
2669 fTerminateFiles.Strip();
2670 fTerminateFiles.ReplaceAll(" ",",");
2671 TObjArray *fextra = fTerminateFiles.Tokenize(",");
2672 TIter nextx(fextra);
2674 while ((obj=nextx())) {
2675 if (aodfiles.Contains(obj->GetName())) continue;
2676 if (outputfiles.Contains(obj->GetName())) continue;
2677 if (termfiles.Contains(obj->GetName())) continue;
2678 if (sextra.Contains(obj->GetName())) continue;
2679 if (!termfiles.IsNull()) termfiles += ",";
2680 termfiles += obj->GetName();
2684 if (stype.Contains("ter")) {
2685 if (!files.IsNull() && !termfiles.IsNull()) {
2690 return files.Data();
2693 //______________________________________________________________________________
2694 Bool_t AliAnalysisAlien::Submit()
2696 // Submit all master jobs.
2697 Int_t nmasterjobs = fInputFiles->GetEntries();
2698 Long_t tshoot = gSystem->Now();
2699 if (!fNsubmitted && !SubmitNext()) return kFALSE;
2700 while (fNsubmitted < nmasterjobs) {
2701 Long_t now = gSystem->Now();
2702 if ((now-tshoot)>30000) {
2704 if (!SubmitNext()) return kFALSE;
2710 //______________________________________________________________________________
2711 Bool_t AliAnalysisAlien::SubmitMerging()
2713 // Submit all merging jobs.
2714 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("/%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
2715 gGrid->Cd(fGridOutputDir);
2716 TString mergeJDLName = fExecutable;
2717 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
2718 Int_t ntosubmit = fInputFiles->GetEntries();
2719 for (Int_t i=0; i<ntosubmit; i++) {
2720 TString runOutDir = gSystem->BaseName(fInputFiles->At(i)->GetName());
2721 runOutDir.ReplaceAll(".xml", "");
2722 if (fOutputToRunNo) {
2723 // The output directory is the run number
2724 printf("### Submitting merging job for run <%s>\n", runOutDir.Data());
2725 runOutDir = Form("%s/%s", fGridOutputDir.Data(), runOutDir.Data());
2727 // The output directory is the master number in 3 digits format
2728 printf("### Submitting merging job for master <%03d>\n", i);
2729 runOutDir = Form("%s/%03d",fGridOutputDir.Data(), i);
2731 // Check now the number of merging stages.
2732 TObjArray *list = fOutputFiles.Tokenize(",");
2736 while((str=(TObjString*)next())) {
2737 outputFile = str->GetString();
2738 Int_t index = outputFile.Index("@");
2739 if (index > 0) outputFile.Remove(index);
2740 if (!fMergeExcludes.Contains(outputFile)) break;
2743 Bool_t done = CheckMergedFiles(outputFile, runOutDir, fMaxMergeFiles, kTRUE, mergeJDLName);
2744 if (!done) return kFALSE;
2746 if (!ntosubmit) return kTRUE;
2747 Info("StartAnalysis", "\n#### STARTING AN ALIEN SHELL FOR YOU. EXIT WHEN YOUR MERGING JOBS HAVE FINISHED. #### \
2748 \n You may exit at any time and terminate the job later using the option <terminate> but disabling SetMergeViaJDL\
2749 \n ##################################################################################");
2750 gSystem->Exec("aliensh");
2754 //______________________________________________________________________________
2755 Bool_t AliAnalysisAlien::SubmitNext()
2757 // Submit next bunch of master jobs if the queue is free. The first master job is
2758 // submitted right away, while the next will not be unless the previous was split.
2759 // The plugin will not submit new master jobs if there are more that 500 jobs in
2761 static Bool_t iscalled = kFALSE;
2762 static Int_t firstmaster = 0;
2763 static Int_t lastmaster = 0;
2764 static Int_t npermaster = 0;
2765 if (iscalled) return kTRUE;
2767 Int_t nrunning=0, nwaiting=0, nerror=0, ndone=0;
2768 Int_t ntosubmit = 0;
2771 Int_t nmasterjobs = fInputFiles->GetEntries();
2774 if (!IsUseSubmitPolicy()) {
2776 Info("SubmitNext","### Warning submit policy not used ! Submitting too many jobs at a time may be prohibitted. \
2777 \n### You can use SetUseSubmitPolicy() to enable if you have problems.");
2778 ntosubmit = nmasterjobs;
2781 TString status = GetJobStatus(firstmaster, lastmaster, nrunning, nwaiting, nerror, ndone);
2782 printf("=== master %d: %s\n", lastmaster, status.Data());
2783 // If last master not split, just return
2784 if (status != "SPLIT") {iscalled = kFALSE; return kTRUE;}
2785 // No more than 100 waiting jobs
2786 if (nwaiting>500) {iscalled = kFALSE; return kTRUE;}
2787 npermaster = (nrunning+nwaiting+nerror+ndone)/fNsubmitted;
2788 if (npermaster) ntosubmit = (500-nwaiting)/npermaster;
2789 if (!ntosubmit) ntosubmit = 1;
2790 printf("=== WAITING(%d) RUNNING(%d) DONE(%d) OTHER(%d) NperMaster=%d => to submit %d jobs\n",
2791 nwaiting, nrunning, ndone, nerror, npermaster, ntosubmit);
2793 for (Int_t i=0; i<ntosubmit; i++) {
2794 // Submit for a range of enumeration of runs.
2795 if (fNsubmitted>=nmasterjobs) {iscalled = kFALSE; return kTRUE;}
2797 TString runOutDir = gSystem->BaseName(fInputFiles->At(fNsubmitted)->GetName());
2798 runOutDir.ReplaceAll(".xml", "");
2800 query = Form("submit %s %s %s", fJDLName.Data(), fInputFiles->At(fNsubmitted)->GetName(), runOutDir.Data());
2802 query = Form("submit %s %s %03d", fJDLName.Data(), fInputFiles->At(fNsubmitted)->GetName(), fNsubmitted);
2803 printf("********* %s\n",query.Data());
2804 res = gGrid->Command(query);
2806 TString cjobId1 = res->GetKey(0,"jobId");
2807 if (!cjobId1.Length()) {
2811 Error("StartAnalysis", "Your JDL %s could not be submitted. The message was:", fJDLName.Data());
2814 Info("StartAnalysis", "\n_______________________________________________________________________ \
2815 \n##### Your JDL %s submitted (%d to go). \nTHE JOB ID IS: %s \
2816 \n_______________________________________________________________________",
2817 fJDLName.Data(), nmasterjobs-fNsubmitted-1, cjobId1.Data());
2820 lastmaster = cjobId1.Atoi();
2821 if (!firstmaster) firstmaster = lastmaster;
2826 Error("StartAnalysis", "No grid result after submission !!! Bailing out...");
2834 //______________________________________________________________________________
2835 void AliAnalysisAlien::WriteAnalysisFile()
2837 // Write current analysis manager into the file <analysisFile>
2838 TString analysisFile = fExecutable;
2839 analysisFile.ReplaceAll(".sh", ".root");
2840 if (!TestBit(AliAnalysisGrid::kSubmit)) {
2841 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
2842 if (!mgr || !mgr->IsInitialized()) {
2843 Error("WriteAnalysisFile", "You need an initialized analysis manager for this");
2846 // Check analysis type
2848 if (mgr->GetMCtruthEventHandler()) TObject::SetBit(AliAnalysisGrid::kUseMC);
2849 handler = (TObject*)mgr->GetInputEventHandler();
2851 if (handler->InheritsFrom("AliESDInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseESD);
2852 if (handler->InheritsFrom("AliAODInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseAOD);
2854 TDirectory *cdir = gDirectory;
2855 TFile *file = TFile::Open(analysisFile, "RECREATE");
2857 // Skip task Terminate calls for the grid job (but not in test mode, where we want to check also the terminate mode
2858 if (!TestBit(AliAnalysisGrid::kTest)) mgr->SetSkipTerminate(kTRUE);
2859 // Unless merging makes no sense
2860 if (IsSingleOutput()) mgr->SetSkipTerminate(kFALSE);
2863 // Enable termination for local jobs
2864 mgr->SetSkipTerminate(kFALSE);
2866 if (cdir) cdir->cd();
2867 Info("WriteAnalysisFile", "\n##### Analysis manager: %s wrote to file <%s>\n", mgr->GetName(),analysisFile.Data());
2869 Bool_t copy = kTRUE;
2870 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
2873 TString workdir = gGrid->GetHomeDirectory();
2874 workdir += fGridWorkingDir;
2875 Info("WriteAnalysisFile", "\n##### Copying file <%s> containing your initialized analysis manager to your alien workspace", analysisFile.Data());
2876 if (FileExists(analysisFile)) gGrid->Rm(analysisFile);
2877 TFile::Cp(Form("file:%s",analysisFile.Data()), Form("alien://%s/%s", workdir.Data(),analysisFile.Data()));
2881 //______________________________________________________________________________
2882 void AliAnalysisAlien::WriteAnalysisMacro()
2884 // Write the analysis macro that will steer the analysis in grid mode.
2885 if (!TestBit(AliAnalysisGrid::kSubmit)) {
2887 out.open(fAnalysisMacro.Data(), ios::out);
2889 Error("WriteAnalysisMacro", "could not open file %s for writing", fAnalysisMacro.Data());
2892 Bool_t hasSTEERBase = kFALSE;
2893 Bool_t hasESD = kFALSE;
2894 Bool_t hasAOD = kFALSE;
2895 Bool_t hasANALYSIS = kFALSE;
2896 Bool_t hasANALYSISalice = kFALSE;
2897 Bool_t hasCORRFW = kFALSE;
2898 TString func = fAnalysisMacro;
2899 TString type = "ESD";
2900 TString comment = "// Analysis using ";
2901 if (TObject::TestBit(AliAnalysisGrid::kUseESD)) comment += "ESD";
2902 if (TObject::TestBit(AliAnalysisGrid::kUseAOD)) {
2906 if (type!="AOD" && fFriendChainName!="") {
2907 Error("WriteAnalysisMacro", "Friend chain can be attached only to AOD");
2910 if (TObject::TestBit(AliAnalysisGrid::kUseMC)) comment += "/MC";
2911 else comment += " data";
2912 out << "const char *anatype = \"" << type.Data() << "\";" << endl << endl;
2913 func.ReplaceAll(".C", "");
2914 out << "void " << func.Data() << "()" << endl;
2916 out << comment.Data() << endl;
2917 out << "// Automatically generated analysis steering macro executed in grid subjobs" << endl << endl;
2918 out << " TStopwatch timer;" << endl;
2919 out << " timer.Start();" << endl << endl;
2920 // Change temp directory to current one
2921 out << "// Set temporary merging directory to current one" << endl;
2922 out << " gSystem->Setenv(\"TMPDIR\", gSystem->pwd());" << endl << endl;
2923 if (!fExecutableCommand.Contains("aliroot")) {
2924 out << "// load base root libraries" << endl;
2925 out << " gSystem->Load(\"libTree\");" << endl;
2926 out << " gSystem->Load(\"libGeom\");" << endl;
2927 out << " gSystem->Load(\"libVMC\");" << endl;
2928 out << " gSystem->Load(\"libPhysics\");" << endl << endl;
2929 out << " gSystem->Load(\"libMinuit\");" << endl << endl;
2931 if (fAdditionalRootLibs.Length()) {
2932 // in principle libtree /lib geom libvmc etc. can go into this list, too
2933 out << "// Add aditional libraries" << endl;
2934 TObjArray *list = fAdditionalRootLibs.Tokenize(" ");
2937 while((str=(TObjString*)next())) {
2938 if (str->GetString().Contains(".so"))
2939 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
2941 if (list) delete list;
2943 out << "// include path" << endl;
2944 if (fIncludePath.Length()) out << " gSystem->AddIncludePath(\"" << fIncludePath.Data() << "\");" << endl;
2945 out << " gSystem->AddIncludePath(\"-I$ALICE_ROOT/include\");" << endl << endl;
2946 out << "// Load analysis framework libraries" << endl;
2947 TString setupPar = "AliAnalysisAlien::SetupPar";
2949 if (!fExecutableCommand.Contains("aliroot")) {
2950 out << " gSystem->Load(\"libSTEERBase\");" << endl;
2951 out << " gSystem->Load(\"libESD\");" << endl;
2952 out << " gSystem->Load(\"libAOD\");" << endl;
2954 out << " gSystem->Load(\"libANALYSIS\");" << endl;
2955 out << " gSystem->Load(\"libANALYSISalice\");" << endl;
2956 out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
2958 TIter next(fPackages);
2961 while ((obj=next())) {
2962 pkgname = obj->GetName();
2963 if (pkgname == "STEERBase" ||
2964 pkgname == "STEERBase.par") hasSTEERBase = kTRUE;
2965 if (pkgname == "ESD" ||
2966 pkgname == "ESD.par") hasESD = kTRUE;
2967 if (pkgname == "AOD" ||
2968 pkgname == "AOD.par") hasAOD = kTRUE;
2969 if (pkgname == "ANALYSIS" ||
2970 pkgname == "ANALYSIS.par") hasANALYSIS = kTRUE;
2971 if (pkgname == "ANALYSISalice" ||
2972 pkgname == "ANALYSISalice.par") hasANALYSISalice = kTRUE;
2973 if (pkgname == "CORRFW" ||
2974 pkgname == "CORRFW.par") hasCORRFW = kTRUE;
2976 if (hasANALYSISalice) setupPar = "SetupPar";
2977 if (!hasSTEERBase) out << " gSystem->Load(\"libSTEERBase\");" << endl;
2978 else out << " if (!" << setupPar << "(\"STEERBase\")) return;" << endl;
2979 if (!hasESD) out << " gSystem->Load(\"libESD\");" << endl;
2980 else out << " if (!" << setupPar << "(\"ESD\")) return;" << endl;
2981 if (!hasAOD) out << " gSystem->Load(\"libAOD\");" << endl;
2982 else out << " if (!" << setupPar << "(\"AOD\")) return;" << endl;
2983 if (!hasANALYSIS) out << " gSystem->Load(\"libANALYSIS\");" << endl;
2984 else out << " if (!" << setupPar << "(\"ANALYSIS\")) return;" << endl;
2985 if (!hasANALYSISalice) out << " gSystem->Load(\"libANALYSISalice\");" << endl;
2986 else out << " if (!" << setupPar << "(\"ANALYSISalice\")) return;" << endl;
2987 if (!hasCORRFW) out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
2988 else out << " if (!" << setupPar << "(\"CORRFW\")) return;" << endl << endl;
2989 out << "// Compile other par packages" << endl;
2991 while ((obj=next())) {
2992 pkgname = obj->GetName();
2993 if (pkgname == "STEERBase" ||
2994 pkgname == "STEERBase.par" ||
2996 pkgname == "ESD.par" ||
2998 pkgname == "AOD.par" ||
2999 pkgname == "ANALYSIS" ||
3000 pkgname == "ANALYSIS.par" ||
3001 pkgname == "ANALYSISalice" ||
3002 pkgname == "ANALYSISalice.par" ||
3003 pkgname == "CORRFW" ||
3004 pkgname == "CORRFW.par") continue;
3005 out << " if (!" << setupPar << "(\"" << obj->GetName() << "\")) return;" << endl;
3008 if (fAdditionalLibs.Length()) {
3009 out << "// Add aditional AliRoot libraries" << endl;
3010 TObjArray *list = fAdditionalLibs.Tokenize(" ");
3013 while((str=(TObjString*)next())) {
3014 if (str->GetString().Contains(".so"))
3015 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
3016 if (str->GetString().Contains(".par"))
3017 out << " if (!" << setupPar << "(\"" << str->GetString() << "\")) return;" << endl;
3019 if (list) delete list;
3022 out << "// analysis source to be compiled at runtime (if any)" << endl;
3023 if (fAnalysisSource.Length()) {
3024 TObjArray *list = fAnalysisSource.Tokenize(" ");
3027 while((str=(TObjString*)next())) {
3028 out << " gROOT->ProcessLine(\".L " << str->GetString().Data() << "+g\");" << endl;
3030 if (list) delete list;
3033 if (fFastReadOption) {
3034 Warning("WriteAnalysisMacro", "!!! You requested FastRead option. Using xrootd flags to reduce timeouts in the grid jobs. This may skip some files that could be accessed !!! \
3035 \n+++ NOTE: To disable this option, use: plugin->SetFastReadOption(kFALSE)");
3036 out << "// fast xrootd reading enabled" << endl;
3037 out << " printf(\"!!! You requested FastRead option. Using xrootd flags to reduce timeouts. Note that this may skip some files that could be accessed !!!\");" << endl;
3038 out << " gEnv->SetValue(\"XNet.ConnectTimeout\",10);" << endl;
3039 out << " gEnv->SetValue(\"XNet.RequestTimeout\",10);" << endl;
3040 out << " gEnv->SetValue(\"XNet.MaxRedirectCount\",2);" << endl;
3041 out << " gEnv->SetValue(\"XNet.ReconnectTimeout\",10);" << endl;
3042 out << " gEnv->SetValue(\"XNet.FirstConnectMaxCnt\",1);" << endl << endl;
3044 out << "// connect to AliEn and make the chain" << endl;
3045 out << " if (!TGrid::Connect(\"alien://\")) return;" << endl;
3046 out << "// read the analysis manager from file" << endl;
3047 TString analysisFile = fExecutable;
3048 analysisFile.ReplaceAll(".sh", ".root");
3049 out << " TFile *file = TFile::Open(\"" << analysisFile << "\");" << endl;
3050 out << " if (!file) return;" << endl;
3051 out << " TIter nextkey(file->GetListOfKeys());" << endl;
3052 out << " AliAnalysisManager *mgr = 0;" << endl;
3053 out << " TKey *key;" << endl;
3054 out << " while ((key=(TKey*)nextkey())) {" << endl;
3055 out << " if (!strcmp(key->GetClassName(), \"AliAnalysisManager\"))" << endl;
3056 out << " mgr = (AliAnalysisManager*)file->Get(key->GetName());" << endl;
3057 out << " };" << endl;
3058 out << " if (!mgr) {" << endl;
3059 out << " ::Error(\"" << func.Data() << "\", \"No analysis manager found in file " << analysisFile <<"\");" << endl;
3060 out << " return;" << endl;
3061 out << " }" << endl << endl;
3062 out << " mgr->PrintStatus();" << endl;
3063 if (AliAnalysisManager::GetAnalysisManager()) {
3064 if (AliAnalysisManager::GetAnalysisManager()->GetDebugLevel()>3) {
3065 out << " gEnv->SetValue(\"XNet.Debug\", \"1\");" << endl;
3067 if (TestBit(AliAnalysisGrid::kTest))
3068 out << " AliLog::SetGlobalLogLevel(AliLog::kWarning);" << endl;
3070 out << " AliLog::SetGlobalLogLevel(AliLog::kError);" << endl;
3073 if (IsUsingTags()) {
3074 out << " TChain *chain = CreateChainFromTags(\"wn.xml\", anatype);" << endl << endl;
3076 out << " TChain *chain = CreateChain(\"wn.xml\", anatype);" << endl << endl;
3078 out << " mgr->StartAnalysis(\"localfile\", chain);" << endl;
3079 out << " timer.Stop();" << endl;
3080 out << " timer.Print();" << endl;
3081 out << "}" << endl << endl;
3082 if (IsUsingTags()) {
3083 out << "TChain* CreateChainFromTags(const char *xmlfile, const char *type=\"ESD\")" << endl;
3085 out << "// Create a chain using tags from the xml file." << endl;
3086 out << " TAlienCollection* coll = TAlienCollection::Open(xmlfile);" << endl;
3087 out << " if (!coll) {" << endl;
3088 out << " ::Error(\"CreateChainFromTags\", \"Cannot create an AliEn collection from %s\", xmlfile);" << endl;
3089 out << " return NULL;" << endl;
3090 out << " }" << endl;
3091 out << " TGridResult* tagResult = coll->GetGridResult(\"\",kFALSE,kFALSE);" << endl;
3092 out << " AliTagAnalysis *tagAna = new AliTagAnalysis(type);" << endl;
3093 out << " tagAna->ChainGridTags(tagResult);" << endl << endl;
3094 out << " AliRunTagCuts *runCuts = new AliRunTagCuts();" << endl;
3095 out << " AliLHCTagCuts *lhcCuts = new AliLHCTagCuts();" << endl;
3096 out << " AliDetectorTagCuts *detCuts = new AliDetectorTagCuts();" << endl;
3097 out << " AliEventTagCuts *evCuts = new AliEventTagCuts();" << endl;
3098 out << " // Check if the cuts configuration file was provided" << endl;
3099 out << " if (!gSystem->AccessPathName(\"ConfigureCuts.C\")) {" << endl;
3100 out << " gROOT->LoadMacro(\"ConfigureCuts.C\");" << endl;
3101 out << " ConfigureCuts(runCuts, lhcCuts, detCuts, evCuts);" << endl;
3102 out << " }" << endl;
3103 if (fFriendChainName=="") {
3104 out << " TChain *chain = tagAna->QueryTags(runCuts, lhcCuts, detCuts, evCuts);" << endl;
3106 out << " TString tmpColl=\"tmpCollection.xml\";" << endl;
3107 out << " tagAna->CreateXMLCollection(tmpColl.Data(),runCuts, lhcCuts, detCuts, evCuts);" << endl;
3108 out << " TChain *chain = CreateChain(tmpColl.Data(),type);" << endl;
3110 out << " if (!chain || !chain->GetNtrees()) return NULL;" << endl;
3111 out << " chain->ls();" << endl;
3112 out << " return chain;" << endl;
3113 out << "}" << endl << endl;
3114 if (gSystem->AccessPathName("ConfigureCuts.C")) {
3115 TString msg = "\n##### You may want to provide a macro ConfigureCuts.C with a method:\n";
3116 msg += " void ConfigureCuts(AliRunTagCuts *runCuts,\n";
3117 msg += " AliLHCTagCuts *lhcCuts,\n";
3118 msg += " AliDetectorTagCuts *detCuts,\n";
3119 msg += " AliEventTagCuts *evCuts)";
3120 Info("WriteAnalysisMacro", "%s", msg.Data());
3123 if (!IsUsingTags() || fFriendChainName!="") {
3124 out <<"//________________________________________________________________________________" << endl;
3125 out << "TChain* CreateChain(const char *xmlfile, const char *type=\"ESD\")" << endl;
3127 out << "// Create a chain using url's from xml file" << endl;
3128 out << " TString filename;" << endl;
3129 out << " Int_t run = 0;" << endl;
3130 out << " TString treename = type;" << endl;
3131 out << " treename.ToLower();" << endl;
3132 out << " treename += \"Tree\";" << endl;
3133 out << " printf(\"***************************************\\n\");" << endl;
3134 out << " printf(\" Getting chain of trees %s\\n\", treename.Data());" << endl;
3135 out << " printf(\"***************************************\\n\");" << endl;
3136 out << " TAlienCollection *coll = TAlienCollection::Open(xmlfile);" << endl;
3137 out << " if (!coll) {" << endl;
3138 out << " ::Error(\"CreateChain\", \"Cannot create an AliEn collection from %s\", xmlfile);" << endl;
3139 out << " return NULL;" << endl;
3140 out << " }" << endl;
3141 out << " AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();" << endl;
3142 out << " TChain *chain = new TChain(treename);" << endl;
3143 if(fFriendChainName!="") {
3144 out << " TChain *chainFriend = new TChain(treename);" << endl;
3146 out << " coll->Reset();" << endl;
3147 out << " while (coll->Next()) {" << endl;
3148 out << " filename = coll->GetTURL("");" << endl;
3149 out << " if (mgr) {" << endl;
3150 out << " Int_t nrun = AliAnalysisManager::GetRunFromAlienPath(filename);" << endl;
3151 out << " if (nrun && nrun != run) {" << endl;
3152 out << " printf(\"### Run number detected from chain: %d\\n\", nrun);" << endl;
3153 out << " mgr->SetRunFromPath(nrun);" << endl;
3154 out << " run = nrun;" << endl;
3155 out << " }" << endl;
3156 out << " }" << endl;
3157 out << " chain->Add(filename);" << endl;
3158 if(fFriendChainName!="") {
3159 out << " TString fileFriend=coll->GetTURL(\"\");" << endl;
3160 out << " fileFriend.ReplaceAll(\"AliAOD.root\",\""<<fFriendChainName.Data()<<"\");" << endl;
3161 out << " fileFriend.ReplaceAll(\"AliAODs.root\",\""<<fFriendChainName.Data()<<"\");" << endl;
3162 out << " chainFriend->Add(fileFriend.Data());" << endl;
3164 out << " }" << endl;
3165 out << " if (!chain->GetNtrees()) {" << endl;
3166 out << " ::Error(\"CreateChain\", \"No tree found from collection %s\", xmlfile);" << endl;
3167 out << " return NULL;" << endl;
3168 out << " }" << endl;
3169 if(fFriendChainName!="") {
3170 out << " chain->AddFriend(chainFriend);" << endl;
3172 out << " return chain;" << endl;
3173 out << "}" << endl << endl;
3175 if (hasANALYSISalice) {
3176 out <<"//________________________________________________________________________________" << endl;
3177 out << "Bool_t SetupPar(const char *package) {" << endl;
3178 out << "// Compile the package and set it up." << endl;
3179 out << " TString pkgdir = package;" << endl;
3180 out << " pkgdir.ReplaceAll(\".par\",\"\");" << endl;
3181 out << " gSystem->Exec(Form(\"tar xvzf %s.par\", pkgdir.Data()));" << endl;
3182 out << " TString cdir = gSystem->WorkingDirectory();" << endl;
3183 out << " gSystem->ChangeDirectory(pkgdir);" << endl;
3184 out << " // Check for BUILD.sh and execute" << endl;
3185 out << " if (!gSystem->AccessPathName(\"PROOF-INF/BUILD.sh\")) {" << endl;
3186 out << " printf(\"*******************************\\n\");" << endl;
3187 out << " printf(\"*** Building PAR archive ***\\n\");" << endl;
3188 out << " printf(\"*******************************\\n\");" << endl;
3189 out << " if (gSystem->Exec(\"PROOF-INF/BUILD.sh\")) {" << endl;
3190 out << " ::Error(\"SetupPar\", \"Cannot build par archive %s\", pkgdir.Data());" << endl;
3191 out << " gSystem->ChangeDirectory(cdir);" << endl;
3192 out << " return kFALSE;" << endl;
3193 out << " }" << endl;
3194 out << " } else {" << endl;
3195 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/BUILD.sh for package %s\", pkgdir.Data());" << endl;
3196 out << " gSystem->ChangeDirectory(cdir);" << endl;
3197 out << " return kFALSE;" << endl;
3198 out << " }" << endl;
3199 out << " // Check for SETUP.C and execute" << endl;
3200 out << " if (!gSystem->AccessPathName(\"PROOF-INF/SETUP.C\")) {" << endl;
3201 out << " printf(\"*******************************\\n\");" << endl;
3202 out << " printf(\"*** Setup PAR archive ***\\n\");" << endl;
3203 out << " printf(\"*******************************\\n\");" << endl;
3204 out << " gROOT->Macro(\"PROOF-INF/SETUP.C\");" << endl;
3205 out << " } else {" << endl;
3206 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/SETUP.C for package %s\", pkgdir.Data());" << endl;
3207 out << " gSystem->ChangeDirectory(cdir);" << endl;
3208 out << " return kFALSE;" << endl;
3209 out << " }" << endl;
3210 out << " // Restore original workdir" << endl;
3211 out << " gSystem->ChangeDirectory(cdir);" << endl;
3212 out << " return kTRUE;" << endl;
3215 Info("WriteAnalysisMacro", "\n##### Analysis macro to run on worker nodes <%s> written",fAnalysisMacro.Data());
3217 Bool_t copy = kTRUE;
3218 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3221 TString workdir = gGrid->GetHomeDirectory();
3222 workdir += fGridWorkingDir;
3223 if (FileExists(fAnalysisMacro)) gGrid->Rm(fAnalysisMacro);
3224 if (IsUsingTags() && !gSystem->AccessPathName("ConfigureCuts.C")) {
3225 if (FileExists("ConfigureCuts.C")) gGrid->Rm("ConfigureCuts.C");
3226 Info("WriteAnalysisMacro", "\n##### Copying cuts configuration macro: <ConfigureCuts.C> to your alien workspace");
3227 TFile::Cp("file:ConfigureCuts.C", Form("alien://%s/ConfigureCuts.C", workdir.Data()));
3229 Info("WriteAnalysisMacro", "\n##### Copying analysis macro: <%s> to your alien workspace", fAnalysisMacro.Data());
3230 TFile::Cp(Form("file:%s",fAnalysisMacro.Data()), Form("alien://%s/%s", workdir.Data(), fAnalysisMacro.Data()));
3234 //______________________________________________________________________________
3235 void AliAnalysisAlien::WriteMergingMacro()
3237 // Write a macro to merge the outputs per master job.
3238 if (!fMergeViaJDL) return;
3239 if (!fOutputFiles.Length()) {
3240 Error("WriteMergingMacro", "No output file names defined. Are you running the right AliAnalysisAlien configuration ?");
3243 TString mergingMacro = fExecutable;
3244 mergingMacro.ReplaceAll(".sh","_merge.C");
3245 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("/%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
3246 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3248 out.open(mergingMacro.Data(), ios::out);
3250 Error("WriteMergingMacro", "could not open file %s for writing", fAnalysisMacro.Data());
3253 Bool_t hasSTEERBase = kFALSE;
3254 Bool_t hasESD = kFALSE;
3255 Bool_t hasAOD = kFALSE;
3256 Bool_t hasANALYSIS = kFALSE;
3257 Bool_t hasANALYSISalice = kFALSE;
3258 Bool_t hasCORRFW = kFALSE;
3259 TString func = mergingMacro;
3261 func.ReplaceAll(".C", "");
3262 out << "void " << func.Data() << "(const char *dir, Int_t stage=0, Int_t ichunk=0)" << endl;
3264 out << "// Automatically generated merging macro executed in grid subjobs" << endl << endl;
3265 out << " TStopwatch timer;" << endl;
3266 out << " timer.Start();" << endl << endl;
3267 if (!fExecutableCommand.Contains("aliroot")) {
3268 out << "// load base root libraries" << endl;
3269 out << " gSystem->Load(\"libTree\");" << endl;
3270 out << " gSystem->Load(\"libGeom\");" << endl;
3271 out << " gSystem->Load(\"libVMC\");" << endl;
3272 out << " gSystem->Load(\"libPhysics\");" << endl << endl;
3273 out << " gSystem->Load(\"libMinuit\");" << endl << endl;
3275 if (fAdditionalRootLibs.Length()) {
3276 // in principle libtree /lib geom libvmc etc. can go into this list, too
3277 out << "// Add aditional libraries" << endl;
3278 TObjArray *list = fAdditionalRootLibs.Tokenize(" ");
3281 while((str=(TObjString*)next())) {
3282 if (str->GetString().Contains(".so"))
3283 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
3285 if (list) delete list;
3287 out << "// include path" << endl;
3288 if (fIncludePath.Length()) out << " gSystem->AddIncludePath(\"" << fIncludePath.Data() << "\");" << endl;
3289 out << " gSystem->AddIncludePath(\"-I$ALICE_ROOT/include\");" << endl << endl;
3290 out << "// Load analysis framework libraries" << endl;
3292 if (!fExecutableCommand.Contains("aliroot")) {
3293 out << " gSystem->Load(\"libSTEERBase\");" << endl;
3294 out << " gSystem->Load(\"libESD\");" << endl;
3295 out << " gSystem->Load(\"libAOD\");" << endl;
3297 out << " gSystem->Load(\"libANALYSIS\");" << endl;
3298 out << " gSystem->Load(\"libANALYSISalice\");" << endl;
3299 out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
3301 TIter next(fPackages);
3304 TString setupPar = "AliAnalysisAlien::SetupPar";
3305 while ((obj=next())) {
3306 pkgname = obj->GetName();
3307 if (pkgname == "STEERBase" ||
3308 pkgname == "STEERBase.par") hasSTEERBase = kTRUE;
3309 if (pkgname == "ESD" ||
3310 pkgname == "ESD.par") hasESD = kTRUE;
3311 if (pkgname == "AOD" ||
3312 pkgname == "AOD.par") hasAOD = kTRUE;
3313 if (pkgname == "ANALYSIS" ||
3314 pkgname == "ANALYSIS.par") hasANALYSIS = kTRUE;
3315 if (pkgname == "ANALYSISalice" ||
3316 pkgname == "ANALYSISalice.par") hasANALYSISalice = kTRUE;
3317 if (pkgname == "CORRFW" ||
3318 pkgname == "CORRFW.par") hasCORRFW = kTRUE;
3320 if (hasANALYSISalice) setupPar = "SetupPar";
3321 if (!hasSTEERBase) out << " gSystem->Load(\"libSTEERBase\");" << endl;
3322 else out << " if (!" << setupPar << "(\"STEERBase\")) return;" << endl;
3323 if (!hasESD) out << " gSystem->Load(\"libESD\");" << endl;
3324 else out << " if (!" << setupPar << "(\"ESD\")) return;" << endl;
3325 if (!hasAOD) out << " gSystem->Load(\"libAOD\");" << endl;
3326 else out << " if (!" << setupPar << "(\"AOD\")) return;" << endl;
3327 if (!hasANALYSIS) out << " gSystem->Load(\"libANALYSIS\");" << endl;
3328 else out << " if (!" << setupPar << "(\"ANALYSIS\")) return;" << endl;
3329 if (!hasANALYSISalice) out << " gSystem->Load(\"libANALYSISalice\");" << endl;
3330 else out << " if (!" << setupPar << "(\"ANALYSISalice\")) return;" << endl;
3331 if (!hasCORRFW) out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
3332 else out << " if (!" << setupPar << "(\"CORRFW\")) return;" << endl << endl;
3333 out << "// Compile other par packages" << endl;
3335 while ((obj=next())) {
3336 pkgname = obj->GetName();
3337 if (pkgname == "STEERBase" ||
3338 pkgname == "STEERBase.par" ||
3340 pkgname == "ESD.par" ||
3342 pkgname == "AOD.par" ||
3343 pkgname == "ANALYSIS" ||
3344 pkgname == "ANALYSIS.par" ||
3345 pkgname == "ANALYSISalice" ||
3346 pkgname == "ANALYSISalice.par" ||
3347 pkgname == "CORRFW" ||
3348 pkgname == "CORRFW.par") continue;
3349 out << " if (!" << setupPar << "(\"" << obj->GetName() << "\")) return;" << endl;
3352 if (fAdditionalLibs.Length()) {
3353 out << "// Add aditional AliRoot libraries" << endl;
3354 TObjArray *list = fAdditionalLibs.Tokenize(" ");
3357 while((str=(TObjString*)next())) {
3358 if (str->GetString().Contains(".so"))
3359 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
3361 if (list) delete list;
3364 out << "// Analysis source to be compiled at runtime (if any)" << endl;
3365 if (fAnalysisSource.Length()) {
3366 TObjArray *list = fAnalysisSource.Tokenize(" ");
3369 while((str=(TObjString*)next())) {
3370 out << " gROOT->ProcessLine(\".L " << str->GetString().Data() << "+g\");" << endl;
3372 if (list) delete list;
3376 if (fFastReadOption) {
3377 Warning("WriteMergingMacro", "!!! You requested FastRead option. Using xrootd flags to reduce timeouts in the grid merging jobs. Note that this may skip some files that could be accessed !!!");
3378 out << "// fast xrootd reading enabled" << endl;
3379 out << " printf(\"!!! You requested FastRead option. Using xrootd flags to reduce timeouts. Note that this may skip some files that could be accessed !!!\");" << endl;
3380 out << " gEnv->SetValue(\"XNet.ConnectTimeout\",10);" << endl;
3381 out << " gEnv->SetValue(\"XNet.RequestTimeout\",10);" << endl;
3382 out << " gEnv->SetValue(\"XNet.MaxRedirectCount\",2);" << endl;
3383 out << " gEnv->SetValue(\"XNet.ReconnectTimeout\",10);" << endl;
3384 out << " gEnv->SetValue(\"XNet.FirstConnectMaxCnt\",1);" << endl << endl;
3386 // Change temp directory to current one
3387 out << "// Set temporary merging directory to current one" << endl;
3388 out << " gSystem->Setenv(\"TMPDIR\", gSystem->pwd());" << endl << endl;
3389 out << "// Connect to AliEn" << endl;
3390 out << " if (!TGrid::Connect(\"alien://\")) return;" << endl;
3391 out << " Bool_t laststage = kFALSE;" << endl;
3392 out << " TString outputDir = dir;" << endl;
3393 out << " TString outputFiles = \"" << GetListOfFiles("out") << "\";" << endl;
3394 out << " TString mergeExcludes = \"" << fMergeExcludes << "\";" << endl;
3395 out << " TObjArray *list = outputFiles.Tokenize(\",\");" << endl;
3396 out << " TIter *iter = new TIter(list);" << endl;
3397 out << " TObjString *str;" << endl;
3398 out << " TString outputFile;" << endl;
3399 out << " Bool_t merged = kTRUE;" << endl;
3400 out << " while((str=(TObjString*)iter->Next())) {" << endl;
3401 out << " outputFile = str->GetString();" << endl;
3402 out << " if (outputFile.Contains(\"*\")) continue;" << endl;
3403 out << " Int_t index = outputFile.Index(\"@\");" << endl;
3404 out << " if (index > 0) outputFile.Remove(index);" << endl;
3405 out << " // Skip already merged outputs" << endl;
3406 out << " if (!gSystem->AccessPathName(outputFile)) {" << endl;
3407 out << " printf(\"Output file <%s> found. Not merging again.\",outputFile.Data());" << endl;
3408 out << " continue;" << endl;
3409 out << " }" << endl;
3410 out << " if (mergeExcludes.Contains(outputFile.Data())) continue;" << endl;
3411 out << " merged = AliAnalysisAlien::MergeOutput(outputFile, outputDir, " << fMaxMergeFiles << ", stage, ichunk);" << endl;
3412 out << " if (!merged) {" << endl;
3413 out << " printf(\"ERROR: Cannot merge %s\\n\", outputFile.Data());" << endl;
3414 out << " return;" << endl;
3415 out << " }" << endl;
3416 out << " // Check if this was the last stage. If yes, run terminate for the tasks." << endl;
3417 out << " if (!gSystem->AccessPathName(outputFile)) laststage = kTRUE;" << endl;
3418 out << " }" << endl;
3419 out << " // all outputs merged, validate" << endl;
3420 out << " ofstream out;" << endl;
3421 out << " out.open(\"outputs_valid\", ios::out);" << endl;
3422 out << " out.close();" << endl;
3423 out << " // read the analysis manager from file" << endl;
3424 TString analysisFile = fExecutable;
3425 analysisFile.ReplaceAll(".sh", ".root");
3426 out << " if (!laststage) return;" << endl;
3427 out << " TFile *file = TFile::Open(\"" << analysisFile << "\");" << endl;
3428 out << " if (!file) return;" << endl;
3429 out << " TIter nextkey(file->GetListOfKeys());" << endl;
3430 out << " AliAnalysisManager *mgr = 0;" << endl;
3431 out << " TKey *key;" << endl;
3432 out << " while ((key=(TKey*)nextkey())) {" << endl;
3433 out << " if (!strcmp(key->GetClassName(), \"AliAnalysisManager\"))" << endl;
3434 out << " mgr = (AliAnalysisManager*)file->Get(key->GetName());" << endl;
3435 out << " };" << endl;
3436 out << " if (!mgr) {" << endl;
3437 out << " ::Error(\"" << func.Data() << "\", \"No analysis manager found in file" << analysisFile <<"\");" << endl;
3438 out << " return;" << endl;
3439 out << " }" << endl << endl;
3440 out << " mgr->SetRunFromPath(mgr->GetRunFromAlienPath(dir));" << endl;
3441 out << " mgr->SetSkipTerminate(kFALSE);" << endl;
3442 out << " mgr->PrintStatus();" << endl;
3443 if (AliAnalysisManager::GetAnalysisManager()) {
3444 if (AliAnalysisManager::GetAnalysisManager()->GetDebugLevel()>3) {
3445 out << " gEnv->SetValue(\"XNet.Debug\", \"1\");" << endl;
3447 if (TestBit(AliAnalysisGrid::kTest))
3448 out << " AliLog::SetGlobalLogLevel(AliLog::kWarning);" << endl;
3450 out << " AliLog::SetGlobalLogLevel(AliLog::kError);" << endl;
3453 out << " TTree *tree = NULL;" << endl;
3454 out << " mgr->StartAnalysis(\"gridterminate\", tree);" << endl;
3455 out << "}" << endl << endl;
3456 if (hasANALYSISalice) {
3457 out <<"//________________________________________________________________________________" << endl;
3458 out << "Bool_t SetupPar(const char *package) {" << endl;
3459 out << "// Compile the package and set it up." << endl;
3460 out << " TString pkgdir = package;" << endl;
3461 out << " pkgdir.ReplaceAll(\".par\",\"\");" << endl;
3462 out << " gSystem->Exec(Form(\"tar xvzf %s.par\", pkgdir.Data()));" << endl;
3463 out << " TString cdir = gSystem->WorkingDirectory();" << endl;
3464 out << " gSystem->ChangeDirectory(pkgdir);" << endl;
3465 out << " // Check for BUILD.sh and execute" << endl;
3466 out << " if (!gSystem->AccessPathName(\"PROOF-INF/BUILD.sh\")) {" << endl;
3467 out << " printf(\"*******************************\\n\");" << endl;
3468 out << " printf(\"*** Building PAR archive ***\\n\");" << endl;
3469 out << " printf(\"*******************************\\n\");" << endl;
3470 out << " if (gSystem->Exec(\"PROOF-INF/BUILD.sh\")) {" << endl;
3471 out << " ::Error(\"SetupPar\", \"Cannot build par archive %s\", pkgdir.Data());" << endl;
3472 out << " gSystem->ChangeDirectory(cdir);" << endl;
3473 out << " return kFALSE;" << endl;
3474 out << " }" << endl;
3475 out << " } else {" << endl;
3476 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/BUILD.sh for package %s\", pkgdir.Data());" << endl;
3477 out << " gSystem->ChangeDirectory(cdir);" << endl;
3478 out << " return kFALSE;" << endl;
3479 out << " }" << endl;
3480 out << " // Check for SETUP.C and execute" << endl;
3481 out << " if (!gSystem->AccessPathName(\"PROOF-INF/SETUP.C\")) {" << endl;
3482 out << " printf(\"*******************************\\n\");" << endl;
3483 out << " printf(\"*** Setup PAR archive ***\\n\");" << endl;
3484 out << " printf(\"*******************************\\n\");" << endl;
3485 out << " gROOT->Macro(\"PROOF-INF/SETUP.C\");" << endl;
3486 out << " } else {" << endl;
3487 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/SETUP.C for package %s\", pkgdir.Data());" << endl;
3488 out << " gSystem->ChangeDirectory(cdir);" << endl;
3489 out << " return kFALSE;" << endl;
3490 out << " }" << endl;
3491 out << " // Restore original workdir" << endl;
3492 out << " gSystem->ChangeDirectory(cdir);" << endl;
3493 out << " return kTRUE;" << endl;
3497 Bool_t copy = kTRUE;
3498 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3501 TString workdir = gGrid->GetHomeDirectory();
3502 workdir += fGridWorkingDir;
3503 if (FileExists(mergingMacro)) gGrid->Rm(mergingMacro);
3504 Info("WriteMergingMacro", "\n##### Copying merging macro: <%s> to your alien workspace", mergingMacro.Data());
3505 TFile::Cp(Form("file:%s",mergingMacro.Data()), Form("alien://%s/%s", workdir.Data(), mergingMacro.Data()));
3509 //______________________________________________________________________________
3510 Bool_t AliAnalysisAlien::SetupPar(const char *package)
3512 // Compile the par file archive pointed by <package>. This must be present in the current directory.
3513 // Note that for loading the compiled library. The current directory should have precedence in
3515 TString pkgdir = package;
3516 pkgdir.ReplaceAll(".par","");
3517 gSystem->Exec(Form("tar xvzf %s.par", pkgdir.Data()));
3518 TString cdir = gSystem->WorkingDirectory();
3519 gSystem->ChangeDirectory(pkgdir);
3520 // Check for BUILD.sh and execute
3521 if (!gSystem->AccessPathName("PROOF-INF/BUILD.sh")) {
3522 printf("**************************************************\n");
3523 printf("*** Building PAR archive %s\n", package);
3524 printf("**************************************************\n");
3525 if (gSystem->Exec("PROOF-INF/BUILD.sh")) {
3526 ::Error("SetupPar", "Cannot build par archive %s", pkgdir.Data());
3527 gSystem->ChangeDirectory(cdir);
3531 ::Error("SetupPar","Cannot access PROOF-INF/BUILD.sh for package %s", pkgdir.Data());
3532 gSystem->ChangeDirectory(cdir);
3535 // Check for SETUP.C and execute
3536 if (!gSystem->AccessPathName("PROOF-INF/SETUP.C")) {
3537 printf("**************************************************\n");
3538 printf("*** Setup PAR archive %s\n", package);
3539 printf("**************************************************\n");
3540 gROOT->Macro("PROOF-INF/SETUP.C");
3541 printf("*** Loaded library: %s\n", gSystem->GetLibraries(pkgdir,"",kFALSE));
3543 ::Error("SetupPar","Cannot access PROOF-INF/SETUP.C for package %s", pkgdir.Data());
3544 gSystem->ChangeDirectory(cdir);
3547 // Restore original workdir
3548 gSystem->ChangeDirectory(cdir);
3552 //______________________________________________________________________________
3553 void AliAnalysisAlien::WriteExecutable()
3555 // Generate the alien executable script.
3556 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3558 out.open(fExecutable.Data(), ios::out);
3560 Error("WriteExecutable", "Bad file name for executable: %s", fExecutable.Data());
3563 out << "#!/bin/bash" << endl;
3564 out << "echo \"=========================================\"" << endl;
3565 out << "echo \"############## PATH : ##############\"" << endl;
3566 out << "echo $PATH" << endl;
3567 out << "echo \"############## LD_LIBRARY_PATH : ##############\"" << endl;
3568 out << "echo $LD_LIBRARY_PATH" << endl;
3569 out << "echo \"############## ROOTSYS : ##############\"" << endl;
3570 out << "echo $ROOTSYS" << endl;
3571 out << "echo \"############## which root : ##############\"" << endl;
3572 out << "which root" << endl;
3573 out << "echo \"############## ALICE_ROOT : ##############\"" << endl;
3574 out << "echo $ALICE_ROOT" << endl;
3575 out << "echo \"############## which aliroot : ##############\"" << endl;
3576 out << "which aliroot" << endl;
3577 out << "echo \"############## system limits : ##############\"" << endl;
3578 out << "ulimit -a" << endl;
3579 out << "echo \"############## memory : ##############\"" << endl;
3580 out << "free -m" << endl;
3581 out << "echo \"=========================================\"" << endl << endl;
3582 // Make sure we can properly compile par files
3583 if (TObject::TestBit(AliAnalysisGrid::kUsePars)) out << "export LD_LIBRARY_PATH=.:$LD_LIBRARY_PATH" << endl;
3584 out << fExecutableCommand << " ";
3585 out << fAnalysisMacro.Data() << " " << fExecutableArgs.Data() << endl << endl;
3586 out << "echo \"======== " << fAnalysisMacro.Data() << " finished with exit code: $? ========\"" << endl;
3587 out << "echo \"############## memory after: ##############\"" << endl;
3588 out << "free -m" << endl;
3590 Bool_t copy = kTRUE;
3591 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3594 TString workdir = gGrid->GetHomeDirectory();
3595 TString bindir = Form("%s/bin", workdir.Data());
3596 if (!DirectoryExists(bindir)) gGrid->Mkdir(bindir,"-p");
3597 workdir += fGridWorkingDir;
3598 TString executable = Form("%s/bin/%s", gGrid->GetHomeDirectory(), fExecutable.Data());
3599 if (FileExists(executable)) gGrid->Rm(executable);
3600 Info("WriteExecutable", "\n##### Copying executable file <%s> to your AliEn bin directory", fExecutable.Data());
3601 TFile::Cp(Form("file:%s",fExecutable.Data()), Form("alien://%s", executable.Data()));
3605 //______________________________________________________________________________
3606 void AliAnalysisAlien::WriteMergeExecutable()
3608 // Generate the alien executable script for the merging job.
3609 if (!fMergeViaJDL) return;
3610 TString mergeExec = fExecutable;
3611 mergeExec.ReplaceAll(".sh", "_merge.sh");
3612 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3614 out.open(mergeExec.Data(), ios::out);
3616 Error("WriteMergingExecutable", "Bad file name for executable: %s", mergeExec.Data());
3619 out << "#!/bin/bash" << endl;
3620 out << "echo \"=========================================\"" << endl;
3621 out << "echo \"############## PATH : ##############\"" << endl;
3622 out << "echo $PATH" << endl;
3623 out << "echo \"############## LD_LIBRARY_PATH : ##############\"" << endl;
3624 out << "echo $LD_LIBRARY_PATH" << endl;
3625 out << "echo \"############## ROOTSYS : ##############\"" << endl;
3626 out << "echo $ROOTSYS" << endl;
3627 out << "echo \"############## which root : ##############\"" << endl;
3628 out << "which root" << endl;
3629 out << "echo \"############## ALICE_ROOT : ##############\"" << endl;
3630 out << "echo $ALICE_ROOT" << endl;
3631 out << "echo \"############## which aliroot : ##############\"" << endl;
3632 out << "which aliroot" << endl;
3633 out << "echo \"############## system limits : ##############\"" << endl;
3634 out << "ulimit -a" << endl;
3635 out << "echo \"############## memory : ##############\"" << endl;
3636 out << "free -m" << endl;
3637 out << "echo \"=========================================\"" << endl << endl;
3638 // Make sure we can properly compile par files
3639 if (TObject::TestBit(AliAnalysisGrid::kUsePars)) out << "export LD_LIBRARY_PATH=.:$LD_LIBRARY_PATH" << endl;
3640 TString mergeMacro = fExecutable;
3641 mergeMacro.ReplaceAll(".sh", "_merge.C");
3642 if (IsOneStageMerging())
3643 out << "export ARG=\"" << mergeMacro << "(\\\"$1\\\")\"" << endl;
3645 out << "export ARG=\"" << mergeMacro << "(\\\"$1\\\",$2,$3)\"" << endl;
3646 out << fExecutableCommand << " " << "$ARG" << endl;
3647 out << "echo \"======== " << mergeMacro.Data() << " finished with exit code: $? ========\"" << endl;
3648 out << "echo \"############## memory after: ##############\"" << endl;
3649 out << "free -m" << endl;
3651 Bool_t copy = kTRUE;
3652 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3655 TString workdir = gGrid->GetHomeDirectory();
3656 TString bindir = Form("%s/bin", workdir.Data());
3657 if (!DirectoryExists(bindir)) gGrid->Mkdir(bindir,"-p");
3658 workdir += fGridWorkingDir;
3659 TString executable = Form("%s/bin/%s", gGrid->GetHomeDirectory(), mergeExec.Data());
3660 if (FileExists(executable)) gGrid->Rm(executable);
3661 Info("WriteMergeExecutable", "\n##### Copying executable file <%s> to your AliEn bin directory", mergeExec.Data());
3662 TFile::Cp(Form("file:%s",mergeExec.Data()), Form("alien://%s", executable.Data()));
3666 //______________________________________________________________________________
3667 void AliAnalysisAlien::WriteProductionFile(const char *filename) const
3669 // Write the production file to be submitted by LPM manager. The format is:
3670 // First line: full_path_to_jdl estimated_no_subjobs_per_master
3671 // Next lines: full_path_to_dataset XXX (XXX is a string)
3672 // To submit, one has to: submit jdl XXX for all lines
3674 out.open(filename, ios::out);
3676 Error("WriteProductionFile", "Bad file name: %s", filename);
3680 if (!fProductionMode && !fGridWorkingDir.BeginsWith("/alice"))
3681 workdir = gGrid->GetHomeDirectory();
3682 workdir += fGridWorkingDir;
3683 Int_t njobspermaster = 1000*fNrunsPerMaster/fSplitMaxInputFileNumber;
3684 TString locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
3685 out << locjdl << " " << njobspermaster << endl;
3686 Int_t nmasterjobs = fInputFiles->GetEntries();
3687 for (Int_t i=0; i<nmasterjobs; i++) {
3688 TString runOutDir = gSystem->BaseName(fInputFiles->At(i)->GetName());
3689 runOutDir.ReplaceAll(".xml", "");
3691 out << Form("%s", fInputFiles->At(i)->GetName()) << " " << runOutDir << endl;
3693 out << Form("%s", fInputFiles->At(i)->GetName()) << " " << Form("%03d", i) << endl;
3696 Info("WriteProductionFile", "\n##### Copying production file <%s> to your work directory", filename);
3697 if (FileExists(filename)) gGrid->Rm(filename);
3698 TFile::Cp(Form("file:%s",filename), Form("alien://%s/%s", workdir.Data(),filename));
3702 //______________________________________________________________________________
3703 void AliAnalysisAlien::WriteValidationScript(Bool_t merge)
3705 // Generate the alien validation script.
3706 // Generate the validation script
3708 if (fValidationScript.IsNull()) {
3709 fValidationScript = fExecutable;
3710 fValidationScript.ReplaceAll(".sh", "_validation.sh");
3712 TString validationScript = fValidationScript;
3713 if (merge) validationScript.ReplaceAll(".sh", "_merge.sh");
3715 Error("WriteValidationScript", "Alien connection required");
3718 if (!fTerminateFiles.IsNull()) {
3719 fTerminateFiles.Strip();
3720 fTerminateFiles.ReplaceAll(" ",",");
3722 TString outStream = "";
3723 if (!TestBit(AliAnalysisGrid::kTest)) outStream = " >> stdout";
3724 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3726 out.open(validationScript, ios::out);
3727 out << "#!/bin/bash" << endl;
3728 out << "##################################################" << endl;
3729 out << "validateout=`dirname $0`" << endl;
3730 out << "validatetime=`date`" << endl;
3731 out << "validated=\"0\";" << endl;
3732 out << "error=0" << endl;
3733 out << "if [ -z $validateout ]" << endl;
3734 out << "then" << endl;
3735 out << " validateout=\".\"" << endl;
3736 out << "fi" << endl << endl;
3737 out << "cd $validateout;" << endl;
3738 out << "validateworkdir=`pwd`;" << endl << endl;
3739 out << "echo \"*******************************************************\"" << outStream << endl;
3740 out << "echo \"* Automatically generated validation script *\"" << outStream << endl;
3742 out << "echo \"* Time: $validatetime \"" << outStream << endl;
3743 out << "echo \"* Dir: $validateout\"" << outStream << endl;
3744 out << "echo \"* Workdir: $validateworkdir\"" << outStream << endl;
3745 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl;
3746 out << "ls -la ./" << outStream << endl;
3747 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl << endl;
3748 out << "##################################################" << endl;
3751 out << "if [ ! -f stderr ] ; then" << endl;
3752 out << " error=1" << endl;
3753 out << " echo \"* ########## Job not validated - no stderr ###\" " << outStream << endl;
3754 out << " echo \"Error = $error\" " << outStream << endl;
3755 out << "fi" << endl;
3757 out << "parArch=`grep -Ei \"Cannot Build the PAR Archive\" stderr`" << endl;
3758 out << "segViol=`grep -Ei \"Segmentation violation\" stderr`" << endl;
3759 out << "segFault=`grep -Ei \"Segmentation fault\" stderr`" << endl;
3760 out << "glibcErr=`grep -Ei \"*** glibc detected ***\" stderr`" << endl;
3763 out << "if [ \"$parArch\" != \"\" ] ; then" << endl;
3764 out << " error=1" << endl;
3765 out << " echo \"* ########## Job not validated - PAR archive not built ###\" " << outStream << endl;
3766 out << " echo \"$parArch\" " << outStream << endl;
3767 out << " echo \"Error = $error\" " << outStream << endl;
3768 out << "fi" << endl;
3770 out << "if [ \"$segViol\" != \"\" ] ; then" << endl;
3771 out << " error=1" << endl;
3772 out << " echo \"* ########## Job not validated - Segment. violation ###\" " << outStream << endl;
3773 out << " echo \"$segViol\" " << outStream << endl;
3774 out << " echo \"Error = $error\" " << outStream << endl;
3775 out << "fi" << endl;
3777 out << "if [ \"$segFault\" != \"\" ] ; then" << endl;
3778 out << " error=1" << endl;
3779 out << " echo \"* ########## Job not validated - Segment. fault ###\" " << outStream << endl;
3780 out << " echo \"$segFault\" " << outStream << endl;
3781 out << " echo \"Error = $error\" " << outStream << endl;
3782 out << "fi" << endl;
3784 out << "if [ \"$glibcErr\" != \"\" ] ; then" << endl;
3785 out << " error=1" << endl;
3786 out << " echo \"* ########## Job not validated - *** glibc detected *** ###\" " << outStream << endl;
3787 out << " echo \"$glibcErr\" " << outStream << endl;
3788 out << " echo \"Error = $error\" " << outStream << endl;
3789 out << "fi" << endl;
3791 // Part dedicated to the specific analyses running into the train
3793 TString outputFiles = fOutputFiles;
3794 if (merge && !fTerminateFiles.IsNull()) {
3796 outputFiles += fTerminateFiles;
3798 TObjArray *arr = outputFiles.Tokenize(",");
3801 while (!merge && (os=(TObjString*)next1())) {
3802 // No need to validate outputs produced by merging since the merging macro does this
3803 outputFile = os->GetString();
3804 Int_t index = outputFile.Index("@");
3805 if (index > 0) outputFile.Remove(index);
3806 if (fTerminateFiles.Contains(outputFile)) continue;
3807 if (outputFile.Contains("*")) continue;
3808 out << "if ! [ -f " << outputFile.Data() << " ] ; then" << endl;
3809 out << " error=1" << endl;
3810 out << " echo \"Output file " << outputFile << " not found. Job FAILED !\"" << outStream << endl;
3811 out << " echo \"Output file " << outputFile << " not found. Job FAILED !\" >> stderr" << endl;
3812 out << "fi" << endl;
3815 out << "if ! [ -f outputs_valid ] ; then" << endl;
3816 out << " error=1" << endl;
3817 out << " echo \"Output files were not validated by the analysis manager\" >> stdout" << endl;
3818 out << " echo \"Output files were not validated by the analysis manager\" >> stderr" << endl;
3819 out << "fi" << endl;
3821 out << "if [ $error = 0 ] ; then" << endl;
3822 out << " echo \"* ---------------- Job Validated ------------------*\"" << outStream << endl;
3823 if (!IsKeepLogs()) {
3824 out << " echo \"* === Logs std* will be deleted === \"" << endl;
3826 out << " rm -f std*" << endl;
3828 out << "fi" << endl;
3830 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl;
3831 out << "echo \"*******************************************************\"" << outStream << endl;
3832 out << "cd -" << endl;
3833 out << "exit $error" << endl;
3835 Bool_t copy = kTRUE;
3836 if (fProductionMode || TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3839 TString workdir = gGrid->GetHomeDirectory();
3840 workdir += fGridWorkingDir;
3841 Info("WriteValidationScript", "\n##### Copying validation script <%s> to your AliEn working space", validationScript.Data());
3842 if (FileExists(validationScript)) gGrid->Rm(validationScript);
3843 TFile::Cp(Form("file:%s",validationScript.Data()), Form("alien://%s/%s", workdir.Data(),validationScript.Data()));