#include "Riostream.h"
#include "AliSysInfo.h"
#include "AliFileMerger.h"
+#include "AliLog.h"
+using std::cerr;
+using std::endl;
+using std::cout;
+using std::ifstream;
ClassImp(AliFileMerger)
+ProcInfo_t procInfo;//TMP
+
////////////////////////////////////////////////////////////////////////
AliFileMerger::AliFileMerger():
TNamed(),
fRejectMask(0),
fAcceptMask(0),
+ fMaxFilesOpen(800),
fNoTrees(kFALSE)
{
//
TNamed(name,name),
fRejectMask(0),
fAcceptMask(0),
+ fMaxFilesOpen(800),
fNoTrees(kFALSE)
{
//
//
// Merge the files coming out of the calibration job
//
- TString outputFile(outputFileName);
- gSystem->ExpandPathName(outputFile);
TString command;
// looking for files to be merged in the output directory
command = Form("find %s/ *%s", outputDir, pattern);
printf("looking for file %s\n",(objs->GetString()).Data());
AddFile(&sourcelist, (objs->GetString()).Data());;
}
- printf("List of files to be merged:\n");
- sourcelist.Print();
- //
+ //
+ IterList(&sourcelist, outputFileName, dontOverwrite);
+ delete res;
+}
+
+void AliFileMerger::IterList(const TList* namesList, const char* outputFileName, Bool_t dontOverwrite)
+{
+ // merge in steps or in one go
+ //
+ gSystem->GetProcInfo(&procInfo);
+ AliInfo(Form(">> memory usage %ld %ld", procInfo.fMemResident, procInfo.fMemVirtual));
+ //
+ TString outputFile(outputFileName);
+ gSystem->ExpandPathName(outputFile);
+ //
+ int nFiles = namesList->GetEntries();
+ int maxSrcOpen = fMaxFilesOpen - 1;
+ TList filesList;
+ filesList.SetOwner(kTRUE);
+ //
+ TString tmpDest[2] = {outputFile,outputFile}; // names for tmp files
+ int npl = outputFile.Last('.');
+ if (npl<0) npl = outputFile.Length();
+ for (int i=0;i<2;i++) tmpDest[i].Insert(npl,Form("_TMPMERGE%d_",i));
+ //
+ int nsteps = 0, currTmp = 0, start = 0;
+ for (int ifl=0;ifl<nFiles;ifl++) {
+ int st = ifl%maxSrcOpen;
+ if (st==0 && ifl) { // new chunk should be started, merge what was already accumulated
+ OpenNextChunks(namesList,&filesList,start,ifl-1);
+ start = ifl; // remember where to start next step
+ if (nsteps++) { // if not 1st one, merge the privous chunk with this one
+ filesList.AddFirst(TFile::Open(tmpDest[currTmp].Data()));
+ currTmp = (currTmp==0) ? 1:0; // swap tmp files
+ }
+ // open temp target
+ TFile* targetTmp = TFile::Open( tmpDest[currTmp].Data(), "RECREATE");
+ if (!targetTmp || targetTmp->IsZombie()) {
+ printf("Error opening temporary file %s\n",tmpDest[currTmp].Data());
+ return;
+ }
+ MergeRootfile(targetTmp, &filesList);
+ targetTmp->Close();
+ delete targetTmp;
+ filesList.Clear(); // close all open files
+ }
+ // nothing to do until needed amount of files is accumulated
+ }
+ // merge last step
TFile* target = TFile::Open( outputFile.Data(), (dontOverwrite ? "CREATE":"RECREATE") );
if (!target || target->IsZombie()) {
cerr << "Error opening target file (does " << outputFileName << " exist?)." << endl;
cerr << "Use force = kTRUE to re-creation of output file." << endl;
return;
- }
- MergeRootfile( target, &sourcelist);
- delete res;
+ }
+ OpenNextChunks(namesList,&filesList,start,nFiles-1);
+ // add result of previous merges
+ if (nsteps) filesList.AddFirst(TFile::Open(tmpDest[currTmp].Data()));
+ MergeRootfile( target, &filesList);
+ target->Close();
delete target;
+ filesList.Clear();
+ //
+ for (int i=0;i<2;i++) gSystem->Exec(Form("if [ -e %s ]; then \nrm %s\nfi",tmpDest[i].Data(),tmpDest[i].Data()));
+ //
+ printf("Merged %d files in %d steps\n",nFiles,++nsteps);
+ //
+ gSystem->GetProcInfo(&procInfo);
+ AliInfo(Form("<< memory usage %ld %ld", procInfo.fMemResident, procInfo.fMemVirtual));
}
void AliFileMerger::IterTXT( const char * fileList, const char* outputFileName, Bool_t dontOverwrite){
AddFile(&sourcelist, objfile.Data());
}
//
- printf("List of files to be merged:\n");
- sourcelist.Print();
+ IterList(&sourcelist, outputFileName, dontOverwrite);
//
- TString outputFile(outputFileName);
- gSystem->ExpandPathName(outputFile);
- TFile* target = TFile::Open( outputFile.Data(), (dontOverwrite ? "CREATE":"RECREATE") );
- if (!target || target->IsZombie()) {
- cerr << "Error opening target file (does " << outputFileName << " exist?)." << endl;
- cerr << "Use force = kTRUE to re-creation of output file." << endl;
- return;
- }
-
- MergeRootfile( target, &sourcelist);
- delete target;
}
void AliFileMerger::StoreResults(TObjArray * array, const char* outputFileName){
return accept;
}
+Bool_t AliFileMerger::IsRejected(TString name){
+ //
+ // check is the name is explicitly in the rejection list
+ // if fRejectMask speciefied - entry with name speciief in the list are rejected
+ //
+ Bool_t reject=kFALSE;
+ if (fRejectMask){
+ //
+ for (Int_t ireject=0; ireject<fRejectMask->GetEntries(); ireject++){
+ if (name.Contains(fRejectMask->At(ireject)->GetName())) {reject=kTRUE; break;} // entry was rejected
+ }
+ }
+ return reject;
+}
}
//___________________________________________________________________________
-int AliFileMerger::MergeRootfile( TDirectory *target, TList *sourcelist)
+int AliFileMerger::MergeRootfile( TDirectory *target, TList *sourcelist, Bool_t nameFiltering)
{
// Merge all objects in a directory
// modified version of root's hadd.cxx
- Int_t counterF = -1;
+ gSystem->GetProcInfo(&procInfo);
+ AliInfo(Form(">> memory usage %ld %ld", procInfo.fMemResident, procInfo.fMemVirtual));
+ //
int status = 0;
cout << "Target path: " << target->GetPath() << endl;
TString path( (char*)strstr( target->GetPath(), ":" ) );
listHargs.Form("((TCollection*)0x%lx)", (ULong_t)&listH);
//
while(first_source) {
- counterF++;
+ //
TDirectory *current_sourcedir = first_source->GetDirectory(path);
if (!current_sourcedir) {
first_source = (TDirectory*)sourcelist->After(first_source);
TH1::AddDirectory(kFALSE);
//
int counterK = 0;
+ int counterF=0;
//
while ( (key = (TKey*)nextkey())) {
if (current_sourcedir == target) break;
//
// check if we don't reject this name
TString nameK(key->GetName());
- if (!IsAccepted(nameK)) {
+ if ((!IsAccepted(nameK) && nameFiltering) || (!nameFiltering && IsRejected(nameK))) {
if (!counterF) printf("Object %s is in rejection list, skipping...\n",nameK.Data());
continue;
}
<< key->GetName() << " title: " << key->GetTitle() << endl;
continue;
}
+ printf("Merging object %s, anchor directory: %s\n",key->GetName(),key->GetMotherDir()->GetPath());
allNames.Add(new TObjString(key->GetName()));
- AliSysInfo::AddStamp(nameK.Data(),1,counterK++,counterF-1);
+ AliSysInfo::AddStamp(nameK.Data(),1,++counterK,counterF++);
// read object from first source file
//current_sourcedir->cd();
+
+ TDirectory* currDir = gDirectory;
+ key->GetMotherDir()->cd();
TObject *obj = key->ReadObj();
- //printf("keyname=%s, obj=%x\n",key->GetName(),obj);
-
+ currDir->cd();
+ if (!obj) {
+ AliError(Form("Failed to get the object with key %s from %s",key->GetName(),current_sourcedir->GetFile()->GetName()));
+ continue;
+ }
+
if ( obj->IsA()->InheritsFrom( TTree::Class() ) ) {
// loop over all source files create a chain of Trees "globChain"
// newdir is now the starting point of another round of merging
// newdir still knows its depth within the target file via
// GetPath(), so we can still figure out where we are in the recursion
- status = MergeRootfile( newdir, sourcelist);
+ status = MergeRootfile( newdir, sourcelist, kFALSE);
if (status) return status;
} else if ( obj->InheritsFrom(TObject::Class())
TKey *key2 = (TKey*)gDirectory->GetListOfKeys()->FindObject(key->GetName());
if (key2) {
TObject *hobj = key2->ReadObj();
+ if (!hobj) {
+ cout << "Failed to get the object with key " << key2->GetName() << " from " <<
+ ndir->GetFile()->GetName() << "/" << ndir->GetName() << endl;
+ nextsource = (TFile*)sourcelist->After( nextsource );
+ continue;
+ }
+ //
hobj->ResetBit(kMustCleanup);
listH.Add(hobj);
Int_t error = 0;
- obj->Execute("Merge", listHargs.Data(), &error);
+ obj->Execute("Merge", listHargs.Data(), &error); // RS Probleme here
if (error) {
cerr << "Error calling Merge() on " << obj->GetName()
<< " with the corresponding object in " << nextsource->GetName() << endl;
}
listH.Delete();
+ // get the number of processed entries to be put in the syswatch.log
+ Double_t numberOfEntries = -1;
+ if (obj->IsA()->GetMethodAllAny("GetEntries"))
+ {
+ TMethodCall getEntries(obj->IsA(), "GetEntries", "");
+ getEntries.Execute(obj, numberOfEntries);
+ }
+ AliSysInfo::AddStamp(nameK.Data(),1,counterK,counterF++,numberOfEntries);
}
}
nextsource = (TFile*)sourcelist->After( nextsource );
THStack *hstack2 = (THStack*) key2->ReadObj();
l->Add(hstack2->GetHists()->Clone());
delete hstack2;
+ AliSysInfo::AddStamp(nameK.Data(),1,counterK,counterF++);
}
}
// save modifications to target file
target->SaveSelf(kTRUE);
//
+ gSystem->GetProcInfo(&procInfo);
+ AliInfo(Form("<< memory usage %ld %ld", procInfo.fMemResident, procInfo.fMemVirtual));
+
return status;
}
//___________________________________________________________________________
-int AliFileMerger::AddFile(TList* sourcelist, std::string entry)
+int AliFileMerger::OpenNextChunks(const TList* namesList, TList* filesList, Int_t from, Int_t to)
+{
+ gSystem->GetProcInfo(&procInfo);
+ AliInfo(Form(">> memory usage %ld %ld", procInfo.fMemResident, procInfo.fMemVirtual));
+
+ filesList->Clear();
+ int nEnt = namesList->GetEntries();
+ from = from<nEnt ? from : nEnt;
+ to = to<nEnt ? to : nEnt;
+ int count = 0;
+ for (int i=from;i<=to;i++) {
+ TNamed* fnam = (TNamed*)namesList->At(i);
+ if (!fnam) continue;
+ TString fnamS(fnam->GetName());
+ gSystem->ExpandPathName(fnamS);
+ if (fnamS.BeginsWith("alien://") && !gGrid) TGrid::Connect("alien");
+ TFile* source = TFile::Open(fnam->GetName());
+ if( source==0 ) { printf("Failed to open file %s, will skip\n",fnam->GetName()); continue; }
+ filesList->Add(source);
+ printf("Opened file %s\n",fnam->GetName());
+ count++;
+ }
+ gSystem->GetProcInfo(&procInfo);
+ AliInfo(Form("<< memory usage %ld %ld", procInfo.fMemResident, procInfo.fMemVirtual));
+
+ return count;
+}
+
+
+//___________________________________________________________________________
+int AliFileMerger::AddFile(TList* namesList, std::string entry)
{
// add a new file to the list of files
// static int count(0);
while( indirect_file ){
std::string line;
std::getline(indirect_file, line);
- if( AddFile(sourcelist, line)!=0 )return 1;;
+ if( AddFile(namesList, line)!=0 ) return 1;;
}
return 0;
}
// cout << "Source file " << (++count) << ": " << entry << endl;
-
- TFile* source = TFile::Open( entry.c_str());
- if( source==0 ) {
- cout << "Failed to open " << entry << " will skip" << endl;
- return 0;
- }
- sourcelist->Add(source);
+ namesList->Add(new TNamed(entry,""));
return 0;
}