// 2. syswatch.log is created diring mergin procedure.
// Memeory consumption - for reading and for merging can be monitored
-
+// RS: Changed merger to respect the structure of files being merged (directories, collections...)
+// Additional option: SetNoTrees (default false) to not merge any tree
+// The code mostly taken from root's hadd.cxx
/*
Usage:
// Libraries for all classes to be merged should be loaded before using the class
#include <fstream>
+#include <THashList.h>
+#include <TChain.h>
+#include <TKey.h>
+#include <TH1.h>
+#include <THStack.h>
#include "TSystem.h"
#include "TFile.h"
#include "TGrid.h"
#include "TObjString.h"
#include "TObjArray.h"
#include "TMethodCall.h"
-
+#include "Riostream.h"
#include "AliSysInfo.h"
#include "AliFileMerger.h"
+#include "AliLog.h"
ClassImp(AliFileMerger)
+ProcInfo_t procInfo;//TMP
+
////////////////////////////////////////////////////////////////////////
AliFileMerger::AliFileMerger():
TNamed(),
fRejectMask(0),
- fAcceptMask(0)
+ fAcceptMask(0),
+ fMaxFilesOpen(800),
+ fNoTrees(kFALSE)
{
//
// Default constructor
AliFileMerger::AliFileMerger(const char* name):
TNamed(name,name),
fRejectMask(0),
- fAcceptMask(0)
+ fAcceptMask(0),
+ fMaxFilesOpen(800),
+ fNoTrees(kFALSE)
{
//
//
}
-void AliFileMerger::IterAlien(const char* outputDir, const char* outputFileName, const char* pattern){
+void AliFileMerger::IterAlien(const char* outputDir, const char* outputFileName, const char* pattern, Bool_t dontOverwrite){
//
// Merge the files coming out of the calibration job
//
-
- TObjArray * mergeArray= new TObjArray;
-
- TString outputFile(outputFileName);
TString command;
// looking for files to be merged in the output directory
command = Form("find %s/ *%s", outputDir, pattern);
TIter nextmap(res);
TMap *map = 0;
// loop over the results
+ TList sourcelist;
+ sourcelist.SetOwner(kTRUE);
+ //
while((map=(TMap*)nextmap())) {
// getting the turl
TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("turl"));
break;
}
printf("looking for file %s\n",(objs->GetString()).Data());
- TFile* currentFile=TFile::Open((objs->GetString()).Data());
- if(!currentFile) continue; // protection
- Merge(currentFile, mergeArray);
-
- if(currentFile) delete currentFile;
+ AddFile(&sourcelist, (objs->GetString()).Data());;
}
-
- // StoreSeparateResults(mergeArray,outputFileName);
- StoreResults(mergeArray,outputFileName);
-
- delete mergeArray;
+ //
+ IterList(&sourcelist, outputFileName, dontOverwrite);
delete res;
-
}
+void AliFileMerger::IterList(const TList* namesList, const char* outputFileName, Bool_t dontOverwrite)
+{
+ // merge in steps or in one go
+ //
+ gSystem->GetProcInfo(&procInfo);
+ AliInfo(Form(">> memory usage %ld %ld", procInfo.fMemResident, procInfo.fMemVirtual));
+ //
+ TString outputFile(outputFileName);
+ gSystem->ExpandPathName(outputFile);
+ //
+ int nFiles = namesList->GetEntries();
+ int maxSrcOpen = fMaxFilesOpen - 1;
+ TList filesList;
+ filesList.SetOwner(kTRUE);
+ //
+ TString tmpDest[2] = {outputFile,outputFile}; // names for tmp files
+ int npl = outputFile.Last('.');
+ if (npl<0) npl = outputFile.Length();
+ for (int i=0;i<2;i++) tmpDest[i].Insert(npl,Form("_TMPMERGE%d_",i));
+ //
+ int nsteps = 0, currTmp = 0, start = 0;
+ for (int ifl=0;ifl<nFiles;ifl++) {
+ int st = ifl%maxSrcOpen;
+ if (st==0 && ifl) { // new chunk should be started, merge what was already accumulated
+ OpenNextChunks(namesList,&filesList,start,ifl-1);
+ start = ifl; // remember where to start next step
+ if (nsteps++) { // if not 1st one, merge the privous chunk with this one
+ filesList.AddFirst(TFile::Open(tmpDest[currTmp].Data()));
+ currTmp = (currTmp==0) ? 1:0; // swap tmp files
+ }
+ // open temp target
+ TFile* targetTmp = TFile::Open( tmpDest[currTmp].Data(), "RECREATE");
+ if (!targetTmp || targetTmp->IsZombie()) {
+ printf("Error opening temporary file %s\n",tmpDest[currTmp].Data());
+ return;
+ }
+ MergeRootfile(targetTmp, &filesList);
+ targetTmp->Close();
+ delete targetTmp;
+ filesList.Clear(); // close all open files
+ }
+ // nothing to do until needed amount of files is accumulated
+ }
+ // merge last step
+ TFile* target = TFile::Open( outputFile.Data(), (dontOverwrite ? "CREATE":"RECREATE") );
+ if (!target || target->IsZombie()) {
+ cerr << "Error opening target file (does " << outputFileName << " exist?)." << endl;
+ cerr << "Use force = kTRUE to re-creation of output file." << endl;
+ return;
+ }
+ OpenNextChunks(namesList,&filesList,start,nFiles-1);
+ // add result of previous merges
+ if (nsteps) filesList.AddFirst(TFile::Open(tmpDest[currTmp].Data()));
+ MergeRootfile( target, &filesList);
+ target->Close();
+ delete target;
+ filesList.Clear();
+ //
+ for (int i=0;i<2;i++) gSystem->Exec(Form("if [ -e %s ]; then \nrm %s\nfi",tmpDest[i].Data(),tmpDest[i].Data()));
+ //
+ printf("Merged %d files in %d steps\n",nFiles,++nsteps);
+ //
+ gSystem->GetProcInfo(&procInfo);
+ AliInfo(Form("<< memory usage %ld %ld", procInfo.fMemResident, procInfo.fMemVirtual));
+}
-
-void AliFileMerger::IterTXT( const char * fileList, const char* outputFileName, Bool_t separate){
+void AliFileMerger::IterTXT( const char * fileList, const char* outputFileName, Bool_t dontOverwrite){
// Merge the files indicated in the list - fileList
// ASCII file option example:
// find `pwd`/ | grep AliESDfriends_v1.root > calib.list
- TObjArray * mergeArray= new TObjArray;
-
// Open the input stream
-
ifstream in;
in.open(fileList);
// Read the input list of files
TString objfile;
Int_t counter=0;
+ TList sourcelist;
+ sourcelist.SetOwner(kTRUE);
while(in.good()) {
in >> objfile;
- if (!objfile.Contains("root")) continue; // protection
- printf("Open file:Counter\t%d\tMerging file %s\n",counter++,objfile.Data());
- TFile currentFile(objfile.Data());
- Merge(¤tFile, mergeArray);
- }
- if (separate) {
- StoreSeparateResults(mergeArray, outputFileName);
- }
- else {
- StoreResults(mergeArray, outputFileName);
+ if (!objfile.Contains(".root")) continue; // protection
+ gSystem->ExpandPathName(objfile);
+ printf("Add file:Counter\t%d\tMerging file %s\n",counter++,objfile.Data());
+ AddFile(&sourcelist, objfile.Data());
}
-
- delete mergeArray;
+ //
+ IterList(&sourcelist, outputFileName, dontOverwrite);
+ //
}
void AliFileMerger::StoreResults(TObjArray * array, const char* outputFileName){
}
}
-
-
void AliFileMerger::Merge(TFile* fileIn, TObjArray * array){
//
// Merging procedure
}
+//___________________________________________________________________________
+int AliFileMerger::MergeRootfile( TDirectory *target, TList *sourcelist)
+{
+ // Merge all objects in a directory
+ // modified version of root's hadd.cxx
+ gSystem->GetProcInfo(&procInfo);
+ AliInfo(Form(">> memory usage %ld %ld", procInfo.fMemResident, procInfo.fMemVirtual));
+
+
+ Int_t counterF = -1;
+ int status = 0;
+ cout << "Target path: " << target->GetPath() << endl;
+ TString path( (char*)strstr( target->GetPath(), ":" ) );
+ path.Remove( 0, 2 );
+ //
+ // find 1st valid file
+ TDirectory *first_source = (TDirectory*)sourcelist->First();
+ //
+ Int_t nguess = sourcelist->GetSize()+1000;
+ THashList allNames(nguess);
+ ((THashList*)target->GetList())->Rehash(nguess);
+ ((THashList*)target->GetListOfKeys())->Rehash(nguess);
+ TList listH;
+ TString listHargs;
+ listHargs.Form("((TCollection*)0x%lx)", (ULong_t)&listH);
+ //
+ while(first_source) {
+ counterF++;
+ TDirectory *current_sourcedir = first_source->GetDirectory(path);
+ if (!current_sourcedir) {
+ first_source = (TDirectory*)sourcelist->After(first_source);
+ continue;
+ }
+ // loop over all keys in this directory
+ TChain *globChain = 0;
+ TIter nextkey( current_sourcedir->GetListOfKeys() );
+ TKey *key, *oldkey=0;
+ //gain time, do not add the objects in the list in memory
+ TH1::AddDirectory(kFALSE);
+ //
+ int counterK = 0;
+ //
+ while ( (key = (TKey*)nextkey())) {
+ if (current_sourcedir == target) break;
+ //
+ // check if we don't reject this name
+ TString nameK(key->GetName());
+ if (!IsAccepted(nameK)) {
+ if (!counterF) printf("Object %s is in rejection list, skipping...\n",nameK.Data());
+ continue;
+ }
+ //
+ //keep only the highest cycle number for each key
+ if (oldkey && !strcmp(oldkey->GetName(),key->GetName())) continue;
+ if (!strcmp(key->GetClassName(),"TProcessID")) {key->ReadObj(); continue;}
+ if (allNames.FindObject(key->GetName())) continue;
+ TClass *cl = TClass::GetClass(key->GetClassName());
+ if (!cl || !cl->InheritsFrom(TObject::Class())) {
+ cout << "Cannot merge object type, name: "
+ << key->GetName() << " title: " << key->GetTitle() << endl;
+ continue;
+ }
+ allNames.Add(new TObjString(key->GetName()));
+ AliSysInfo::AddStamp(nameK.Data(),1,counterK++,counterF-1);
+ // read object from first source file
+ //current_sourcedir->cd();
+
+ TObject *obj = key->ReadObj();
+ if (!obj) {
+ AliError(Form("Failed to get the object with key %s from %s",key->GetName(),current_sourcedir->GetFile()->GetName()));
+ continue;
+ }
+
+ if ( obj->IsA()->InheritsFrom( TTree::Class() ) ) {
+
+ // loop over all source files create a chain of Trees "globChain"
+ if (!fNoTrees) { //
+ TString obj_name;
+ if (path.Length()) {
+ obj_name = path + "/" + obj->GetName();
+ } else {
+ obj_name = obj->GetName();
+ }
+ globChain = new TChain(obj_name);
+ globChain->Add(first_source->GetName());
+ TFile *nextsource = (TFile*)sourcelist->After( first_source );
+ while ( nextsource ) {
+ //do not add to the list a file that does not contain this Tree
+ TFile *curf = TFile::Open(nextsource->GetName());
+ if (curf) {
+ Bool_t mustAdd = kFALSE;
+ if (curf->FindKey(obj_name)) {
+ mustAdd = kTRUE;
+ } else {
+ //we could be more clever here. No need to import the object
+ //we are missing a function in TDirectory
+ TObject *aobj = curf->Get(obj_name);
+ if (aobj) { mustAdd = kTRUE; delete aobj;}
+ }
+ if (mustAdd) {
+ globChain->Add(nextsource->GetName());
+ }
+ }
+ delete curf;
+ nextsource = (TFile*)sourcelist->After( nextsource );
+ }
+ }
+ } else if ( obj->IsA()->InheritsFrom( TDirectory::Class() ) ) {
+ // it's a subdirectory
+
+ cout << "Found subdirectory " << obj->GetName() << endl;
+ // create a new subdir of same name and title in the target file
+ target->cd();
+ TDirectory *newdir = target->mkdir( obj->GetName(), obj->GetTitle() );
+
+ // newdir is now the starting point of another round of merging
+ // newdir still knows its depth within the target file via
+ // GetPath(), so we can still figure out where we are in the recursion
+ status = MergeRootfile( newdir, sourcelist);
+ if (status) return status;
+
+ } else if ( obj->InheritsFrom(TObject::Class())
+ && obj->IsA()->GetMethodWithPrototype("Merge", "TCollection*") ) {
+ // object implements Merge(TCollection*)
+
+ // loop over all source files and merge same-name object
+ TFile *nextsource = (TFile*)sourcelist->After( first_source );
+ while ( nextsource ) {
+ // make sure we are at the correct directory level by cd'ing to path
+ TDirectory *ndir = nextsource->GetDirectory(path);
+ if (ndir) {
+ ndir->cd();
+ TKey *key2 = (TKey*)gDirectory->GetListOfKeys()->FindObject(key->GetName());
+ if (key2) {
+ TObject *hobj = key2->ReadObj();
+ if (!hobj) {
+ cout << "Failed to get the object with key " << key2->GetName() << " from " <<
+ ndir->GetFile()->GetName() << "/" << ndir->GetName() << endl;
+ nextsource = (TFile*)sourcelist->After( nextsource );
+ continue;
+ }
+ //
+ hobj->ResetBit(kMustCleanup);
+ listH.Add(hobj);
+ Int_t error = 0;
+ obj->Execute("Merge", listHargs.Data(), &error); // RS Probleme here
+ if (error) {
+ cerr << "Error calling Merge() on " << obj->GetName()
+ << " with the corresponding object in " << nextsource->GetName() << endl;
+ }
+ listH.Delete();
+ }
+ }
+ nextsource = (TFile*)sourcelist->After( nextsource );
+ }
+ } else if ( obj->IsA()->InheritsFrom( THStack::Class() ) ) {
+ THStack *hstack1 = (THStack*) obj;
+ TList* l = new TList();
+
+ // loop over all source files and merge the histos of the
+ // corresponding THStacks with the one pointed to by "hstack1"
+ TFile *nextsource = (TFile*)sourcelist->After( first_source );
+ while ( nextsource ) {
+ // make sure we are at the correct directory level by cd'ing to path
+ TDirectory *ndir = nextsource->GetDirectory(path);
+ if (ndir) {
+ ndir->cd();
+ TKey *key2 = (TKey*)gDirectory->GetListOfKeys()->FindObject(hstack1->GetName());
+ if (key2) {
+ THStack *hstack2 = (THStack*) key2->ReadObj();
+ l->Add(hstack2->GetHists()->Clone());
+ delete hstack2;
+ }
+ }
+
+ nextsource = (TFile*)sourcelist->After( nextsource );
+ }
+ hstack1->GetHists()->Merge(l);
+ l->Delete();
+ } else {
+ // object is of no type that we can merge
+ cout << "Cannot merge object type, name: "
+ << obj->GetName() << " title: " << obj->GetTitle() << endl;
+
+ // loop over all source files and write similar objects directly to the output file
+ TFile *nextsource = (TFile*)sourcelist->After( first_source );
+ while ( nextsource ) {
+ // make sure we are at the correct directory level by cd'ing to path
+ TDirectory *ndir = nextsource->GetDirectory(path);
+ if (ndir) {
+ ndir->cd();
+ TKey *key2 = (TKey*)gDirectory->GetListOfKeys()->FindObject(key->GetName());
+ if (key2) {
+ TObject *nobj = key2->ReadObj();
+ nobj->ResetBit(kMustCleanup);
+ int nbytes1 = target->WriteTObject(nobj, key2->GetName(), "SingleKey" );
+ if (nbytes1 <= 0) status = -1;
+ delete nobj;
+ }
+ }
+ nextsource = (TFile*)sourcelist->After( nextsource );
+ }
+ }
+
+ // now write the merged histogram (which is "in" obj) to the target file
+ // note that this will just store obj in the current directory level,
+ // which is not persistent until the complete directory itself is stored
+ // by "target->Write()" below
+ target->cd();
+
+ //!!if the object is a tree, it is stored in globChain...
+ if(obj->IsA()->InheritsFrom( TDirectory::Class() )) {
+ //printf("cas d'une directory\n");
+ } else if(obj->IsA()->InheritsFrom( TTree::Class() )) {
+ if (!fNoTrees) {
+ globChain->ls();
+ globChain->Merge(target->GetFile(),0,"keep fast");
+ delete globChain;
+ }
+ } else {
+ int nbytes2 = obj->Write( key->GetName(), TObject::kSingleKey );
+ if (nbytes2 <= 0) status = -1;
+ }
+ oldkey = key;
+ delete obj;
+ } // while ( ( TKey *key = (TKey*)nextkey() ) )
+ first_source = (TDirectory*)sourcelist->After(first_source);
+ }
+ // save modifications to target file
+ target->SaveSelf(kTRUE);
+ //
+ gSystem->GetProcInfo(&procInfo);
+ AliInfo(Form("<< memory usage %ld %ld", procInfo.fMemResident, procInfo.fMemVirtual));
+
+ return status;
+}
+
+//___________________________________________________________________________
+int AliFileMerger::OpenNextChunks(const TList* namesList, TList* filesList, Int_t from, Int_t to)
+{
+ gSystem->GetProcInfo(&procInfo);
+ AliInfo(Form(">> memory usage %ld %ld", procInfo.fMemResident, procInfo.fMemVirtual));
+
+ filesList->Clear();
+ int nEnt = namesList->GetEntries();
+ from = from<nEnt ? from : nEnt;
+ to = to<nEnt ? to : nEnt;
+ int count = 0;
+ for (int i=from;i<=to;i++) {
+ TNamed* fnam = (TNamed*)namesList->At(i);
+ if (!fnam) continue;
+ TString fnamS(fnam->GetName());
+ gSystem->ExpandPathName(fnamS);
+ if (fnamS.BeginsWith("alien://") && !gGrid) TGrid::Connect("alien");
+ TFile* source = TFile::Open(fnam->GetName());
+ if( source==0 ) { printf("Failed to open file %s, will skip\n",fnam->GetName()); continue; }
+ filesList->Add(source);
+ printf("Opened file %s\n",fnam->GetName());
+ count++;
+ }
+ gSystem->GetProcInfo(&procInfo);
+ AliInfo(Form("<< memory usage %ld %ld", procInfo.fMemResident, procInfo.fMemVirtual));
+
+ return count;
+}
+
+
+//___________________________________________________________________________
+int AliFileMerger::AddFile(TList* namesList, std::string entry)
+{
+ // add a new file to the list of files
+ // static int count(0);
+ if( entry.empty() ) return 0;
+ size_t j =entry.find_first_not_of(' ');
+ if( j==std::string::npos ) return 0;
+ entry = entry.substr(j);
+ if( entry.substr(0,1)=="@") {
+ std::ifstream indirect_file(entry.substr(1).c_str() );
+ if( ! indirect_file.is_open() ) {
+ std::cerr<< "Could not open indirect file " << entry.substr(1) << std::endl;
+ return 1;
+ }
+ while( indirect_file ){
+ std::string line;
+ std::getline(indirect_file, line);
+ if( AddFile(namesList, line)!=0 ) return 1;;
+ }
+ return 0;
+ }
+ // cout << "Source file " << (++count) << ": " << entry << endl;
+ namesList->Add(new TNamed(entry,""));
+ return 0;
+}