Added possibility to resume merging (automatic).
authoragheata <agheata@f7af4fe6-9843-0410-8265-dc069ae4e863>
Mon, 8 Jun 2009 15:28:43 +0000 (15:28 +0000)
committeragheata <agheata@f7af4fe6-9843-0410-8265-dc069ae4e863>
Mon, 8 Jun 2009 15:28:43 +0000 (15:28 +0000)
Added possibility to specify the number of runs to be processed per master job via SetNrunsPerMaster().

ANALYSIS/AliAnalysisAlien.cxx
ANALYSIS/AliAnalysisAlien.h

index a74484b..ddb380c 100644 (file)
@@ -48,7 +48,7 @@ AliAnalysisAlien::AliAnalysisAlien()
                   fMaxInitFailed(0),
                   fMasterResubmitThreshold(0),
                   fNtestFiles(0),
-                  fNMasterJobs(0),
+                  fNrunsPerMaster(0),
                   fMaxMergeFiles(0),
                   fRunNumbers(),
                   fExecutable(),
@@ -91,7 +91,7 @@ AliAnalysisAlien::AliAnalysisAlien(const char *name)
                   fMaxInitFailed(0),
                   fMasterResubmitThreshold(0),
                   fNtestFiles(0),
-                  fNMasterJobs(0),
+                  fNrunsPerMaster(0),
                   fMaxMergeFiles(0),
                   fRunNumbers(),
                   fExecutable(),
@@ -134,7 +134,7 @@ AliAnalysisAlien::AliAnalysisAlien(const AliAnalysisAlien& other)
                   fMaxInitFailed(other.fMaxInitFailed),
                   fMasterResubmitThreshold(other.fMasterResubmitThreshold),
                   fNtestFiles(other.fNtestFiles),
-                  fNMasterJobs(other.fNMasterJobs),
+                  fNrunsPerMaster(other.fNrunsPerMaster),
                   fMaxMergeFiles(other.fMaxMergeFiles),
                   fRunNumbers(other.fRunNumbers),
                   fExecutable(other.fExecutable),
@@ -392,6 +392,8 @@ Bool_t AliAnalysisAlien::CheckInputData()
    // Check validity of run number(s)
    TObjArray *arr;
    TObjString *os;
+   Int_t nruns = 0;
+   TString schunk;
    TString path;
    if (!checked) {
       checked = kTRUE;
@@ -419,7 +421,18 @@ Bool_t AliAnalysisAlien::CheckInputData()
          if (use_tags) msg += " using_tags: Yes";
          else          msg += " using_tags: No";
          Info("CheckDataType", msg.Data());
-         AddDataFile(path);
+         if (fNrunsPerMaster<2) {
+            AddDataFile(path);
+         } else {
+            nruns++;
+            if (((nruns-1)%fNrunsPerMaster) == 0) {
+               schunk = os->GetString();
+            }   
+            if ((nruns%fNrunsPerMaster)!=0 && os!=arr->Last()) continue;
+            schunk += Form("_%s.xml", os->GetString().Data());
+            path = Form("%s/%s", workdir.Data(), schunk.Data());
+            AddDataFile(path);
+         }   
       }
       delete arr;   
    } else {
@@ -437,7 +450,18 @@ Bool_t AliAnalysisAlien::CheckInputData()
          if (use_tags) msg += " using_tags: Yes";
          else          msg += " using_tags: No";
          Info("CheckDataType", msg.Data());
-         AddDataFile(path);
+         if (fNrunsPerMaster<2) {
+            AddDataFile(path);
+         } else {
+            nruns++;
+            if (((nruns-1)%fNrunsPerMaster) == 0) {
+               schunk = Form("%d", irun);
+            }
+            if ((nruns%fNrunsPerMaster)!=0 && irun != fRunRange[1]) continue;
+            schunk += Form("_%d.xml",  irun);
+            path = Form("%s/%s", workdir.Data(), schunk.Data());
+            AddDataFile(path);
+         }   
       }
    }
    return kTRUE;      
@@ -466,6 +490,9 @@ Bool_t AliAnalysisAlien::CreateDataset(const char *pattern)
    
    TString file;
    TString path;
+   Int_t nruns = 0;
+   TString schunk;
+   TGridCollection *cbase, *cadd;
    if (!fRunNumbers.Length() && !fRunRange[0]) return kTRUE;
    // Several runs
    if (fRunNumbers.Length()) {
@@ -476,29 +503,60 @@ Bool_t AliAnalysisAlien::CreateDataset(const char *pattern)
          path = Form("%s/%s ", fGridDataDir.Data(), os->GetString().Data());
          if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
          else file = Form("%s.xml", os->GetString().Data());
-         if (FileExists(file) && !TestBit(AliAnalysisGrid::kTest)) {
-            Info("CreateDataset", "\n#####   Dataset %s exist. Skipping creation...", file.Data());
-//            gGrid->Rm(file); 
-            continue;
-         }
-         command = "find ";
-         command += options;
-         command += path;
-         command += pattern;
-//         conditions = Form(" > %s", file.Data());
-         command += conditions;
-         TGridResult *res = gGrid->Command(command);
-         if (res) delete res;
-         // Write standard output to file
-         gROOT->ProcessLine(Form("gGrid->Stdout(); > %s", file.Data()));
+         // If local collection file does not exist, create it via 'find' command.
+         if (gSystem->AccessPathName(file)) {
+            command = "find ";
+            command += options;
+            command += path;
+            command += pattern;
+            command += conditions;
+            TGridResult *res = gGrid->Command(command);
+            if (res) delete res;
+            // Write standard output to file
+            gROOT->ProcessLine(Form("gGrid->Stdout(); > %s", file.Data()));
+         }   
          if (TestBit(AliAnalysisGrid::kTest)) break;
-         // Copy xml file to alien space
-         TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
-         if (!FileExists(file)) {
-            Error("CreateDataset", "Command %s did NOT succeed", command.Data());
-            delete arr;
-            return kFALSE;
-         }
+         // Check if there is one run per master job.
+         if (fNrunsPerMaster<2) {
+            if (FileExists(file)) {
+               Info("CreateDataset", "\n#####   Dataset %s exist. Skipping creation...", file.Data());
+               continue;
+            }        
+            // Copy xml file to alien space
+            TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
+            if (!FileExists(file)) {
+               Error("CreateDataset", "Command %s did NOT succeed", command.Data());
+               delete arr;
+               return kFALSE;
+            }
+         } else {
+            nruns++;
+            if (((nruns-1)%fNrunsPerMaster) == 0) {
+               schunk = os->GetString();
+               cbase = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
+            } else {
+               cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
+               printf("   Merging collection <%s> into masterjob input...\n", file.Data());
+               cbase->Add(cadd);
+               delete cadd;
+            }
+            if ((nruns%fNrunsPerMaster)!=0 && os!=arr->Last()) {
+               continue;
+            }   
+            schunk += Form("_%s.xml", os->GetString().Data());
+            if (FileExists(schunk)) {
+               Info("CreateDataset", "\n#####   Dataset %s exist. Skipping creation...", schunk.Data());
+               continue;
+            }        
+            printf("Exporting merged collection <%s> and copying to AliEn.\n", schunk.Data());
+            cbase->ExportXML(Form("file://%s", schunk.Data()),kFALSE,kFALSE, schunk, "Merged runs");
+            TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
+            if (!FileExists(schunk)) {
+               Error("CreateDataset", "Copy command did NOT succeed for %s", schunk.Data());
+               delete arr;
+               return kFALSE;
+            }
+         }   
       }   
       delete arr;
    } else {
@@ -507,28 +565,63 @@ Bool_t AliAnalysisAlien::CreateDataset(const char *pattern)
          path = Form("%s/%d ", fGridDataDir.Data(), irun);
          if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
          else file = Form("%d.xml", irun);
-         if (FileExists(file) && !TestBit(AliAnalysisGrid::kTest)) {
+         if (FileExists(file) && fNrunsPerMaster<2 && !TestBit(AliAnalysisGrid::kTest)) {
             Info("CreateDataset", "\n#####   Dataset %s exist. Skipping creation...", file.Data());
 //            gGrid->Rm(file); 
             continue;
          }
-         command = "find ";
-         command += options;
-         command += path;
-         command += pattern;
-//         conditions = Form(" > %s", file.Data());
-         command += conditions;
-         TGridResult *res = gGrid->Command(command);
-         if (res) delete res;
-         // Write standard output to file
-         gROOT->ProcessLine(Form("gGrid->Stdout(); > %s", file.Data()));
+         // If local collection file does not exist, create it via 'find' command.
+         if (gSystem->AccessPathName(file)) {
+            command = "find ";
+            command += options;
+            command += path;
+            command += pattern;
+            command += conditions;
+            TGridResult *res = gGrid->Command(command);
+            if (res) delete res;
+            // Write standard output to file
+            gROOT->ProcessLine(Form("gGrid->Stdout(); > %s", file.Data()));
+         }   
          if (TestBit(AliAnalysisGrid::kTest)) break;
-         // Copy xml file to alien space
-         TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
-         if (!FileExists(file)) {
-            Error("CreateDataset", "Command %s did NOT succeed", command.Data());
-            return kFALSE;
-         }
+         // Check if there is one run per master job.
+         if (fNrunsPerMaster<2) {
+            if (FileExists(file)) {
+               Info("CreateDataset", "\n#####   Dataset %s exist. Skipping creation...", file.Data());
+               continue;
+            }        
+            // Copy xml file to alien space
+            TFile::Cp(Form("file:%s",file.Data()), Form("alien://%s/%s",workdir.Data(), file.Data()));
+            if (!FileExists(file)) {
+               Error("CreateDataset", "Command %s did NOT succeed", command.Data());
+               return kFALSE;
+            }
+         } else {
+            nruns++;
+            printf("   Merging collection <%s> into %d runs chunk...\n",file.Data(),fNrunsPerMaster);
+            if (((nruns-1)%fNrunsPerMaster) == 0) {
+               schunk = Form("%d", irun);
+               cbase = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
+            } else {
+               cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"%s\", 1000000);",file.Data()));
+               cbase->Add(cadd);
+               delete cadd;
+            }
+            if ((nruns%fNrunsPerMaster)!=0 && irun!=fRunRange[1]) {
+               continue;
+            }   
+            schunk += Form("_%d.xml", irun);
+            if (FileExists(schunk)) {
+               Info("CreateDataset", "\n#####   Dataset %s exist. Skipping creation...", schunk.Data());
+               continue;
+            }        
+            printf("Exporting merged collection <%s> and copying to AliEn.\n", schunk.Data());
+            cbase->ExportXML(Form("file://%s", schunk.Data()),kFALSE,kFALSE, schunk, "Merged runs");
+            TFile::Cp(Form("file:%s",schunk.Data()), Form("alien://%s/%s",workdir.Data(), schunk.Data()));
+            if (!FileExists(schunk)) {
+               Error("CreateDataset", "Copy command did NOT succeed for %s", schunk.Data());
+               return kFALSE;
+            }
+         }   
       }
    }      
    return kTRUE;
@@ -958,8 +1051,8 @@ void AliAnalysisAlien::Print(Option_t *) const
    printf("=   Max number of subjob fails to kill: __________ %d\n", fMaxInitFailed);
    if (fMasterResubmitThreshold>0) 
    printf("=   Resubmit master job if failed subjobs >_______ %d\n", fMasterResubmitThreshold);
-   if (fNMasterJobs>0)
-   printf("=   Number of master jobs to be launched: ________ not used yet\n");
+   if (fNrunsPerMaster>0)
+   printf("=   Number of runs per master job: _______________ %d\n", fNrunsPerMaster);
    printf("=   Number of files in one chunk to be merged: ___ %d\n", fMaxMergeFiles);
    printf("=   Name of the generated execution script: ______ %s\n",fExecutable.Data());
    if (fArguments.Length()) 
@@ -1000,7 +1093,7 @@ void AliAnalysisAlien::SetDefaults()
    fNtestFiles                 = 10;
    fRunRange[0]                = 0;
    fRunRange[1]                = 0;
-   fNMasterJobs                = 0;
+   fNrunsPerMaster             = 1;
    fMaxMergeFiles              = 100;
    fRunNumbers                 = "";
    fExecutable                 = "analysis.sh";
@@ -1060,6 +1153,11 @@ Bool_t AliAnalysisAlien::MergeOutputs()
       output_file = str->GetString();
       Int_t index = output_file.Index("@");
       if (index > 0) output_file.Remove(index);
+      // Skip already merged outputs
+      if (!gSystem->AccessPathName(output_file)) {
+         Info("MergeOutputs", "Output file <%s> found. Not merging again.", output_file.Data());
+         continue;
+      }   
       if (fMergeExcludes.Length() &&
           fMergeExcludes.Contains(output_file.Data())) continue;
       // Perform a 'find' command in the output directory, looking for registered outputs    
@@ -1072,6 +1170,28 @@ Bool_t AliAnalysisAlien::MergeOutputs()
       TMap *map;
       previous_chunk = "";
       count_chunk = 0;
+      // Check if there is a merge operation to resume
+      output_chunk = output_file;
+      output_chunk.ReplaceAll(".root", "_*.root");
+      if (!gSystem->Exec(Form("ls %s", output_chunk.Data()))) {
+         while (1) {
+            for (Int_t counter=0; counter<fMaxMergeFiles; counter++) map = (TMap*)nextmap();
+            if (!map) {
+               Error("MergeOutputs", "Cannot resume merging for <%s>, nentries=%d", output_file.Data(), res->GetSize());
+               delete res;
+               return kFALSE;
+            }
+            output_chunk = output_file;
+            output_chunk.ReplaceAll(".root", Form("_%04d.root", count_chunk));
+            printf("%s\n", output_chunk.Data());
+            count_chunk++;
+            if (gSystem->AccessPathName(output_chunk)) continue;
+            // Merged file with chunks up to <count_chunk> found
+            printf("Resume merging of <%s> from <%s>\n", output_file.Data(), output_chunk.Data());
+            previous_chunk = output_chunk;
+            break;
+         }
+      }
       count_zero = fMaxMergeFiles;
       while ((map=(TMap*)nextmap())) {
       // Loop 'find' results and get next LFN
@@ -1113,6 +1233,7 @@ Bool_t AliAnalysisAlien::MergeOutputs()
                break;
             } else {
                Info("MergeOutputs", "\n#####   Merged %d output files to <%s>", fm->GetMergeList()->GetSize(), output_chunk.Data());
+               gSystem->Unlink(previous_chunk);
             }
             if (map == res->Last()) {
                delete res;
index 164acdd..28ea780 100644 (file)
@@ -40,7 +40,7 @@ public:
    virtual void        SetAdditionalLibs(const char *list)               {fAdditionalLibs = list;}
    virtual void        SetPrice(Int_t price=1)                           {fPrice = price;}
    virtual void        SetRunRange(Int_t min, Int_t max)                 {fRunRange[0] = min; fRunRange[1] = max;}
-   virtual void        SetNMasterJobs(Int_t njobs)                       {fNMasterJobs = njobs;}
+   virtual void        SetNrunsPerMaster(Int_t nruns=1)                  {fNrunsPerMaster = nruns;}
    virtual void        SetMaxMergeFiles(Int_t nfiles)                    {fMaxMergeFiles = nfiles;}
    virtual void        SetSplitMode(const char *type="se")               {fSplitMode = type;}
    virtual void        SetSplitMaxInputFileNumber(Int_t nfiles=100)      {fSplitMaxInputFileNumber = nfiles;}
@@ -99,7 +99,7 @@ private:
    Int_t            fMasterResubmitThreshold; // Failed jobs will be resubmitted until this DONE ratio
    Int_t            fNtestFiles;      // Number of files used in the testing case
    Int_t            fRunRange[2];     // Run range
-   Int_t            fNMasterJobs;     // Number of masterjobs to be launched
+   Int_t            fNrunsPerMaster;  // Number of runs per masterjob
    Int_t            fMaxMergeFiles;   // Maximum number of files to be merged in one chunk
    TString          fRunNumbers;      // List of runs to be processed
    TString          fExecutable;      // Executable script for AliEn job