1 /**************************************************************************
2 * Copyright(c) 1998-2007, ALICE Experiment at CERN, All rights reserved. *
4 * Author: The ALICE Off-line Project. *
5 * Contributors are mentioned in the code where appropriate. *
7 * Permission to use, copy, modify and distribute this software and its *
8 * documentation strictly for non-commercial purposes is hereby granted *
9 * without fee, provided that the above copyright notice appears in all *
10 * copies and that both the copyright notice and this permission notice *
11 * appear in the supporting documentation. The authors make no claims *
12 * about the suitability of this software for any purpose. It is *
13 * provided "as is" without express or implied warranty. *
14 **************************************************************************/
16 // Author: Mihaela Gheata, 01/09/2008
18 //==============================================================================
19 // AliAnalysisAlien - AliEn utility class. Provides interface for creating
20 // a personalized JDL, finding and creating a dataset.
21 //==============================================================================
23 #include "Riostream.h"
30 #include "TFileCollection.h"
32 #include "TObjString.h"
33 #include "TObjArray.h"
35 #include "TGridResult.h"
36 #include "TGridCollection.h"
38 #include "TGridJobStatusList.h"
39 #include "TGridJobStatus.h"
40 #include "TFileMerger.h"
41 #include "AliAnalysisManager.h"
42 #include "AliVEventHandler.h"
43 #include "AliAnalysisDataContainer.h"
44 #include "AliAnalysisAlien.h"
46 ClassImp(AliAnalysisAlien)
48 //______________________________________________________________________________
49 AliAnalysisAlien::AliAnalysisAlien()
55 fSplitMaxInputFileNumber(0),
57 fMasterResubmitThreshold(0),
69 fNproofWorkersPerSlave(0),
78 fAdditionalRootLibs(),
105 fRootVersionForProof(),
114 //______________________________________________________________________________
115 AliAnalysisAlien::AliAnalysisAlien(const char *name)
116 :AliAnalysisGrid(name),
121 fSplitMaxInputFileNumber(0),
123 fMasterResubmitThreshold(0),
135 fNproofWorkersPerSlave(0),
139 fExecutableCommand(),
144 fAdditionalRootLibs(),
171 fRootVersionForProof(),
180 //______________________________________________________________________________
181 AliAnalysisAlien::AliAnalysisAlien(const AliAnalysisAlien& other)
182 :AliAnalysisGrid(other),
185 fPrice(other.fPrice),
187 fSplitMaxInputFileNumber(other.fSplitMaxInputFileNumber),
188 fMaxInitFailed(other.fMaxInitFailed),
189 fMasterResubmitThreshold(other.fMasterResubmitThreshold),
190 fNtestFiles(other.fNtestFiles),
191 fNrunsPerMaster(other.fNrunsPerMaster),
192 fMaxMergeFiles(other.fMaxMergeFiles),
193 fNsubmitted(other.fNsubmitted),
194 fProductionMode(other.fProductionMode),
195 fOutputToRunNo(other.fOutputToRunNo),
196 fMergeViaJDL(other.fMergeViaJDL),
197 fFastReadOption(other.fFastReadOption),
198 fOverwriteMode(other.fOverwriteMode),
199 fNreplicas(other.fNreplicas),
200 fNproofWorkers(other.fNproofWorkers),
201 fNproofWorkersPerSlave(other.fNproofWorkersPerSlave),
202 fProofReset(other.fProofReset),
203 fRunNumbers(other.fRunNumbers),
204 fExecutable(other.fExecutable),
205 fExecutableCommand(other.fExecutableCommand),
206 fArguments(other.fArguments),
207 fExecutableArgs(other.fExecutableArgs),
208 fAnalysisMacro(other.fAnalysisMacro),
209 fAnalysisSource(other.fAnalysisSource),
210 fAdditionalRootLibs(other.fAdditionalRootLibs),
211 fAdditionalLibs(other.fAdditionalLibs),
212 fSplitMode(other.fSplitMode),
213 fAPIVersion(other.fAPIVersion),
214 fROOTVersion(other.fROOTVersion),
215 fAliROOTVersion(other.fAliROOTVersion),
216 fExternalPackages(other.fExternalPackages),
218 fGridWorkingDir(other.fGridWorkingDir),
219 fGridDataDir(other.fGridDataDir),
220 fDataPattern(other.fDataPattern),
221 fGridOutputDir(other.fGridOutputDir),
222 fOutputArchive(other.fOutputArchive),
223 fOutputFiles(other.fOutputFiles),
224 fInputFormat(other.fInputFormat),
225 fDatasetName(other.fDatasetName),
226 fJDLName(other.fJDLName),
227 fMergeExcludes(other.fMergeExcludes),
228 fIncludePath(other.fIncludePath),
229 fCloseSE(other.fCloseSE),
230 fFriendChainName(other.fFriendChainName),
231 fJobTag(other.fJobTag),
232 fOutputSingle(other.fOutputSingle),
233 fRunPrefix(other.fRunPrefix),
234 fProofCluster(other.fProofCluster),
235 fProofDataSet(other.fProofDataSet),
236 fFileForTestMode(other.fFileForTestMode),
237 fRootVersionForProof(other.fRootVersionForProof),
238 fAliRootMode(other.fAliRootMode),
243 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
244 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
245 fRunRange[0] = other.fRunRange[0];
246 fRunRange[1] = other.fRunRange[1];
247 if (other.fInputFiles) {
248 fInputFiles = new TObjArray();
249 TIter next(other.fInputFiles);
251 while ((obj=next())) fInputFiles->Add(new TObjString(obj->GetName()));
252 fInputFiles->SetOwner();
254 if (other.fPackages) {
255 fPackages = new TObjArray();
256 TIter next(other.fPackages);
258 while ((obj=next())) fPackages->Add(new TObjString(obj->GetName()));
259 fPackages->SetOwner();
263 //______________________________________________________________________________
264 AliAnalysisAlien::~AliAnalysisAlien()
267 if (fGridJDL) delete fGridJDL;
268 if (fMergingJDL) delete fMergingJDL;
269 if (fInputFiles) delete fInputFiles;
270 if (fPackages) delete fPackages;
273 //______________________________________________________________________________
274 AliAnalysisAlien &AliAnalysisAlien::operator=(const AliAnalysisAlien& other)
277 if (this != &other) {
278 AliAnalysisGrid::operator=(other);
279 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
280 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
281 fPrice = other.fPrice;
283 fSplitMaxInputFileNumber = other.fSplitMaxInputFileNumber;
284 fMaxInitFailed = other.fMaxInitFailed;
285 fMasterResubmitThreshold = other.fMasterResubmitThreshold;
286 fNtestFiles = other.fNtestFiles;
287 fNrunsPerMaster = other.fNrunsPerMaster;
288 fMaxMergeFiles = other.fMaxMergeFiles;
289 fNsubmitted = other.fNsubmitted;
290 fProductionMode = other.fProductionMode;
291 fOutputToRunNo = other.fOutputToRunNo;
292 fMergeViaJDL = other.fMergeViaJDL;
293 fFastReadOption = other.fFastReadOption;
294 fOverwriteMode = other.fOverwriteMode;
295 fNreplicas = other.fNreplicas;
296 fNproofWorkers = other.fNproofWorkers;
297 fNproofWorkersPerSlave = other.fNproofWorkersPerSlave;
298 fProofReset = other.fProofReset;
299 fRunNumbers = other.fRunNumbers;
300 fExecutable = other.fExecutable;
301 fExecutableCommand = other.fExecutableCommand;
302 fArguments = other.fArguments;
303 fExecutableArgs = other.fExecutableArgs;
304 fAnalysisMacro = other.fAnalysisMacro;
305 fAnalysisSource = other.fAnalysisSource;
306 fAdditionalRootLibs = other.fAdditionalRootLibs;
307 fAdditionalLibs = other.fAdditionalLibs;
308 fSplitMode = other.fSplitMode;
309 fAPIVersion = other.fAPIVersion;
310 fROOTVersion = other.fROOTVersion;
311 fAliROOTVersion = other.fAliROOTVersion;
312 fExternalPackages = other.fExternalPackages;
314 fGridWorkingDir = other.fGridWorkingDir;
315 fGridDataDir = other.fGridDataDir;
316 fDataPattern = other.fDataPattern;
317 fGridOutputDir = other.fGridOutputDir;
318 fOutputArchive = other.fOutputArchive;
319 fOutputFiles = other.fOutputFiles;
320 fInputFormat = other.fInputFormat;
321 fDatasetName = other.fDatasetName;
322 fJDLName = other.fJDLName;
323 fMergeExcludes = other.fMergeExcludes;
324 fIncludePath = other.fIncludePath;
325 fCloseSE = other.fCloseSE;
326 fFriendChainName = other.fFriendChainName;
327 fJobTag = other.fJobTag;
328 fOutputSingle = other.fOutputSingle;
329 fRunPrefix = other.fRunPrefix;
330 fProofCluster = other.fProofCluster;
331 fProofDataSet = other.fProofDataSet;
332 fFileForTestMode = other.fFileForTestMode;
333 fRootVersionForProof = other.fRootVersionForProof;
334 fAliRootMode = other.fAliRootMode;
335 if (other.fInputFiles) {
336 fInputFiles = new TObjArray();
337 TIter next(other.fInputFiles);
339 while ((obj=next())) fInputFiles->Add(new TObjString(obj->GetName()));
340 fInputFiles->SetOwner();
342 if (other.fPackages) {
343 fPackages = new TObjArray();
344 TIter next(other.fPackages);
346 while ((obj=next())) fPackages->Add(new TObjString(obj->GetName()));
347 fPackages->SetOwner();
353 //______________________________________________________________________________
354 void AliAnalysisAlien::AddIncludePath(const char *path)
356 // Add include path in the remote analysis macro.
358 if (p.Contains("-I")) fIncludePath += Form("%s ", path);
359 else fIncludePath += Form("-I%s ", path);
362 //______________________________________________________________________________
363 void AliAnalysisAlien::AddRunNumber(Int_t run)
365 // Add a run number to the list of runs to be processed.
366 if (fRunNumbers.Length()) fRunNumbers += " ";
367 fRunNumbers += Form("%s%d", fRunPrefix.Data(), run);
370 //______________________________________________________________________________
371 void AliAnalysisAlien::AddRunNumber(const char* run)
373 // Add a run number to the list of runs to be processed.
374 if (fRunNumbers.Length()) fRunNumbers += " ";
378 //______________________________________________________________________________
379 void AliAnalysisAlien::AddDataFile(const char *lfn)
381 // Adds a data file to the input to be analysed. The file should be a valid LFN
382 // or point to an existing file in the alien workdir.
383 if (!fInputFiles) fInputFiles = new TObjArray();
384 fInputFiles->Add(new TObjString(lfn));
387 //______________________________________________________________________________
388 void AliAnalysisAlien::AddExternalPackage(const char *package)
390 // Adds external packages w.r.t to the default ones (root,aliroot and gapi)
391 if (fExternalPackages) fExternalPackages += " ";
392 fExternalPackages += package;
395 //______________________________________________________________________________
396 Bool_t AliAnalysisAlien::Connect()
398 // Try to connect to AliEn. User needs a valid token and /tmp/gclient_env_$UID sourced.
399 if (gGrid && gGrid->IsConnected()) return kTRUE;
401 Info("Connect", "Trying to connect to AliEn ...");
402 TGrid::Connect("alien://");
404 if (!gGrid || !gGrid->IsConnected()) {
405 Error("Connect", "Did not managed to connect to AliEn. Make sure you have a valid token.");
408 fUser = gGrid->GetUser();
409 Info("Connect", "\n##### Connected to AliEn as user %s. Setting analysis user to <%s>", fUser.Data(), fUser.Data());
413 //______________________________________________________________________________
414 void AliAnalysisAlien::CdWork()
416 // Check validity of alien workspace. Create directory if possible.
418 Error("CdWork", "Alien connection required");
421 TString homedir = gGrid->GetHomeDirectory();
422 TString workdir = homedir + fGridWorkingDir;
423 if (DirectoryExists(workdir)) {
427 // Work directory not existing - create it
429 if (gGrid->Mkdir(workdir, "-p")) {
430 gGrid->Cd(fGridWorkingDir);
431 Info("CreateJDL", "\n##### Created alien working directory %s", fGridWorkingDir.Data());
433 Warning("CreateJDL", "Working directory %s cannot be created.\n Using %s instead.",
434 workdir.Data(), homedir.Data());
435 fGridWorkingDir = "";
439 //______________________________________________________________________________
440 Bool_t AliAnalysisAlien::CheckFileCopy(const char *alienpath)
442 // Check if file copying is possible.
444 Error("CheckFileCopy", "Not connected to AliEn. File copying cannot be tested.");
447 Info("CheckFileCopy", "Checking possibility to copy files to your AliEn home directory... \
448 \n +++ NOTE: You can disable this via: plugin->SetCheckCopy(kFALSE);");
449 // Check if alien_CLOSE_SE is defined
450 TString closeSE = gSystem->Getenv("alien_CLOSE_SE");
451 if (!closeSE.IsNull()) {
452 Info("CheckFileCopy", "Your current close storage is pointing to: \
453 \n alien_CLOSE_SE = \"%s\"", closeSE.Data());
455 Warning("CheckFileCopy", "Your current close storage is empty ! Depending on your location, file copying may fail.");
457 // Check if grid directory exists.
458 if (!DirectoryExists(alienpath)) {
459 Error("CheckFileCopy", "Alien path %s does not seem to exist", alienpath);
462 TFile f("plugin_test_copy", "RECREATE");
463 // User may not have write permissions to current directory
465 Error("CheckFileCopy", "Cannot create local test file. Do you have write access to current directory: <%s> ?",
466 gSystem->WorkingDirectory());
470 if (FileExists(Form("alien://%s/%s",alienpath, f.GetName()))) gGrid->Rm(Form("alien://%s/%s",alienpath, f.GetName()));
471 if (!TFile::Cp(f.GetName(), Form("alien://%s/%s",alienpath, f.GetName()))) {
472 Error("CheckFileCopy", "Cannot copy files to Alien destination: <%s> This may be temporary, or: \
473 \n# 1. Make sure you have write permissions there. If this is the case: \
474 \n# 2. Check the storage availability at: http://alimonitor.cern.ch/stats?page=SE/table \
475 \n# Do: export alien_CLOSE_SE=\"working_disk_SE\" \
476 \n# To make this permanent put in in your .bashrc (in .alienshrc is not enough) \
477 \n# Redo token: rm /tmp/x509up_u$UID then: alien-token-init <username>", alienpath);
478 gSystem->Unlink(f.GetName());
481 gSystem->Unlink(f.GetName());
482 gGrid->Rm(Form("%s%s",alienpath,f.GetName()));
483 Info("CheckFileCopy", "### ...SUCCESS ###");
487 //______________________________________________________________________________
488 Bool_t AliAnalysisAlien::CheckInputData()
490 // Check validity of input data. If necessary, create xml files.
491 if (!fInputFiles && !fRunNumbers.Length() && !fRunRange[0]) {
492 if (!fGridDataDir.Length()) {
493 Error("CkeckInputData", "AliEn path to base data directory must be set.\n = Use: SetGridDataDir()");
496 Info("CheckInputData", "Analysis will make a single xml for base data directory %s",fGridDataDir.Data());
497 if (fDataPattern.Contains("tag") && TestBit(AliAnalysisGrid::kTest))
498 TObject::SetBit(AliAnalysisGrid::kUseTags, kTRUE); // ADDED (fix problem in determining the tag usage in test mode)
501 // Process declared files
502 Bool_t isCollection = kFALSE;
503 Bool_t isXml = kFALSE;
504 Bool_t useTags = kFALSE;
505 Bool_t checked = kFALSE;
508 TString workdir = gGrid->GetHomeDirectory();
509 workdir += fGridWorkingDir;
512 TIter next(fInputFiles);
513 while ((objstr=(TObjString*)next())) {
516 file += objstr->GetString();
517 // Store full lfn path
518 if (FileExists(file)) objstr->SetString(file);
520 file = objstr->GetName();
521 if (!FileExists(objstr->GetName())) {
522 Error("CheckInputData", "Data file %s not found or not in your working dir: %s",
523 objstr->GetName(), workdir.Data());
527 Bool_t iscoll, isxml, usetags;
528 CheckDataType(file, iscoll, isxml, usetags);
531 isCollection = iscoll;
534 TObject::SetBit(AliAnalysisGrid::kUseTags, useTags);
536 if ((iscoll != isCollection) || (isxml != isXml) || (usetags != useTags)) {
537 Error("CheckInputData", "Some conflict was found in the types of inputs");
543 // Process requested run numbers
544 if (!fRunNumbers.Length() && !fRunRange[0]) return kTRUE;
545 // Check validity of alien data directory
546 if (!fGridDataDir.Length()) {
547 Error("CkeckInputData", "AliEn path to base data directory must be set.\n = Use: SetGridDataDir()");
550 if (!DirectoryExists(fGridDataDir)) {
551 Error("CheckInputData", "Data directory %s not existing.", fGridDataDir.Data());
555 Error("CheckInputData", "You are using raw AliEn collections as input. Cannot process run numbers.");
559 if (checked && !isXml) {
560 Error("CheckInputData", "Cannot mix processing of full runs with non-xml files");
563 // Check validity of run number(s)
567 TString schunk, schunk2;
571 useTags = fDataPattern.Contains("tag");
572 TObject::SetBit(AliAnalysisGrid::kUseTags, useTags);
574 if (useTags != fDataPattern.Contains("tag")) {
575 Error("CheckInputData", "Cannot mix input files using/not using tags");
578 if (fRunNumbers.Length()) {
579 Info("CheckDataType", "Using supplied run numbers (run ranges are ignored)");
580 arr = fRunNumbers.Tokenize(" ");
582 while ((os=(TObjString*)next())) {
583 path = Form("%s/%s ", fGridDataDir.Data(), os->GetString().Data());
584 if (!DirectoryExists(path)) {
585 Warning("CheckInputData", "Run number %s not found in path: <%s>", os->GetString().Data(), path.Data());
588 path = Form("%s/%s.xml", workdir.Data(),os->GetString().Data());
589 TString msg = "\n##### file: ";
591 msg += " type: xml_collection;";
592 if (useTags) msg += " using_tags: Yes";
593 else msg += " using_tags: No";
594 Info("CheckDataType", "%s", msg.Data());
595 if (fNrunsPerMaster<2) {
596 AddDataFile(Form("%s.xml", os->GetString().Data()));
599 if (((nruns-1)%fNrunsPerMaster) == 0) {
600 schunk = os->GetString();
602 if ((nruns%fNrunsPerMaster)!=0 && os!=arr->Last()) continue;
603 schunk += Form("_%s.xml", os->GetString().Data());
609 Info("CheckDataType", "Using run range [%d, %d]", fRunRange[0], fRunRange[1]);
610 for (Int_t irun=fRunRange[0]; irun<=fRunRange[1]; irun++) {
611 path = Form("%s/%s%d ", fGridDataDir.Data(), fRunPrefix.Data(), irun);
612 if (!DirectoryExists(path)) {
613 // Warning("CheckInputData", "Run number %d not found in path: <%s>", irun, path.Data());
616 path = Form("%s/%s%d.xml", workdir.Data(),fRunPrefix.Data(),irun);
617 TString msg = "\n##### file: ";
619 msg += " type: xml_collection;";
620 if (useTags) msg += " using_tags: Yes";
621 else msg += " using_tags: No";
622 Info("CheckDataType", "%s", msg.Data());
623 if (fNrunsPerMaster<2) {
624 AddDataFile(Form("%s%d.xml",fRunPrefix.Data(),irun));
627 if (((nruns-1)%fNrunsPerMaster) == 0) {
628 schunk = Form("%s%d", fRunPrefix.Data(),irun);
630 schunk2 = Form("_%s%d.xml", fRunPrefix.Data(), irun);
631 if ((nruns%fNrunsPerMaster)!=0 && irun != fRunRange[1]) continue;
644 //______________________________________________________________________________
645 Bool_t AliAnalysisAlien::CreateDataset(const char *pattern)
647 // Create dataset for the grid data directory + run number.
648 if (TestBit(AliAnalysisGrid::kOffline)) return kTRUE;
650 Error("CreateDataset", "Cannot create dataset with no grid connection");
656 TString workdir = gGrid->GetHomeDirectory();
657 workdir += fGridWorkingDir;
659 // Compose the 'find' command arguments
661 TString options = "-x collection ";
662 if (TestBit(AliAnalysisGrid::kTest)) options += Form("-l %d ", fNtestFiles);
663 TString conditions = "";
668 TString schunk, schunk2;
669 TGridCollection *cbase=0, *cadd=0;
670 if (!fRunNumbers.Length() && !fRunRange[0]) {
671 if (fInputFiles && fInputFiles->GetEntries()) return kTRUE;
672 // Make a single data collection from data directory.
674 if (!DirectoryExists(path)) {
675 Error("CreateDataset", "Path to data directory %s not valid",fGridDataDir.Data());
679 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
680 else file = Form("%s.xml", gSystem->BaseName(path));
681 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
687 command += conditions;
688 printf("command: %s\n", command.Data());
689 TGridResult *res = gGrid->Command(command);
691 // Write standard output to file
692 gROOT->ProcessLine(Form("gGrid->Stdout(); > %s", file.Data()));
693 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
694 Bool_t nullFile = kFALSE;
696 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
698 nullFile = (gSystem->Exec(Form("grep /event %s 2>/dev/null > /dev/null",file.Data()))==0)?kFALSE:kTRUE;
700 Error("CreateDataset","Dataset %s produced by the previous find command is empty !", file.Data());
705 Bool_t fileExists = FileExists(file);
706 if (!TestBit(AliAnalysisGrid::kTest) && (!fileExists || fOverwriteMode)) {
707 // Copy xml file to alien space
708 if (fileExists) gGrid->Rm(file);
709 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
710 if (!FileExists(file)) {
711 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
714 // Update list of files to be processed.
716 AddDataFile(Form("%s/%s", workdir.Data(), file.Data()));
720 Bool_t nullResult = kTRUE;
721 if (fRunNumbers.Length()) {
722 TObjArray *arr = fRunNumbers.Tokenize(" ");
725 while ((os=(TObjString*)next())) {
726 path = Form("%s/%s ", fGridDataDir.Data(), os->GetString().Data());
727 if (!DirectoryExists(path)) continue;
729 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
730 else file = Form("%s.xml", os->GetString().Data());
731 // If local collection file does not exist, create it via 'find' command.
732 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
737 command += conditions;
738 TGridResult *res = gGrid->Command(command);
740 // Write standard output to file
741 gROOT->ProcessLine(Form("gGrid->Stdout(); > %s", file.Data()));
742 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
743 Bool_t nullFile = kFALSE;
745 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
747 nullFile = (gSystem->Exec(Form("grep /event %s 2>/dev/null > /dev/null",file.Data()))==0)?kFALSE:kTRUE;
749 Warning("CreateDataset","Dataset %s produced by: <%s> is empty !", file.Data(), command.Data());
750 fRunNumbers.ReplaceAll(os->GetString().Data(), "");
756 if (TestBit(AliAnalysisGrid::kTest)) break;
757 // Check if there is one run per master job.
758 if (fNrunsPerMaster<2) {
759 if (FileExists(file)) {
760 if (fOverwriteMode) gGrid->Rm(file);
762 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
766 // Copy xml file to alien space
767 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
768 if (!FileExists(file)) {
769 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
775 if (((nruns-1)%fNrunsPerMaster) == 0) {
776 schunk = os->GetString();
777 cbase = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
779 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
780 printf(" Merging collection <%s> into masterjob input...\n", file.Data());
784 if ((nruns%fNrunsPerMaster)!=0 && os!=arr->Last()) {
787 schunk += Form("_%s.xml", os->GetString().Data());
788 if (FileExists(schunk)) {
789 if (fOverwriteMode) gGrid->Rm(file);
791 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", schunk.Data());
795 printf("Exporting merged collection <%s> and copying to AliEn\n", schunk.Data());
796 cbase->ExportXML(Form("file://%s", schunk.Data()),kFALSE,kFALSE, schunk, "Merged runs");
797 TFile::Cp(Form("file:%s",schunk.Data()), Form("alien://%s/%s",workdir.Data(), schunk.Data()));
798 if (!FileExists(schunk)) {
799 Error("CreateDataset", "Copy command did NOT succeed for %s", schunk.Data());
807 Error("CreateDataset", "No valid dataset corresponding to the query!");
811 // Process a full run range.
812 for (Int_t irun=fRunRange[0]; irun<=fRunRange[1]; irun++) {
813 path = Form("%s/%s%d ", fGridDataDir.Data(), fRunPrefix.Data(), irun);
814 if (!DirectoryExists(path)) continue;
816 if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
817 else file = Form("%s%d.xml", fRunPrefix.Data(), irun);
818 if (FileExists(file) && fNrunsPerMaster<2 && !TestBit(AliAnalysisGrid::kTest)) {
819 if (fOverwriteMode) gGrid->Rm(file);
821 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
825 // If local collection file does not exist, create it via 'find' command.
826 if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
831 command += conditions;
832 TGridResult *res = gGrid->Command(command);
834 // Write standard output to file
835 gROOT->ProcessLine(Form("gGrid->Stdout(); > %s", file.Data()));
836 Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
837 Bool_t nullFile = kFALSE;
839 Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
841 nullFile = (gSystem->Exec(Form("grep /event %s 2>/dev/null > /dev/null",file.Data()))==0)?kFALSE:kTRUE;
843 Warning("CreateDataset","Dataset %s produced by: <%s> is empty !", file.Data(), command.Data());
849 if (TestBit(AliAnalysisGrid::kTest)) break;
850 // Check if there is one run per master job.
851 if (fNrunsPerMaster<2) {
852 if (FileExists(file)) {
853 if (fOverwriteMode) gGrid->Rm(file);
855 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", file.Data());
859 // Copy xml file to alien space
860 TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
861 if (!FileExists(file)) {
862 Error("CreateDataset", "Command %s did NOT succeed", command.Data());
867 // Check if the collection for the chunk exist locally.
868 Int_t nchunk = (nruns-1)/fNrunsPerMaster;
869 if (FileExists(fInputFiles->At(nchunk)->GetName())) {
870 if (fOverwriteMode) gGrid->Rm(fInputFiles->At(nchunk)->GetName());
873 printf(" Merging collection <%s> into %d runs chunk...\n",file.Data(),fNrunsPerMaster);
874 if (((nruns-1)%fNrunsPerMaster) == 0) {
875 schunk = Form("%s%d", fRunPrefix.Data(), irun);
876 cbase = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
878 cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
882 schunk2 = Form("%s_%s%d.xml", schunk.Data(), fRunPrefix.Data(), irun);
883 if ((nruns%fNrunsPerMaster)!=0 && irun!=fRunRange[1] && schunk2 != fInputFiles->Last()->GetName()) {
887 if (FileExists(schunk)) {
888 if (fOverwriteMode) gGrid->Rm(schunk);
890 Info("CreateDataset", "\n##### Dataset %s exist. Skipping creation...", schunk.Data());
894 printf("Exporting merged collection <%s> and copying to AliEn.\n", schunk.Data());
895 cbase->ExportXML(Form("file://%s", schunk.Data()),kFALSE,kFALSE, schunk, "Merged runs");
896 if (FileExists(schunk)) {
897 if (fOverwriteMode) gGrid->Rm(schunk);
899 Info("CreateDataset", "\n##### Dataset %s exist. Skipping copy...", schunk.Data());
903 TFile::Cp(Form("file:%s",schunk.Data()), Form("alien://%s/%s",workdir.Data(), schunk.Data()));
904 if (!FileExists(schunk)) {
905 Error("CreateDataset", "Copy command did NOT succeed for %s", schunk.Data());
911 Error("CreateDataset", "No valid dataset corresponding to the query!");
918 //______________________________________________________________________________
919 Bool_t AliAnalysisAlien::CreateJDL()
921 // Generate a JDL file according to current settings. The name of the file is
922 // specified by fJDLName.
923 Bool_t error = kFALSE;
926 if (TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
927 Bool_t generate = kTRUE;
928 if (TestBit(AliAnalysisGrid::kTest) || TestBit(AliAnalysisGrid::kSubmit)) generate = kFALSE;
930 Error("CreateJDL", "Alien connection required");
933 // Check validity of alien workspace
935 TString workdir = gGrid->GetHomeDirectory();
936 workdir += fGridWorkingDir;
940 Error("CreateJDL()", "Define some input files for your analysis.");
943 // Compose list of input files
944 // Check if output files were defined
945 if (!fOutputFiles.Length()) {
946 Error("CreateJDL", "You must define at least one output file");
949 // Check if an output directory was defined and valid
950 if (!fGridOutputDir.Length()) {
951 Error("CreateJDL", "You must define AliEn output directory");
954 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s", workdir.Data(), fGridOutputDir.Data());
955 if (!DirectoryExists(fGridOutputDir)) {
956 if (gGrid->Mkdir(fGridOutputDir,"-p")) {
957 Info("CreateJDL", "\n##### Created alien output directory %s", fGridOutputDir.Data());
959 Error("CreateJDL", "Could not create alien output directory %s", fGridOutputDir.Data());
965 // Exit if any error up to now
966 if (error) return kFALSE;
968 if (!fUser.IsNull()) {
969 fGridJDL->SetValue("User", Form("\"%s\"", fUser.Data()));
970 fMergingJDL->SetValue("User", Form("\"%s\"", fUser.Data()));
972 fGridJDL->SetExecutable(fExecutable, "This is the startup script");
973 TString mergeExec = fExecutable;
974 mergeExec.ReplaceAll(".sh", "_merge.sh");
975 fMergingJDL->SetExecutable(mergeExec, "This is the startup script");
976 mergeExec.ReplaceAll(".sh", ".C");
977 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),mergeExec.Data()), "List of input files to be uploaded to workers");
978 if (!fArguments.IsNull())
979 fGridJDL->SetArguments(fArguments, "Arguments for the executable command");
980 fMergingJDL->SetArguments("$1 $2 $3");
981 fGridJDL->SetValue("TTL", Form("\"%d\"",fTTL));
982 fGridJDL->SetDescription("TTL", Form("Time after which the job is killed (%d min.)", fTTL/60));
983 fMergingJDL->SetValue("TTL", Form("\"%d\"",fTTL));
984 fMergingJDL->SetDescription("TTL", Form("Time after which the job is killed (%d min.)", fTTL/60));
986 if (fMaxInitFailed > 0) {
987 fGridJDL->SetValue("MaxInitFailed", Form("\"%d\"",fMaxInitFailed));
988 fGridJDL->SetDescription("MaxInitFailed", "Maximum number of first failing jobs to abort the master job");
990 if (fSplitMaxInputFileNumber > 0) {
991 fGridJDL->SetValue("SplitMaxInputFileNumber", Form("\"%d\"", fSplitMaxInputFileNumber));
992 fGridJDL->SetDescription("SplitMaxInputFileNumber", "Maximum number of input files to be processed per subjob");
994 if (fSplitMode.Length()) {
995 fGridJDL->SetValue("Split", Form("\"%s\"", fSplitMode.Data()));
996 fGridJDL->SetDescription("Split", "We split per SE or file");
998 if (!fAliROOTVersion.IsNull()) {
999 fGridJDL->AddToPackages("AliRoot", fAliROOTVersion,"VO_ALICE", "List of requested packages");
1000 fMergingJDL->AddToPackages("AliRoot", fAliROOTVersion, "VO_ALICE", "List of requested packages");
1002 if (!fROOTVersion.IsNull()) {
1003 fGridJDL->AddToPackages("ROOT", fROOTVersion);
1004 fMergingJDL->AddToPackages("ROOT", fROOTVersion);
1006 if (!fAPIVersion.IsNull()) {
1007 fGridJDL->AddToPackages("APISCONFIG", fAPIVersion);
1008 fMergingJDL->AddToPackages("APISCONFIG", fAPIVersion);
1010 if (!fExternalPackages.IsNull()) {
1011 arr = fExternalPackages.Tokenize(" ");
1013 while ((os=(TObjString*)next())) {
1014 TString pkgname = os->GetString();
1015 Int_t index = pkgname.Index("::");
1016 TString pkgversion = pkgname(index+2, pkgname.Length());
1017 pkgname.Remove(index);
1018 fGridJDL->AddToPackages(pkgname, pkgversion);
1019 fMergingJDL->AddToPackages(pkgname, pkgversion);
1023 fGridJDL->SetInputDataListFormat(fInputFormat, "Format of input data");
1024 fGridJDL->SetInputDataList("wn.xml", "Collection name to be processed on each worker node");
1025 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), fAnalysisMacro.Data()), "List of input files to be uploaded to workers");
1026 TString analysisFile = fExecutable;
1027 analysisFile.ReplaceAll(".sh", ".root");
1028 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),analysisFile.Data()));
1029 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),analysisFile.Data()));
1030 if (IsUsingTags() && !gSystem->AccessPathName("ConfigureCuts.C"))
1031 fGridJDL->AddToInputSandbox(Form("LF:%s/ConfigureCuts.C", workdir.Data()));
1032 if (fAdditionalLibs.Length()) {
1033 arr = fAdditionalLibs.Tokenize(" ");
1035 while ((os=(TObjString*)next())) {
1036 if (os->GetString().Contains(".so")) continue;
1037 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), os->GetString().Data()));
1038 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), os->GetString().Data()));
1043 TIter next(fPackages);
1045 while ((obj=next())) {
1046 fGridJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), obj->GetName()));
1047 fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), obj->GetName()));
1050 if (fOutputArchive.Length()) {
1051 arr = fOutputArchive.Tokenize(" ");
1053 Bool_t first = kTRUE;
1054 const char *comment = "Files to be archived";
1055 const char *comment1 = comment;
1056 while ((os=(TObjString*)next())) {
1057 if (!first) comment = NULL;
1058 if (!os->GetString().Contains("@") && fCloseSE.Length())
1059 fGridJDL->AddToOutputArchive(Form("%s@%s",os->GetString().Data(), fCloseSE.Data()), comment);
1061 fGridJDL->AddToOutputArchive(os->GetString(), comment);
1065 TString outputArchive = fOutputArchive;
1066 if (!fMergeExcludes.IsNull()) {
1067 arr = fMergeExcludes.Tokenize(" ");
1069 while ((os=(TObjString*)next1())) {
1070 outputArchive.ReplaceAll(Form("%s,",os->GetString().Data()),"");
1071 outputArchive.ReplaceAll(os->GetString(),"");
1075 arr = outputArchive.Tokenize(" ");
1079 while ((os=(TObjString*)next2())) {
1080 if (!first) comment = NULL;
1081 TString currentfile = os->GetString();
1082 currentfile.ReplaceAll(".root", "*.root");
1083 currentfile.ReplaceAll(".zip", "-Stage$2_$3.zip");
1084 if (!currentfile.Contains("@") && fCloseSE.Length())
1085 fMergingJDL->AddToOutputArchive(Form("%s@%s",currentfile.Data(), fCloseSE.Data()), comment);
1087 fMergingJDL->AddToOutputArchive(currentfile, comment);
1092 arr = fOutputFiles.Tokenize(",");
1094 Bool_t first = kTRUE;
1095 const char *comment = "Files to be archived";
1096 const char *comment1 = comment;
1097 while ((os=(TObjString*)next())) {
1098 // Ignore ouputs in jdl that are also in outputarchive
1099 TString sout = os->GetString();
1100 if (sout.Index("@")>0) sout.Remove(sout.Index("@"));
1101 if (fOutputArchive.Contains(sout)) continue;
1102 if (!first) comment = NULL;
1103 if (!os->GetString().Contains("@") && fCloseSE.Length())
1104 fGridJDL->AddToOutputSandbox(Form("%s@%s",os->GetString().Data(), fCloseSE.Data()), comment);
1106 fGridJDL->AddToOutputSandbox(os->GetString(), comment);
1110 if (fOutputFiles.Length()) {
1111 TString outputFiles = fOutputFiles;
1112 if (!fMergeExcludes.IsNull()) {
1113 arr = fMergeExcludes.Tokenize(" ");
1115 while ((os=(TObjString*)next1())) {
1116 outputFiles.ReplaceAll(Form("%s,",os->GetString().Data()),"");
1117 outputFiles.ReplaceAll(os->GetString(),"");
1121 arr = outputFiles.Tokenize(" ");
1125 while ((os=(TObjString*)next2())) {
1126 // Ignore ouputs in jdl that are also in outputarchive
1127 TString sout = os->GetString();
1128 if (sout.Index("@")>0) sout.Remove(sout.Index("@"));
1129 if (fOutputArchive.Contains(sout)) continue;
1130 if (!first) comment = NULL;
1131 if (!os->GetString().Contains("@") && fCloseSE.Length())
1132 fMergingJDL->AddToOutputSandbox(Form("%s@%s",os->GetString().Data(), fCloseSE.Data()), comment);
1134 fMergingJDL->AddToOutputSandbox(os->GetString(), comment);
1139 fGridJDL->SetPrice((UInt_t)fPrice, "AliEn price for this job");
1140 fMergingJDL->SetPrice((UInt_t)fPrice, "AliEn price for this job");
1141 TString validationScript = fExecutable;
1142 validationScript.ReplaceAll(".sh", "_validation.sh");
1143 fGridJDL->SetValidationCommand(Form("%s/%s", workdir.Data(),validationScript.Data()), "Validation script to be run for each subjob");
1144 validationScript = fExecutable;
1145 validationScript.ReplaceAll(".sh", "_mergevalidation.sh");
1146 fMergingJDL->SetValidationCommand(Form("%s/%s", workdir.Data(),validationScript.Data()), "Validation script to be run for each subjob");
1147 if (fMasterResubmitThreshold) {
1148 fGridJDL->SetValue("MasterResubmitThreshold", Form("\"%d%%\"", fMasterResubmitThreshold));
1149 fGridJDL->SetDescription("MasterResubmitThreshold", "Resubmit failed jobs until DONE rate reaches this percentage");
1151 // Write a jdl with 2 input parameters: collection name and output dir name.
1154 // Copy jdl to grid workspace
1156 // Check if an output directory was defined and valid
1157 if (!fGridOutputDir.Length()) {
1158 Error("CreateJDL", "You must define AliEn output directory");
1161 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("%s/%s", workdir.Data(), fGridOutputDir.Data());
1162 if (!fProductionMode && !DirectoryExists(fGridOutputDir)) {
1163 if (gGrid->Mkdir(fGridOutputDir,"-p")) {
1164 Info("CreateJDL", "\n##### Created alien output directory %s", fGridOutputDir.Data());
1166 Error("CreateJDL", "Could not create alien output directory %s", fGridOutputDir.Data());
1172 if (TestBit(AliAnalysisGrid::kSubmit)) {
1173 TString mergeJDLName = fExecutable;
1174 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
1175 TString locjdl = Form("%s/%s", fGridOutputDir.Data(),fJDLName.Data());
1176 TString locjdl1 = Form("%s/%s", fGridOutputDir.Data(),mergeJDLName.Data());
1177 if (fProductionMode) {
1178 locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
1179 locjdl1 = Form("%s/%s", workdir.Data(),mergeJDLName.Data());
1181 if (FileExists(locjdl)) gGrid->Rm(locjdl);
1182 if (FileExists(locjdl1)) gGrid->Rm(locjdl1);
1183 Info("CreateJDL", "\n##### Copying JDL file <%s> to your AliEn output directory", fJDLName.Data());
1184 TFile::Cp(Form("file:%s",fJDLName.Data()), Form("alien://%s", locjdl.Data()));
1186 Info("CreateJDL", "\n##### Copying merging JDL file <%s> to your AliEn output directory", mergeJDLName.Data());
1187 TFile::Cp(Form("file:%s",mergeJDLName.Data()), Form("alien://%s", locjdl1.Data()));
1190 if (fAdditionalLibs.Length()) {
1191 arr = fAdditionalLibs.Tokenize(" ");
1194 while ((os=(TObjString*)next())) {
1195 if (os->GetString().Contains(".so")) continue;
1196 Info("CreateJDL", "\n##### Copying dependency: <%s> to your alien workspace", os->GetString().Data());
1197 if (FileExists(os->GetString())) gGrid->Rm(os->GetString());
1198 TFile::Cp(Form("file:%s",os->GetString().Data()), Form("alien://%s/%s", workdir.Data(), os->GetString().Data()));
1203 TIter next(fPackages);
1205 while ((obj=next())) {
1206 if (FileExists(obj->GetName())) gGrid->Rm(obj->GetName());
1207 Info("CreateJDL", "\n##### Copying dependency: <%s> to your alien workspace", obj->GetName());
1208 TFile::Cp(Form("file:%s",obj->GetName()), Form("alien://%s/%s", workdir.Data(), obj->GetName()));
1215 //______________________________________________________________________________
1216 Bool_t AliAnalysisAlien::WriteJDL(Bool_t copy)
1218 // Writes one or more JDL's corresponding to findex. If findex is negative,
1219 // all run numbers are considered in one go (jdl). For non-negative indices
1220 // they correspond to the indices in the array fInputFiles.
1221 if (!fInputFiles) return kFALSE;
1223 TString workdir = gGrid->GetHomeDirectory();
1224 workdir += fGridWorkingDir;
1226 if (!fRunNumbers.Length() && !fRunRange[0]) {
1227 // One jdl with no parameters in case input data is specified by name.
1228 TIter next(fInputFiles);
1229 while ((os=(TObjString*)next()))
1230 fGridJDL->AddToInputDataCollection(Form("LF:%s,nodownload", os->GetString().Data()), "Input xml collections");
1231 if (!fOutputSingle.IsNull())
1232 fGridJDL->SetOutputDirectory(Form("#alienfulldir#/../%s",fOutputSingle.Data()), "Output directory");
1234 fGridJDL->SetOutputDirectory(Form("%s/#alien_counter_03i#", fGridOutputDir.Data()), "Output directory");
1235 fMergingJDL->SetOutputDirectory(fGridOutputDir);
1238 // One jdl to be submitted with 2 input parameters: data collection name and output dir prefix
1239 fGridJDL->AddToInputDataCollection(Form("LF:%s/$1,nodownload", workdir.Data()), "Input xml collections");
1240 if (!fOutputSingle.IsNull()) {
1241 if (!fOutputToRunNo) fGridJDL->SetOutputDirectory(Form("#alienfulldir#/%s",fOutputSingle.Data()), "Output directory");
1242 else fGridJDL->SetOutputDirectory(Form("%s/$2",fGridOutputDir.Data()), "Output directory");
1244 fGridJDL->SetOutputDirectory(Form("%s/$2/#alien_counter_03i#", fGridOutputDir.Data()), "Output directory");
1245 fMergingJDL->SetOutputDirectory("$1", "Output directory");
1250 // Generate the JDL as a string
1251 TString sjdl = fGridJDL->Generate();
1252 TString sjdl1 = fMergingJDL->Generate();
1254 sjdl.ReplaceAll("\"LF:", "\n \"LF:");
1255 sjdl.ReplaceAll("(member", "\n (member");
1256 sjdl.ReplaceAll("\",\"VO_", "\",\n \"VO_");
1257 sjdl.ReplaceAll("{", "{\n ");
1258 sjdl.ReplaceAll("};", "\n};");
1259 sjdl.ReplaceAll("{\n \n", "{\n");
1260 sjdl.ReplaceAll("\n\n", "\n");
1261 sjdl.ReplaceAll("OutputDirectory", "OutputDir");
1262 sjdl1.ReplaceAll("\"LF:", "\n \"LF:");
1263 sjdl1.ReplaceAll("(member", "\n (member");
1264 sjdl1.ReplaceAll("\",\"VO_", "\",\n \"VO_");
1265 sjdl1.ReplaceAll("{", "{\n ");
1266 sjdl1.ReplaceAll("};", "\n};");
1267 sjdl1.ReplaceAll("{\n \n", "{\n");
1268 sjdl1.ReplaceAll("\n\n", "\n");
1269 sjdl1.ReplaceAll("OutputDirectory", "OutputDir");
1270 sjdl += "JDLVariables = \n{\n \"Packages\",\n \"OutputDir\"\n};\n";
1271 sjdl.Prepend(Form("Jobtag = {\n \"comment:%s\"\n};\n", fJobTag.Data()));
1272 index = sjdl.Index("JDLVariables");
1273 if (index >= 0) sjdl.Insert(index, "\n# JDL variables\n");
1274 sjdl += "Workdirectorysize = {\"5000MB\"};";
1275 sjdl1 += "JDLVariables = \n{\n \"Packages\",\n \"OutputDir\"\n};\n";
1276 sjdl1.Prepend(Form("Jobtag = {\n \"comment:%s_Merging\"\n};\n", fJobTag.Data()));
1277 sjdl1.Prepend("# Generated merging jdl\n# $1 = full alien path to output directory to be merged\n# $2 = merging stage\n# $3 = merged chunk\n");
1278 index = sjdl1.Index("JDLVariables");
1279 if (index >= 0) sjdl1.Insert(index, "\n# JDL variables\n");
1280 sjdl1 += "Workdirectorysize = {\"5000MB\"};";
1281 // Write jdl to file
1283 out.open(fJDLName.Data(), ios::out);
1285 Error("CreateJDL", "Bad file name: %s", fJDLName.Data());
1288 out << sjdl << endl;
1289 TString mergeJDLName = fExecutable;
1290 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
1293 out1.open(mergeJDLName.Data(), ios::out);
1295 Error("CreateJDL", "Bad file name: %s", mergeJDLName.Data());
1298 out1 << sjdl1 << endl;
1301 // Copy jdl to grid workspace
1303 Info("CreateJDL", "\n##### You may want to review jdl:%s and analysis macro:%s before running in <submit> mode", fJDLName.Data(), fAnalysisMacro.Data());
1305 TString locjdl = Form("%s/%s", fGridOutputDir.Data(),fJDLName.Data());
1306 TString locjdl1 = Form("%s/%s", fGridOutputDir.Data(),mergeJDLName.Data());
1307 if (fProductionMode) {
1308 locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
1309 locjdl1 = Form("%s/%s", workdir.Data(),mergeJDLName.Data());
1311 if (FileExists(locjdl)) gGrid->Rm(locjdl);
1312 if (FileExists(locjdl1)) gGrid->Rm(locjdl1);
1313 Info("CreateJDL", "\n##### Copying JDL file <%s> to your AliEn output directory", fJDLName.Data());
1314 TFile::Cp(Form("file:%s",fJDLName.Data()), Form("alien://%s", locjdl.Data()));
1316 Info("CreateJDL", "\n##### Copying merging JDL file <%s> to your AliEn output directory", mergeJDLName.Data());
1317 TFile::Cp(Form("file:%s",mergeJDLName.Data()), Form("alien://%s", locjdl1.Data()));
1323 //______________________________________________________________________________
1324 Bool_t AliAnalysisAlien::FileExists(const char *lfn)
1326 // Returns true if file exists.
1327 if (!gGrid) return kFALSE;
1328 TGridResult *res = gGrid->Ls(lfn);
1329 if (!res) return kFALSE;
1330 TMap *map = dynamic_cast<TMap*>(res->At(0));
1335 TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("name"));
1336 if (!objs || !objs->GetString().Length()) {
1344 //______________________________________________________________________________
1345 Bool_t AliAnalysisAlien::DirectoryExists(const char *dirname)
1347 // Returns true if directory exists. Can be also a path.
1348 if (!gGrid) return kFALSE;
1349 // Check if dirname is a path
1350 TString dirstripped = dirname;
1351 dirstripped = dirstripped.Strip();
1352 dirstripped = dirstripped.Strip(TString::kTrailing, '/');
1353 TString dir = gSystem->BaseName(dirstripped);
1355 TString path = gSystem->DirName(dirstripped);
1356 TGridResult *res = gGrid->Ls(path, "-F");
1357 if (!res) return kFALSE;
1361 while ((map=dynamic_cast<TMap*>(next()))) {
1362 obj = map->GetValue("name");
1364 if (dir == obj->GetName()) {
1373 //______________________________________________________________________________
1374 void AliAnalysisAlien::CheckDataType(const char *lfn, Bool_t &isCollection, Bool_t &isXml, Bool_t &useTags)
1376 // Check input data type.
1377 isCollection = kFALSE;
1381 Error("CheckDataType", "No connection to grid");
1384 isCollection = IsCollection(lfn);
1385 TString msg = "\n##### file: ";
1388 msg += " type: raw_collection;";
1389 // special treatment for collections
1391 // check for tag files in the collection
1392 TGridResult *res = gGrid->Command(Form("listFilesFromCollection -z -v %s",lfn), kFALSE);
1394 msg += " using_tags: No (unknown)";
1395 Info("CheckDataType", "%s", msg.Data());
1398 const char* typeStr = res->GetKey(0, "origLFN");
1399 if (!typeStr || !strlen(typeStr)) {
1400 msg += " using_tags: No (unknown)";
1401 Info("CheckDataType", "%s", msg.Data());
1404 TString file = typeStr;
1405 useTags = file.Contains(".tag");
1406 if (useTags) msg += " using_tags: Yes";
1407 else msg += " using_tags: No";
1408 Info("CheckDataType", "%s", msg.Data());
1413 isXml = slfn.Contains(".xml");
1415 // Open xml collection and check if there are tag files inside
1416 msg += " type: xml_collection;";
1417 TGridCollection *coll = (TGridCollection*)gROOT->ProcessLine(Form("TAlienCollection::Open(\"alien://%s\",1);",lfn));
1419 msg += " using_tags: No (unknown)";
1420 Info("CheckDataType", "%s", msg.Data());
1423 TMap *map = coll->Next();
1425 msg += " using_tags: No (unknown)";
1426 Info("CheckDataType", "%s", msg.Data());
1429 map = (TMap*)map->GetValue("");
1431 if (map && map->GetValue("name")) file = map->GetValue("name")->GetName();
1432 useTags = file.Contains(".tag");
1434 if (useTags) msg += " using_tags: Yes";
1435 else msg += " using_tags: No";
1436 Info("CheckDataType", "%s", msg.Data());
1439 useTags = slfn.Contains(".tag");
1440 if (slfn.Contains(".root")) msg += " type: root file;";
1441 else msg += " type: unknown file;";
1442 if (useTags) msg += " using_tags: Yes";
1443 else msg += " using_tags: No";
1444 Info("CheckDataType", "%s", msg.Data());
1447 //______________________________________________________________________________
1448 void AliAnalysisAlien::EnablePackage(const char *package)
1450 // Enables a par file supposed to exist in the current directory.
1451 TString pkg(package);
1452 pkg.ReplaceAll(".par", "");
1454 if (gSystem->AccessPathName(pkg)) {
1455 Fatal("EnablePackage", "Package %s not found", pkg.Data());
1458 if (!TObject::TestBit(AliAnalysisGrid::kUsePars))
1459 Info("EnablePackage", "AliEn plugin will use .par packages");
1460 TObject::SetBit(AliAnalysisGrid::kUsePars, kTRUE);
1462 fPackages = new TObjArray();
1463 fPackages->SetOwner();
1465 fPackages->Add(new TObjString(pkg));
1468 //______________________________________________________________________________
1469 TChain *AliAnalysisAlien::GetChainForTestMode(const char *treeName) const
1471 // Make a tree from files having the location specified in fFileForTestMode.
1472 // Inspired from JF's CreateESDChain.
1473 if (fFileForTestMode.IsNull()) {
1474 Error("GetChainForTestMode", "For proof test mode please use SetFileForTestMode() pointing to a file that contains data file locations.");
1477 if (gSystem->AccessPathName(fFileForTestMode)) {
1478 Error("GetChainForTestMode", "File not found: %s", fFileForTestMode.Data());
1483 in.open(fFileForTestMode);
1485 // Read the input list of files and add them to the chain
1487 TChain *chain = new TChain(treeName);
1491 if (line.IsNull()) continue;
1492 if (count++ == fNtestFiles) break;
1493 TString esdFile(line);
1494 TFile *file = TFile::Open(esdFile);
1496 if (!file->IsZombie()) chain->Add(esdFile);
1499 Error("GetChainforTestMode", "Skipping un-openable file: %s", esdFile.Data());
1503 if (!chain->GetListOfFiles()->GetEntries()) {
1504 Error("GetChainForTestMode", "No file from %s could be opened", fFileForTestMode.Data());
1512 //______________________________________________________________________________
1513 const char *AliAnalysisAlien::GetJobStatus(Int_t jobidstart, Int_t lastid, Int_t &nrunning, Int_t &nwaiting, Int_t &nerror, Int_t &ndone)
1515 // Get job status for all jobs with jobid>jobidstart.
1516 static char mstatus[20];
1522 TGridJobStatusList *list = gGrid->Ps("");
1523 if (!list) return mstatus;
1524 Int_t nentries = list->GetSize();
1525 TGridJobStatus *status;
1527 for (Int_t ijob=0; ijob<nentries; ijob++) {
1528 status = (TGridJobStatus *)list->At(ijob);
1529 pid = gROOT->ProcessLine(Form("atoi(((TAlienJobStatus*)0x%lx)->GetKey(\"queueId\"));", (ULong_t)status));
1530 if (pid<jobidstart) continue;
1531 if (pid == lastid) {
1532 gROOT->ProcessLine(Form("sprintf((char*)0x%lx,((TAlienJobStatus*)0x%lx)->GetKey(\"status\"));",(ULong_t)mstatus, (ULong_t)status));
1534 switch (status->GetStatus()) {
1535 case TGridJobStatus::kWAITING:
1537 case TGridJobStatus::kRUNNING:
1539 case TGridJobStatus::kABORTED:
1540 case TGridJobStatus::kFAIL:
1541 case TGridJobStatus::kUNKNOWN:
1543 case TGridJobStatus::kDONE:
1552 //______________________________________________________________________________
1553 Bool_t AliAnalysisAlien::IsCollection(const char *lfn) const
1555 // Returns true if file is a collection. Functionality duplicated from
1556 // TAlien::Type() because we don't want to directly depend on TAlien.
1558 Error("IsCollection", "No connection to grid");
1561 TGridResult *res = gGrid->Command(Form("type -z %s",lfn),kFALSE);
1562 if (!res) return kFALSE;
1563 const char* typeStr = res->GetKey(0, "type");
1564 if (!typeStr || !strlen(typeStr)) return kFALSE;
1565 if (!strcmp(typeStr, "collection")) return kTRUE;
1570 //______________________________________________________________________________
1571 Bool_t AliAnalysisAlien::IsSingleOutput() const
1573 // Check if single-ouput option is on.
1574 return (!fOutputSingle.IsNull());
1577 //______________________________________________________________________________
1578 void AliAnalysisAlien::Print(Option_t *) const
1580 // Print current plugin settings.
1581 printf("### AliEn analysis plugin current settings ###\n");
1582 printf("= OverwriteMode:________________________________ %d\n", fOverwriteMode);
1583 if (fOverwriteMode) {
1584 printf("***** NOTE: Overwrite mode will overwrite the input generated datasets and partial results from previous analysis. \
1585 \n***** To disable, use: plugin->SetOverwriteMode(kFALSE);\n");
1587 printf("= Copy files to grid: __________________________ %s\n", (IsUseCopy())?"YES":"NO");
1588 printf("= Check if files can be copied to grid: ________ %s\n", (IsCheckCopy())?"YES":"NO");
1589 printf("= Production mode:______________________________ %d\n", fProductionMode);
1590 printf("= Version of API requested: ____________________ %s\n", fAPIVersion.Data());
1591 printf("= Version of ROOT requested: ___________________ %s\n", fROOTVersion.Data());
1592 printf("= Version of AliRoot requested: ________________ %s\n", fAliROOTVersion.Data());
1594 printf("= User running the plugin: _____________________ %s\n", fUser.Data());
1595 printf("= Grid workdir relative to user $HOME: _________ %s\n", fGridWorkingDir.Data());
1596 printf("= Grid output directory relative to workdir: ___ %s\n", fGridOutputDir.Data());
1597 printf("= Data base directory path requested: __________ %s\n", fGridDataDir.Data());
1598 printf("= Data search pattern: _________________________ %s\n", fDataPattern.Data());
1599 printf("= Input data format: ___________________________ %s\n", fInputFormat.Data());
1600 if (fRunNumbers.Length())
1601 printf("= Run numbers to be processed: _________________ %s\n", fRunNumbers.Data());
1603 printf("= Run range to be processed: ___________________ %s%d-%s%d\n", fRunPrefix.Data(), fRunRange[0], fRunPrefix.Data(), fRunRange[1]);
1604 if (!fRunRange[0] && !fRunNumbers.Length()) {
1605 TIter next(fInputFiles);
1608 while ((obj=next())) list += obj->GetName();
1609 printf("= Input files to be processed: _________________ %s\n", list.Data());
1611 if (TestBit(AliAnalysisGrid::kTest))
1612 printf("= Number of input files used in test mode: _____ %d\n", fNtestFiles);
1613 printf("= List of output files to be registered: _______ %s\n", fOutputFiles.Data());
1614 printf("= List of outputs going to be archived: ________ %s\n", fOutputArchive.Data());
1615 printf("= List of outputs that should not be merged: ___ %s\n", fMergeExcludes.Data());
1616 printf("=====================================================================\n");
1617 printf("= Job price: ___________________________________ %d\n", fPrice);
1618 printf("= Time to live (TTL): __________________________ %d\n", fTTL);
1619 printf("= Max files per subjob: ________________________ %d\n", fSplitMaxInputFileNumber);
1620 if (fMaxInitFailed>0)
1621 printf("= Max number of subjob fails to kill: __________ %d\n", fMaxInitFailed);
1622 if (fMasterResubmitThreshold>0)
1623 printf("= Resubmit master job if failed subjobs >_______ %d\n", fMasterResubmitThreshold);
1624 printf("= Number of replicas for the output files_______ %d\n", fNreplicas);
1625 if (fNrunsPerMaster>0)
1626 printf("= Number of runs per master job: _______________ %d\n", fNrunsPerMaster);
1627 printf("= Number of files in one chunk to be merged: ___ %d\n", fMaxMergeFiles);
1628 printf("= Name of the generated execution script: ______ %s\n", fExecutable.Data());
1629 printf("= Executable command: __________________________ %s\n", fExecutableCommand.Data());
1630 if (fArguments.Length())
1631 printf("= Arguments for the execution script: __________ %s\n",fArguments.Data());
1632 if (fExecutableArgs.Length())
1633 printf("= Arguments after macro name in executable______ %s\n",fExecutableArgs.Data());
1634 printf("= Name of the generated analysis macro: ________ %s\n",fAnalysisMacro.Data());
1635 printf("= User analysis files to be deployed: __________ %s\n",fAnalysisSource.Data());
1636 printf("= Additional libs to be loaded or souces to be compiled runtime: <%s>\n",fAdditionalLibs.Data());
1637 printf("= Master jobs split mode: ______________________ %s\n",fSplitMode.Data());
1639 printf("= Custom name for the dataset to be created: ___ %s\n", fDatasetName.Data());
1640 printf("= Name of the generated JDL: ___________________ %s\n", fJDLName.Data());
1641 if (fIncludePath.Data())
1642 printf("= Include path for runtime task compilation: ___ %s\n", fIncludePath.Data());
1643 if (fCloseSE.Length())
1644 printf("= Force job outputs to storage element: ________ %s\n", fCloseSE.Data());
1645 if (fFriendChainName.Length())
1646 printf("= Open friend chain file on worker: ____________ %s\n", fFriendChainName.Data());
1648 TIter next(fPackages);
1651 while ((obj=next())) list += obj->GetName();
1652 printf("= Par files to be used: ________________________ %s\n", list.Data());
1656 //______________________________________________________________________________
1657 void AliAnalysisAlien::SetDefaults()
1659 // Set default values for everything. What cannot be filled will be left empty.
1660 if (fGridJDL) delete fGridJDL;
1661 fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
1662 fMergingJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
1665 fSplitMaxInputFileNumber = 100;
1667 fMasterResubmitThreshold = 0;
1672 fNrunsPerMaster = 1;
1673 fMaxMergeFiles = 100;
1675 fExecutable = "analysis.sh";
1676 fExecutableCommand = "root -b -q";
1678 fExecutableArgs = "";
1679 fAnalysisMacro = "myAnalysis.C";
1680 fAnalysisSource = "";
1681 fAdditionalLibs = "";
1685 fAliROOTVersion = "";
1686 fUser = ""; // Your alien user name
1687 fGridWorkingDir = "";
1688 fGridDataDir = ""; // Can be like: /alice/sim/PDC_08a/LHC08c9/
1689 fDataPattern = "*AliESDs.root"; // Can be like: *AliESDs.root, */pass1/*AliESDs.root, ...
1690 fFriendChainName = "";
1691 fGridOutputDir = "output";
1692 fOutputArchive = "log_archive.zip:std*@disk=1 root_archive.zip:*.root@disk=2";
1693 fOutputFiles = ""; // Like "AliAODs.root histos.root"
1694 fInputFormat = "xml-single";
1695 fJDLName = "analysis.jdl";
1696 fJobTag = "Automatically generated analysis JDL";
1697 fMergeExcludes = "";
1700 SetCheckCopy(kTRUE);
1701 SetDefaultOutputs(kTRUE);
1705 //______________________________________________________________________________
1706 Bool_t AliAnalysisAlien::CheckMergedFiles(const char *filename, const char *aliendir, Int_t nperchunk, Bool_t submit, const char *jdl)
1708 // Static method that checks the status of merging. This can submit merging jobs that did not produced the expected
1709 // output. If <submit> is false (checking) returns true only when the final merged file was found. If submit is true returns
1710 // true if the jobs were successfully submitted.
1711 Int_t countOrig = 0;
1712 Int_t countStage = 0;
1715 Bool_t doneFinal = kFALSE;
1717 TString saliendir(aliendir);
1718 TString sfilename, stmp;
1719 saliendir.ReplaceAll("//","/");
1720 saliendir = saliendir.Strip(TString::kTrailing, '/');
1722 ::Error("GetNregisteredFiles", "You need to be connected to AliEn.");
1725 sfilename = filename;
1726 sfilename.ReplaceAll(".root", "*.root");
1727 printf("Checking directory <%s> for merged files <%s> ...\n", aliendir, sfilename.Data());
1728 TString command = Form("find %s/ *%s", saliendir.Data(), sfilename.Data());
1729 TGridResult *res = gGrid->Command(command);
1731 ::Error("GetNregisteredFiles","Error: No result for the find command\n");
1736 while ((map=(TMap*)nextmap())) {
1737 TString turl = map->GetValue("turl")->GetName();
1738 if (!turl.Length()) {
1743 turl.ReplaceAll("alien://", "");
1744 turl.ReplaceAll(saliendir, "");
1745 sfilename = gSystem->BaseName(turl);
1746 turl = turl.Strip(TString::kLeading, '/');
1747 // Now check to what the file corresponds to:
1748 // original output - aliendir/%03d/filename
1749 // merged file (which stage) - aliendir/filename-Stage%02d_%04d
1750 // final merged file - aliendir/filename
1751 if (sfilename == turl) {
1752 if (sfilename == filename) {
1756 Int_t index = sfilename.Index("Stage");
1757 if (index<0) continue;
1758 stmp = sfilename(index+5,2);
1759 Int_t istage = atoi(stmp);
1760 stmp = sfilename(index+8,4);
1761 Int_t ijob = atoi(stmp);
1762 if (istage<stage) continue; // Ignore lower stages
1765 chunksDone.ResetAllBits();
1769 chunksDone.SetBitNumber(ijob);
1776 printf("=> Removing files from previous stages...\n");
1777 gGrid->Rm(Form("%s/*Stage*.root", aliendir));
1778 for (i=1; i<stage; i++)
1779 gGrid->Rm(Form("%s/*Stage%d*.zip", aliendir, i));
1784 // Compute number of jobs that were submitted for the current stage
1785 Int_t ntotstage = countOrig;
1786 for (i=1; i<=stage; i++) {
1787 if (ntotstage%nperchunk) ntotstage = (ntotstage/nperchunk)+1;
1788 else ntotstage = (ntotstage/nperchunk);
1790 // Now compare with the number of set bits in the chunksDone array
1791 Int_t nmissing = (stage>0)?(ntotstage - countStage):0;
1793 printf("*** Found %d original files\n", countOrig);
1794 if (stage==0) printf("*** No merging completed so far.\n");
1795 else printf("*** Found %d out of %d files merged for stage %d\n", countStage, ntotstage, stage);
1796 if (nmissing) printf("*** Number of merged files missing for this stage: %d -> check merging job completion\n", nmissing);
1797 if (!submit) return doneFinal;
1798 // Sumbit merging jobs for all missing chunks for the current stage.
1799 TString query = Form("submit %s %s", jdl, aliendir);
1802 for (i=0; i<nmissing; i++) {
1803 ichunk = chunksDone.FirstNullBit(ichunk+1);
1804 Int_t jobId = SubmitSingleJob(Form("%s %d %d", query.Data(), stage, ichunk));
1805 if (!jobId) return kFALSE;
1809 // Submit next stage of merging
1810 if (stage==0) countStage = countOrig;
1811 Int_t nchunks = (countStage/nperchunk);
1812 if (countStage%nperchunk) nchunks += 1;
1813 for (i=0; i<nchunks; i++) {
1814 Int_t jobId = SubmitSingleJob(Form("%s %d %d", query.Data(), stage+1, i));
1815 if (!jobId) return kFALSE;
1820 //______________________________________________________________________________
1821 Int_t AliAnalysisAlien::SubmitSingleJob(const char *query)
1823 // Submits a single job corresponding to the query and returns job id. If 0 submission failed.
1824 if (!gGrid) return 0;
1825 printf("=> %s ------> ",query);
1826 TGridResult *res = gGrid->Command(query);
1828 TString jobId = res->GetKey(0,"jobId");
1830 if (jobId.IsNull()) {
1831 printf("submission failed. Reason:\n");
1834 ::Error("SubmitSingleJob", "Your query %s could not be submitted", query);
1837 printf(" Job id: %s\n", jobId.Data());
1841 //______________________________________________________________________________
1842 Bool_t AliAnalysisAlien::MergeOutput(const char *output, const char *basedir, Int_t nmaxmerge, Int_t stage, Int_t ichunk)
1844 // Merge given output files from basedir. The file merger will merge nmaxmerge
1845 // files in a group. Merging can be done in stages:
1846 // stage=0 : will merge all existing files in a single stage
1847 // stage=1 : does a find command for all files that do NOT contain the string "Stage".
1848 // If their number is bigger that nmaxmerge, only the files from
1849 // ichunk*nmaxmerge to ichunk*(nmaxmerge+1)-1 will get merged as output_stage_<ichunk>
1850 // stage=n : does a find command for files named <output>Stage<stage-1>_*. If their number is bigger than
1851 // nmaxmerge, merge just the chunk ichunk, otherwise write the merged output to the file
1853 TString outputFile = output;
1855 TString outputChunk;
1856 TString previousChunk = "";
1857 Int_t countChunk = 0;
1858 Int_t countZero = nmaxmerge;
1859 Bool_t merged = kTRUE;
1860 Int_t index = outputFile.Index("@");
1861 if (index > 0) outputFile.Remove(index);
1862 TString inputFile = outputFile;
1863 if (stage>1) inputFile.ReplaceAll(".root", Form("-Stage%02d_*.root", stage-1));
1864 command = Form("find %s/ *%s", basedir, inputFile.Data());
1865 printf("command: %s\n", command.Data());
1866 TGridResult *res = gGrid->Command(command);
1868 ::Error("MergeOutput","No result for the find command\n");
1872 TFileMerger *fm = 0;
1875 // Check if there is a merge operation to resume. Works only for stage 0 or 1.
1876 outputChunk = outputFile;
1877 outputChunk.ReplaceAll(".root", "_*.root");
1878 // Check for existent temporary merge files
1879 // Check overwrite mode and remove previous partial results if needed
1880 // Preserve old merging functionality for stage 0.
1882 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
1884 // Skip as many input files as in a chunk
1885 for (Int_t counter=0; counter<nmaxmerge; counter++) map = (TMap*)nextmap();
1887 ::Error("MergeOutput", "Cannot resume merging for <%s>, nentries=%d", outputFile.Data(), res->GetSize());
1891 outputChunk = outputFile;
1892 outputChunk.ReplaceAll(".root", Form("_%04d.root", countChunk));
1894 if (gSystem->AccessPathName(outputChunk)) continue;
1895 // Merged file with chunks up to <countChunk> found
1896 ::Info("MergeOutput", "Resume merging of <%s> from <%s>\n", outputFile.Data(), outputChunk.Data());
1897 previousChunk = outputChunk;
1901 countZero = nmaxmerge;
1903 while ((map=(TMap*)nextmap())) {
1904 // Loop 'find' results and get next LFN
1905 if (countZero == nmaxmerge) {
1906 // First file in chunk - create file merger and add previous chunk if any.
1907 fm = new TFileMerger(kFALSE);
1908 fm->SetFastMethod(kTRUE);
1909 if (previousChunk.Length()) fm->AddFile(previousChunk.Data());
1910 outputChunk = outputFile;
1911 outputChunk.ReplaceAll(".root", Form("_%04d.root", countChunk));
1913 // If last file found, put merged results in the output file
1914 if (map == res->Last()) outputChunk = outputFile;
1915 TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("turl"));
1916 if (!objs || !objs->GetString().Length()) {
1917 // Nothing found - skip this output
1922 // Add file to be merged and decrement chunk counter.
1923 fm->AddFile(objs->GetString());
1925 if (countZero==0 || map == res->Last()) {
1926 if (!fm->GetMergeList() || !fm->GetMergeList()->GetSize()) {
1927 // Nothing found - skip this output
1928 ::Warning("MergeOutput", "No <%s> files found.", inputFile.Data());
1933 fm->OutputFile(outputChunk);
1934 // Merge the outputs, then go to next chunk
1936 ::Error("MergeOutput", "Could not merge all <%s> files", outputFile.Data());
1941 ::Info("MergeOutputs", "\n##### Merged %d output files to <%s>", fm->GetMergeList()->GetSize(), outputChunk.Data());
1942 gSystem->Unlink(previousChunk);
1944 if (map == res->Last()) {
1950 countZero = nmaxmerge;
1951 previousChunk = outputChunk;
1956 // Merging stage different than 0.
1957 // Move to the begining of the requested chunk.
1958 outputChunk = outputFile;
1959 if (nmaxmerge < res->GetSize()) {
1960 if (ichunk*nmaxmerge >= res->GetSize()) {
1961 ::Error("MergeOutput", "Cannot merge merge chunk %d grouping %d files from %d total.", ichunk, nmaxmerge, res->GetSize());
1965 for (Int_t counter=0; counter<ichunk*nmaxmerge; counter++) map = (TMap*)nextmap();
1966 outputChunk.ReplaceAll(".root", Form("-Stage%02d_%04d.root", stage, ichunk));
1968 countZero = nmaxmerge;
1969 fm = new TFileMerger(kFALSE);
1970 fm->SetFastMethod(kTRUE);
1971 while ((map=(TMap*)nextmap())) {
1972 // Loop 'find' results and get next LFN
1973 TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("turl"));
1974 if (!objs || !objs->GetString().Length()) {
1975 // Nothing found - skip this output
1980 // Add file to be merged and decrement chunk counter.
1981 fm->AddFile(objs->GetString());
1983 if (countZero==0) break;
1986 if (!fm->GetMergeList() || !fm->GetMergeList()->GetSize()) {
1987 // Nothing found - skip this output
1988 ::Warning("MergeOutput", "No <%s> files found.", inputFile.Data());
1992 fm->OutputFile(outputChunk);
1993 // Merge the outputs
1995 ::Error("MergeOutput", "Could not merge all <%s> files", outputFile.Data());
1999 ::Info("MergeOutput", "\n##### Merged %d output files to <%s>", fm->GetMergeList()->GetSize(), outputChunk.Data());
2005 //______________________________________________________________________________
2006 Bool_t AliAnalysisAlien::MergeOutputs()
2008 // Merge analysis outputs existing in the AliEn space.
2009 if (TestBit(AliAnalysisGrid::kTest)) return kTRUE;
2010 if (TestBit(AliAnalysisGrid::kOffline)) return kFALSE;
2012 Error("MergeOutputs", "Cannot merge outputs without grid connection. Terminate will NOT be executed");
2016 if (!TestBit(AliAnalysisGrid::kMerge)) {
2017 Info("MergeOutputs", "### Re-run with <MergeViaJDL> option in terminate mode of the plugin to submit merging jobs ###");
2020 if (fProductionMode) {
2021 Info("MergeOutputs", "### Merging will be submitted by LPM manager... ###");
2024 Info("MergeOutputs", "Submitting merging JDL");
2025 if (!SubmitMerging()) return kFALSE;
2026 Info("MergeOutputs", "### Re-run with <MergeViaJDL> off to collect results after merging jobs are done ###");
2027 Info("MergeOutputs", "### The Terminate() method is executed by the merging jobs");
2030 // Get the output path
2031 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("/%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
2032 if (!DirectoryExists(fGridOutputDir)) {
2033 Error("MergeOutputs", "Grid output directory %s not found. Terminate() will NOT be executed", fGridOutputDir.Data());
2036 if (!fOutputFiles.Length()) {
2037 Error("MergeOutputs", "No output file names defined. Are you running the right AliAnalysisAlien configuration ?");
2040 // Check if fast read option was requested
2041 Info("MergeOutputs", "Started local merging of output files from: alien://%s \
2042 \n======= overwrite mode = %d", fGridOutputDir.Data(), (Int_t)fOverwriteMode);
2043 if (fFastReadOption) {
2044 Warning("MergeOutputs", "You requested FastRead option. Using xrootd flags to reduce timeouts. This may skip some files that could be accessed ! \
2045 \n+++ NOTE: To disable this option, use: plugin->SetFastReadOption(kFALSE)");
2046 gEnv->SetValue("XNet.ConnectTimeout",10);
2047 gEnv->SetValue("XNet.RequestTimeout",10);
2048 gEnv->SetValue("XNet.MaxRedirectCount",2);
2049 gEnv->SetValue("XNet.ReconnectTimeout",10);
2050 gEnv->SetValue("XNet.FirstConnectMaxCnt",1);
2052 // Make sure we change the temporary directory
2053 gSystem->Setenv("TMPDIR", gSystem->pwd());
2054 TObjArray *list = fOutputFiles.Tokenize(",");
2058 Bool_t merged = kTRUE;
2059 while((str=(TObjString*)next())) {
2060 outputFile = str->GetString();
2061 Int_t index = outputFile.Index("@");
2062 if (index > 0) outputFile.Remove(index);
2063 TString outputChunk = outputFile;
2064 outputChunk.ReplaceAll(".root", "_*.root");
2065 // Skip already merged outputs
2066 if (!gSystem->AccessPathName(outputFile)) {
2067 if (fOverwriteMode) {
2068 Info("MergeOutputs", "Overwrite mode. Existing file %s was deleted.", outputFile.Data());
2069 gSystem->Unlink(outputFile);
2070 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
2071 Info("MergeOutput", "Overwrite mode: partial merged files %s will removed",
2072 outputChunk.Data());
2073 gSystem->Exec(Form("rm -f %s", outputChunk.Data()));
2076 Info("MergeOutputs", "Output file <%s> found. Not merging again.", outputFile.Data());
2080 if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
2081 Info("MergeOutput", "Overwrite mode: partial merged files %s will removed",
2082 outputChunk.Data());
2083 gSystem->Exec(Form("rm -f %s", outputChunk.Data()));
2086 if (fMergeExcludes.Length() &&
2087 fMergeExcludes.Contains(outputFile.Data())) continue;
2088 // Perform a 'find' command in the output directory, looking for registered outputs
2089 merged = MergeOutput(outputFile, fGridOutputDir, fMaxMergeFiles);
2091 Error("MergeOutputs", "Terminate() will NOT be executed");
2094 TFile *fileOpened = (TFile*)gROOT->GetListOfFiles()->FindObject(outputFile);
2095 if (fileOpened) fileOpened->Close();
2100 //______________________________________________________________________________
2101 void AliAnalysisAlien::SetDefaultOutputs(Bool_t flag)
2103 // Use the output files connected to output containers from the analysis manager
2104 // rather than the files defined by SetOutputFiles
2105 if (flag && !TObject::TestBit(AliAnalysisGrid::kDefaultOutputs))
2106 Info("SetDefaultOutputs", "Plugin will use the output files taken from analysis manager");
2107 TObject::SetBit(AliAnalysisGrid::kDefaultOutputs, flag);
2110 //______________________________________________________________________________
2111 void AliAnalysisAlien::SetOutputFiles(const char *list)
2113 // Manually set the output files list.
2114 // Removes duplicates. Not allowed if default outputs are not disabled.
2115 if (TObject::TestBit(AliAnalysisGrid::kDefaultOutputs)) {
2116 Fatal("SetOutputFiles", "You have to explicitly call SetDefaultOutputs(kFALSE) to manually set output files.");
2119 Info("SetOutputFiles", "Output file list is set manually - you are on your own.");
2121 TString slist = list;
2122 if (slist.Contains("@")) Warning("SetOutputFiles","The plugin does not allow explicit SE's. Please use: SetNumberOfReplicas() instead.");
2123 TObjArray *arr = slist.Tokenize(" ");
2127 while ((os=(TObjString*)next())) {
2128 sout = os->GetString();
2129 if (sout.Index("@")>0) sout.Remove(sout.Index("@"));
2130 if (fOutputFiles.Contains(sout)) continue;
2131 if (!fOutputFiles.IsNull()) fOutputFiles += ",";
2132 fOutputFiles += sout;
2137 //______________________________________________________________________________
2138 void AliAnalysisAlien::SetOutputArchive(const char *list)
2140 // Manually set the output archive list. Free text - you are on your own...
2141 // Not allowed if default outputs are not disabled.
2142 if (TObject::TestBit(AliAnalysisGrid::kDefaultOutputs)) {
2143 Fatal("SetOutputArchive", "You have to explicitly call SetDefaultOutputs(kFALSE) to manually set the output archives.");
2146 Info("SetOutputArchive", "Output archive is set manually - you are on your own.");
2147 fOutputArchive = list;
2150 //______________________________________________________________________________
2151 void AliAnalysisAlien::SetPreferedSE(const char */*se*/)
2153 // Setting a prefered output SE is not allowed anymore.
2154 Warning("SetPreferedSE", "Setting a preferential SE is not allowed anymore via the plugin. Use SetNumberOfReplicas() and SetDefaultOutputs()");
2157 //______________________________________________________________________________
2158 Bool_t AliAnalysisAlien::StartAnalysis(Long64_t /*nentries*/, Long64_t /*firstEntry*/)
2160 // Start remote grid analysis.
2161 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
2162 Bool_t testMode = TestBit(AliAnalysisGrid::kTest);
2163 if (!mgr || !mgr->IsInitialized()) {
2164 Error("StartAnalysis", "You need an initialized analysis manager for this");
2167 // Are we in PROOF mode ?
2168 if (mgr->IsProofMode()) {
2169 Info("StartAnalysis", "##### Starting PROOF analysis on via the plugin #####");
2170 if (fProofCluster.IsNull()) {
2171 Error("StartAnalysis", "You need to specify the proof cluster name via SetProofCluster");
2174 if (fProofDataSet.IsNull() && !testMode) {
2175 Error("StartAnalysis", "You need to specify a dataset using SetProofDataSet()");
2178 // Set the needed environment
2179 gEnv->SetValue("XSec.GSI.DelegProxy","2");
2180 // Do we need to reset PROOF ? The success of the Reset operation cannot be checked
2181 if (fProofReset && !testMode) {
2182 if (fProofReset==1) {
2183 Info("StartAnalysis", "Sending soft reset signal to proof cluster %s", fProofCluster.Data());
2184 gROOT->ProcessLine(Form("TProof::Reset(\"%s\", kFALSE);", fProofCluster.Data()));
2186 Info("StartAnalysis", "Sending hard reset signal to proof cluster %s", fProofCluster.Data());
2187 gROOT->ProcessLine(Form("TProof::Reset(\"%s\", kTRUE);", fProofCluster.Data()));
2189 Info("StartAnalysis", "Stopping the analysis. Please use SetProofReset(0) to resume.");
2192 // Do we need to change the ROOT version ? The success of this cannot be checked.
2193 if (!fRootVersionForProof.IsNull() && !testMode) {
2194 gROOT->ProcessLine(Form("TProof::Mgr(\"%s\")->SetROOTVersion(\"%s\");",
2195 fProofCluster.Data(), fRootVersionForProof.Data()));
2197 // Connect to PROOF and check the status
2200 if (fNproofWorkersPerSlave) sworkers = Form("workers=%dx", fNproofWorkersPerSlave);
2201 else if (fNproofWorkers) sworkers = Form("workers=%d", fNproofWorkers);
2203 if (!sworkers.IsNull())
2204 proof = gROOT->ProcessLine(Form("TProof::Open(\"%s\", \"%s\");", fProofCluster.Data(), sworkers.Data()));
2206 proof = gROOT->ProcessLine(Form("TProof::Open(\"%s\");", fProofCluster.Data()));
2208 proof = gROOT->ProcessLine("TProof::Open(\"\");");
2210 Error("StartAnalysis", "Could not start PROOF in test mode");
2215 Error("StartAnalysis", "Could not connect to PROOF cluster <%s>", fProofCluster.Data());
2218 if (fNproofWorkersPerSlave*fNproofWorkers > 0)
2219 gROOT->ProcessLine(Form("gProof->SetParallel(%d);", fNproofWorkers));
2220 // Is dataset existing ?
2222 TString dataset = fProofDataSet;
2223 Int_t index = dataset.Index("#");
2224 if (index>=0) dataset.Remove(index);
2225 // if (!gROOT->ProcessLine(Form("gProof->ExistsDataSet(\"%s\");",fProofDataSet.Data()))) {
2226 // Error("StartAnalysis", "Dataset %s not existing", fProofDataSet.Data());
2229 // Info("StartAnalysis", "Dataset %s found", dataset.Data());
2231 // Is ClearPackages() needed ?
2232 if (TestSpecialBit(kClearPackages)) {
2233 Info("StartAnalysis", "ClearPackages signal sent to PROOF. Use SetClearPackages(kFALSE) to reset this.");
2234 gROOT->ProcessLine("gProof->ClearPackages();");
2236 // Is a given aliroot mode requested ?
2238 if (!fAliRootMode.IsNull()) {
2239 TString alirootMode = fAliRootMode;
2240 if (alirootMode == "default") alirootMode = "";
2241 Info("StartAnalysis", "You are requesting AliRoot mode: %s", fAliRootMode.Data());
2242 optionsList.SetOwner();
2243 optionsList.Add(new TNamed("ALIROOT_MODE", alirootMode.Data()));
2244 // Check the additional libs to be loaded
2246 if (!alirootMode.IsNull()) extraLibs = "ANALYSIS:ANALYSISalice";
2247 // Parse the extra libs for .so
2248 if (fAdditionalLibs.Length()) {
2249 TObjArray *list = fAdditionalLibs.Tokenize(" ");
2252 while((str=(TObjString*)next()) && str->GetString().Contains(".so")) {
2253 TString stmp = str->GetName();
2254 if (stmp.BeginsWith("lib")) stmp.Remove(0,3);
2255 stmp.ReplaceAll(".so","");
2256 if (!extraLibs.IsNull()) extraLibs += ":";
2259 if (list) delete list;
2261 if (!extraLibs.IsNull()) optionsList.Add(new TNamed("ALIROOT_EXTRA_LIBS",extraLibs.Data()));
2262 // Check extra includes
2263 if (!fIncludePath.IsNull()) {
2264 TString includePath = fIncludePath;
2265 includePath.ReplaceAll(" ",":");
2266 includePath.Strip(TString::kTrailing, ':');
2267 Info("StartAnalysis", "Adding extra includes: %s",includePath.Data());
2268 optionsList.Add(new TNamed("ALIROOT_EXTRA_INCLUDES",includePath.Data()));
2270 // Check if connection to grid is requested
2271 if (TestSpecialBit(kProofConnectGrid))
2272 optionsList.Add(new TNamed("ALIROOT_ENABLE_ALIEN", "1"));
2273 // Enable AliRoot par
2275 // Enable proof lite package
2276 TString alirootLite = gSystem->ExpandPathName("$ALICE_ROOT/ANALYSIS/macros/AliRootProofLite.par");
2277 for (Int_t i=0; i<optionsList.GetSize(); i++) {
2278 TNamed *obj = (TNamed*)optionsList.At(i);
2279 printf("%s %s\n", obj->GetName(), obj->GetTitle());
2281 if (!gROOT->ProcessLine(Form("gProof->UploadPackage(\"%s\");",alirootLite.Data()))
2282 && !gROOT->ProcessLine(Form("gProof->EnablePackage(\"%s\", (TList*)0x%lx);",alirootLite.Data(),(ULong_t)&optionsList))) {
2283 Info("StartAnalysis", "AliRootProofLite enabled");
2285 Error("StartAnalysis", "There was an error trying to enable package AliRootProofLite.par");
2289 if (gROOT->ProcessLine(Form("gProof->EnablePackage(\"VO_ALICE@AliRoot::%s\", (TList*)0x%lx);",
2290 fAliROOTVersion.Data(), (ULong_t)&optionsList))) {
2291 Error("StartAnalysis", "There was an error trying to enable package VO_ALICE@AliRoot::%s", fAliROOTVersion.Data());
2296 if (fAdditionalLibs.Contains(".so") && !testMode) {
2297 Error("StartAnalysis", "You request additional libs to be loaded but did not enabled any AliRoot mode. Please refer to: \
2298 \n http://aaf.cern.ch/node/83 and use a parameter for SetAliRootMode()");
2302 // Enable par files if requested
2303 if (fPackages && fPackages->GetEntries()) {
2304 TIter next(fPackages);
2306 while ((package=next())) {
2307 if (gROOT->ProcessLine(Form("gProof->UploadPackage(\"%s\");", package->GetName()))) {
2308 if (gROOT->ProcessLine(Form("gProof->EnablePackage(\"%s\",kTRUE);", package->GetName()))) {
2309 Error("StartAnalysis", "There was an error trying to enable package %s", package->GetName());
2313 Error("StartAnalysis", "There was an error trying to upload package %s", package->GetName());
2318 // Do we need to load analysis source files ?
2319 // NOTE: don't load on client since this is anyway done by the user to attach his task.
2320 if (fAnalysisSource.Length()) {
2321 TObjArray *list = fAnalysisSource.Tokenize(" ");
2324 while((str=(TObjString*)next())) {
2325 gROOT->ProcessLine(Form("gProof->Load(\"%s+g\", kTRUE);", str->GetName()));
2327 if (list) delete list;
2330 // Register dataset to proof lite.
2331 if (fFileForTestMode.IsNull()) {
2332 Error("GetChainForTestMode", "For proof test mode please use SetFileForTestMode() pointing to a file that contains data file locations.");
2335 if (gSystem->AccessPathName(fFileForTestMode)) {
2336 Error("GetChainForTestMode", "File not found: %s", fFileForTestMode.Data());
2339 TFileCollection *coll = new TFileCollection();
2340 coll->AddFromFile(fFileForTestMode);
2341 gROOT->ProcessLine(Form("gProof->RegisterDataSet(\"test_collection\", (TFileCollection*)0x%lx, \"OV\");", (ULong_t)coll));
2342 gROOT->ProcessLine("gProof->ShowDataSets()");
2347 // Check if output files have to be taken from the analysis manager
2348 if (TestBit(AliAnalysisGrid::kDefaultOutputs)) {
2350 TIter next(mgr->GetOutputs());
2351 AliAnalysisDataContainer *output;
2352 while ((output=(AliAnalysisDataContainer*)next())) {
2353 const char *filename = output->GetFileName();
2354 if (!(strcmp(filename, "default"))) {
2355 if (!mgr->GetOutputEventHandler()) continue;
2356 filename = mgr->GetOutputEventHandler()->GetOutputFileName();
2358 if (fOutputFiles.Contains(filename)) continue;
2359 if (fOutputFiles.Length()) fOutputFiles += ",";
2360 fOutputFiles += filename;
2362 // Add extra files registered to the analysis manager
2363 if (mgr->GetExtraFiles().Length()) {
2364 if (fOutputFiles.Length()) fOutputFiles += ",";
2365 TString extra = mgr->GetExtraFiles();
2366 extra.ReplaceAll(" ", ",");
2367 // Protection in case extra files do not exist (will it work?)
2368 extra.ReplaceAll(".root", "*.root");
2369 fOutputFiles += extra;
2371 // Compose the output archive.
2372 fOutputArchive = "log_archive.zip:std*@disk=1 ";
2373 fOutputArchive += Form("root_archive.zip:%s@disk=%d",fOutputFiles.Data(),fNreplicas);
2375 // if (!fCloseSE.Length()) fCloseSE = gSystem->Getenv("alien_CLOSE_SE");
2376 if (TestBit(AliAnalysisGrid::kOffline)) {
2377 Info("StartAnalysis","\n##### OFFLINE MODE ##### Files to be used in GRID are produced but not copied \
2378 \n there nor any job run. You can revise the JDL and analysis \
2379 \n macro then run the same in \"submit\" mode.");
2380 } else if (TestBit(AliAnalysisGrid::kTest)) {
2381 Info("StartAnalysis","\n##### LOCAL MODE ##### Your analysis will be run locally on a subset of the requested \
2383 } else if (TestBit(AliAnalysisGrid::kSubmit)) {
2384 Info("StartAnalysis","\n##### SUBMIT MODE ##### Files required by your analysis are copied to your grid working \
2385 \n space and job submitted.");
2386 } else if (TestBit(AliAnalysisGrid::kMerge)) {
2387 Info("StartAnalysis","\n##### MERGE MODE ##### The registered outputs of the analysis will be merged");
2388 if (fMergeViaJDL) CheckInputData();
2391 Info("StartAnalysis","\n##### FULL ANALYSIS MODE ##### Producing needed files and submitting your analysis job...");
2396 Error("StartAnalysis", "Cannot start grid analysis without grid connection");
2399 if (IsCheckCopy()) CheckFileCopy(gGrid->GetHomeDirectory());
2400 if (!CheckInputData()) {
2401 Error("StartAnalysis", "There was an error in preprocessing your requested input data");
2404 if (!CreateDataset(fDataPattern)) {
2406 if (!fRunNumbers.Length() && !fRunRange[0]) serror = Form("path to data directory: <%s>", fGridDataDir.Data());
2407 if (fRunNumbers.Length()) serror = "run numbers";
2408 if (fRunRange[0]) serror = Form("run range [%d, %d]", fRunRange[0], fRunRange[1]);
2409 serror += Form("\n or data pattern <%s>", fDataPattern.Data());
2410 Error("StartAnalysis", "No data to process. Please fix %s in your plugin configuration.", serror.Data());
2413 WriteAnalysisFile();
2414 WriteAnalysisMacro();
2416 WriteValidationScript();
2418 WriteMergingMacro();
2419 WriteMergeExecutable();
2420 WriteValidationScript(kTRUE);
2422 if (!CreateJDL()) return kFALSE;
2423 if (TestBit(AliAnalysisGrid::kOffline)) return kFALSE;
2425 // Locally testing the analysis
2426 Info("StartAnalysis", "\n_______________________________________________________________________ \
2427 \n Running analysis script in a daughter shell as on a worker node \
2428 \n_______________________________________________________________________");
2429 TObjArray *list = fOutputFiles.Tokenize(",");
2433 while((str=(TObjString*)next())) {
2434 outputFile = str->GetString();
2435 Int_t index = outputFile.Index("@");
2436 if (index > 0) outputFile.Remove(index);
2437 if (!gSystem->AccessPathName(outputFile)) gSystem->Exec(Form("rm %s", outputFile.Data()));
2440 gSystem->Exec(Form("bash %s 2>stderr", fExecutable.Data()));
2441 TString validationScript = fExecutable;
2442 validationScript.ReplaceAll(".sh", "_validation.sh");
2443 gSystem->Exec(Form("bash %s",validationScript.Data()));
2444 // gSystem->Exec("cat stdout");
2447 // Check if submitting is managed by LPM manager
2448 if (fProductionMode) {
2449 TString prodfile = fJDLName;
2450 prodfile.ReplaceAll(".jdl", ".prod");
2451 WriteProductionFile(prodfile);
2452 Info("StartAnalysis", "Job submitting is managed by LPM. Rerun in terminate mode after jobs finished.");
2455 // Submit AliEn job(s)
2456 gGrid->Cd(fGridOutputDir);
2459 if (!fRunNumbers.Length() && !fRunRange[0]) {
2460 // Submit a given xml or a set of runs
2461 res = gGrid->Command(Form("submit %s", fJDLName.Data()));
2462 printf("*************************** %s\n",Form("submit %s", fJDLName.Data()));
2464 const char *cjobId = res->GetKey(0,"jobId");
2468 Error("StartAnalysis", "Your JDL %s could not be submitted", fJDLName.Data());
2471 Info("StartAnalysis", "\n_______________________________________________________________________ \
2472 \n##### Your JDL %s was successfully submitted. \nTHE JOB ID IS: %s \
2473 \n_______________________________________________________________________",
2474 fJDLName.Data(), cjobId);
2479 Error("StartAnalysis", "No grid result after submission !!! Bailing out...");
2483 // Submit for a range of enumeration of runs.
2484 if (!Submit()) return kFALSE;
2487 Info("StartAnalysis", "\n#### STARTING AN ALIEN SHELL FOR YOU. EXIT WHEN YOUR JOB %s HAS FINISHED. #### \
2488 \n You may exit at any time and terminate the job later using the option <terminate> \
2489 \n ##################################################################################", jobID.Data());
2490 gSystem->Exec("aliensh");
2494 //______________________________________________________________________________
2495 Bool_t AliAnalysisAlien::Submit()
2497 // Submit all master jobs.
2498 Int_t nmasterjobs = fInputFiles->GetEntries();
2499 Long_t tshoot = gSystem->Now();
2500 if (!fNsubmitted && !SubmitNext()) return kFALSE;
2501 while (fNsubmitted < nmasterjobs) {
2502 Long_t now = gSystem->Now();
2503 if ((now-tshoot)>30000) {
2505 if (!SubmitNext()) return kFALSE;
2511 //______________________________________________________________________________
2512 Bool_t AliAnalysisAlien::SubmitMerging()
2514 // Submit all merging jobs.
2515 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("/%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
2516 gGrid->Cd(fGridOutputDir);
2517 TString mergeJDLName = fExecutable;
2518 mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
2519 Int_t ntosubmit = fInputFiles->GetEntries();
2520 for (Int_t i=0; i<ntosubmit; i++) {
2521 TString runOutDir = gSystem->BaseName(fInputFiles->At(i)->GetName());
2522 runOutDir.ReplaceAll(".xml", "");
2523 if (fOutputToRunNo) {
2524 // The output directory is the run number
2525 printf("### Submitting merging job for run <%s>\n", runOutDir.Data());
2526 runOutDir = Form("%s/%s", fGridOutputDir.Data(), runOutDir.Data());
2528 // The output directory is the master number in 3 digits format
2529 printf("### Submitting merging job for master <%03d>\n", i);
2530 runOutDir = Form("%s/%03d",fGridOutputDir.Data(), i);
2532 // Check now the number of merging stages.
2533 TObjArray *list = fOutputFiles.Tokenize(",");
2537 while((str=(TObjString*)next())) {
2538 outputFile = str->GetString();
2539 Int_t index = outputFile.Index("@");
2540 if (index > 0) outputFile.Remove(index);
2541 if (!fMergeExcludes.Contains(outputFile)) break;
2544 Bool_t done = CheckMergedFiles(outputFile, runOutDir, fMaxMergeFiles, kTRUE, mergeJDLName);
2545 if (!done) return kFALSE;
2547 if (!ntosubmit) return kTRUE;
2548 Info("StartAnalysis", "\n#### STARTING AN ALIEN SHELL FOR YOU. EXIT WHEN YOUR MERGING JOBS HAVE FINISHED. #### \
2549 \n You may exit at any time and terminate the job later using the option <terminate> but disabling SetMergeViaJDL\
2550 \n ##################################################################################");
2551 gSystem->Exec("aliensh");
2555 //______________________________________________________________________________
2556 Bool_t AliAnalysisAlien::SubmitNext()
2558 // Submit next bunch of master jobs if the queue is free. The first master job is
2559 // submitted right away, while the next will not be unless the previous was split.
2560 // The plugin will not submit new master jobs if there are more that 500 jobs in
2562 static Bool_t iscalled = kFALSE;
2563 static Int_t firstmaster = 0;
2564 static Int_t lastmaster = 0;
2565 static Int_t npermaster = 0;
2566 if (iscalled) return kTRUE;
2568 Int_t nrunning=0, nwaiting=0, nerror=0, ndone=0;
2569 Int_t ntosubmit = 0;
2572 Int_t nmasterjobs = fInputFiles->GetEntries();
2575 if (!IsUseSubmitPolicy()) {
2577 Info("SubmitNext","### Warning submit policy not used ! Submitting too many jobs at a time may be prohibitted. \
2578 \n### You can use SetUseSubmitPolicy() to enable if you have problems.");
2579 ntosubmit = nmasterjobs;
2582 TString status = GetJobStatus(firstmaster, lastmaster, nrunning, nwaiting, nerror, ndone);
2583 printf("=== master %d: %s\n", lastmaster, status.Data());
2584 // If last master not split, just return
2585 if (status != "SPLIT") {iscalled = kFALSE; return kTRUE;}
2586 // No more than 100 waiting jobs
2587 if (nwaiting>500) {iscalled = kFALSE; return kTRUE;}
2588 npermaster = (nrunning+nwaiting+nerror+ndone)/fNsubmitted;
2589 if (npermaster) ntosubmit = (500-nwaiting)/npermaster;
2590 if (!ntosubmit) ntosubmit = 1;
2591 printf("=== WAITING(%d) RUNNING(%d) DONE(%d) OTHER(%d) NperMaster=%d => to submit %d jobs\n",
2592 nwaiting, nrunning, ndone, nerror, npermaster, ntosubmit);
2594 for (Int_t i=0; i<ntosubmit; i++) {
2595 // Submit for a range of enumeration of runs.
2596 if (fNsubmitted>=nmasterjobs) {iscalled = kFALSE; return kTRUE;}
2598 TString runOutDir = gSystem->BaseName(fInputFiles->At(fNsubmitted)->GetName());
2599 runOutDir.ReplaceAll(".xml", "");
2601 query = Form("submit %s %s %s", fJDLName.Data(), fInputFiles->At(fNsubmitted)->GetName(), runOutDir.Data());
2603 query = Form("submit %s %s %03d", fJDLName.Data(), fInputFiles->At(fNsubmitted)->GetName(), fNsubmitted);
2604 printf("********* %s\n",query.Data());
2605 res = gGrid->Command(query);
2607 TString cjobId1 = res->GetKey(0,"jobId");
2608 if (!cjobId1.Length()) {
2612 Error("StartAnalysis", "Your JDL %s could not be submitted. The message was:", fJDLName.Data());
2615 Info("StartAnalysis", "\n_______________________________________________________________________ \
2616 \n##### Your JDL %s submitted (%d to go). \nTHE JOB ID IS: %s \
2617 \n_______________________________________________________________________",
2618 fJDLName.Data(), nmasterjobs-fNsubmitted-1, cjobId1.Data());
2621 lastmaster = cjobId1.Atoi();
2622 if (!firstmaster) firstmaster = lastmaster;
2627 Error("StartAnalysis", "No grid result after submission !!! Bailing out...");
2635 //______________________________________________________________________________
2636 void AliAnalysisAlien::WriteAnalysisFile()
2638 // Write current analysis manager into the file <analysisFile>
2639 TString analysisFile = fExecutable;
2640 analysisFile.ReplaceAll(".sh", ".root");
2641 if (!TestBit(AliAnalysisGrid::kSubmit)) {
2642 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
2643 if (!mgr || !mgr->IsInitialized()) {
2644 Error("WriteAnalysisFile", "You need an initialized analysis manager for this");
2647 // Check analysis type
2649 if (mgr->GetMCtruthEventHandler()) TObject::SetBit(AliAnalysisGrid::kUseMC);
2650 handler = (TObject*)mgr->GetInputEventHandler();
2652 if (handler->InheritsFrom("AliESDInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseESD);
2653 if (handler->InheritsFrom("AliAODInputHandler")) TObject::SetBit(AliAnalysisGrid::kUseAOD);
2655 TDirectory *cdir = gDirectory;
2656 TFile *file = TFile::Open(analysisFile, "RECREATE");
2658 // Skip task Terminate calls for the grid job (but not in test mode, where we want to check also the terminate mode
2659 if (!TestBit(AliAnalysisGrid::kTest)) mgr->SetSkipTerminate(kTRUE);
2660 // Unless merging makes no sense
2661 if (IsSingleOutput()) mgr->SetSkipTerminate(kFALSE);
2664 // Enable termination for local jobs
2665 mgr->SetSkipTerminate(kFALSE);
2667 if (cdir) cdir->cd();
2668 Info("WriteAnalysisFile", "\n##### Analysis manager: %s wrote to file <%s>\n", mgr->GetName(),analysisFile.Data());
2670 Bool_t copy = kTRUE;
2671 if (TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
2674 TString workdir = gGrid->GetHomeDirectory();
2675 workdir += fGridWorkingDir;
2676 Info("CreateJDL", "\n##### Copying file <%s> containing your initialized analysis manager to your alien workspace", analysisFile.Data());
2677 if (FileExists(analysisFile)) gGrid->Rm(analysisFile);
2678 TFile::Cp(Form("file:%s",analysisFile.Data()), Form("alien://%s/%s", workdir.Data(),analysisFile.Data()));
2682 //______________________________________________________________________________
2683 void AliAnalysisAlien::WriteAnalysisMacro()
2685 // Write the analysis macro that will steer the analysis in grid mode.
2686 if (!TestBit(AliAnalysisGrid::kSubmit)) {
2688 out.open(fAnalysisMacro.Data(), ios::out);
2690 Error("WriteAnalysisMacro", "could not open file %s for writing", fAnalysisMacro.Data());
2693 Bool_t hasSTEERBase = kFALSE;
2694 Bool_t hasESD = kFALSE;
2695 Bool_t hasAOD = kFALSE;
2696 Bool_t hasANALYSIS = kFALSE;
2697 Bool_t hasANALYSISalice = kFALSE;
2698 Bool_t hasCORRFW = kFALSE;
2699 TString func = fAnalysisMacro;
2700 TString type = "ESD";
2701 TString comment = "// Analysis using ";
2702 if (TObject::TestBit(AliAnalysisGrid::kUseESD)) comment += "ESD";
2703 if (TObject::TestBit(AliAnalysisGrid::kUseAOD)) {
2707 if (type!="AOD" && fFriendChainName!="") {
2708 Error("WriteAnalysisMacro", "Friend chain can be attached only to AOD");
2711 if (TObject::TestBit(AliAnalysisGrid::kUseMC)) comment += "/MC";
2712 else comment += " data";
2713 out << "const char *anatype = \"" << type.Data() << "\";" << endl << endl;
2714 func.ReplaceAll(".C", "");
2715 out << "void " << func.Data() << "()" << endl;
2717 out << comment.Data() << endl;
2718 out << "// Automatically generated analysis steering macro executed in grid subjobs" << endl << endl;
2719 out << " TStopwatch timer;" << endl;
2720 out << " timer.Start();" << endl << endl;
2721 // Change temp directory to current one
2722 out << "// Set temporary merging directory to current one" << endl;
2723 out << " gSystem->Setenv(\"TMPDIR\", gSystem->pwd());" << endl << endl;
2724 out << "// load base root libraries" << endl;
2725 out << " gSystem->Load(\"libTree\");" << endl;
2726 out << " gSystem->Load(\"libGeom\");" << endl;
2727 out << " gSystem->Load(\"libVMC\");" << endl;
2728 out << " gSystem->Load(\"libPhysics\");" << endl << endl;
2729 out << " gSystem->Load(\"libMinuit\");" << endl << endl;
2730 if (fAdditionalRootLibs.Length()) {
2731 // in principle libtree /lib geom libvmc etc. can go into this list, too
2732 out << "// Add aditional libraries" << endl;
2733 TObjArray *list = fAdditionalRootLibs.Tokenize(" ");
2736 while((str=(TObjString*)next())) {
2737 if (str->GetString().Contains(".so"))
2738 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
2740 if (list) delete list;
2742 out << "// include path" << endl;
2743 if (fIncludePath.Length()) out << " gSystem->AddIncludePath(\"" << fIncludePath.Data() << "\");" << endl;
2744 out << " gSystem->AddIncludePath(\"-I$ALICE_ROOT/include\");" << endl << endl;
2745 out << "// Load analysis framework libraries" << endl;
2746 TString setupPar = "AliAnalysisAlien::SetupPar";
2748 out << " gSystem->Load(\"libSTEERBase\");" << endl;
2749 out << " gSystem->Load(\"libESD\");" << endl;
2750 out << " gSystem->Load(\"libAOD\");" << endl;
2751 out << " gSystem->Load(\"libANALYSIS\");" << endl;
2752 out << " gSystem->Load(\"libANALYSISalice\");" << endl;
2753 out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
2755 TIter next(fPackages);
2758 while ((obj=next())) {
2759 pkgname = obj->GetName();
2760 if (pkgname == "STEERBase" ||
2761 pkgname == "STEERBase.par") hasSTEERBase = kTRUE;
2762 if (pkgname == "ESD" ||
2763 pkgname == "ESD.par") hasESD = kTRUE;
2764 if (pkgname == "AOD" ||
2765 pkgname == "AOD.par") hasAOD = kTRUE;
2766 if (pkgname == "ANALYSIS" ||
2767 pkgname == "ANALYSIS.par") hasANALYSIS = kTRUE;
2768 if (pkgname == "ANALYSISalice" ||
2769 pkgname == "ANALYSISalice.par") hasANALYSISalice = kTRUE;
2770 if (pkgname == "CORRFW" ||
2771 pkgname == "CORRFW.par") hasCORRFW = kTRUE;
2773 if (hasANALYSISalice) setupPar = "SetupPar";
2774 if (!hasSTEERBase) out << " gSystem->Load(\"libSTEERBase\");" << endl;
2775 else out << " if (!" << setupPar << "(\"STEERBase\")) return;" << endl;
2776 if (!hasESD) out << " gSystem->Load(\"libESD\");" << endl;
2777 else out << " if (!" << setupPar << "(\"ESD\")) return;" << endl;
2778 if (!hasAOD) out << " gSystem->Load(\"libAOD\");" << endl;
2779 else out << " if (!" << setupPar << "(\"AOD\")) return;" << endl;
2780 if (!hasANALYSIS) out << " gSystem->Load(\"libANALYSIS\");" << endl;
2781 else out << " if (!" << setupPar << "(\"ANALYSIS\")) return;" << endl;
2782 if (!hasANALYSISalice) out << " gSystem->Load(\"libANALYSISalice\");" << endl;
2783 else out << " if (!" << setupPar << "(\"ANALYSISalice\")) return;" << endl;
2784 if (!hasCORRFW) out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
2785 else out << " if (!" << setupPar << "(\"CORRFW\")) return;" << endl << endl;
2786 out << "// Compile other par packages" << endl;
2788 while ((obj=next())) {
2789 pkgname = obj->GetName();
2790 if (pkgname == "STEERBase" ||
2791 pkgname == "STEERBase.par" ||
2793 pkgname == "ESD.par" ||
2795 pkgname == "AOD.par" ||
2796 pkgname == "ANALYSIS" ||
2797 pkgname == "ANALYSIS.par" ||
2798 pkgname == "ANALYSISalice" ||
2799 pkgname == "ANALYSISalice.par" ||
2800 pkgname == "CORRFW" ||
2801 pkgname == "CORRFW.par") continue;
2802 out << " if (!" << setupPar << "(\"" << obj->GetName() << "\")) return;" << endl;
2805 if (fAdditionalLibs.Length()) {
2806 out << "// Add aditional AliRoot libraries" << endl;
2807 TObjArray *list = fAdditionalLibs.Tokenize(" ");
2810 while((str=(TObjString*)next())) {
2811 if (str->GetString().Contains(".so"))
2812 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
2813 if (str->GetString().Contains(".par"))
2814 out << " if (!" << setupPar << "(\"" << str->GetString() << "\")) return;" << endl;
2816 if (list) delete list;
2819 out << "// analysis source to be compiled at runtime (if any)" << endl;
2820 if (fAnalysisSource.Length()) {
2821 TObjArray *list = fAnalysisSource.Tokenize(" ");
2824 while((str=(TObjString*)next())) {
2825 out << " gROOT->ProcessLine(\".L " << str->GetString().Data() << "+g\");" << endl;
2827 if (list) delete list;
2830 if (fFastReadOption) {
2831 Warning("WriteAnalysisMacro", "!!! You requested FastRead option. Using xrootd flags to reduce timeouts in the grid jobs. This may skip some files that could be accessed !!! \
2832 \n+++ NOTE: To disable this option, use: plugin->SetFastReadOption(kFALSE)");
2833 out << "// fast xrootd reading enabled" << endl;
2834 out << " printf(\"!!! You requested FastRead option. Using xrootd flags to reduce timeouts. Note that this may skip some files that could be accessed !!!\");" << endl;
2835 out << " gEnv->SetValue(\"XNet.ConnectTimeout\",10);" << endl;
2836 out << " gEnv->SetValue(\"XNet.RequestTimeout\",10);" << endl;
2837 out << " gEnv->SetValue(\"XNet.MaxRedirectCount\",2);" << endl;
2838 out << " gEnv->SetValue(\"XNet.ReconnectTimeout\",10);" << endl;
2839 out << " gEnv->SetValue(\"XNet.FirstConnectMaxCnt\",1);" << endl << endl;
2841 out << "// connect to AliEn and make the chain" << endl;
2842 out << " if (!TGrid::Connect(\"alien://\")) return;" << endl;
2843 if (IsUsingTags()) {
2844 out << " TChain *chain = CreateChainFromTags(\"wn.xml\", anatype);" << endl << endl;
2846 out << " TChain *chain = CreateChain(\"wn.xml\", anatype);" << endl << endl;
2848 out << "// read the analysis manager from file" << endl;
2849 TString analysisFile = fExecutable;
2850 analysisFile.ReplaceAll(".sh", ".root");
2851 out << " TFile *file = TFile::Open(\"" << analysisFile << "\");" << endl;
2852 out << " if (!file) return;" << endl;
2853 out << " TIter nextkey(file->GetListOfKeys());" << endl;
2854 out << " AliAnalysisManager *mgr = 0;" << endl;
2855 out << " TKey *key;" << endl;
2856 out << " while ((key=(TKey*)nextkey())) {" << endl;
2857 out << " if (!strcmp(key->GetClassName(), \"AliAnalysisManager\"))" << endl;
2858 out << " mgr = (AliAnalysisManager*)file->Get(key->GetName());" << endl;
2859 out << " };" << endl;
2860 out << " if (!mgr) {" << endl;
2861 out << " ::Error(\"" << func.Data() << "\", \"No analysis manager found in file " << analysisFile <<"\");" << endl;
2862 out << " return;" << endl;
2863 out << " }" << endl << endl;
2864 out << " mgr->PrintStatus();" << endl;
2865 if (AliAnalysisManager::GetAnalysisManager()) {
2866 if (AliAnalysisManager::GetAnalysisManager()->GetDebugLevel()>3) {
2867 out << " gEnv->SetValue(\"XNet.Debug\", \"1\");" << endl;
2869 out << " AliLog::SetGlobalLogLevel(AliLog::kError);" << endl;
2872 out << " mgr->StartAnalysis(\"localfile\", chain);" << endl;
2873 out << " timer.Stop();" << endl;
2874 out << " timer.Print();" << endl;
2875 out << "}" << endl << endl;
2876 if (IsUsingTags()) {
2877 out << "TChain* CreateChainFromTags(const char *xmlfile, const char *type=\"ESD\")" << endl;
2879 out << "// Create a chain using tags from the xml file." << endl;
2880 out << " TAlienCollection* coll = TAlienCollection::Open(xmlfile);" << endl;
2881 out << " if (!coll) {" << endl;
2882 out << " ::Error(\"CreateChainFromTags\", \"Cannot create an AliEn collection from %s\", xmlfile);" << endl;
2883 out << " return NULL;" << endl;
2884 out << " }" << endl;
2885 out << " TGridResult* tagResult = coll->GetGridResult(\"\",kFALSE,kFALSE);" << endl;
2886 out << " AliTagAnalysis *tagAna = new AliTagAnalysis(type);" << endl;
2887 out << " tagAna->ChainGridTags(tagResult);" << endl << endl;
2888 out << " AliRunTagCuts *runCuts = new AliRunTagCuts();" << endl;
2889 out << " AliLHCTagCuts *lhcCuts = new AliLHCTagCuts();" << endl;
2890 out << " AliDetectorTagCuts *detCuts = new AliDetectorTagCuts();" << endl;
2891 out << " AliEventTagCuts *evCuts = new AliEventTagCuts();" << endl;
2892 out << " // Check if the cuts configuration file was provided" << endl;
2893 out << " if (!gSystem->AccessPathName(\"ConfigureCuts.C\")) {" << endl;
2894 out << " gROOT->LoadMacro(\"ConfigureCuts.C\");" << endl;
2895 out << " ConfigureCuts(runCuts, lhcCuts, detCuts, evCuts);" << endl;
2896 out << " }" << endl;
2897 if (fFriendChainName=="") {
2898 out << " TChain *chain = tagAna->QueryTags(runCuts, lhcCuts, detCuts, evCuts);" << endl;
2900 out << " TString tmpColl=\"tmpCollection.xml\";" << endl;
2901 out << " tagAna->CreateXMLCollection(tmpColl.Data(),runCuts, lhcCuts, detCuts, evCuts);" << endl;
2902 out << " TChain *chain = CreateChain(tmpColl.Data(),type);" << endl;
2904 out << " if (!chain || !chain->GetNtrees()) return NULL;" << endl;
2905 out << " chain->ls();" << endl;
2906 out << " return chain;" << endl;
2907 out << "}" << endl << endl;
2908 if (gSystem->AccessPathName("ConfigureCuts.C")) {
2909 TString msg = "\n##### You may want to provide a macro ConfigureCuts.C with a method:\n";
2910 msg += " void ConfigureCuts(AliRunTagCuts *runCuts,\n";
2911 msg += " AliLHCTagCuts *lhcCuts,\n";
2912 msg += " AliDetectorTagCuts *detCuts,\n";
2913 msg += " AliEventTagCuts *evCuts)";
2914 Info("WriteAnalysisMacro", "%s", msg.Data());
2917 if (!IsUsingTags() || fFriendChainName!="") {
2918 out <<"//________________________________________________________________________________" << endl;
2919 out << "TChain* CreateChain(const char *xmlfile, const char *type=\"ESD\")" << endl;
2921 out << "// Create a chain using url's from xml file" << endl;
2922 out << " TString treename = type;" << endl;
2923 out << " treename.ToLower();" << endl;
2924 out << " treename += \"Tree\";" << endl;
2925 out << " printf(\"***************************************\\n\");" << endl;
2926 out << " printf(\" Getting chain of trees %s\\n\", treename.Data());" << endl;
2927 out << " printf(\"***************************************\\n\");" << endl;
2928 out << " TAlienCollection *coll = TAlienCollection::Open(xmlfile);" << endl;
2929 out << " if (!coll) {" << endl;
2930 out << " ::Error(\"CreateChain\", \"Cannot create an AliEn collection from %s\", xmlfile);" << endl;
2931 out << " return NULL;" << endl;
2932 out << " }" << endl;
2933 out << " TChain *chain = new TChain(treename);" << endl;
2934 if(fFriendChainName!="") {
2935 out << " TChain *chainFriend = new TChain(treename);" << endl;
2937 out << " coll->Reset();" << endl;
2938 out << " while (coll->Next()) {" << endl;
2939 out << " chain->Add(coll->GetTURL(\"\"));" << endl;
2940 if(fFriendChainName!="") {
2941 out << " TString fileFriend=coll->GetTURL(\"\");" << endl;
2942 out << " fileFriend.ReplaceAll(\"AliAOD.root\",\""<<fFriendChainName.Data()<<"\");" << endl;
2943 out << " fileFriend.ReplaceAll(\"AliAODs.root\",\""<<fFriendChainName.Data()<<"\");" << endl;
2944 out << " chainFriend->Add(fileFriend.Data());" << endl;
2946 out << " }" << endl;
2947 out << " if (!chain->GetNtrees()) {" << endl;
2948 out << " ::Error(\"CreateChain\", \"No tree found from collection %s\", xmlfile);" << endl;
2949 out << " return NULL;" << endl;
2950 out << " }" << endl;
2951 if(fFriendChainName!="") {
2952 out << " chain->AddFriend(chainFriend);" << endl;
2954 out << " return chain;" << endl;
2955 out << "}" << endl << endl;
2957 if (hasANALYSISalice) {
2958 out <<"//________________________________________________________________________________" << endl;
2959 out << "Bool_t SetupPar(const char *package) {" << endl;
2960 out << "// Compile the package and set it up." << endl;
2961 out << " TString pkgdir = package;" << endl;
2962 out << " pkgdir.ReplaceAll(\".par\",\"\");" << endl;
2963 out << " gSystem->Exec(Form(\"tar xvzf %s.par\", pkgdir.Data()));" << endl;
2964 out << " TString cdir = gSystem->WorkingDirectory();" << endl;
2965 out << " gSystem->ChangeDirectory(pkgdir);" << endl;
2966 out << " // Check for BUILD.sh and execute" << endl;
2967 out << " if (!gSystem->AccessPathName(\"PROOF-INF/BUILD.sh\")) {" << endl;
2968 out << " printf(\"*******************************\\n\");" << endl;
2969 out << " printf(\"*** Building PAR archive ***\\n\");" << endl;
2970 out << " printf(\"*******************************\\n\");" << endl;
2971 out << " if (gSystem->Exec(\"PROOF-INF/BUILD.sh\")) {" << endl;
2972 out << " ::Error(\"SetupPar\", \"Cannot build par archive %s\", pkgdir.Data());" << endl;
2973 out << " gSystem->ChangeDirectory(cdir);" << endl;
2974 out << " return kFALSE;" << endl;
2975 out << " }" << endl;
2976 out << " } else {" << endl;
2977 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/BUILD.sh for package %s\", pkgdir.Data());" << endl;
2978 out << " gSystem->ChangeDirectory(cdir);" << endl;
2979 out << " return kFALSE;" << endl;
2980 out << " }" << endl;
2981 out << " // Check for SETUP.C and execute" << endl;
2982 out << " if (!gSystem->AccessPathName(\"PROOF-INF/SETUP.C\")) {" << endl;
2983 out << " printf(\"*******************************\\n\");" << endl;
2984 out << " printf(\"*** Setup PAR archive ***\\n\");" << endl;
2985 out << " printf(\"*******************************\\n\");" << endl;
2986 out << " gROOT->Macro(\"PROOF-INF/SETUP.C\");" << endl;
2987 out << " } else {" << endl;
2988 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/SETUP.C for package %s\", pkgdir.Data());" << endl;
2989 out << " gSystem->ChangeDirectory(cdir);" << endl;
2990 out << " return kFALSE;" << endl;
2991 out << " }" << endl;
2992 out << " // Restore original workdir" << endl;
2993 out << " gSystem->ChangeDirectory(cdir);" << endl;
2994 out << " return kTRUE;" << endl;
2997 Info("WriteAnalysisMacro", "\n##### Analysis macro to run on worker nodes <%s> written",fAnalysisMacro.Data());
2999 Bool_t copy = kTRUE;
3000 if (TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3003 TString workdir = gGrid->GetHomeDirectory();
3004 workdir += fGridWorkingDir;
3005 if (FileExists(fAnalysisMacro)) gGrid->Rm(fAnalysisMacro);
3006 if (IsUsingTags() && !gSystem->AccessPathName("ConfigureCuts.C")) {
3007 if (FileExists("ConfigureCuts.C")) gGrid->Rm("ConfigureCuts.C");
3008 Info("WriteAnalysisMacro", "\n##### Copying cuts configuration macro: <ConfigureCuts.C> to your alien workspace");
3009 TFile::Cp("file:ConfigureCuts.C", Form("alien://%s/ConfigureCuts.C", workdir.Data()));
3011 Info("WriteAnalysisMacro", "\n##### Copying analysis macro: <%s> to your alien workspace", fAnalysisMacro.Data());
3012 TFile::Cp(Form("file:%s",fAnalysisMacro.Data()), Form("alien://%s/%s", workdir.Data(), fAnalysisMacro.Data()));
3016 //______________________________________________________________________________
3017 void AliAnalysisAlien::WriteMergingMacro()
3019 // Write a macro to merge the outputs per master job.
3020 if (!fMergeViaJDL) return;
3021 if (!fOutputFiles.Length()) {
3022 Error("WriteMergingMacro", "No output file names defined. Are you running the right AliAnalysisAlien configuration ?");
3025 TString mergingMacro = fExecutable;
3026 mergingMacro.ReplaceAll(".sh","_merge.C");
3027 if (!fGridOutputDir.Contains("/")) fGridOutputDir = Form("/%s/%s/%s", gGrid->GetHomeDirectory(), fGridWorkingDir.Data(), fGridOutputDir.Data());
3028 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3030 out.open(mergingMacro.Data(), ios::out);
3032 Error("WriteMergingMacro", "could not open file %s for writing", fAnalysisMacro.Data());
3035 Bool_t hasSTEERBase = kFALSE;
3036 Bool_t hasESD = kFALSE;
3037 Bool_t hasAOD = kFALSE;
3038 Bool_t hasANALYSIS = kFALSE;
3039 Bool_t hasANALYSISalice = kFALSE;
3040 Bool_t hasCORRFW = kFALSE;
3041 TString func = mergingMacro;
3043 func.ReplaceAll(".C", "");
3044 out << "void " << func.Data() << "(const char *dir, Int_t stage=0, Int_t ichunk=0)" << endl;
3046 out << "// Automatically generated merging macro executed in grid subjobs" << endl << endl;
3047 out << " TStopwatch timer;" << endl;
3048 out << " timer.Start();" << endl << endl;
3049 if (!fExecutableCommand.Contains("aliroot")) {
3050 out << "// load base root libraries" << endl;
3051 out << " gSystem->Load(\"libTree\");" << endl;
3052 out << " gSystem->Load(\"libGeom\");" << endl;
3053 out << " gSystem->Load(\"libVMC\");" << endl;
3054 out << " gSystem->Load(\"libPhysics\");" << endl << endl;
3055 out << " gSystem->Load(\"libMinuit\");" << endl << endl;
3057 if (fAdditionalRootLibs.Length()) {
3058 // in principle libtree /lib geom libvmc etc. can go into this list, too
3059 out << "// Add aditional libraries" << endl;
3060 TObjArray *list = fAdditionalRootLibs.Tokenize(" ");
3063 while((str=(TObjString*)next())) {
3064 if (str->GetString().Contains(".so"))
3065 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
3067 if (list) delete list;
3069 out << "// include path" << endl;
3070 if (fIncludePath.Length()) out << " gSystem->AddIncludePath(\"" << fIncludePath.Data() << "\");" << endl;
3071 out << " gSystem->AddIncludePath(\"-I$ALICE_ROOT/include\");" << endl << endl;
3072 out << "// Load analysis framework libraries" << endl;
3074 if (!fExecutableCommand.Contains("aliroot")) {
3075 out << " gSystem->Load(\"libSTEERBase\");" << endl;
3076 out << " gSystem->Load(\"libESD\");" << endl;
3077 out << " gSystem->Load(\"libAOD\");" << endl;
3079 out << " gSystem->Load(\"libANALYSIS\");" << endl;
3080 out << " gSystem->Load(\"libANALYSISalice\");" << endl;
3081 out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
3083 TIter next(fPackages);
3086 TString setupPar = "AliAnalysisAlien::SetupPar";
3087 while ((obj=next())) {
3088 pkgname = obj->GetName();
3089 if (pkgname == "STEERBase" ||
3090 pkgname == "STEERBase.par") hasSTEERBase = kTRUE;
3091 if (pkgname == "ESD" ||
3092 pkgname == "ESD.par") hasESD = kTRUE;
3093 if (pkgname == "AOD" ||
3094 pkgname == "AOD.par") hasAOD = kTRUE;
3095 if (pkgname == "ANALYSIS" ||
3096 pkgname == "ANALYSIS.par") hasANALYSIS = kTRUE;
3097 if (pkgname == "ANALYSISalice" ||
3098 pkgname == "ANALYSISalice.par") hasANALYSISalice = kTRUE;
3099 if (pkgname == "CORRFW" ||
3100 pkgname == "CORRFW.par") hasCORRFW = kTRUE;
3102 if (hasANALYSISalice) setupPar = "SetupPar";
3103 if (!hasSTEERBase) out << " gSystem->Load(\"libSTEERBase\");" << endl;
3104 else out << " if (!" << setupPar << "(\"STEERBase\")) return;" << endl;
3105 if (!hasESD) out << " gSystem->Load(\"libESD\");" << endl;
3106 else out << " if (!" << setupPar << "(\"ESD\")) return;" << endl;
3107 if (!hasAOD) out << " gSystem->Load(\"libAOD\");" << endl;
3108 else out << " if (!" << setupPar << "(\"AOD\")) return;" << endl;
3109 if (!hasANALYSIS) out << " gSystem->Load(\"libANALYSIS\");" << endl;
3110 else out << " if (!" << setupPar << "(\"ANALYSIS\")) return;" << endl;
3111 if (!hasANALYSISalice) out << " gSystem->Load(\"libANALYSISalice\");" << endl;
3112 else out << " if (!" << setupPar << "(\"ANALYSISalice\")) return;" << endl;
3113 if (!hasCORRFW) out << " gSystem->Load(\"libCORRFW\");" << endl << endl;
3114 else out << " if (!" << setupPar << "(\"CORRFW\")) return;" << endl << endl;
3115 out << "// Compile other par packages" << endl;
3117 while ((obj=next())) {
3118 pkgname = obj->GetName();
3119 if (pkgname == "STEERBase" ||
3120 pkgname == "STEERBase.par" ||
3122 pkgname == "ESD.par" ||
3124 pkgname == "AOD.par" ||
3125 pkgname == "ANALYSIS" ||
3126 pkgname == "ANALYSIS.par" ||
3127 pkgname == "ANALYSISalice" ||
3128 pkgname == "ANALYSISalice.par" ||
3129 pkgname == "CORRFW" ||
3130 pkgname == "CORRFW.par") continue;
3131 out << " if (!" << setupPar << "(\"" << obj->GetName() << "\")) return;" << endl;
3134 if (fAdditionalLibs.Length()) {
3135 out << "// Add aditional AliRoot libraries" << endl;
3136 TObjArray *list = fAdditionalLibs.Tokenize(" ");
3139 while((str=(TObjString*)next())) {
3140 if (str->GetString().Contains(".so"))
3141 out << " gSystem->Load(\"" << str->GetString().Data() << "\");" << endl;
3143 if (list) delete list;
3146 out << "// Analysis source to be compiled at runtime (if any)" << endl;
3147 if (fAnalysisSource.Length()) {
3148 TObjArray *list = fAnalysisSource.Tokenize(" ");
3151 while((str=(TObjString*)next())) {
3152 out << " gROOT->ProcessLine(\".L " << str->GetString().Data() << "+g\");" << endl;
3154 if (list) delete list;
3158 if (fFastReadOption) {
3159 Warning("WriteMergingMacro", "!!! You requested FastRead option. Using xrootd flags to reduce timeouts in the grid merging jobs. Note that this may skip some files that could be accessed !!!");
3160 out << "// fast xrootd reading enabled" << endl;
3161 out << " printf(\"!!! You requested FastRead option. Using xrootd flags to reduce timeouts. Note that this may skip some files that could be accessed !!!\");" << endl;
3162 out << " gEnv->SetValue(\"XNet.ConnectTimeout\",10);" << endl;
3163 out << " gEnv->SetValue(\"XNet.RequestTimeout\",10);" << endl;
3164 out << " gEnv->SetValue(\"XNet.MaxRedirectCount\",2);" << endl;
3165 out << " gEnv->SetValue(\"XNet.ReconnectTimeout\",10);" << endl;
3166 out << " gEnv->SetValue(\"XNet.FirstConnectMaxCnt\",1);" << endl << endl;
3168 // Change temp directory to current one
3169 out << "// Set temporary merging directory to current one" << endl;
3170 out << " gSystem->Setenv(\"TMPDIR\", gSystem->pwd());" << endl << endl;
3171 out << "// Connect to AliEn" << endl;
3172 out << " if (!TGrid::Connect(\"alien://\")) return;" << endl;
3173 out << " Bool_t laststage = kFALSE;" << endl;
3174 out << " TString outputDir = dir;" << endl;
3175 out << " TString outputFiles = \"" << fOutputFiles << "\";" << endl;
3176 out << " TString mergeExcludes = \"" << fMergeExcludes << "\";" << endl;
3177 out << " mergeExcludes += \"" << AliAnalysisManager::GetAnalysisManager()->GetExtraFiles() << "\";" << endl;
3178 out << " TObjArray *list = outputFiles.Tokenize(\",\");" << endl;
3179 out << " TIter *iter = new TIter(list);" << endl;
3180 out << " TObjString *str;" << endl;
3181 out << " TString outputFile;" << endl;
3182 out << " Bool_t merged = kTRUE;" << endl;
3183 out << " while((str=(TObjString*)iter->Next())) {" << endl;
3184 out << " outputFile = str->GetString();" << endl;
3185 out << " if (outputFile.Contains(\"*\")) continue;" << endl;
3186 out << " Int_t index = outputFile.Index(\"@\");" << endl;
3187 out << " if (index > 0) outputFile.Remove(index);" << endl;
3188 out << " // Skip already merged outputs" << endl;
3189 out << " if (!gSystem->AccessPathName(outputFile)) {" << endl;
3190 out << " printf(\"Output file <%s> found. Not merging again.\",outputFile.Data());" << endl;
3191 out << " continue;" << endl;
3192 out << " }" << endl;
3193 out << " if (mergeExcludes.Contains(outputFile.Data())) continue;" << endl;
3194 out << " merged = AliAnalysisAlien::MergeOutput(outputFile, outputDir, " << fMaxMergeFiles << ", stage, ichunk);" << endl;
3195 out << " if (!merged) {" << endl;
3196 out << " printf(\"ERROR: Cannot merge %s\\n\", outputFile.Data());" << endl;
3197 out << " return;" << endl;
3198 out << " }" << endl;
3199 out << " // Check if this was the last stage. If yes, run terminate for the tasks." << endl;
3200 out << " if (!gSystem->AccessPathName(outputFile)) laststage = kTRUE;" << endl;
3201 out << " }" << endl;
3202 out << " // all outputs merged, validate" << endl;
3203 out << " ofstream out;" << endl;
3204 out << " out.open(\"outputs_valid\", ios::out);" << endl;
3205 out << " out.close();" << endl;
3206 out << " // read the analysis manager from file" << endl;
3207 TString analysisFile = fExecutable;
3208 analysisFile.ReplaceAll(".sh", ".root");
3209 out << " if (!laststage) return;" << endl;
3210 out << " TFile *file = TFile::Open(\"" << analysisFile << "\");" << endl;
3211 out << " if (!file) return;" << endl;
3212 out << " TIter nextkey(file->GetListOfKeys());" << endl;
3213 out << " AliAnalysisManager *mgr = 0;" << endl;
3214 out << " TKey *key;" << endl;
3215 out << " while ((key=(TKey*)nextkey())) {" << endl;
3216 out << " if (!strcmp(key->GetClassName(), \"AliAnalysisManager\"))" << endl;
3217 out << " mgr = (AliAnalysisManager*)file->Get(key->GetName());" << endl;
3218 out << " };" << endl;
3219 out << " if (!mgr) {" << endl;
3220 out << " ::Error(\"" << func.Data() << "\", \"No analysis manager found in file" << analysisFile <<"\");" << endl;
3221 out << " return;" << endl;
3222 out << " }" << endl << endl;
3223 out << " mgr->SetSkipTerminate(kFALSE);" << endl;
3224 out << " mgr->PrintStatus();" << endl;
3225 if (AliAnalysisManager::GetAnalysisManager()) {
3226 if (AliAnalysisManager::GetAnalysisManager()->GetDebugLevel()>3) {
3227 out << " gEnv->SetValue(\"XNet.Debug\", \"1\");" << endl;
3229 out << " AliLog::SetGlobalLogLevel(AliLog::kError);" << endl;
3232 out << " mgr->StartAnalysis(\"gridterminate\");" << endl;
3233 out << "}" << endl << endl;
3234 if (hasANALYSISalice) {
3235 out <<"//________________________________________________________________________________" << endl;
3236 out << "Bool_t SetupPar(const char *package) {" << endl;
3237 out << "// Compile the package and set it up." << endl;
3238 out << " TString pkgdir = package;" << endl;
3239 out << " pkgdir.ReplaceAll(\".par\",\"\");" << endl;
3240 out << " gSystem->Exec(Form(\"tar xvzf %s.par\", pkgdir.Data()));" << endl;
3241 out << " TString cdir = gSystem->WorkingDirectory();" << endl;
3242 out << " gSystem->ChangeDirectory(pkgdir);" << endl;
3243 out << " // Check for BUILD.sh and execute" << endl;
3244 out << " if (!gSystem->AccessPathName(\"PROOF-INF/BUILD.sh\")) {" << endl;
3245 out << " printf(\"*******************************\\n\");" << endl;
3246 out << " printf(\"*** Building PAR archive ***\\n\");" << endl;
3247 out << " printf(\"*******************************\\n\");" << endl;
3248 out << " if (gSystem->Exec(\"PROOF-INF/BUILD.sh\")) {" << endl;
3249 out << " ::Error(\"SetupPar\", \"Cannot build par archive %s\", pkgdir.Data());" << endl;
3250 out << " gSystem->ChangeDirectory(cdir);" << endl;
3251 out << " return kFALSE;" << endl;
3252 out << " }" << endl;
3253 out << " } else {" << endl;
3254 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/BUILD.sh for package %s\", pkgdir.Data());" << endl;
3255 out << " gSystem->ChangeDirectory(cdir);" << endl;
3256 out << " return kFALSE;" << endl;
3257 out << " }" << endl;
3258 out << " // Check for SETUP.C and execute" << endl;
3259 out << " if (!gSystem->AccessPathName(\"PROOF-INF/SETUP.C\")) {" << endl;
3260 out << " printf(\"*******************************\\n\");" << endl;
3261 out << " printf(\"*** Setup PAR archive ***\\n\");" << endl;
3262 out << " printf(\"*******************************\\n\");" << endl;
3263 out << " gROOT->Macro(\"PROOF-INF/SETUP.C\");" << endl;
3264 out << " } else {" << endl;
3265 out << " ::Error(\"SetupPar\",\"Cannot access PROOF-INF/SETUP.C for package %s\", pkgdir.Data());" << endl;
3266 out << " gSystem->ChangeDirectory(cdir);" << endl;
3267 out << " return kFALSE;" << endl;
3268 out << " }" << endl;
3269 out << " // Restore original workdir" << endl;
3270 out << " gSystem->ChangeDirectory(cdir);" << endl;
3271 out << " return kTRUE;" << endl;
3275 Bool_t copy = kTRUE;
3276 if (TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3279 TString workdir = gGrid->GetHomeDirectory();
3280 workdir += fGridWorkingDir;
3281 if (FileExists(mergingMacro)) gGrid->Rm(mergingMacro);
3282 Info("WriteMergingMacro", "\n##### Copying merging macro: <%s> to your alien workspace", mergingMacro.Data());
3283 TFile::Cp(Form("file:%s",mergingMacro.Data()), Form("alien://%s/%s", workdir.Data(), mergingMacro.Data()));
3287 //______________________________________________________________________________
3288 Bool_t AliAnalysisAlien::SetupPar(const char *package)
3290 // Compile the par file archive pointed by <package>. This must be present in the current directory.
3291 // Note that for loading the compiled library. The current directory should have precedence in
3293 TString pkgdir = package;
3294 pkgdir.ReplaceAll(".par","");
3295 gSystem->Exec(Form("tar xvzf %s.par", pkgdir.Data()));
3296 TString cdir = gSystem->WorkingDirectory();
3297 gSystem->ChangeDirectory(pkgdir);
3298 // Check for BUILD.sh and execute
3299 if (!gSystem->AccessPathName("PROOF-INF/BUILD.sh")) {
3300 printf("**************************************************\n");
3301 printf("*** Building PAR archive %s\n", package);
3302 printf("**************************************************\n");
3303 if (gSystem->Exec("PROOF-INF/BUILD.sh")) {
3304 ::Error("SetupPar", "Cannot build par archive %s", pkgdir.Data());
3305 gSystem->ChangeDirectory(cdir);
3309 ::Error("SetupPar","Cannot access PROOF-INF/BUILD.sh for package %s", pkgdir.Data());
3310 gSystem->ChangeDirectory(cdir);
3313 // Check for SETUP.C and execute
3314 if (!gSystem->AccessPathName("PROOF-INF/SETUP.C")) {
3315 printf("**************************************************\n");
3316 printf("*** Setup PAR archive %s\n", package);
3317 printf("**************************************************\n");
3318 gROOT->Macro("PROOF-INF/SETUP.C");
3319 printf("*** Loaded library: %s\n", gSystem->GetLibraries(pkgdir,"",kFALSE));
3321 ::Error("SetupPar","Cannot access PROOF-INF/SETUP.C for package %s", pkgdir.Data());
3322 gSystem->ChangeDirectory(cdir);
3325 // Restore original workdir
3326 gSystem->ChangeDirectory(cdir);
3330 //______________________________________________________________________________
3331 void AliAnalysisAlien::WriteExecutable()
3333 // Generate the alien executable script.
3334 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3336 out.open(fExecutable.Data(), ios::out);
3338 Error("WriteExecutable", "Bad file name for executable: %s", fExecutable.Data());
3341 out << "#!/bin/bash" << endl;
3342 out << "echo \"=========================================\"" << endl;
3343 out << "echo \"############## PATH : ##############\"" << endl;
3344 out << "echo $PATH" << endl;
3345 out << "echo \"############## LD_LIBRARY_PATH : ##############\"" << endl;
3346 out << "echo $LD_LIBRARY_PATH" << endl;
3347 out << "echo \"############## ROOTSYS : ##############\"" << endl;
3348 out << "echo $ROOTSYS" << endl;
3349 out << "echo \"############## which root : ##############\"" << endl;
3350 out << "which root" << endl;
3351 out << "echo \"############## ALICE_ROOT : ##############\"" << endl;
3352 out << "echo $ALICE_ROOT" << endl;
3353 out << "echo \"############## which aliroot : ##############\"" << endl;
3354 out << "which aliroot" << endl;
3355 out << "echo \"############## system limits : ##############\"" << endl;
3356 out << "ulimit -a" << endl;
3357 out << "echo \"############## memory : ##############\"" << endl;
3358 out << "free -m" << endl;
3359 out << "echo \"=========================================\"" << endl << endl;
3360 // Make sure we can properly compile par files
3361 if (TObject::TestBit(AliAnalysisGrid::kUsePars)) out << "export LD_LIBRARY_PATH=.:$LD_LIBRARY_PATH" << endl;
3362 out << fExecutableCommand << " ";
3363 out << fAnalysisMacro.Data() << " " << fExecutableArgs.Data() << endl << endl;
3364 out << "echo \"======== " << fAnalysisMacro.Data() << " finished with exit code: $? ========\"" << endl;
3365 out << "echo \"############## memory after: ##############\"" << endl;
3366 out << "free -m" << endl;
3368 Bool_t copy = kTRUE;
3369 if (TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3372 TString workdir = gGrid->GetHomeDirectory();
3373 TString bindir = Form("%s/bin", workdir.Data());
3374 if (!DirectoryExists(bindir)) gGrid->Mkdir(bindir,"-p");
3375 workdir += fGridWorkingDir;
3376 TString executable = Form("%s/bin/%s", gGrid->GetHomeDirectory(), fExecutable.Data());
3377 if (FileExists(executable)) gGrid->Rm(executable);
3378 Info("CreateJDL", "\n##### Copying executable file <%s> to your AliEn bin directory", fExecutable.Data());
3379 TFile::Cp(Form("file:%s",fExecutable.Data()), Form("alien://%s", executable.Data()));
3383 //______________________________________________________________________________
3384 void AliAnalysisAlien::WriteMergeExecutable()
3386 // Generate the alien executable script for the merging job.
3387 if (!fMergeViaJDL) return;
3388 TString mergeExec = fExecutable;
3389 mergeExec.ReplaceAll(".sh", "_merge.sh");
3390 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3392 out.open(mergeExec.Data(), ios::out);
3394 Error("WriteMergingExecutable", "Bad file name for executable: %s", mergeExec.Data());
3397 out << "#!/bin/bash" << endl;
3398 out << "echo \"=========================================\"" << endl;
3399 out << "echo \"############## PATH : ##############\"" << endl;
3400 out << "echo $PATH" << endl;
3401 out << "echo \"############## LD_LIBRARY_PATH : ##############\"" << endl;
3402 out << "echo $LD_LIBRARY_PATH" << endl;
3403 out << "echo \"############## ROOTSYS : ##############\"" << endl;
3404 out << "echo $ROOTSYS" << endl;
3405 out << "echo \"############## which root : ##############\"" << endl;
3406 out << "which root" << endl;
3407 out << "echo \"############## ALICE_ROOT : ##############\"" << endl;
3408 out << "echo $ALICE_ROOT" << endl;
3409 out << "echo \"############## which aliroot : ##############\"" << endl;
3410 out << "which aliroot" << endl;
3411 out << "echo \"############## system limits : ##############\"" << endl;
3412 out << "ulimit -a" << endl;
3413 out << "echo \"############## memory : ##############\"" << endl;
3414 out << "free -m" << endl;
3415 out << "echo \"=========================================\"" << endl << endl;
3416 // Make sure we can properly compile par files
3417 if (TObject::TestBit(AliAnalysisGrid::kUsePars)) out << "export LD_LIBRARY_PATH=.:$LD_LIBRARY_PATH" << endl;
3418 TString mergeMacro = fExecutable;
3419 mergeMacro.ReplaceAll(".sh", "_merge.C");
3420 out << "export ARG=\"" << mergeMacro << "(\\\"$1\\\",$2,$3)\"" << endl;
3421 out << fExecutableCommand << " " << "$ARG" << endl;
3422 out << "echo \"======== " << mergeMacro.Data() << " finished with exit code: $? ========\"" << endl;
3423 out << "echo \"############## memory after: ##############\"" << endl;
3424 out << "free -m" << endl;
3426 Bool_t copy = kTRUE;
3427 if (TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3430 TString workdir = gGrid->GetHomeDirectory();
3431 TString bindir = Form("%s/bin", workdir.Data());
3432 if (!DirectoryExists(bindir)) gGrid->Mkdir(bindir,"-p");
3433 workdir += fGridWorkingDir;
3434 TString executable = Form("%s/bin/%s", gGrid->GetHomeDirectory(), mergeExec.Data());
3435 if (FileExists(executable)) gGrid->Rm(executable);
3436 Info("CreateJDL", "\n##### Copying executable file <%s> to your AliEn bin directory", mergeExec.Data());
3437 TFile::Cp(Form("file:%s",mergeExec.Data()), Form("alien://%s", executable.Data()));
3441 //______________________________________________________________________________
3442 void AliAnalysisAlien::WriteProductionFile(const char *filename) const
3444 // Write the production file to be submitted by LPM manager. The format is:
3445 // First line: full_path_to_jdl estimated_no_subjobs_per_master
3446 // Next lines: full_path_to_dataset XXX (XXX is a string)
3447 // To submit, one has to: submit jdl XXX for all lines
3449 out.open(filename, ios::out);
3451 Error("WriteProductionFile", "Bad file name: %s", filename);
3454 TString workdir = gGrid->GetHomeDirectory();
3455 workdir += fGridWorkingDir;
3456 Int_t njobspermaster = 1000*fNrunsPerMaster/fSplitMaxInputFileNumber;
3457 TString locjdl = Form("%s/%s", workdir.Data(),fJDLName.Data());
3458 out << locjdl << " " << njobspermaster << endl;
3459 Int_t nmasterjobs = fInputFiles->GetEntries();
3460 for (Int_t i=0; i<nmasterjobs; i++) {
3461 TString runOutDir = gSystem->BaseName(fInputFiles->At(i)->GetName());
3462 runOutDir.ReplaceAll(".xml", "");
3464 out << Form("%s", fInputFiles->At(i)->GetName()) << " " << runOutDir << endl;
3466 out << Form("%s", fInputFiles->At(i)->GetName()) << " " << Form("%03d", i) << endl;
3468 Info("WriteProductionFile", "\n##### Copying production file <%s> to your work directory", filename);
3469 if (FileExists(filename)) gGrid->Rm(filename);
3470 TFile::Cp(Form("file:%s",filename), Form("alien://%s/%s", workdir.Data(),filename));
3473 //______________________________________________________________________________
3474 void AliAnalysisAlien::WriteValidationScript(Bool_t merge)
3476 // Generate the alien validation script.
3477 // Generate the validation script
3479 TString validationScript = fExecutable;
3480 if (merge) validationScript.ReplaceAll(".sh", "_mergevalidation.sh");
3481 else validationScript.ReplaceAll(".sh", "_validation.sh");
3483 Error("WriteValidationScript", "Alien connection required");
3486 TString outStream = "";
3487 if (!TestBit(AliAnalysisGrid::kTest)) outStream = " >> stdout";
3488 if (!TestBit(AliAnalysisGrid::kSubmit)) {
3490 out.open(validationScript, ios::out);
3491 out << "#!/bin/bash" << endl;
3492 out << "##################################################" << endl;
3493 out << "validateout=`dirname $0`" << endl;
3494 out << "validatetime=`date`" << endl;
3495 out << "validated=\"0\";" << endl;
3496 out << "error=0" << endl;
3497 out << "if [ -z $validateout ]" << endl;
3498 out << "then" << endl;
3499 out << " validateout=\".\"" << endl;
3500 out << "fi" << endl << endl;
3501 out << "cd $validateout;" << endl;
3502 out << "validateworkdir=`pwd`;" << endl << endl;
3503 out << "echo \"*******************************************************\"" << outStream << endl;
3504 out << "echo \"* Automatically generated validation script *\"" << outStream << endl;
3506 out << "echo \"* Time: $validatetime \"" << outStream << endl;
3507 out << "echo \"* Dir: $validateout\"" << outStream << endl;
3508 out << "echo \"* Workdir: $validateworkdir\"" << outStream << endl;
3509 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl;
3510 out << "ls -la ./" << outStream << endl;
3511 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl << endl;
3512 out << "##################################################" << endl;
3515 out << "if [ ! -f stderr ] ; then" << endl;
3516 out << " error=1" << endl;
3517 out << " echo \"* ########## Job not validated - no stderr ###\" " << outStream << endl;
3518 out << " echo \"Error = $error\" " << outStream << endl;
3519 out << "fi" << endl;
3521 out << "parArch=`grep -Ei \"Cannot Build the PAR Archive\" stderr`" << endl;
3522 out << "segViol=`grep -Ei \"Segmentation violation\" stderr`" << endl;
3523 out << "segFault=`grep -Ei \"Segmentation fault\" stderr`" << endl;
3524 out << "glibcErr=`grep -Ei \"*** glibc detected ***\" stderr`" << endl;
3527 out << "if [ \"$parArch\" != \"\" ] ; then" << endl;
3528 out << " error=1" << endl;
3529 out << " echo \"* ########## Job not validated - PAR archive not built ###\" " << outStream << endl;
3530 out << " echo \"$parArch\" " << outStream << endl;
3531 out << " echo \"Error = $error\" " << outStream << endl;
3532 out << "fi" << endl;
3534 out << "if [ \"$segViol\" != \"\" ] ; then" << endl;
3535 out << " error=1" << endl;
3536 out << " echo \"* ########## Job not validated - Segment. violation ###\" " << outStream << endl;
3537 out << " echo \"$segViol\" " << outStream << endl;
3538 out << " echo \"Error = $error\" " << outStream << endl;
3539 out << "fi" << endl;
3541 out << "if [ \"$segFault\" != \"\" ] ; then" << endl;
3542 out << " error=1" << endl;
3543 out << " echo \"* ########## Job not validated - Segment. fault ###\" " << outStream << endl;
3544 out << " echo \"$segFault\" " << outStream << endl;
3545 out << " echo \"Error = $error\" " << outStream << endl;
3546 out << "fi" << endl;
3548 out << "if [ \"$glibcErr\" != \"\" ] ; then" << endl;
3549 out << " error=1" << endl;
3550 out << " echo \"* ########## Job not validated - *** glibc detected *** ###\" " << outStream << endl;
3551 out << " echo \"$glibcErr\" " << outStream << endl;
3552 out << " echo \"Error = $error\" " << outStream << endl;
3553 out << "fi" << endl;
3555 // Part dedicated to the specific analyses running into the train
3557 TObjArray *arr = fOutputFiles.Tokenize(",");
3560 AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
3561 TString extra = mgr->GetExtraFiles();
3562 while ((os=(TObjString*)next1())) {
3564 outputFile = os->GetString();
3565 Int_t index = outputFile.Index("@");
3566 if (index > 0) outputFile.Remove(index);
3567 if (merge && fMergeExcludes.Contains(outputFile)) continue;
3568 if (extra.Contains(outputFile)) continue;
3569 if (outputFile.Contains("*")) continue;
3570 out << "if ! [ -f " << outputFile.Data() << " ] ; then" << endl;
3571 out << " error=1" << endl;
3572 out << " echo \"Output file " << outputFile << " not found. Job FAILED !\"" << outStream << endl;
3573 out << " echo \"Output file " << outputFile << " not found. Job FAILED !\" >> stderr" << endl;
3574 out << "fi" << endl;
3577 out << "if ! [ -f outputs_valid ] ; then" << endl;
3578 out << " error=1" << endl;
3579 out << " echo \"Output files were not validated by the analysis manager\" >> stdout" << endl;
3580 out << " echo \"Output files were not validated by the analysis manager\" >> stderr" << endl;
3581 out << "fi" << endl;
3583 out << "if [ $error = 0 ] ; then" << endl;
3584 out << " echo \"* ---------------- Job Validated ------------------*\"" << outStream << endl;
3585 if (!IsKeepLogs()) {
3586 out << " echo \"* === Logs std* will be deleted === \"" << endl;
3588 out << " rm -f std*" << endl;
3590 out << "fi" << endl;
3592 out << "echo \"* ----------------------------------------------------*\"" << outStream << endl;
3593 out << "echo \"*******************************************************\"" << outStream << endl;
3594 out << "cd -" << endl;
3595 out << "exit $error" << endl;
3597 Bool_t copy = kTRUE;
3598 if (TestBit(AliAnalysisGrid::kOffline) || TestBit(AliAnalysisGrid::kTest)) copy = kFALSE;
3601 TString workdir = gGrid->GetHomeDirectory();
3602 workdir += fGridWorkingDir;
3603 Info("CreateJDL", "\n##### Copying validation script <%s> to your AliEn working space", validationScript.Data());
3604 if (FileExists(validationScript)) gGrid->Rm(validationScript);
3605 TFile::Cp(Form("file:%s",validationScript.Data()), Form("alien://%s/%s", workdir.Data(),validationScript.Data()));