Implemented merging in stages. Instead of submitting one merging job per master,...
authoragheata <agheata@f7af4fe6-9843-0410-8265-dc069ae4e863>
Thu, 29 Jul 2010 13:24:29 +0000 (13:24 +0000)
committeragheata <agheata@f7af4fe6-9843-0410-8265-dc069ae4e863>
Thu, 29 Jul 2010 13:24:29 +0000 (13:24 +0000)
The number of files merged in a chunk is the one set via: plugin->SetMaxMergeFiles(nperchunk)
This works only when having the plugin in SetMergeViaJDL() mode. Running the analysis with the plugin in in "terminate" mode will submit merging at stage 1. When the submitted merging jobs are in a final state, the user can rerun in "terminate" mode. This will either resubmit the jobs that failed in the previous stage or just submit jobs for the next stage.
When the current stage fulfills nfiles_for_stage_N <= nperchunk, the final merging job will also run the analysis in "terminate" mode.
The intermediate files are calles: output-Stage%02d_%04d and are NOT cleaned up in the current implementation - the user should do it.
Note: Do NOT run "terminate" mode if existing merging jobs are not yet in a final state, as this will resubmit ALL jobs that have not yet registered their output, even if these are successful.

Run scenario:
1. Analysis via plugin using:
  plugin->SetMergeViaJDL();
  plugin->SetMaxMergeFiles(10);
  plugin->SetRunMode("full")
...
  mgr->StartAnalysis("grid")
This will submit a master job which is split say in 1000 jobs, each producing output1.root, ..., outputN.root

2. All jobs finished (maybe after resubmission) even if some failed (due to corrupted inputs or whatever)
We are left with 980 successful jobs.
  plugin->SetRunMode("terminate")
...
  mgr->StartAnalysis("grid")
This will submit 980/10 meging jobs. The K-th job will produce for the output N the file: output-Stage01_000K.root
Wait until jobs are done, resubmit failing ones -> ALL merging jobs must succeed

3. We have now 98 Stage01 merged files. Redoing step 2 will submit 98/10+1=10 jobs stage 2.
These will produce 10 files outputN-Stage02_000K.root

4. Redoing stage 2 when these jobs finished, the plugin will submit finally a single merging job.
After merging, the job will run the analysis "terminate" phase.

ANALYSIS/AliAnalysisAlien.cxx
ANALYSIS/AliAnalysisAlien.h

index f32c7c3..bae04c3 100644 (file)
@@ -22,6 +22,7 @@
 
 #include "Riostream.h"
 #include "TEnv.h"
+#include "TBits.h"
 #include "TError.h"
 #include "TROOT.h"
 #include "TSystem.h"
@@ -940,7 +941,7 @@ Bool_t AliAnalysisAlien::CreateJDL()
       fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(),mergeExec.Data()), "List of input files to be uploaded to workers");
       if (!fArguments.IsNull())
          fGridJDL->SetArguments(fArguments, "Arguments for the executable command");
-      fMergingJDL->SetArguments("$1"); 
+      fMergingJDL->SetArguments("$1 $2 $3"); 
       fGridJDL->SetValue("TTL", Form("\"%d\"",fTTL));
       fGridJDL->SetDescription("TTL", Form("Time after which the job is killed (%d min.)", fTTL/60));
       fMergingJDL->SetValue("TTL", Form("\"%d\"",fTTL));
@@ -1231,7 +1232,8 @@ Bool_t AliAnalysisAlien::WriteJDL(Bool_t copy)
    index = sjdl.Index("JDLVariables");
    if (index >= 0) sjdl.Insert(index, "\n# JDL variables\n");
    sjdl1 += "JDLVariables = \n{\n   \"Packages\",\n   \"OutputDir\"\n};\n";
-   sjdl1.Prepend(Form("Jobtag = {\n   \"comment:Merging_%s\"\n};\n", fJobTag.Data()));
+   sjdl1.Prepend(Form("Jobtag = {\n   \"comment:%s_Merging\"\n};\n", fJobTag.Data()));
+   sjdl1.Prepend("# Generated merging jdl\n# $1 = full alien path to output directory to be merged\n# $2 = merging stage\n# $3 = merged chunk\n");
    index = sjdl1.Index("JDLVariables");
    if (index >= 0) sjdl1.Insert(index, "\n# JDL variables\n");
    // Write jdl to file
@@ -1615,9 +1617,143 @@ void AliAnalysisAlien::SetDefaults()
 }   
 
 //______________________________________________________________________________
-Bool_t AliAnalysisAlien::MergeOutput(const char *output, const char *basedir, Int_t nmaxmerge)
+Bool_t AliAnalysisAlien::CheckMergedFiles(const char *filename, const char *aliendir, Int_t nperchunk, Bool_t submit, const char *jdl)
 {
-// Merge all registered outputs from basedir.
+// Static method that checks the status of merging. This can submit merging jobs that did not produced the expected
+// output. If <submit> is false (checking) returns true only when the final merged file was found. If submit is true returns
+// true if the jobs were successfully submitted.
+   Int_t countOrig = 0;
+   Int_t countStage = 0;
+   Int_t stage = 0;
+   Int_t i;
+   Bool_t doneFinal = kFALSE;
+   TBits chunksDone;
+   TString saliendir(aliendir);
+   TString sfilename, stmp;
+   saliendir.ReplaceAll("//","/");
+   saliendir = saliendir.Strip(TString::kTrailing, '/');
+   if (!gGrid) {
+      ::Error("GetNregisteredFiles", "You need to be connected to AliEn.");
+      return kFALSE;
+   }
+   printf("Checking directory <%s> for merged files <%s*> ...\n", aliendir, filename);
+   TString command = Form("find %s/ *%s", saliendir.Data(), filename);
+   TGridResult *res = gGrid->Command(command);
+   if (!res) {
+      ::Error("GetNregisteredFiles","Error: No result for the find command\n");
+      return kFALSE;
+   }     
+   TIter nextmap(res);
+   TMap *map = 0;   
+   while ((map=(TMap*)nextmap())) {
+      TString turl = map->GetValue("turl")->GetName();
+      if (!turl.Length()) {
+         // Nothing found
+         delete res;
+         return kFALSE;
+      }
+      turl.ReplaceAll("alien://", "");
+      turl.ReplaceAll(saliendir, "");
+      sfilename = gSystem->BaseName(turl);
+      // Now check to what the file corresponds to: 
+      //    original output           - aliendir/%03d/filename
+      //    merged file (which stage) - aliendir/filename-Stage%02d_%04d
+      //    final merged file         - aliendir/filename
+      if (sfilename == turl) {
+         if (sfilename == filename) {
+            doneFinal = kTRUE;
+         } else {   
+            // check stage
+            Int_t index = sfilename.Index("Stage");
+            if (index<0) continue;
+            stmp = sfilename(index+5,2);
+            Int_t istage = atoi(stmp);
+            stmp = sfilename(index+8,4);
+            Int_t ijob = atoi(stmp);
+            if (istage<stage) continue; // Ignore lower stages
+            if (istage>stage) {
+               countStage = 0;
+               chunksDone.ResetAllBits();
+               stage = istage;
+            }
+            countStage++;
+            chunksDone.SetBitNumber(ijob);
+         }     
+      } else {
+         countOrig++;
+      }
+      if (doneFinal) {
+         delete res;
+         return kTRUE;
+      }               
+   }
+   delete res;
+   // Compute number of jobs that were submitted for the current stage
+   Int_t ntotstage = countOrig;
+   for (i=1; i<=stage; i++) ntotstage = (ntotstage/nperchunk)+1;
+   // Now compare with the number of set bits in the chunksDone array
+   Int_t nmissing = (stage>0)?(ntotstage - countStage):0;
+   // Print the info
+   printf("*** Found %d original files\n", countOrig);
+   if (stage==0) printf("*** No merging completed so far.\n");
+   else          printf("*** Found %d out of %d files merged for stage %d\n", countStage, ntotstage, stage);
+   if (nmissing) printf("*** Number of merged files missing for this stage: %d -> check merging job completion\n", nmissing);
+
+   if (!submit) return doneFinal;
+   // Sumbit merging jobs for all missing chunks for the current stage.
+   TString query = Form("submit %s %s", jdl, aliendir);
+   Int_t ichunk = -1;
+   if (nmissing) {
+      for (i=0; i<nmissing; i++) {
+         ichunk = chunksDone.FirstNullBit(ichunk+1);
+         Int_t jobId = SubmitSingleJob(Form("%s %d %d", query.Data(), stage, ichunk));
+         if (!jobId) return kFALSE;
+      }
+      return kTRUE;
+   }
+   // Submit next stage of merging
+   if (stage==0) countStage = countOrig;
+   Int_t nchunks = (countStage/nperchunk)+1;
+   for (i=0; i<nchunks; i++) {
+      Int_t jobId = SubmitSingleJob(Form("%s %d %d", query.Data(), stage+1, i));
+      if (!jobId) return kFALSE;
+   }        
+   return kTRUE;
+}      
+
+//______________________________________________________________________________
+Int_t AliAnalysisAlien::SubmitSingleJob(const char *query)
+{
+// Submits a single job corresponding to the query and returns job id. If 0 submission failed.
+   if (!gGrid) return 0;
+   printf("=> %s ------> ",query);
+   TGridResult *res = gGrid->Command(query);
+   if (!res) return 0;
+   TString jobId = res->GetKey(0,"jobId");
+   delete res;
+   if (jobId.IsNull()) {
+      printf("submission failed. Reason:\n");
+      gGrid->Stdout();
+      gGrid->Stderr();
+      ::Error("SubmitSingleJob", "Your query %s could not be submitted", query);
+      return 0;
+   }
+   printf(" Job id: %s\n", jobId.Data());
+   return atoi(jobId);
+}  
+
+//______________________________________________________________________________
+Bool_t AliAnalysisAlien::MergeOutput(const char *output, const char *basedir, Int_t nmaxmerge, Int_t stage, Int_t ichunk)
+{
+// Merge given output files from basedir. The file merger will merge nmaxmerge
+// files in a group. Merging can be done in stages:
+// stage=0 : will merge all existing files in a single stage
+// stage=1 : does a find command for all files that do NOT contain the string "Stage". 
+//           If their number is bigger that nmaxmerge, only the files from 
+//           ichunk*nmaxmerge to ichunk*(nmaxmerge+1)-1 will get merged as output_stage_<ichunk>
+// stage=n : does a find command for files named <output>Stage<stage-1>_*. If their number is bigger than
+//           nmaxmerge, merge just the chunk ichunk, otherwise write the merged output to the file 
+//           named <output>.
    TString outputFile = output;
    TString command;
    TString outputChunk;
@@ -1627,55 +1763,117 @@ Bool_t AliAnalysisAlien::MergeOutput(const char *output, const char *basedir, In
    Bool_t merged = kTRUE;
    Int_t index = outputFile.Index("@");
    if (index > 0) outputFile.Remove(index);
-   command = Form("find %s/ *%s", basedir, outputFile.Data());
+   TString inputFile = outputFile;
+   if (stage>1) inputFile.ReplaceAll(".root", Form("-Stage%02d_*.root", stage-1));
+   command = Form("find %s/ *%s", basedir, inputFile.Data());
    printf("command: %s\n", command.Data());
    TGridResult *res = gGrid->Command(command);
    if (!res) {
-      printf("Error: No result for the find command\n");
+      ::Error("MergeOutput","No result for the find command\n");
       return kFALSE;
    }     
 
    TFileMerger *fm = 0;
    TIter nextmap(res);
    TMap *map = 0;
-   // Check if there is a merge operation to resume
+   // Check if there is a merge operation to resume. Works only for stage 0 or 1.
    outputChunk = outputFile;
    outputChunk.ReplaceAll(".root", "_*.root");
    // Check for existent temporary merge files
    // Check overwrite mode and remove previous partial results if needed
-   if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
-      while (1) {
-         // Skip as many input files as in a chunk
-         for (Int_t counter=0; counter<nmaxmerge; counter++) map = (TMap*)nextmap();
-         if (!map) {
-            ::Error("MergeOutputs", "Cannot resume merging for <%s>, nentries=%d", outputFile.Data(), res->GetSize());
+   // Preserve old merging functionality for stage 0.
+   if (stage==0) {
+      if (!gSystem->Exec(Form("ls %s 2>/dev/null", outputChunk.Data()))) {
+         while (1) {
+            // Skip as many input files as in a chunk
+            for (Int_t counter=0; counter<nmaxmerge; counter++) map = (TMap*)nextmap();
+            if (!map) {
+               ::Error("MergeOutput", "Cannot resume merging for <%s>, nentries=%d", outputFile.Data(), res->GetSize());
+               delete res;
+               return kFALSE;
+            }
+            outputChunk = outputFile;
+            outputChunk.ReplaceAll(".root", Form("_%04d.root", countChunk));
+            countChunk++;
+            if (gSystem->AccessPathName(outputChunk)) continue;
+            // Merged file with chunks up to <countChunk> found
+            ::Info("MergeOutput", "Resume merging of <%s> from <%s>\n", outputFile.Data(), outputChunk.Data());
+            previousChunk = outputChunk;
+            break;
+         }
+      }   
+      countZero = nmaxmerge;
+   
+      while ((map=(TMap*)nextmap())) {
+      // Loop 'find' results and get next LFN
+         if (countZero == nmaxmerge) {
+            // First file in chunk - create file merger and add previous chunk if any.
+            fm = new TFileMerger(kFALSE);
+            fm->SetFastMethod(kTRUE);
+            if (previousChunk.Length()) fm->AddFile(previousChunk.Data());
+            outputChunk = outputFile;
+            outputChunk.ReplaceAll(".root", Form("_%04d.root", countChunk));
+         }
+         // If last file found, put merged results in the output file
+         if (map == res->Last()) outputChunk = outputFile;
+         TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("turl"));
+         if (!objs || !objs->GetString().Length()) {
+            // Nothing found - skip this output
             delete res;
+            delete fm;
             return kFALSE;
+         } 
+         // Add file to be merged and decrement chunk counter.
+         fm->AddFile(objs->GetString());
+         countZero--;
+         if (countZero==0 || map == res->Last()) {            
+            if (!fm->GetMergeList() || !fm->GetMergeList()->GetSize()) {
+            // Nothing found - skip this output
+               ::Warning("MergeOutput", "No <%s> files found.", inputFile.Data());
+               delete res;
+               delete fm;
+               return kFALSE;
+            }
+            fm->OutputFile(outputChunk);
+            // Merge the outputs, then go to next chunk      
+            if (!fm->Merge()) {
+               ::Error("MergeOutput", "Could not merge all <%s> files", outputFile.Data());
+               delete res;
+               delete fm;
+               return kFALSE;
+            } else {
+               ::Info("MergeOutputs", "\n#####   Merged %d output files to <%s>", fm->GetMergeList()->GetSize(), outputChunk.Data());
+               gSystem->Unlink(previousChunk);
+            }
+            if (map == res->Last()) {
+               delete res;
+               delete fm;
+               break;
+            }      
+            countChunk++;
+            countZero = nmaxmerge;
+            previousChunk = outputChunk;
          }
-         outputChunk = outputFile;
-         outputChunk.ReplaceAll(".root", Form("_%04d.root", countChunk));
-         countChunk++;
-         if (gSystem->AccessPathName(outputChunk)) continue;
-         // Merged file with chunks up to <countChunk> found
-         printf("Resume merging of <%s> from <%s>\n", outputFile.Data(), outputChunk.Data());
-         previousChunk = outputChunk;
-         break;
       }
-   }   
-   countZero = nmaxmerge;
-   
+      return merged;
+   }
+   // Merging stage different than 0.
+   // Move to the begining of the requested chunk.
+   outputChunk = outputFile;
+   if (nmaxmerge < res->GetSize()) {
+      if (ichunk*nmaxmerge >= res->GetSize()) {
+         ::Error("MergeOutput", "Cannot merge merge chunk %d grouping %d files from %d total.", ichunk, nmaxmerge, res->GetSize());
+         delete res;
+         return kFALSE;
+      }   
+      for (Int_t counter=0; counter<ichunk*nmaxmerge; counter++) map = (TMap*)nextmap();
+      outputChunk.ReplaceAll(".root", Form("-Stage%02d_%04d.root", stage, ichunk));
+   }
+   countZero = nmaxmerge;  
+   fm = new TFileMerger(kFALSE);
+   fm->SetFastMethod(kTRUE);
    while ((map=(TMap*)nextmap())) {
-   // Loop 'find' results and get next LFN
-      if (countZero == nmaxmerge) {
-         // First file in chunk - create file merger and add previous chunk if any.
-         fm = new TFileMerger(kFALSE);
-         fm->SetFastMethod(kTRUE);
-         if (previousChunk.Length()) fm->AddFile(previousChunk.Data());
-         outputChunk = outputFile;
-         outputChunk.ReplaceAll(".root", Form("_%04d.root", countChunk));
-      }
-      // If last file found, put merged results in the output file
-      if (map == res->Last()) outputChunk = outputFile;
+      // Loop 'find' results and get next LFN
       TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("turl"));
       if (!objs || !objs->GetString().Length()) {
          // Nothing found - skip this output
@@ -1686,37 +1884,26 @@ Bool_t AliAnalysisAlien::MergeOutput(const char *output, const char *basedir, In
       // Add file to be merged and decrement chunk counter.
       fm->AddFile(objs->GetString());
       countZero--;
-      if (countZero==0 || map == res->Last()) {            
-         fm->OutputFile(outputChunk);
-         if (!fm->GetMergeList() || !fm->GetMergeList()->GetSize()) {
-         // Nothing found - skip this output
-            ::Warning("MergeOutputs", "No <%s> files found.", outputFile.Data());
-            delete res;
-            delete fm;
-            return kFALSE;
-         }
-         // Merge the outputs, then go to next chunk      
-         if (!fm->Merge()) {
-            ::Error("MergeOutputs", "Could not merge all <%s> files", outputFile.Data());
-            delete res;
-            delete fm;
-            merged = kFALSE;
-            return kFALSE;
-         } else {
-            ::Info("MergeOutputs", "\n#####   Merged %d output files to <%s>", fm->GetMergeList()->GetSize(), outputChunk.Data());
-            gSystem->Unlink(previousChunk);
-         }
-         if (map == res->Last()) {
-            delete res;
-            delete fm;
-            break;
-         }      
-         countChunk++;
-         countZero = nmaxmerge;
-         previousChunk = outputChunk;
-      }
+      if (countZero==0) break;
+   }
+   delete res;
+   if (!fm->GetMergeList() || !fm->GetMergeList()->GetSize()) {
+      // Nothing found - skip this output
+      ::Warning("MergeOutput", "No <%s> files found.", inputFile.Data());
+      delete fm;
+      return kFALSE;
+   }
+   fm->OutputFile(outputChunk);
+   // Merge the outputs
+   if (!fm->Merge()) {
+      ::Error("MergeOutput", "Could not merge all <%s> files", outputFile.Data());
+      delete fm;
+      return kFALSE;
+   } else {
+      ::Info("MergeOutput", "\n#####   Merged %d output files to <%s>", fm->GetMergeList()->GetSize(), outputChunk.Data());
    }
-   return merged;
+   delete fm;
+   return kTRUE;
 } 
 
 //______________________________________________________________________________
@@ -2054,35 +2241,24 @@ Bool_t AliAnalysisAlien::SubmitMerging()
    TString mergeJDLName = fExecutable;
    mergeJDLName.ReplaceAll(".sh", "_merge.jdl");
    Int_t ntosubmit = fInputFiles->GetEntries();
-   printf("### Submitting %d merging jobs...\n", ntosubmit);
    for (Int_t i=0; i<ntosubmit; i++) {
-      TString query;
       TString runOutDir = gSystem->BaseName(fInputFiles->At(i)->GetName());
       runOutDir.ReplaceAll(".xml", "");
-      if (fOutputToRunNo)
-         query = Form("submit %s %s", mergeJDLName.Data(), runOutDir.Data());
-      else
-         query = Form("submit %s %03d", mergeJDLName.Data(), i);
-      printf("********* %s\n",query.Data());
-      TGridResult *res = gGrid->Command(query);
-      if (res) {
-         const char *cjobId = res->GetKey(0,"jobId");
-         if (!cjobId) {
-            gGrid->Stdout();
-            gGrid->Stderr();
-            Error("StartAnalysis", "Your JDL %s could not be submitted", mergeJDLName.Data());
-            return kFALSE;
-         } else {
-            Info("StartAnalysis", "\n_______________________________________________________________________ \
-            \n#####   Your JDL %s was successfully submitted. \nTHE JOB ID IS: %s \
-            \n_______________________________________________________________________",
-                   mergeJDLName.Data(), cjobId);
-         }
-         delete res;
-      } else {     
-         Error("SubmitMerging", "No grid result after submission !!! Bailing out...");
-         return kFALSE;
-      }             
+      if (fOutputToRunNo) {
+         // The output directory is the run number
+         printf("### Submitting merging job for run <%s>\n", runOutDir.Data());
+         runOutDir = Form("%s/%s", fGridOutputDir.Data(), runOutDir.Data());
+      } else {
+         // The output directory is the master number in 3 digits format
+         printf("### Submitting merging job for master <%03d>\n", i);
+         runOutDir = Form("%s/%03d",fGridOutputDir.Data(), i);
+      }
+      // Check now the number of merging stages.
+      TString outputFile = fOutputFiles;
+      Int_t index = outputFile.Index(",");
+      if (index>0) outputFile.Remove(index);
+      Bool_t done = CheckMergedFiles(outputFile, runOutDir, fMaxMergeFiles, kTRUE, mergeJDLName);
+      if (!done) return kFALSE;
    }
    if (!ntosubmit) return kTRUE;
    Info("StartAnalysis", "\n#### STARTING AN ALIEN SHELL FOR YOU. EXIT WHEN YOUR MERGING JOBS HAVE FINISHED. #### \
@@ -2569,17 +2745,19 @@ void AliAnalysisAlien::WriteMergingMacro()
       TString func = mergingMacro;
       TString comment;
       func.ReplaceAll(".C", "");
-      out << "void " << func.Data() << "(const char *dir)" << endl;
+      out << "void " << func.Data() << "(const char *dir, Int_t stage=0, Int_t ichunk=0)" << endl;
       out << "{" << endl;
       out << "// Automatically generated merging macro executed in grid subjobs" << endl << endl;
       out << "   TStopwatch timer;" << endl;
       out << "   timer.Start();" << endl << endl;
-      out << "// load base root libraries" << endl;
-      out << "   gSystem->Load(\"libTree\");" << endl;
-      out << "   gSystem->Load(\"libGeom\");" << endl;
-      out << "   gSystem->Load(\"libVMC\");" << endl;
-      out << "   gSystem->Load(\"libPhysics\");" << endl << endl;
-      out << "   gSystem->Load(\"libMinuit\");" << endl << endl;
+      if (!fExecutableCommand.Contains("aliroot")) {
+         out << "// load base root libraries" << endl;
+         out << "   gSystem->Load(\"libTree\");" << endl;
+         out << "   gSystem->Load(\"libGeom\");" << endl;
+         out << "   gSystem->Load(\"libVMC\");" << endl;
+         out << "   gSystem->Load(\"libPhysics\");" << endl << endl;
+         out << "   gSystem->Load(\"libMinuit\");" << endl << endl;
+      }   
       if (fAdditionalRootLibs.Length()) {
          // in principle libtree /lib geom libvmc etc. can go into this list, too
          out << "// Add aditional libraries" << endl;
@@ -2597,9 +2775,11 @@ void AliAnalysisAlien::WriteMergingMacro()
       out << "   gSystem->AddIncludePath(\"-I$ALICE_ROOT/include\");" << endl << endl;
       out << "// Load analysis framework libraries" << endl;
       if (!fPackages) {
-         out << "   gSystem->Load(\"libSTEERBase\");" << endl;
-         out << "   gSystem->Load(\"libESD\");" << endl;
-         out << "   gSystem->Load(\"libAOD\");" << endl;
+         if (!fExecutableCommand.Contains("aliroot")) {
+            out << "   gSystem->Load(\"libSTEERBase\");" << endl;
+            out << "   gSystem->Load(\"libESD\");" << endl;
+            out << "   gSystem->Load(\"libAOD\");" << endl;
+         }
          out << "   gSystem->Load(\"libANALYSIS\");" << endl;
          out << "   gSystem->Load(\"libANALYSISalice\");" << endl;
          out << "   gSystem->Load(\"libCORRFW\");" << endl << endl;
@@ -2694,8 +2874,8 @@ void AliAnalysisAlien::WriteMergingMacro()
       out << "   gSystem->Setenv(\"TMPDIR\", gSystem->pwd());" << endl << endl;   
       out << "// Connect to AliEn" << endl;
       out << "   if (!TGrid::Connect(\"alien://\")) return;" << endl;
-      out << "   TString outputDir = \"" << fGridOutputDir << "/\";" << endl;  
-      out << "   outputDir += dir;" << endl;    
+      out << "   Bool_t laststage = kTRUE;" << endl;
+      out << "   TString outputDir = dir;" << endl;  
       out << "   TString outputFiles = \"" << fOutputFiles << "\";" << endl;
       out << "   TString mergeExcludes = \"" << fMergeExcludes << "\";" << endl;
       out << "   mergeExcludes += \"" << AliAnalysisManager::GetAnalysisManager()->GetExtraFiles() << "\";" << endl;
@@ -2706,6 +2886,7 @@ void AliAnalysisAlien::WriteMergingMacro()
       out << "   Bool_t merged = kTRUE;" << endl;
       out << "   while((str=(TObjString*)iter->Next())) {" << endl;
       out << "      outputFile = str->GetString();" << endl;
+      out << "      if (outputFile.Contains(\"*\")) continue;" << endl;
       out << "      Int_t index = outputFile.Index(\"@\");" << endl;
       out << "      if (index > 0) outputFile.Remove(index);" << endl;
       out << "      // Skip already merged outputs" << endl;
@@ -2714,16 +2895,24 @@ void AliAnalysisAlien::WriteMergingMacro()
       out << "         continue;" << endl;
       out << "      }" << endl;
       out << "      if (mergeExcludes.Contains(outputFile.Data())) continue;" << endl;
-      out << "      merged = AliAnalysisAlien::MergeOutput(outputFile, outputDir, " << fMaxMergeFiles << ");" << endl;
+      out << "      merged = AliAnalysisAlien::MergeOutput(outputFile, outputDir, " << fMaxMergeFiles << ", stage, ichunk);" << endl;
       out << "      if (!merged) {" << endl;
       out << "         printf(\"ERROR: Cannot merge %s\\n\", outputFile.Data());" << endl;
+      out << "         return;" << endl;
       out << "      }" << endl;
+      out << "      // Check if this was the last stage. If yes, run terminate for the tasks." << endl;
+      out << "      if (!gSystem->AccessPathName(outputFile)) laststage = kTRUE;" << endl;
       out << "   }" << endl;
-      out << "// read the analysis manager from file" << endl;
+      out << "   // all outputs merged, validate" << endl;
+      out << "   ofstream out;" << endl;
+      out << "   out.open(\"outputs_valid\", ios::out);" << endl;
+      out << "   out.close();" << endl;
+      out << "   // read the analysis manager from file" << endl;
       TString analysisFile = fExecutable;
       analysisFile.ReplaceAll(".sh", ".root");
+      out << "   if (!laststage) return;" << endl;
       out << "   TFile *file = TFile::Open(\"" << analysisFile << "\");" << endl;
-      out << "   if (!file) return;" << endl; 
+      out << "   if (!file) return;" << endl;
       out << "   TIter nextkey(file->GetListOfKeys());" << endl;
       out << "   AliAnalysisManager *mgr = 0;" << endl;
       out << "   TKey *key;" << endl;
@@ -2932,7 +3121,7 @@ void AliAnalysisAlien::WriteMergeExecutable()
       if (TObject::TestBit(AliAnalysisGrid::kUsePars)) out << "export LD_LIBRARY_PATH=.:$LD_LIBRARY_PATH" << endl;
       TString mergeMacro = fExecutable;
       mergeMacro.ReplaceAll(".sh", "_merge.C");
-      out << "export ARG=\"" << mergeMacro << "(\\\"$1\\\")\"" << endl;
+      out << "export ARG=\"" << mergeMacro << "(\\\"$1\\\",$2,$3)\"" << endl;
       out << fExecutableCommand << " " << "$ARG" << endl; 
       out << "echo \"======== " << mergeMacro.Data() << " finished with exit code: $? ========\"" << endl;
       out << "echo \"############## memory after: ##############\"" << endl;
@@ -3075,6 +3264,7 @@ void AliAnalysisAlien::WriteValidationScript(Bool_t merge)
       AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
       TString extra = mgr->GetExtraFiles();
       while ((os=(TObjString*)next1())) { 
+         if (merge) break;
          outputFile = os->GetString();
          Int_t index = outputFile.Index("@");
          if (index > 0) outputFile.Remove(index);
@@ -3083,18 +3273,16 @@ void AliAnalysisAlien::WriteValidationScript(Bool_t merge)
          if (outputFile.Contains("*")) continue;
          out << "if ! [ -f " << outputFile.Data() << " ] ; then" << endl;
          out << "   error=1" << endl;
-         out << "   echo \"Output file(s) not found. Job FAILED !\""  << outStream << endl;
-         out << "   echo \"Output file(s) not found. Job FAILED !\" >> stderr" << endl;
+         out << "   echo \"Output file " << outputFile << " not found. Job FAILED !\""  << outStream << endl;
+         out << "   echo \"Output file " << outputFile << " not found. Job FAILED !\" >> stderr" << endl;
          out << "fi" << endl;
       }   
       delete arr;
-      if (!merge) {
-        out << "if ! [ -f outputs_valid ] ; then" << endl;
-        out << "   error=1" << endl;
-        out << "   echo \"Output files were not validated by the analysis manager\" >> stdout" << endl;
-        out << "   echo \"Output files were not validated by the analysis manager\" >> stderr" << endl;
-        out << "fi" << endl;
-      }  
+      out << "if ! [ -f outputs_valid ] ; then" << endl;
+      out << "   error=1" << endl;
+      out << "   echo \"Output files were not validated by the analysis manager\" >> stdout" << endl;
+      out << "   echo \"Output files were not validated by the analysis manager\" >> stderr" << endl;
+      out << "fi" << endl;
       
       out << "if [ $error = 0 ] ; then" << endl;
       out << "   echo \"* ----------------   Job Validated  ------------------*\""  << outStream << endl;
index 0e7ab11..8b14f83 100644 (file)
@@ -94,13 +94,15 @@ public:
    static Bool_t       DirectoryExists(const char *lfn);
    static Bool_t       FileExists(const char *lfn);
    static const char  *GetJobStatus(Int_t jobidstart, Int_t lastid, Int_t &nrunning, Int_t &nwaiting, Int_t &nerror, Int_t &ndone);
-   static Bool_t       MergeOutput(const char *output, const char *basedir, Int_t nmaxmerge);
+   static Bool_t       CheckMergedFiles(const char *filename, const char *aliendir, Int_t nperchunk, Bool_t submit=kTRUE, const char *jdl="");
+   static Bool_t       MergeOutput(const char *output, const char *basedir, Int_t nmaxmerge, Int_t stage=0, Int_t ichunk=0);
    virtual Bool_t      MergeOutputs();
    virtual void        Print(Option_t *option="") const;
    virtual Bool_t      StartAnalysis(Long64_t nentries=123456789, Long64_t firstentry=0);
    static Bool_t       SetupPar(const char *package);
    virtual Bool_t      Submit();
    virtual Bool_t      SubmitMerging();
+   static Int_t        SubmitSingleJob(const char *query);
    virtual void        WriteAnalysisFile();
    virtual void        WriteAnalysisMacro();
    virtual void        WriteMergingMacro();