]> git.uio.no Git - u/mrichter/AliRoot.git/blobdiff - ANALYSIS/AliAnalysisAlien.cxx
- adjusted error parameterization
[u/mrichter/AliRoot.git] / ANALYSIS / AliAnalysisAlien.cxx
index 03b09df1a9595f3a7c6b1bae07a9266f6caa4076..a74484b2d676bab414851cc3eb2f90537155e055 100644 (file)
@@ -49,6 +49,7 @@ AliAnalysisAlien::AliAnalysisAlien()
                   fMasterResubmitThreshold(0),
                   fNtestFiles(0),
                   fNMasterJobs(0),
+                  fMaxMergeFiles(0),
                   fRunNumbers(),
                   fExecutable(),
                   fArguments(),
@@ -91,6 +92,7 @@ AliAnalysisAlien::AliAnalysisAlien(const char *name)
                   fMasterResubmitThreshold(0),
                   fNtestFiles(0),
                   fNMasterJobs(0),
+                  fMaxMergeFiles(0),
                   fRunNumbers(),
                   fExecutable(),
                   fArguments(),
@@ -133,6 +135,7 @@ AliAnalysisAlien::AliAnalysisAlien(const AliAnalysisAlien& other)
                   fMasterResubmitThreshold(other.fMasterResubmitThreshold),
                   fNtestFiles(other.fNtestFiles),
                   fNMasterJobs(other.fNMasterJobs),
+                  fMaxMergeFiles(other.fMaxMergeFiles),
                   fRunNumbers(other.fRunNumbers),
                   fExecutable(other.fExecutable),
                   fArguments(other.fArguments),
@@ -916,6 +919,73 @@ Bool_t AliAnalysisAlien::IsCollection(const char *lfn) const
    return kFALSE;
 }   
 
+//______________________________________________________________________________
+void AliAnalysisAlien::Print(Option_t *) const
+{
+// Print current plugin settings.
+   printf("### AliEn analysis plugin current settings ###\n");
+   printf("=   Version of API requested: ____________________ %s\n", fAPIVersion.Data());
+   printf("=   Version of ROOT requested: ___________________ %s\n", fROOTVersion.Data());
+   printf("=   Version of AliRoot requested: ________________ %s\n", fAliROOTVersion.Data());
+   if (fUser.Length()) 
+   printf("=   User running the plugin: _____________________ %s\n", fUser.Data());
+   printf("=   Grid workdir relative to user $HOME: _________ %s\n", fGridWorkingDir.Data());
+   printf("=   Grid output directory relative to workdir: ___ %s\n", fGridOutputDir.Data());
+   printf("=   Data base directory path requested: __________ %s\n", fGridDataDir.Data());
+   printf("=   Data search pattern: _________________________ %s\n", fDataPattern.Data());
+   printf("=   Input data format: ___________________________ %s\n", fInputFormat.Data());
+   if (fRunNumbers.Length()) 
+   printf("=   Run numbers to be processed: _________________ %s\n", fRunNumbers.Data());
+   if (fRunRange[0])
+   printf("=   Run range to be processed: ___________________ %d-%d\n", fRunRange[0], fRunRange[1]);
+   if (!fRunRange[0] && !fRunNumbers.Length()) {
+      TIter next(fInputFiles);
+      TObject *obj;
+      TString list;
+      while ((obj=next())) list += obj->GetName();
+      printf("=   Input files to be processed: _________________ %s\n", list.Data());
+   }
+   if (TestBit(AliAnalysisGrid::kTest))
+   printf("=   Number of input files used in test mode: _____ %d\n", fNtestFiles);
+   printf("=   List of output files to be registered: _______ %s\n", fOutputFiles.Data());
+   printf("=   List of outputs going to be archived: ________ %s\n", fOutputArchive.Data());
+   printf("=   List of outputs that should not be merged: ___ %s\n", fMergeExcludes.Data());
+   printf("=====================================================================\n");
+   printf("=   Job price: ___________________________________ %d\n", fPrice);
+   printf("=   Time to live (TTL): __________________________ %d\n", fTTL);
+   printf("=   Max files per subjob: ________________________ %d\n", fSplitMaxInputFileNumber);
+   if (fMaxInitFailed>0) 
+   printf("=   Max number of subjob fails to kill: __________ %d\n", fMaxInitFailed);
+   if (fMasterResubmitThreshold>0) 
+   printf("=   Resubmit master job if failed subjobs >_______ %d\n", fMasterResubmitThreshold);
+   if (fNMasterJobs>0)
+   printf("=   Number of master jobs to be launched: ________ not used yet\n");
+   printf("=   Number of files in one chunk to be merged: ___ %d\n", fMaxMergeFiles);
+   printf("=   Name of the generated execution script: ______ %s\n",fExecutable.Data());
+   if (fArguments.Length()) 
+   printf("=   Arguments for the execution script: __________ %s\n",fArguments.Data());
+   printf("=   Name of the generated analysis macro: ________ %s\n",fAnalysisMacro.Data());
+   printf("=   User analysis files to be deployed: __________ %s\n",fAnalysisSource.Data());
+   printf("=   Additional libs to be loaded or souces to be compiled runtime: <%s>\n",fAdditionalLibs.Data());
+   printf("=   Master jobs split mode: ______________________ %s\n",fSplitMode.Data());
+   if (fDatasetName)
+   printf("=   Custom name for the dataset to be created: ___ %s\n", fDatasetName.Data());
+   printf("=   Name of the generated JDL: ___________________ %s\n", fJDLName.Data());
+   if (fIncludePath.Data())
+   printf("=   Include path for runtime task compilation: ___ %s\n", fIncludePath.Data());
+   if (fCloseSE.Length())
+   printf("=   Force job outputs to storage element: ________ %s\n", fCloseSE.Data());
+   if (fFriendChainName.Length())
+   printf("=   Open friend chain file on worker: ____________ %s\n", fFriendChainName.Data());
+   if (fPackages) {
+      TIter next(fPackages);
+      TObject *obj;
+      TString list;
+      while ((obj=next())) list += obj->GetName();
+      printf("=   Par files to be used: ________________________ %s\n", list.Data());
+   }   
+}
+
 //______________________________________________________________________________
 void AliAnalysisAlien::SetDefaults()
 {
@@ -931,6 +1001,7 @@ void AliAnalysisAlien::SetDefaults()
    fRunRange[0]                = 0;
    fRunRange[1]                = 0;
    fNMasterJobs                = 0;
+   fMaxMergeFiles              = 100;
    fRunNumbers                 = "";
    fExecutable                 = "analysis.sh";
    fArguments                  = "";
@@ -980,6 +1051,10 @@ Bool_t AliAnalysisAlien::MergeOutputs()
    TObjString *str;
    TString command;
    TString output_file;
+   TString output_chunk;
+   TString previous_chunk;
+   Int_t count_chunk = 0;
+   Int_t count_zero = fMaxMergeFiles;
    Bool_t merged = kTRUE;
    while((str=(TObjString*)next())) {
       output_file = str->GetString();
@@ -987,6 +1062,7 @@ Bool_t AliAnalysisAlien::MergeOutputs()
       if (index > 0) output_file.Remove(index);
       if (fMergeExcludes.Length() &&
           fMergeExcludes.Contains(output_file.Data())) continue;
+      // Perform a 'find' command in the output directory, looking for registered outputs    
       command = Form("find %s/ *%s", output.Data(), output_file.Data());
       printf("command: %s\n", command.Data());
       TGridResult *res = gGrid->Command(command);
@@ -994,33 +1070,60 @@ Bool_t AliAnalysisAlien::MergeOutputs()
       TFileMerger *fm = 0;
       TIter nextmap(res);
       TMap *map;
+      previous_chunk = "";
+      count_chunk = 0;
+      count_zero = fMaxMergeFiles;
       while ((map=(TMap*)nextmap())) {
+      // Loop 'find' results and get next LFN
+         if (count_zero == fMaxMergeFiles) {
+            // First file in chunk - create file merger and add previous chunk if any.
+            fm = new TFileMerger(kFALSE);
+            fm->SetFastMethod(kTRUE);
+            if (previous_chunk.Length()) fm->AddFile(previous_chunk.Data());
+            output_chunk = output_file;
+            output_chunk.ReplaceAll(".root", Form("_%04d.root", count_chunk));
+         }
+         // If last file found, put merged results in the output file
+         if (map == res->Last()) output_chunk = output_file;
          TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("turl"));
          if (!objs || !objs->GetString().Length()) {
+            // Nothing found - skip this output
             delete res;
-            continue;
-         }   
-         if (!fm) {
-            fm = new TFileMerger(kFALSE);
-            fm->SetFastMethod(kTRUE);
-            fm->OutputFile(output_file);
+            delete fm;
+            break;
+         } 
+         // Add file to be merged and decrement chunk counter.
+         fm->AddFile(objs->GetString());
+         count_zero--;
+         if (count_zero==0 || map == res->Last()) {            
+            fm->OutputFile(output_chunk);
+            if (!fm->GetMergeList() || !fm->GetMergeList()->GetSize()) {
+            // Nothing found - skip this output
+               Warning("MergeOutputs", "No <%s> files found.", output_file.Data());
+               delete res;
+               delete fm;
+               break;
+            }
+            // Merge the outputs, then go to next chunk      
+            if (!fm->Merge()) {
+               Error("MergeOutputs", "Could not merge all <%s> files", output_file.Data());
+               delete res;
+               delete fm;
+               merged = kFALSE;
+               break;
+            } else {
+               Info("MergeOutputs", "\n#####   Merged %d output files to <%s>", fm->GetMergeList()->GetSize(), output_chunk.Data());
+            }
+            if (map == res->Last()) {
+               delete res;
+               delete fm;
+               break;
+            }      
+            count_chunk++;
+            count_zero = fMaxMergeFiles;
+            previous_chunk = output_chunk;
          }
-         fm->AddFile(objs->GetString());   
       }
-      if (!fm || !fm->GetMergeList() || !fm->GetMergeList()->GetSize()) {
-         Warning("MergeOutputs", "No <%s> files found.", output_file.Data());
-         merged = kFALSE;
-         delete res;
-         continue;
-      }
-      if (!fm->Merge()) {
-         Error("MergeOutputs", "Could not merge all <%s> files", output_file.Data());
-         merged = kFALSE;
-      } else {
-         Info("MergeOutputs", "\n#####   Merged %d output files <%s>", fm->GetMergeList()->GetSize(), output_file.Data());
-      }   
-      delete fm;
-      delete res;
    } 
    if (!merged) {
       Error("MergeOutputs", "Terminate() will  NOT be executed");
@@ -1090,7 +1193,8 @@ void AliAnalysisAlien::StartAnalysis(Long64_t /*nentries*/, Long64_t /*firstEntr
    if (!Connect()) {
       Error("StartAnalysis", "Cannot start grid analysis without grid connection");
       return;
-   }   
+   }
+   Print();   
    if (!CheckInputData()) {
       Error("StartAnalysis", "There was an error in preprocessing your requested input data");
       return;
@@ -1127,7 +1231,8 @@ void AliAnalysisAlien::StartAnalysis(Long64_t /*nentries*/, Long64_t /*firstEntr
    CdWork();
    TGridResult *res;
    TString jobID = "";
-   if (fRunNumbers.Length()) {
+   if (!fRunRange[0]) {
+      // Submit a given xml or a set of runs
       res = gGrid->Command(Form("submit %s", fJDLName.Data()));
       printf("*************************** %s\n",Form("submit %s", fJDLName.Data()));
       if (res) {
@@ -1145,10 +1250,7 @@ void AliAnalysisAlien::StartAnalysis(Long64_t /*nentries*/, Long64_t /*firstEntr
          delete res;
       }   
    } else {
-      if (!fRunRange[0]) {
-         Error("StartAnalysis", "No runs defined. Exiting.");
-         return;
-      }
+      // Submit for a range of runs.
       TString sjdl;
       for (Int_t irun=fRunRange[0]; irun<=fRunRange[1]; irun++) {
          if (!fInputFiles->At(irun-fRunRange[0])) break;
@@ -1177,7 +1279,6 @@ void AliAnalysisAlien::StartAnalysis(Long64_t /*nentries*/, Long64_t /*firstEntr
    Info("StartAnalysis", "\n#### STARTING AN ALIEN SHELL FOR YOU. EXIT WHEN YOUR JOB %s HAS FINISHED. #### \
    \n You may exit at any time and terminate the job later using the option <terminate> \
    \n ##################################################################################", jobID.Data());
-   //gGrid->Shell();
    gSystem->Exec("aliensh");
 }