]> git.uio.no Git - u/mrichter/AliRoot.git/blobdiff - ANALYSIS/AliFileMerger.cxx
Fix
[u/mrichter/AliRoot.git] / ANALYSIS / AliFileMerger.cxx
index 11b76b60b68840f32c2b9f2d925e568f8f048f96..d46e4e819cad8774fde3499fa08475f6bee75c3c 100644 (file)
@@ -28,7 +28,9 @@
 //  2. syswatch.log is created diring mergin procedure. 
 //     Memeory consumption - for reading and for merging can be monitored
 
-
+//  RS: Changed merger to respect the structure of files being merged (directories, collections...)
+//      Additional option: SetNoTrees (default false) to not merge any tree
+//      The code mostly taken from root's hadd.cxx
 /*
   Usage:
   // Libraries for all classes to be merged should be loaded before using the class
  
 
 #include <fstream>
+#include <THashList.h>
+#include <TChain.h>
+#include <TKey.h>
+#include <TH1.h>
+#include <THStack.h>
 #include "TSystem.h"
 #include "TFile.h"
 #include "TGrid.h"
 #include "TObjString.h"
 #include "TObjArray.h"
 #include "TMethodCall.h"
-
+#include "Riostream.h"
 #include "AliSysInfo.h"
 #include "AliFileMerger.h"
+#include "AliLog.h"
 
 ClassImp(AliFileMerger)
 
+ProcInfo_t procInfo;//TMP
+
 ////////////////////////////////////////////////////////////////////////
 
 AliFileMerger::AliFileMerger():
   TNamed(),
   fRejectMask(0),
-  fAcceptMask(0)
+  fAcceptMask(0),
+  fMaxFilesOpen(500),
+  fNoTrees(kFALSE)
 {
   //
   // Default constructor
@@ -79,7 +91,9 @@ AliFileMerger::AliFileMerger():
 AliFileMerger::AliFileMerger(const char* name):
   TNamed(name,name),
   fRejectMask(0),
-  fAcceptMask(0)
+  fAcceptMask(0),
+  fMaxFilesOpen(500),
+  fNoTrees(kFALSE)
 {
   //
   // 
@@ -87,15 +101,11 @@ AliFileMerger::AliFileMerger(const char* name):
 }
 
 
-void AliFileMerger::IterAlien(const char* outputDir, const char* outputFileName, const char* pattern){
+void AliFileMerger::IterAlien(const char* outputDir, const char* outputFileName, const char* pattern, Bool_t dontOverwrite){
 
   //
   // Merge the files coming out of the calibration job
   // 
-  
-  TObjArray * mergeArray= new TObjArray;
-  
-  TString outputFile(outputFileName);
   TString command;
   // looking for files to be merged in the output directory
   command = Form("find %s/ *%s", outputDir, pattern);
@@ -106,6 +116,9 @@ void AliFileMerger::IterAlien(const char* outputDir, const char* outputFileName,
   TIter nextmap(res);
   TMap *map = 0;
   // loop over the results
+  TList sourcelist;  
+  sourcelist.SetOwner(kTRUE);
+  //
   while((map=(TMap*)nextmap())) {
     // getting the turl
     TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("turl"));
@@ -115,56 +128,103 @@ void AliFileMerger::IterAlien(const char* outputDir, const char* outputFileName,
       break;
     } 
     printf("looking for file %s\n",(objs->GetString()).Data());
-    TFile* currentFile=TFile::Open((objs->GetString()).Data());
-    if(!currentFile) continue; // protection
-    Merge(currentFile, mergeArray);
-
-    if(currentFile) delete currentFile;
+    AddFile(&sourcelist, (objs->GetString()).Data());;
   }
-  Bool_t separate = kFALSE;
-  if (separate) {
-    StoreSeparateResults(mergeArray,outputFileName);
-  }
-  else {
-    StoreResults(mergeArray,outputFileName);
-  }
-  delete mergeArray;
+  //
+  IterList(&sourcelist, outputFileName, dontOverwrite);
   delete res;
-
 }
 
+void AliFileMerger::IterList(const TList* namesList, const char* outputFileName, Bool_t dontOverwrite)
+{
+  // merge in steps or in one go
+  //
+  gSystem->GetProcInfo(&procInfo);
+  AliInfo(Form(">> memory usage %ld %ld", procInfo.fMemResident, procInfo.fMemVirtual));
+  //
+  TString outputFile(outputFileName);
+  gSystem->ExpandPathName(outputFile);
+  //
+  int nFiles = namesList->GetEntries();
+  int maxSrcOpen = fMaxFilesOpen - 1;
+  TList filesList;
+  filesList.SetOwner(kTRUE);
+  //
+  TString tmpDest[2] = {outputFile,outputFile}; // names for tmp files
+  int npl = outputFile.Last('.');
+  if (npl<0) npl  = outputFile.Length();
+  for (int i=0;i<2;i++) tmpDest[i].Insert(npl,Form("_TMPMERGE%d_",i));
+  //
+  int nsteps = 0, currTmp = 0, start = 0;
+  for (int ifl=0;ifl<nFiles;ifl++) {
+    int st = ifl%maxSrcOpen;
+    if (st==0 && ifl) { // new chunk should be started, merge what was already accumulated
+      OpenNextChunks(namesList,&filesList,start,ifl-1);
+      start = ifl; // remember where to start next step
+      if (nsteps++) { // if not 1st one, merge the privous chunk with this one
+       filesList.AddFirst(TFile::Open(tmpDest[currTmp].Data()));
+       currTmp = (currTmp==0) ? 1:0;         // swap tmp files
+      }
+      // open temp target
+      TFile* targetTmp = TFile::Open( tmpDest[currTmp].Data(), "RECREATE");
+      if (!targetTmp || targetTmp->IsZombie()) {
+       printf("Error opening temporary file %s\n",tmpDest[currTmp].Data());
+       return;
+      }
+      MergeRootfile(targetTmp, &filesList);
+      targetTmp->Close();
+      delete targetTmp;
+      filesList.Clear(); // close all open files
+    }
+    // nothing to do until needed amount of files is accumulated
+  }
+  // merge last step
+  TFile* target = TFile::Open( outputFile.Data(), (dontOverwrite ? "CREATE":"RECREATE") );
+  if (!target || target->IsZombie()) {
+    cerr << "Error opening target file (does " << outputFileName << " exist?)." << endl;
+    cerr << "Use force = kTRUE to re-creation of output file." << endl;
+    return;
+  }
+  OpenNextChunks(namesList,&filesList,start,nFiles-1);
+  // add result of previous merges
+  if (nsteps) filesList.AddFirst(TFile::Open(tmpDest[currTmp].Data()));
+  MergeRootfile( target, &filesList);
+  target->Close();
+  delete target;
+  filesList.Clear();
+  // 
+  for (int i=0;i<2;i++) gSystem->Exec(Form("if [ -e %s ]; then \nrm %s\nfi",tmpDest[i].Data(),tmpDest[i].Data()));
+  //
+  printf("Merged %d files in %d steps\n",nFiles,++nsteps);
+  //  
+  gSystem->GetProcInfo(&procInfo);
+  AliInfo(Form("<< memory usage %ld %ld", procInfo.fMemResident, procInfo.fMemVirtual));
+}
 
-
-void AliFileMerger::IterTXT( const char * fileList,  const char* outputFileName, Bool_t separate){
+void AliFileMerger::IterTXT( const char * fileList,  const char* outputFileName, Bool_t dontOverwrite){
   
   // Merge the files indicated in the list - fileList
   // ASCII file option example: 
   // find `pwd`/ | grep AliESDfriends_v1.root > calib.list
   
-  TObjArray * mergeArray= new TObjArray;
-  
   // Open the input stream
-  
   ifstream in;
   in.open(fileList);
   // Read the input list of files 
   TString objfile;
   Int_t counter=0;
+  TList sourcelist;  
+  sourcelist.SetOwner(kTRUE);
   while(in.good()) {
     in >> objfile;
-    if (!objfile.Contains("root")) continue; // protection    
-    printf("Open file:Counter\t%d\tMerging file %s\n",counter++,objfile.Data());
-    TFile currentFile(objfile.Data());
-    Merge(&currentFile, mergeArray);
-  }
-  if (separate) { 
-    StoreSeparateResults(mergeArray, outputFileName);
+    if (!objfile.Contains(".root")) continue; // protection
+    gSystem->ExpandPathName(objfile);
+    printf("Add file:Counter\t%d\tMerging file %s\n",counter++,objfile.Data());
+    AddFile(&sourcelist, objfile.Data());
   }
-  else {
-    StoreResults(mergeArray, outputFileName);
-  }
-
-  delete mergeArray;
+  //
+  IterList(&sourcelist, outputFileName, dontOverwrite);
+  //
 }
 
 void AliFileMerger::StoreResults(TObjArray * array, const char* outputFileName){
@@ -196,12 +256,11 @@ void AliFileMerger::StoreSeparateResults(TObjArray * array, const char* outputFi
   }
 }
 
-
-
 void AliFileMerger::Merge(TFile* fileIn, TObjArray * array){
   //
   // Merging procedure
   //
+  if (!array) return;
   static Int_t counter=-1;
   counter++;
   TObjArray *carray = new TObjArray;   //array of the objects inside current file
@@ -228,8 +287,8 @@ void AliFileMerger::Merge(TFile* fileIn, TObjArray * array){
     return;
   }
   TMethodCall callEnv;
-  
-  for (Int_t i=0; i<carray->GetEntries(); i++){
+  Int_t entries =carray->GetEntriesFast();
+  for (Int_t i=0; i<entries; i++){
     
     TObjArray *templist = new TObjArray(1);
     templist->SetOwner(kFALSE);
@@ -258,6 +317,7 @@ void AliFileMerger::Merge(TFile* fileIn, TObjArray * array){
     AliSysInfo::AddStamp(currentObject->GetName(),2,i,counter);  
     delete templist;
   }
+  carray->Delete();
   delete carray;
 }
 
@@ -308,3 +368,285 @@ void AliFileMerger::AddAccept(const char *accept){
 
 }
 
+//___________________________________________________________________________
+int AliFileMerger::MergeRootfile( TDirectory *target, TList *sourcelist)
+{
+  // Merge all objects in a directory
+  // modified version of root's hadd.cxx
+  gSystem->GetProcInfo(&procInfo);
+  AliInfo(Form(">> memory usage %ld %ld", procInfo.fMemResident, procInfo.fMemVirtual));
+
+
+  Int_t counterF = -1;
+  int status = 0;
+  cout << "Target path: " << target->GetPath() << endl;
+  TString path( (char*)strstr( target->GetPath(), ":" ) );
+  path.Remove( 0, 2 );
+  //
+  // find 1st valid file
+  TDirectory *first_source = (TDirectory*)sourcelist->First();
+  //
+  Int_t nguess = sourcelist->GetSize()+1000;
+  THashList allNames(nguess);
+  ((THashList*)target->GetList())->Rehash(nguess);
+  ((THashList*)target->GetListOfKeys())->Rehash(nguess);
+  TList listH;
+  TString listHargs;
+  listHargs.Form("((TCollection*)0x%lx)", (ULong_t)&listH);
+  //
+  while(first_source) {
+    counterF++;
+    TDirectory *current_sourcedir = first_source->GetDirectory(path);
+    if (!current_sourcedir) {
+      first_source = (TDirectory*)sourcelist->After(first_source);
+      continue;
+    }
+    // loop over all keys in this directory
+    TChain *globChain = 0;
+    TIter nextkey( current_sourcedir->GetListOfKeys() );
+    TKey *key, *oldkey=0;
+    //gain time, do not add the objects in the list in memory
+    TH1::AddDirectory(kFALSE);
+    //
+    int counterK = 0;
+    //
+    while ( (key = (TKey*)nextkey())) {
+      if (current_sourcedir == target) break;
+      //
+      // check if we don't reject this name
+      TString nameK(key->GetName());
+      if (!IsAccepted(nameK)) {
+       if (!counterF) printf("Object %s is in rejection list, skipping...\n",nameK.Data());
+       continue;
+      }
+      //
+      //keep only the highest cycle number for each key
+      if (oldkey && !strcmp(oldkey->GetName(),key->GetName())) continue;
+      if (!strcmp(key->GetClassName(),"TProcessID")) {key->ReadObj(); continue;}
+      if (allNames.FindObject(key->GetName())) continue;
+      TClass *cl = TClass::GetClass(key->GetClassName());
+      if (!cl || !cl->InheritsFrom(TObject::Class())) {
+       cout << "Cannot merge object type, name: "
+            << key->GetName() << " title: " << key->GetTitle() << endl;
+       continue;
+      }
+      allNames.Add(new TObjString(key->GetName()));
+      AliSysInfo::AddStamp(nameK.Data(),1,counterK++,counterF-1); 
+      // read object from first source file
+      //current_sourcedir->cd();
+
+      TObject *obj = key->ReadObj();
+      
+      if ( obj->IsA()->InheritsFrom( TTree::Class() ) ) {
+       
+       // loop over all source files create a chain of Trees "globChain"
+       if (!fNoTrees) { // 
+         TString obj_name;
+         if (path.Length()) {
+           obj_name = path + "/" + obj->GetName();
+         } else {
+           obj_name = obj->GetName();
+         }
+         globChain = new TChain(obj_name);
+         globChain->Add(first_source->GetName());
+         TFile *nextsource = (TFile*)sourcelist->After( first_source );
+         while ( nextsource ) {
+           //do not add to the list a file that does not contain this Tree
+           TFile *curf = TFile::Open(nextsource->GetName());
+           if (curf) {
+             Bool_t mustAdd = kFALSE;
+             if (curf->FindKey(obj_name)) {
+               mustAdd = kTRUE;
+             } else {
+               //we could be more clever here. No need to import the object
+               //we are missing a function in TDirectory
+               TObject *aobj = curf->Get(obj_name);
+               if (aobj) { mustAdd = kTRUE; delete aobj;}
+             }
+             if (mustAdd) {
+               globChain->Add(nextsource->GetName());
+             }
+           }
+           delete curf;
+           nextsource = (TFile*)sourcelist->After( nextsource );
+         }
+       }
+      } else if ( obj->IsA()->InheritsFrom( TDirectory::Class() ) ) {
+       // it's a subdirectory
+       
+       cout << "Found subdirectory " << obj->GetName() << endl;
+       // create a new subdir of same name and title in the target file
+       target->cd();
+       TDirectory *newdir = target->mkdir( obj->GetName(), obj->GetTitle() );
+       
+       // newdir is now the starting point of another round of merging
+       // newdir still knows its depth within the target file via
+       // GetPath(), so we can still figure out where we are in the recursion
+       status = MergeRootfile( newdir, sourcelist);
+       if (status) return status;
+       
+      } else if ( obj->InheritsFrom(TObject::Class())
+                 && obj->IsA()->GetMethodWithPrototype("Merge", "TCollection*") ) {
+       // object implements Merge(TCollection*)
+       
+       // loop over all source files and merge same-name object
+       TFile *nextsource = (TFile*)sourcelist->After( first_source );
+       while ( nextsource ) {
+         // make sure we are at the correct directory level by cd'ing to path
+         TDirectory *ndir = nextsource->GetDirectory(path);
+         if (ndir) {
+           ndir->cd();
+           TKey *key2 = (TKey*)gDirectory->GetListOfKeys()->FindObject(key->GetName());
+           if (key2) {
+             TObject *hobj = key2->ReadObj();
+             hobj->ResetBit(kMustCleanup);
+             listH.Add(hobj);
+             Int_t error = 0;
+             obj->Execute("Merge", listHargs.Data(), &error); // RS Probleme here
+             if (error) {
+               cerr << "Error calling Merge() on " << obj->GetName()
+                    << " with the corresponding object in " << nextsource->GetName() << endl;
+             }
+             listH.Delete();
+           }
+         }
+         nextsource = (TFile*)sourcelist->After( nextsource );
+       }
+      } else if ( obj->IsA()->InheritsFrom( THStack::Class() ) ) {
+       THStack *hstack1 = (THStack*) obj;
+       TList* l = new TList();
+       
+       // loop over all source files and merge the histos of the
+       // corresponding THStacks with the one pointed to by "hstack1"
+       TFile *nextsource = (TFile*)sourcelist->After( first_source );
+       while ( nextsource ) {
+         // make sure we are at the correct directory level by cd'ing to path
+         TDirectory *ndir = nextsource->GetDirectory(path);
+         if (ndir) {
+           ndir->cd();
+           TKey *key2 = (TKey*)gDirectory->GetListOfKeys()->FindObject(hstack1->GetName());
+           if (key2) {
+             THStack *hstack2 = (THStack*) key2->ReadObj();
+             l->Add(hstack2->GetHists()->Clone());
+             delete hstack2;
+           }
+         }
+         
+         nextsource = (TFile*)sourcelist->After( nextsource );
+       }
+       hstack1->GetHists()->Merge(l);
+       l->Delete();
+      } else {
+       // object is of no type that we can merge
+       cout << "Cannot merge object type, name: "
+            << obj->GetName() << " title: " << obj->GetTitle() << endl;
+       
+       // loop over all source files and write similar objects directly to the output file
+       TFile *nextsource = (TFile*)sourcelist->After( first_source );
+       while ( nextsource ) {
+         // make sure we are at the correct directory level by cd'ing to path
+         TDirectory *ndir = nextsource->GetDirectory(path);
+         if (ndir) {
+           ndir->cd();
+           TKey *key2 = (TKey*)gDirectory->GetListOfKeys()->FindObject(key->GetName());
+           if (key2) {
+             TObject *nobj = key2->ReadObj();
+             nobj->ResetBit(kMustCleanup);
+             int nbytes1 = target->WriteTObject(nobj, key2->GetName(), "SingleKey" );
+             if (nbytes1 <= 0) status = -1;
+             delete nobj;
+           }
+         }
+         nextsource = (TFile*)sourcelist->After( nextsource );
+       }
+      }
+      
+      // now write the merged histogram (which is "in" obj) to the target file
+      // note that this will just store obj in the current directory level,
+      // which is not persistent until the complete directory itself is stored
+      // by "target->Write()" below
+      target->cd();
+      
+      //!!if the object is a tree, it is stored in globChain...
+      if(obj->IsA()->InheritsFrom( TDirectory::Class() )) {
+       //printf("cas d'une directory\n");
+      } else if(obj->IsA()->InheritsFrom( TTree::Class() )) {
+       if (!fNoTrees) {
+         globChain->ls();
+         globChain->Merge(target->GetFile(),0,"keep fast");
+         delete globChain;
+       }
+      } else {
+       int nbytes2 = obj->Write( key->GetName(), TObject::kSingleKey );
+       if (nbytes2 <= 0) status = -1;
+      }
+      oldkey = key;
+      delete obj;
+    } // while ( ( TKey *key = (TKey*)nextkey() ) )
+    first_source = (TDirectory*)sourcelist->After(first_source);
+  }
+  // save modifications to target file
+  target->SaveSelf(kTRUE);
+  //
+  gSystem->GetProcInfo(&procInfo);
+  AliInfo(Form("<< memory usage %ld %ld", procInfo.fMemResident, procInfo.fMemVirtual));
+
+  return status;
+}
+
+//___________________________________________________________________________
+int AliFileMerger::OpenNextChunks(const TList* namesList, TList* filesList, Int_t from, Int_t to)
+{
+  gSystem->GetProcInfo(&procInfo);
+  AliInfo(Form(">> memory usage %ld %ld", procInfo.fMemResident, procInfo.fMemVirtual));
+
+  filesList->Clear();
+  int nEnt = namesList->GetEntries();
+  from = from<nEnt ? from : nEnt;
+  to   = to<nEnt ? to : nEnt;
+  int count = 0;
+  for (int i=from;i<=to;i++) {
+    TNamed* fnam = (TNamed*)namesList->At(i);
+    if (!fnam) continue;
+    TString fnamS(fnam->GetName());
+    gSystem->ExpandPathName(fnamS);
+    if (fnamS.BeginsWith("alien://") && !gGrid) TGrid::Connect("alien");
+    TFile* source = TFile::Open(fnam->GetName());
+    if( source==0 ) { printf("Failed to open file %s, will skip\n",fnam->GetName()); continue; }
+    filesList->Add(source);
+    printf("Opened file %s\n",fnam->GetName());
+    count++;
+  }
+  gSystem->GetProcInfo(&procInfo);
+  AliInfo(Form("<< memory usage %ld %ld", procInfo.fMemResident, procInfo.fMemVirtual));
+
+  return count;
+}
+
+
+//___________________________________________________________________________
+int AliFileMerger::AddFile(TList* namesList, std::string entry)
+{
+  // add a new file to the list of files
+  //  static int count(0);
+  if( entry.empty() ) return 0;
+  size_t j =entry.find_first_not_of(' ');
+  if( j==std::string::npos ) return 0;
+  entry = entry.substr(j);
+  if( entry.substr(0,1)=="@") {
+    std::ifstream indirect_file(entry.substr(1).c_str() );
+    if( ! indirect_file.is_open() ) {
+      std::cerr<< "Could not open indirect file " << entry.substr(1) << std::endl;
+      return 1;
+    }
+    while( indirect_file ){
+      std::string line;
+      std::getline(indirect_file, line);
+      if( AddFile(namesList, line)!=0 ) return 1;;
+    }
+    return 0;
+  }
+  //  cout << "Source file " << (++count) << ": " << entry << endl;
+  namesList->Add(new TNamed(entry,""));
+  return 0;
+}