X-Git-Url: http://git.uio.no/git/?a=blobdiff_plain;f=ANALYSIS%2FAliAnalysisManager.cxx;h=3e0264907010dbf3524ea67e100a04b9184a8ba6;hb=ce972512b6b999f1a06251fd325d5ec2154d6b25;hp=5a7f70bdac0ded856d8571a18ceb3cb57e7e8c41;hpb=aee5ee4424a87ea1086e015211c1cdf042acc22b;p=u%2Fmrichter%2FAliRoot.git diff --git a/ANALYSIS/AliAnalysisManager.cxx b/ANALYSIS/AliAnalysisManager.cxx index 5a7f70bdac0..3e026490701 100644 --- a/ANALYSIS/AliAnalysisManager.cxx +++ b/ANALYSIS/AliAnalysisManager.cxx @@ -27,15 +27,21 @@ #include +#include #include #include +//#include +#include +#include #include #include #include #include #include +#include #include "AliAnalysisSelector.h" +#include "AliAnalysisGrid.h" #include "AliAnalysisTask.h" #include "AliAnalysisDataContainer.h" #include "AliAnalysisDataSlot.h" @@ -47,6 +53,7 @@ ClassImp(AliAnalysisManager) AliAnalysisManager *AliAnalysisManager::fgAnalysisManager = NULL; +TString AliAnalysisManager::fgCommonFileName = ""; //______________________________________________________________________________ AliAnalysisManager::AliAnalysisManager(const char *name, const char *title) @@ -55,7 +62,7 @@ AliAnalysisManager::AliAnalysisManager(const char *name, const char *title) fInputEventHandler(NULL), fOutputEventHandler(NULL), fMCtruthEventHandler(NULL), - fEventPool(NULL), + fEventPool(NULL), fCurrentEntry(-1), fNSysInfo(0), fMode(kLocalAnalysis), @@ -68,17 +75,25 @@ AliAnalysisManager::AliAnalysisManager(const char *name, const char *title) fContainers(NULL), fInputs(NULL), fOutputs(NULL), - fSelector(NULL) + fParamCont(NULL), + fCommonInput(NULL), + fCommonOutput(NULL), + fSelector(NULL), + fGridHandler(NULL), + fExtraFiles("") { // Default constructor. fgAnalysisManager = this; + fgCommonFileName = "AnalysisResults.root"; fTasks = new TObjArray(); fTopTasks = new TObjArray(); fZombies = new TObjArray(); fContainers = new TObjArray(); fInputs = new TObjArray(); fOutputs = new TObjArray(); + fParamCont = new TObjArray(); SetEventLoop(kTRUE); + TObject::SetObjectStat(kFALSE); } //______________________________________________________________________________ @@ -88,7 +103,7 @@ AliAnalysisManager::AliAnalysisManager(const AliAnalysisManager& other) fInputEventHandler(NULL), fOutputEventHandler(NULL), fMCtruthEventHandler(NULL), - fEventPool(NULL), + fEventPool(NULL), fCurrentEntry(-1), fNSysInfo(0), fMode(other.fMode), @@ -101,7 +116,12 @@ AliAnalysisManager::AliAnalysisManager(const AliAnalysisManager& other) fContainers(NULL), fInputs(NULL), fOutputs(NULL), - fSelector(NULL) + fParamCont(NULL), + fCommonInput(NULL), + fCommonOutput(NULL), + fSelector(NULL), + fGridHandler(NULL), + fExtraFiles() { // Copy constructor. fTasks = new TObjArray(*other.fTasks); @@ -110,7 +130,10 @@ AliAnalysisManager::AliAnalysisManager(const AliAnalysisManager& other) fContainers = new TObjArray(*other.fContainers); fInputs = new TObjArray(*other.fInputs); fOutputs = new TObjArray(*other.fOutputs); + fParamCont = new TObjArray(*other.fParamCont); + fgCommonFileName = "AnalysisResults.root"; fgAnalysisManager = this; + TObject::SetObjectStat(kFALSE); } //______________________________________________________________________________ @@ -135,7 +158,13 @@ AliAnalysisManager& AliAnalysisManager::operator=(const AliAnalysisManager& othe fContainers = new TObjArray(*other.fContainers); fInputs = new TObjArray(*other.fInputs); fOutputs = new TObjArray(*other.fOutputs); + fParamCont = new TObjArray(*other.fParamCont); + fCommonInput = NULL; + fCommonOutput = NULL; fSelector = NULL; + fGridHandler = NULL; + fExtraFiles = other.fExtraFiles; + fgCommonFileName = "AnalysisResults.root"; fgAnalysisManager = this; } return *this; @@ -151,73 +180,94 @@ AliAnalysisManager::~AliAnalysisManager() if (fContainers) {fContainers->Delete(); delete fContainers;} if (fInputs) delete fInputs; if (fOutputs) delete fOutputs; + if (fParamCont) delete fParamCont; + if (fGridHandler) delete fGridHandler; if (fgAnalysisManager==this) fgAnalysisManager = NULL; + TObject::SetObjectStat(kTRUE); } //______________________________________________________________________________ Int_t AliAnalysisManager::GetEntry(Long64_t entry, Int_t getall) { // Read one entry of the tree or a whole branch. - if (fDebug > 0) printf("== AliAnalysisManager::GetEntry(%lld)\n", entry); fCurrentEntry = entry; return fTree ? fTree->GetTree()->GetEntry(entry, getall) : 0; } //______________________________________________________________________________ -void AliAnalysisManager::Init(TTree *tree) +Bool_t AliAnalysisManager::Init(TTree *tree) { // The Init() function is called when the selector needs to initialize // a new tree or chain. Typically here the branch addresses of the tree // will be set. It is normaly not necessary to make changes to the // generated code, but the routine can be extended by the user if needed. // Init() will be called many times when running with PROOF. - if (!tree) return; + Bool_t init = kFALSE; + if (!tree) return kFALSE; // Should not happen - protected in selector caller if (fDebug > 0) { printf("->AliAnalysisManager::Init(%s)\n", tree->GetName()); } - // Call InitTree of EventHandler if (fOutputEventHandler) { if (fMode == kProofAnalysis) { - fOutputEventHandler->Init(0x0, "proof"); + init = fOutputEventHandler->Init(0x0, "proof"); } else { - fOutputEventHandler->Init(0x0, "local"); + init = fOutputEventHandler->Init(0x0, "local"); } + if (!init) { + Error("Init", "Output event handler failed to initialize"); + return kFALSE; + } } - + if (fInputEventHandler) { if (fMode == kProofAnalysis) { - fInputEventHandler->Init(tree, "proof"); + init = fInputEventHandler->Init(tree, "proof"); } else { - fInputEventHandler->Init(tree, "local"); + init = fInputEventHandler->Init(tree, "local"); } + if (!init) { + Error("Init", "Input event handler failed to initialize tree"); + return kFALSE; + } } else { // If no input event handler we need to get the tree once // for the chain - if(!tree->GetTree()) tree->LoadTree(0); + if(!tree->GetTree()) { + Long64_t readEntry = tree->LoadTree(0); + if (readEntry == -2) { + Error("Init", "Input tree has no entry. Exiting"); + return kFALSE; + } + } } - if (fMCtruthEventHandler) { if (fMode == kProofAnalysis) { - fMCtruthEventHandler->Init(0x0, "proof"); + init = fMCtruthEventHandler->Init(0x0, "proof"); } else { - fMCtruthEventHandler->Init(0x0, "local"); + init = fMCtruthEventHandler->Init(0x0, "local"); } + if (!init) { + Error("Init", "MC event handler failed to initialize"); + return kFALSE; + } } if (!fInitOK) InitAnalysis(); - if (!fInitOK) return; + if (!fInitOK) return kFALSE; fTree = tree; - AliAnalysisDataContainer *top = (AliAnalysisDataContainer*)fInputs->At(0); + AliAnalysisDataContainer *top = fCommonInput; + if (!top) top = (AliAnalysisDataContainer*)fInputs->At(0); if (!top) { Error("Init","No top input container !"); - return; + return kFALSE; } top->SetData(tree); if (fDebug > 0) { printf("<-AliAnalysisManager::Init(%s)\n", tree->GetName()); } + return kTRUE; } //______________________________________________________________________________ @@ -228,44 +278,69 @@ void AliAnalysisManager::SlaveBegin(TTree *tree) // The tree argument is deprecated (on PROOF 0 is passed). if (fDebug > 0) printf("->AliAnalysisManager::SlaveBegin()\n"); static Bool_t isCalled = kFALSE; + Bool_t init = kFALSE; + Bool_t initOK = kTRUE; + TString msg; + TDirectory *curdir = gDirectory; // Call SlaveBegin only once in case of mixing if (isCalled && fMode==kMixingAnalysis) return; // Call Init of EventHandler if (fOutputEventHandler) { if (fMode == kProofAnalysis) { - fOutputEventHandler->Init("proof"); + // Merging AOD's in PROOF via TProofOutputFile + if (fDebug > 1) printf(" Initializing AOD output file %s...\n", fOutputEventHandler->GetOutputFileName()); + init = fOutputEventHandler->Init("proof"); + if (!init) msg = "Failed to initialize output handler on worker"; } else { - fOutputEventHandler->Init("local"); + init = fOutputEventHandler->Init("local"); + if (!init) msg = "Failed to initialize output handler"; } + initOK &= init; + if (!fSelector) Error("SlaveBegin", "Selector not set"); + else if (!init) {fSelector->Abort(msg); fSelector->SetStatus(-1);} } if (fInputEventHandler) { fInputEventHandler->SetInputTree(tree); if (fMode == kProofAnalysis) { - fInputEventHandler->Init("proof"); + init = fInputEventHandler->Init("proof"); + if (!init) msg = "Failed to initialize input handler on worker"; } else { - fInputEventHandler->Init("local"); + init = fInputEventHandler->Init("local"); + if (!init) msg = "Failed to initialize input handler"; } + initOK &= init; + if (!fSelector) Error("SlaveBegin", "Selector not set"); + else if (!init) {fSelector->Abort(msg); fSelector->SetStatus(-1);} } if (fMCtruthEventHandler) { if (fMode == kProofAnalysis) { - fMCtruthEventHandler->Init("proof"); + init = fMCtruthEventHandler->Init("proof"); + if (!init) msg = "Failed to initialize MC handler on worker"; } else { - fMCtruthEventHandler->Init("local"); + init = fMCtruthEventHandler->Init("local"); + if (!init) msg = "Failed to initialize MC handler"; } + initOK &= init; + if (!fSelector) Error("SlaveBegin", "Selector not set"); + else if (!init) {fSelector->Abort(msg); fSelector->SetStatus(-1);} } - + if (curdir) curdir->cd(); + isCalled = kTRUE; + if (!initOK) return; TIter next(fTasks); AliAnalysisTask *task; // Call CreateOutputObjects for all tasks + Bool_t getsysInfo = ((fNSysInfo>0) && (fMode==kLocalAnalysis))?kTRUE:kFALSE; + Int_t itask = 0; while ((task=(AliAnalysisTask*)next())) { - TDirectory *curdir = gDirectory; + curdir = gDirectory; task->CreateOutputObjects(); + if (getsysInfo) AliSysInfo::AddStamp(Form("%s_CREATEOUTOBJ",task->ClassName()), 0, itask, 0); + itask++; if (curdir) curdir->cd(); } - isCalled = kTRUE; - if (fDebug > 0) printf("<-AliAnalysisManager::SlaveBegin()\n"); } @@ -277,10 +352,8 @@ Bool_t AliAnalysisManager::Notify() // is started when using PROOF. It is normaly not necessary to make changes // to the generated code, but the routine can be extended by the // user if needed. The return value is currently not used. - if (!fTree) { - Error("Notify","No current tree."); - return kFALSE; - } + if (!fTree) return kFALSE; + TFile *curfile = fTree->GetCurrentFile(); if (!curfile) { Error("Notify","No current file"); @@ -349,7 +422,7 @@ void AliAnalysisManager::PackOutput(TList *target) // stage in PROOF case for each slave. if (fDebug > 0) printf("->AliAnalysisManager::PackOutput()\n"); if (!target) { - Error("PackOutput", "No target. Aborting."); + Error("PackOutput", "No target. Exiting."); return; } if (fInputEventHandler) fInputEventHandler ->Terminate(); @@ -371,33 +444,32 @@ void AliAnalysisManager::PackOutput(TList *target) if (fMode == kProofAnalysis) { TIter next(fOutputs); AliAnalysisDataContainer *output; + Bool_t isManagedByHandler = kFALSE; while ((output=(AliAnalysisDataContainer*)next())) { // Do not consider outputs of post event loop tasks + isManagedByHandler = kFALSE; if (output->GetProducer()->IsPostEventLoop()) continue; + const char *filename = output->GetFileName(); + if (!(strcmp(filename, "default")) && fOutputEventHandler) { + isManagedByHandler = kTRUE; + printf("#### Handler output. Extra: %s\n", fExtraFiles.Data()); + filename = fOutputEventHandler->GetOutputFileName(); + } // Check if data was posted to this container. If not, issue an error. - if (!output->GetData() ) { - Error("PackOutput", "No data for output container %s. Forgot to PostData ?\n", output->GetName()); + if (!output->GetData() && !isManagedByHandler) { + Error("PackOutput", "No data for output container %s. Forgot to PostData ?", output->GetName()); continue; } if (!output->IsSpecialOutput()) { // Normal outputs - const char *filename = output->GetFileName(); - if (!(strcmp(filename, "default"))) { - if (fOutputEventHandler) filename = fOutputEventHandler->GetOutputFileName(); - } - if (strlen(filename)) { - // File resident outputs - TFile *file = output->GetFile(); + if (strlen(filename) && !isManagedByHandler) { // Backup current folder TDirectory *opwd = gDirectory; - // Create file if not existing and register to container. - if (file) file->cd(); - else file = new TFile(filename, "RECREATE"); - if (file->IsZombie()) { - Fatal("PackOutput", "Could not recreate file %s\n", filename); - return; - } - output->SetFile(file); + // File resident outputs. + // Check first if the file exists. + TString openoption = "RECREATE"; + if (!gSystem->AccessPathName(output->GetFileName())) openoption = "UPDATE"; + TFile *file = AliAnalysisManager::OpenFile(output, openoption, kTRUE); // Clear file list to release object ownership to user. file->Clear(); // Save data to file, then close. @@ -408,7 +480,13 @@ void AliAnalysisManager::PackOutput(TList *target) coll->SetName(output->GetName()); coll->Write(output->GetName(), TObject::kSingleKey); } else { - output->GetData()->Write(); + if (output->GetData()->InheritsFrom(TTree::Class())) { + TTree *tree = (TTree*)output->GetData(); + // tree->SetDirectory(file); + tree->AutoSave(); + } else { + output->GetData()->Write(); + } } if (fDebug > 1) printf("PackOutput %s: memory merge, file resident output\n", output->GetName()); if (fDebug > 2) { @@ -416,45 +494,70 @@ void AliAnalysisManager::PackOutput(TList *target) file->ls(); } file->Close(); + output->SetFile(NULL); // Restore current directory if (opwd) opwd->cd(); } else { // Memory-resident outputs - if (fDebug > 1) printf("PackOutput %s: memory merge memory resident output\n", output->GetName()); + if (fDebug > 1) printf("PackOutput %s: memory merge memory resident output\n", filename); + } + AliAnalysisDataWrapper *wrap = 0; + if (isManagedByHandler) { + wrap = new AliAnalysisDataWrapper(fOutputEventHandler->GetTree()); + wrap->SetName(output->GetName()); } - AliAnalysisDataWrapper *wrap = output->ExportData(); - // Output wrappers must delete data after merging (AG 13/11/07) - wrap->SetDeleteData(kTRUE); + else wrap =output->ExportData(); + // Output wrappers must NOT delete data after merging - the user owns them + wrap->SetDeleteData(kFALSE); target->Add(wrap); - } - // Special outputs - if (output->IsSpecialOutput()) { + } else { + // Special outputs. The file must be opened and connected to the container. TDirectory *opwd = gDirectory; TFile *file = output->GetFile(); if (!file) { AliAnalysisTask *producer = output->GetProducer(); - Error("PackOutput", + Fatal("PackOutput", "File %s for special container %s was NOT opened in %s::CreateOutputObjects !!!", output->GetFileName(), output->GetName(), producer->ClassName()); continue; } - file->cd(); - // Release object ownership to users after writing data to file - if (output->GetData()->InheritsFrom(TCollection::Class())) { - // If data is a collection, we set the name of the collection - // as the one of the container and we save as a single key. - TCollection *coll = (TCollection*)output->GetData(); - coll->SetName(output->GetName()); - coll->Write(output->GetName(), TObject::kSingleKey); - } else { - output->GetData()->Write(); - } - file->Clear(); - if (fDebug > 2) { - printf(" file %s listing content:\n", output->GetFileName()); - file->ls(); - } - file->Close(); + TString outFilename = file->GetName(); + if (fDebug > 1) printf("PackOutput %s: special output\n", output->GetName()); + if (isManagedByHandler) { + // Terminate IO for files managed by the output handler + // file->Write() moved to AOD handler (A.G. 11.01.10) +// if (file) file->Write(); + if (file && fDebug > 2) { + printf(" handled file %s listing content:\n", file->GetName()); + file->ls(); + } + fOutputEventHandler->TerminateIO(); + } else { + file->cd(); + // Release object ownership to users after writing data to file + if (output->GetData()->InheritsFrom(TCollection::Class())) { + // If data is a collection, we set the name of the collection + // as the one of the container and we save as a single key. + TCollection *coll = (TCollection*)output->GetData(); + coll->SetName(output->GetName()); + coll->Write(output->GetName(), TObject::kSingleKey); + } else { + if (output->GetData()->InheritsFrom(TTree::Class())) { + TTree *tree = (TTree*)output->GetData(); + tree->SetDirectory(file); + tree->AutoSave(); + } else { + output->GetData()->Write(); + } + } + file->Clear(); + if (fDebug > 2) { + printf(" file %s listing content:\n", output->GetFileName()); + file->ls(); + } + file->Close(); + output->SetFile(NULL); + } // Restore current directory if (opwd) opwd->cd(); // Check if a special output location was provided or the output files have to be merged @@ -462,13 +565,45 @@ void AliAnalysisManager::PackOutput(TList *target) TString remote = fSpecialOutputLocation; remote += "/"; Int_t gid = gROOT->ProcessLine("gProofServ->GetGroupId();"); - remote += Form("%s_%d_", gSystem->HostName(), gid); - remote += output->GetFileName(); - TFile::Cp(output->GetFileName(), remote.Data()); + if (remote.BeginsWith("alien://")) { + gROOT->ProcessLine("TGrid::Connect(\"alien://pcapiserv01.cern.ch:10000\", gProofServ->GetUser());"); + remote += outFilename; + remote.ReplaceAll(".root", Form("_%d.root", gid)); + } else { + remote += Form("%s_%d_", gSystem->HostName(), gid); + remote += outFilename; + } + if (fDebug > 1) + Info("PackOutput", "Output file for container %s to be copied \n at: %s. No merging.", + output->GetName(), remote.Data()); + TFile::Cp ( outFilename.Data(), remote.Data() ); + // Copy extra outputs + if (fExtraFiles.Length() && isManagedByHandler) { + TObjArray *arr = fExtraFiles.Tokenize(" "); + TObjString *os; + TIter nextfilename(arr); + while ((os=(TObjString*)nextfilename())) { + outFilename = os->GetString(); + remote = fSpecialOutputLocation; + remote += "/"; + if (remote.BeginsWith("alien://")) { + remote += outFilename; + remote.ReplaceAll(".root", Form("_%d.root", gid)); + } else { + remote += Form("%s_%d_", gSystem->HostName(), gid); + remote += outFilename; + } + if (fDebug > 1) + Info("PackOutput", "Extra AOD file %s to be copied \n at: %s. No merging.", + outFilename.Data(), remote.Data()); + TFile::Cp ( outFilename.Data(), remote.Data() ); + } + delete arr; + } } else { // No special location specified-> use TProofOutputFile as merging utility // The file at this output slot must be opened in CreateOutputObjects - if (fDebug > 1) printf(" File %s to be merged...\n", output->GetFileName()); + if (fDebug > 1) printf(" File for container %s to be merged via file merger...\n", output->GetName()); } } } @@ -485,36 +620,70 @@ void AliAnalysisManager::ImportWrappers(TList *source) AliAnalysisDataContainer *cont; AliAnalysisDataWrapper *wrap; Int_t icont = 0; + Bool_t inGrid = (fMode == kGridAnalysis)?kTRUE:kFALSE; + TDirectory *cdir = gDirectory; while ((cont=(AliAnalysisDataContainer*)next())) { - if (cont->GetProducer()->IsPostEventLoop()) continue; - if (cont->IsSpecialOutput()) { + wrap = 0; + if (cont->GetProducer()->IsPostEventLoop() && !inGrid) continue; + if (cont->IsRegisterDataset()) continue; + const char *filename = cont->GetFileName(); + Bool_t isManagedByHandler = kFALSE; + if (!(strcmp(filename, "default")) && fOutputEventHandler) { + isManagedByHandler = kTRUE; + filename = fOutputEventHandler->GetOutputFileName(); + } + if (cont->IsSpecialOutput() || inGrid) { if (strlen(fSpecialOutputLocation.Data())) continue; - // Copy merged file from PROOF scratch space - if (fDebug > 1) - printf(" Copying file %s from PROOF scratch space\n", cont->GetFileName()); - Bool_t gotit = TFile::Cp(Form("root://lxb6045.cern.ch:11094//pool/scratch/%s",cont->GetFileName()), - cont->GetFileName()); - if (!gotit) { - Error("ImportWrappers", "Could not get file %s from proof scratch space", cont->GetFileName()); - } + // Copy merged file from PROOF scratch space. + // In case of grid the files are already in the current directory. + if (!inGrid) { + if (isManagedByHandler && fExtraFiles.Length()) { + // Copy extra registered dAOD files. + TObjArray *arr = fExtraFiles.Tokenize(" "); + TObjString *os; + TIter nextfilename(arr); + while ((os=(TObjString*)nextfilename())) GetFileFromWrapper(os->GetString(), source); + delete arr; + } + if (!GetFileFromWrapper(filename, source)) continue; + } // Normally we should connect data from the copied file to the // corresponding output container, but it is not obvious how to do this // automatically if several objects in file... - continue; + TFile *f = (TFile*)gROOT->GetListOfFiles()->FindObject(filename); + if (!f) f = TFile::Open(filename, "READ"); + if (!f) { + Error("ImportWrappers", "Cannot open file %s in read-only mode", filename); + continue; + } + TObject *obj = 0; + // Cd to the directory pointed by the container + TString folder = cont->GetFolderName(); + if (!folder.IsNull()) f->cd(folder); + // Try to fetch first an object having the container name. + obj = gDirectory->Get(cont->GetName()); + if (!obj) { + Warning("ImportWrappers", "Could not import object for container %s in file %s:%s.\n Object will not be available in Terminate()", + cont->GetName(), filename, cont->GetFolderName()); + continue; + } + wrap = new AliAnalysisDataWrapper(obj); + wrap->SetDeleteData(kFALSE); } - wrap = (AliAnalysisDataWrapper*)source->FindObject(cont->GetName()); + if (!wrap) wrap = (AliAnalysisDataWrapper*)source->FindObject(cont->GetName()); if (!wrap) { Error("ImportWrappers","Container %s not found in analysis output !", cont->GetName()); continue; } icont++; if (fDebug > 1) { - printf(" Importing data for container %s", cont->GetName()); - if (strlen(cont->GetFileName())) printf(" -> file %s\n", cont->GetFileName()); + printf(" Importing data for container %s\n", cont->GetName()); + if (strlen(filename)) printf(" -> file %s\n", filename); else printf("\n"); } cont->ImportData(wrap); - } + } + if (cdir) cdir->cd(); if (fDebug > 0) printf("<-AliAnalysisManager::ImportWrappers(): %d containers imported\n", icont); } @@ -524,7 +693,7 @@ void AliAnalysisManager::UnpackOutput(TList *source) // Called by AliAnalysisSelector::Terminate only on the client. if (fDebug > 0) printf("->AliAnalysisManager::UnpackOutput()\n"); if (!source) { - Error("UnpackOutput", "No target. Aborting."); + Error("UnpackOutput", "No target. Exiting."); return; } if (fDebug > 1) printf(" Source list contains %d containers\n", source->GetSize()); @@ -563,38 +732,88 @@ void AliAnalysisManager::Terminate() // the results graphically. if (fDebug > 0) printf("->AliAnalysisManager::Terminate()\n"); AliAnalysisTask *task; + AliAnalysisDataContainer *output; TIter next(fTasks); + TStopwatch timer; + Bool_t getsysInfo = ((fNSysInfo>0) && (fMode==kLocalAnalysis))?kTRUE:kFALSE; // Call Terminate() for tasks - while ((task=(AliAnalysisTask*)next())) task->Terminate(); + Int_t itask = 0; + while (!IsSkipTerminate() && (task=(AliAnalysisTask*)next())) { + // Save all the canvases produced by the Terminate + TString pictname = Form("%s_%s", task->GetName(), task->ClassName()); + task->Terminate(); + if (getsysInfo) + AliSysInfo::AddStamp(Form("%s_TERMINATE",task->ClassName()),0, itask, 2); + itask++; + if (TObject::TestBit(kSaveCanvases)) { + if (!gROOT->IsBatch()) { + if (fDebug>0) printf("Waiting 5 sec for %s::Terminate() to finish drawing ...", task->ClassName()); + timer.Start(); + while (timer.CpuTime()<5) { + timer.Continue(); + gSystem->ProcessEvents(); + } + } + Int_t iend = gROOT->GetListOfCanvases()->GetEntries(); + if (iend==0) continue; + TCanvas *canvas; + for (Int_t ipict=0; ipictGetListOfCanvases()->At(ipict); + if (!canvas) continue; + canvas->SaveAs(Form("%s_%02d.gif", pictname.Data(),ipict)); + } + gROOT->GetListOfCanvases()->Delete(); + } + } // - TIter next1(fOutputs); - AliAnalysisDataContainer *output; + if (fInputEventHandler) fInputEventHandler ->TerminateIO(); + if (fOutputEventHandler) fOutputEventHandler ->TerminateIO(); + if (fMCtruthEventHandler) fMCtruthEventHandler->TerminateIO(); + TObjArray *allOutputs = new TObjArray(); + Int_t icont; + for (icont=0; icontGetEntriesFast(); icont++) allOutputs->Add(fOutputs->At(icont)); + if (!IsSkipTerminate()) + for (icont=0; icontGetEntriesFast(); icont++) allOutputs->Add(fParamCont->At(icont)); + TIter next1(allOutputs); + TString handlerFile = ""; + if (fOutputEventHandler) { + handlerFile = fOutputEventHandler->GetOutputFileName(); + } while ((output=(AliAnalysisDataContainer*)next1())) { - // Close all files at output - // Special outputs have the files already closed and written. - if (output->IsSpecialOutput()) continue; + // Special outputs or grid files have the files already closed and written. + if (fMode == kGridAnalysis) continue; + if (fMode == kProofAnalysis) { + if (output->IsSpecialOutput() || output->IsRegisterDataset()) continue; + } const char *filename = output->GetFileName(); - if (!(strcmp(filename, "default"))) { - if (fOutputEventHandler) filename = fOutputEventHandler->GetOutputFileName(); - TFile *aodfile = (TFile*)gROOT->GetListOfFiles()->FindObject(filename); - if (aodfile) { - if (fDebug > 1) printf("Writing output handler file: %s\n", filename); - aodfile->Write(); - continue; - } - } + TString openoption = "RECREATE"; + if (!(strcmp(filename, "default"))) continue; if (!strlen(filename)) continue; if (!output->GetData()) continue; - TFile *file = output->GetFile(); TDirectory *opwd = gDirectory; - if (file) { - file->cd(); + TFile *file = output->GetFile(); + if (!file) file = (TFile*)gROOT->GetListOfFiles()->FindObject(filename); + if (!file) { + //if (handlerFile == filename && !gSystem->AccessPathName(filename)) openoption = "UPDATE"; + if (!gSystem->AccessPathName(filename)) openoption = "UPDATE"; + if (fDebug>0) printf("Opening file: %s option=%s\n",filename, openoption.Data()); + file = new TFile(filename, openoption); } else { - file = new TFile(filename, "RECREATE"); - if (file->IsZombie()) continue; - output->SetFile(file); + if (fDebug>0) printf("File already opened: %s\n", filename); } - if (fDebug > 1) printf(" writing output data %s to file %s\n", output->GetData()->GetName(), file->GetName()); + if (file->IsZombie()) { + Error("Terminate", "Cannot open output file %s", filename); + continue; + } + output->SetFile(file); + file->cd(); + // Check for a folder request + TString dir = output->GetFolderName(); + if (!dir.IsNull()) { + if (!file->GetDirectory(dir)) file->mkdir(dir); + file->cd(dir); + } + if (fDebug > 0) printf("...writing container %s to file %s:%s\n", output->GetName(), file->GetName(), output->GetFolderName()); if (output->GetData()->InheritsFrom(TCollection::Class())) { // If data is a collection, we set the name of the collection // as the one of the container and we save as a single key. @@ -602,44 +821,210 @@ void AliAnalysisManager::Terminate() coll->SetName(output->GetName()); coll->Write(output->GetName(), TObject::kSingleKey); } else { - output->GetData()->Write(); + if (output->GetData()->InheritsFrom(TTree::Class())) { + TTree *tree = (TTree*)output->GetData(); + tree->SetDirectory(file); + tree->AutoSave(); + } else { + output->GetData()->Write(); + } } - file->Close(); if (opwd) opwd->cd(); } + next1.Reset(); + while ((output=(AliAnalysisDataContainer*)next1())) { + // Close all files at output + TDirectory *opwd = gDirectory; + if (output->GetFile()) { + output->GetFile()->Close(); + output->SetFile(NULL); + // Copy merged outputs in alien if requested + if (fSpecialOutputLocation.Length() && + fSpecialOutputLocation.BeginsWith("alien://")) { + Info("Terminate", "Copy file %s to %s", output->GetFile()->GetName(),fSpecialOutputLocation.Data()); + TFile::Cp(output->GetFile()->GetName(), + Form("%s/%s", fSpecialOutputLocation.Data(), output->GetFile()->GetName())); + } + } + if (opwd) opwd->cd(); + } + delete allOutputs; - if (fInputEventHandler) fInputEventHandler ->TerminateIO(); - if (fOutputEventHandler) fOutputEventHandler ->TerminateIO(); - if (fMCtruthEventHandler) fMCtruthEventHandler->TerminateIO(); - - Bool_t getsysInfo = ((fNSysInfo>0) && (fMode==kLocalAnalysis))?kTRUE:kFALSE; if (getsysInfo) { TDirectory *cdir = gDirectory; TFile f("syswatch.root", "RECREATE"); + TH1 *hist; + TString cut; if (!f.IsZombie()) { TTree *tree = AliSysInfo::MakeTree("syswatch.log"); + tree->SetName("syswatch"); tree->SetMarkerStyle(kCircle); tree->SetMarkerColor(kBlue); tree->SetMarkerSize(0.5); if (!gROOT->IsBatch()) { tree->SetAlias("event", "id0"); - tree->SetAlias("memUSED", "pI.fMemVirtual"); - tree->SetAlias("userCPU", "pI.fCpuUser"); - TCanvas *c = new TCanvas("SysInfo","SysInfo",10,10,800,600); - c->Divide(2,1,0.01,0.01); - c->cd(1); - tree->Draw("memUSED:event","","", 1234567890, 0); - c->cd(2); - tree->Draw("userCPU:event","","", 1234567890, 0); + tree->SetAlias("task", "id1"); + tree->SetAlias("stage", "id2"); + // Already defined aliases + // tree->SetAlias("deltaT","stampSec-stampOldSec"); + // tree->SetAlias("T","stampSec-first"); + // tree->SetAlias("deltaVM","(pI.fMemVirtual-pIOld.fMemVirtual)"); + // tree->SetAlias("VM","pI.fMemVirtual"); + TCanvas *canvas = new TCanvas("SysInfo","SysInfo",10,10,1200,800); + Int_t npads = 1 /*COO plot for all tasks*/ + + fTopTasks->GetEntries() /*Exec plot per task*/ + + 1 /*Terminate plot for all tasks*/ + + 1; /*vm plot*/ + + Int_t iopt = (Int_t)TMath::Sqrt((Double_t)npads); + if (npadsDivide(iopt, iopt+1, 0.01, 0.01); + else + canvas->Divide(iopt+1, iopt+1, 0.01, 0.01); + Int_t ipad = 1; + // draw the plot of deltaVM for Exec for each task + for (itask=0; itaskGetEntriesFast(); itask++) { + task = (AliAnalysisTask*)fTopTasks->At(itask); + canvas->cd(ipad++); + cut = Form("task==%d && stage==1", itask); + tree->Draw("deltaVM:event",cut,"", 1234567890, 0); + hist = (TH1*)gPad->GetListOfPrimitives()->FindObject("htemp"); + if (hist) { + hist->SetTitle(Form("%s: Exec dVM[kB]/event", task->GetName())); + hist->GetYaxis()->SetTitle("deltaVM [kB]"); + } + } + // Draw the plot of deltaVM for CreateOutputObjects for all tasks + canvas->cd(ipad++); + tree->SetMarkerStyle(kFullTriangleUp); + tree->SetMarkerColor(kRed); + tree->SetMarkerSize(0.8); + cut = "task>=0 && task<1000 && stage==0"; + tree->Draw("deltaVM:sname",cut,"", 1234567890, 0); + hist = (TH1*)gPad->GetListOfPrimitives()->FindObject("htemp"); + if (hist) { + hist->SetTitle("Memory in CreateOutputObjects()"); + hist->GetYaxis()->SetTitle("deltaVM [kB]"); + hist->GetXaxis()->SetTitle("task"); + } + // draw the plot of deltaVM for Terminate for all tasks + canvas->cd(ipad++); + tree->SetMarkerStyle(kOpenSquare); + tree->SetMarkerColor(kMagenta); + cut = "task>=0 && task<1000 && stage==2"; + tree->Draw("deltaVM:sname",cut,"", 1234567890, 0); + hist = (TH1*)gPad->GetListOfPrimitives()->FindObject("htemp"); + if (hist) { + hist->SetTitle("Memory in Terminate()"); + hist->GetYaxis()->SetTitle("deltaVM [kB]"); + hist->GetXaxis()->SetTitle("task"); + } + // Full VM profile + canvas->cd(ipad++); + tree->SetMarkerStyle(kFullCircle); + tree->SetMarkerColor(kGreen); + cut = Form("task==%d && stage==1",fTopTasks->GetEntriesFast()-1); + tree->Draw("VM:event",cut,"", 1234567890, 0); + hist = (TH1*)gPad->GetListOfPrimitives()->FindObject("htemp"); + if (hist) { + hist->SetTitle("Virtual memory"); + hist->GetYaxis()->SetTitle("VM [kB]"); + } + canvas->Modified(); } + tree->SetMarkerStyle(kCircle); + tree->SetMarkerColor(kBlue); + tree->SetMarkerSize(0.5); tree->Write(); f.Close(); delete tree; } if (cdir) cdir->cd(); + } + // Validate the output files + if (ValidateOutputFiles()) { + ofstream out; + out.open("outputs_valid", ios::out); + out.close(); } if (fDebug > 0) printf("<-AliAnalysisManager::Terminate()\n"); } +//______________________________________________________________________________ +void AliAnalysisManager::ProfileTask(Int_t itop, const char *option) const +{ +// Profiles the task having the itop index in the list of top (first level) tasks. + AliAnalysisTask *task = (AliAnalysisTask*)fTopTasks->At(itop); + if (!task) { + Error("ProfileTask", "There are only %d top tasks in the manager", fTopTasks->GetEntries()); + return; + } + ProfileTask(task->GetName(), option); +} + +//______________________________________________________________________________ +void AliAnalysisManager::ProfileTask(const char *name, const char */*option*/) const +{ +// Profile a managed task after the execution of the analysis in case NSysInfo +// was used. + if (gSystem->AccessPathName("syswatch.root")) { + Error("ProfileTask", "No file syswatch.root found in the current directory"); + return; + } + if (gROOT->IsBatch()) return; + AliAnalysisTask *task = (AliAnalysisTask*)fTopTasks->FindObject(name); + if (!task) { + Error("ProfileTask", "No top task named %s known by the manager.", name); + return; + } + Int_t itop = fTopTasks->IndexOf(task); + Int_t itask = fTasks->IndexOf(task); + // Create canvas with 2 pads: first draw COO + Terminate, second Exec + TDirectory *cdir = gDirectory; + TFile f("syswatch.root"); + TTree *tree = (TTree*)f.Get("syswatch"); + if (!tree) { + Error("ProfileTask", "No tree named found in file syswatch.root"); + return; + } + if (fDebug > 0) printf("=== Profiling task %s (class %s)\n", name, task->ClassName()); + TCanvas *canvas = new TCanvas(Form("profile_%d",itop),Form("Profile of task %s (class %s)",name,task->ClassName()),10,10,800,600); + canvas->Divide(2, 2, 0.01, 0.01); + Int_t ipad = 1; + TString cut; + TH1 *hist; + // VM profile for COO and Terminate methods + canvas->cd(ipad++); + cut = Form("task==%d && (stage==0 || stage==2)",itask); + tree->Draw("deltaVM:sname",cut,"", 1234567890, 0); + hist = (TH1*)gPad->GetListOfPrimitives()->FindObject("htemp"); + if (hist) { + hist->SetTitle("Alocated VM[kB] for COO and Terminate"); + hist->GetYaxis()->SetTitle("deltaVM [kB]"); + hist->GetXaxis()->SetTitle("method"); + } + // CPU profile per event + canvas->cd(ipad++); + cut = Form("task==%d && stage==1",itop); + tree->Draw("deltaT:event",cut,"", 1234567890, 0); + hist = (TH1*)gPad->GetListOfPrimitives()->FindObject("htemp"); + if (hist) { + hist->SetTitle("Execution time per event"); + hist->GetYaxis()->SetTitle("CPU/event [s]"); + } + // VM profile for Exec + canvas->cd(ipad++); + cut = Form("task==%d && stage==1",itop); + tree->Draw("deltaVM:event",cut,"", 1234567890, 0); + hist = (TH1*)gPad->GetListOfPrimitives()->FindObject("htemp"); + if (hist) { + hist->SetTitle("Alocated VM[kB] per event"); + hist->GetYaxis()->SetTitle("deltaVM [kB]"); + } + canvas->Modified(); + delete tree; + f.Close(); + if (cdir) cdir->cd(); +} //______________________________________________________________________________ void AliAnalysisManager::AddTask(AliAnalysisTask *task) @@ -666,11 +1051,13 @@ AliAnalysisDataContainer *AliAnalysisManager::CreateContainer(const char *name, TClass *datatype, EAliAnalysisContType type, const char *filename) { // Create a data container of a certain type. Types can be: -// kExchangeContainer = 0, used to exchange date between tasks +// kExchangeContainer = 0, used to exchange data between tasks // kInputContainer = 1, used to store input data -// kOutputContainer = 2, used for posting results +// kOutputContainer = 2, used for writing result to a file +// filename: composed by file#folder (e.g. results.root#INCLUSIVE) - will write +// the output object to a folder inside the output file if (fContainers->FindObject(name)) { - Error("CreateContainer","A container named %s already defined !\n",name); + Error("CreateContainer","A container named %s already defined !",name); return NULL; } AliAnalysisDataContainer *cont = new AliAnalysisDataContainer(name, datatype); @@ -686,6 +1073,13 @@ AliAnalysisDataContainer *AliAnalysisManager::CreateContainer(const char *name, cont->SetDataOwned(kFALSE); // data owned by the file } break; + case kParamContainer: + fParamCont->Add(cont); + if (filename && strlen(filename)) { + cont->SetFileName(filename); + cont->SetDataOwned(kFALSE); // data owned by the file + } + break; case kExchangeContainer: break; } @@ -697,6 +1091,10 @@ Bool_t AliAnalysisManager::ConnectInput(AliAnalysisTask *task, Int_t islot, AliAnalysisDataContainer *cont) { // Connect input of an existing task to a data container. + if (!task) { + Error("ConnectInput", "Task pointer is NULL"); + return kFALSE; + } if (!fTasks->FindObject(task)) { AddTask(task); Info("ConnectInput", "Task %s was not registered. Now owned by analysis manager", task->GetName()); @@ -710,6 +1108,10 @@ Bool_t AliAnalysisManager::ConnectOutput(AliAnalysisTask *task, Int_t islot, AliAnalysisDataContainer *cont) { // Connect output of an existing task to a data container. + if (!task) { + Error("ConnectOutput", "Task pointer is NULL"); + return kFALSE; + } if (!fTasks->FindObject(task)) { AddTask(task); Warning("ConnectOutput", "Task %s not registered. Now owned by analysis manager", task->GetName()); @@ -736,8 +1138,10 @@ Bool_t AliAnalysisManager::InitAnalysis() { // Initialization of analysis chain of tasks. Should be called after all tasks // and data containers are properly connected - // Check for input/output containers + // Reset flag and remove valid_outputs file if exists fInitOK = kFALSE; + if (!gSystem->AccessPathName("outputs_valid")) + gSystem->Unlink("outputs_valid"); // Check for top tasks (depending only on input data containers) if (!fTasks->First()) { Error("InitAnalysis", "Analysis has no tasks !"); @@ -857,26 +1261,58 @@ void AliAnalysisManager::ResetAnalysis() } //______________________________________________________________________________ -void AliAnalysisManager::StartAnalysis(const char *type, TTree *tree, Long64_t nentries, Long64_t firstentry) +Long64_t AliAnalysisManager::StartAnalysis(const char *type, TTree * const tree, Long64_t nentries, Long64_t firstentry) { // Start analysis for this manager. Analysis task can be: LOCAL, PROOF, GRID or // MIX. Process nentries starting from firstentry + Long64_t retv = 0; if (!fInitOK) { Error("StartAnalysis","Analysis manager was not initialized !"); - return; + return -1; } if (fDebug > 0) printf("StartAnalysis %s\n",GetName()); TString anaType = type; anaType.ToLower(); fMode = kLocalAnalysis; - if (tree) { - if (anaType.Contains("proof")) fMode = kProofAnalysis; - else if (anaType.Contains("grid")) fMode = kGridAnalysis; - else if (anaType.Contains("mix")) fMode = kMixingAnalysis; - } + Bool_t runlocalinit = kTRUE; + if (anaType.Contains("file")) { + runlocalinit = kFALSE; + } + if (anaType.Contains("proof")) fMode = kProofAnalysis; + else if (anaType.Contains("grid")) fMode = kGridAnalysis; + else if (anaType.Contains("mix")) fMode = kMixingAnalysis; + if (fMode == kGridAnalysis) { - Warning("StartAnalysis", "GRID analysis mode not implemented. Running local."); - fMode = kLocalAnalysis; + if (!fGridHandler) { + Error("StartAnalysis", "Cannot start grid analysis without a grid handler."); + Info("===", "Add an AliAnalysisAlien object as plugin for this manager and configure it."); + return -1; + } + // Write analysis manager in the analysis file + cout << "===== RUNNING GRID ANALYSIS: " << GetName() << endl; + // run local task configuration + TIter nextTask(fTasks); + AliAnalysisTask *task; + while ((task=(AliAnalysisTask*)nextTask())) { + task->LocalInit(); + } + if (!fGridHandler->StartAnalysis(nentries, firstentry)) { + Info("StartAnalysis", "Grid analysis was stopped and cannot be terminated"); + return -1; + } + + // Terminate grid analysis + if (fSelector && fSelector->GetStatus() == -1) return -1; + if (fGridHandler->GetRunMode() == AliAnalysisGrid::kOffline) return 0; + cout << "===== MERGING OUTPUTS REGISTERED BY YOUR ANALYSIS JOB: " << GetName() << endl; + if (!fGridHandler->MergeOutputs()) { + // Return if outputs could not be merged or if it alien handler + // was configured for offline mode or local testing. + return 0; + } + ImportWrappers(NULL); + Terminate(); + return 0; } char line[256]; SetEventLoop(kFALSE); @@ -885,55 +1321,68 @@ void AliAnalysisManager::StartAnalysis(const char *type, TTree *tree, Long64_t n TChain *chain = 0; TString ttype = "TTree"; - if (tree->IsA() == TChain::Class()) { + if (tree && tree->IsA() == TChain::Class()) { chain = (TChain*)tree; if (!chain || !chain->GetListOfFiles()->First()) { Error("StartAnalysis", "Cannot process null or empty chain..."); - return; + return -1; } ttype = "TChain"; } + Bool_t getsysInfo = ((fNSysInfo>0) && (fMode==kLocalAnalysis))?kTRUE:kFALSE; + if (getsysInfo) AliSysInfo::AddStamp("Start", 0); // Initialize locally all tasks (happens for all modes) TIter next(fTasks); AliAnalysisTask *task; - while ((task=(AliAnalysisTask*)next())) { - task->LocalInit(); - } + if (runlocalinit) { + while ((task=(AliAnalysisTask*)next())) { + task->LocalInit(); + } + if (getsysInfo) AliSysInfo::AddStamp("LocalInit_all", 0); + } switch (fMode) { case kLocalAnalysis: if (!tree) { TIter nextT(fTasks); // Call CreateOutputObjects for all tasks + Int_t itask = 0; while ((task=(AliAnalysisTask*)nextT())) { TDirectory *curdir = gDirectory; task->CreateOutputObjects(); + if (getsysInfo) AliSysInfo::AddStamp(Form("%s_CREATEOUTOBJ",task->ClassName()), 0, itask, 0); if (curdir) curdir->cd(); + itask++; } + if (IsExternalLoop()) { + Info("StartAnalysis", "Initialization done. Event loop is controlled externally.\ + \nSetData for top container, call ExecAnalysis in a loop and then Terminate manually"); + return 0; + } ExecAnalysis(); Terminate(); - return; + return 0; } // Run tree-based analysis via AliAnalysisSelector cout << "===== RUNNING LOCAL ANALYSIS " << GetName() << " ON TREE " << tree->GetName() << endl; fSelector = new AliAnalysisSelector(this); - tree->Process(fSelector, "", nentries, firstentry); + retv = tree->Process(fSelector, "", nentries, firstentry); break; case kProofAnalysis: if (!gROOT->GetListOfProofs() || !gROOT->GetListOfProofs()->GetEntries()) { - printf("StartAnalysis: no PROOF!!!\n"); - return; + Error("StartAnalysis", "No PROOF!!! Exiting."); + return -1; } sprintf(line, "gProof->AddInput((TObject*)0x%lx);", (ULong_t)this); gROOT->ProcessLine(line); if (chain) { chain->SetProof(); cout << "===== RUNNING PROOF ANALYSIS " << GetName() << " ON CHAIN " << chain->GetName() << endl; - chain->Process("AliAnalysisSelector", "", nentries, firstentry); + retv = chain->Process("AliAnalysisSelector", "", nentries, firstentry); } else { - printf("StartAnalysis: no chain\n"); - return; + Error("StartAnalysis", "No chain!!! Exiting."); + return -1; } break; case kGridAnalysis: @@ -943,39 +1392,42 @@ void AliAnalysisManager::StartAnalysis(const char *type, TTree *tree, Long64_t n // Run event mixing analysis if (!fEventPool) { Error("StartAnalysis", "Cannot run event mixing without event pool"); - return; + return -1; } cout << "===== RUNNING EVENT MIXING ANALYSIS " << GetName() << endl; fSelector = new AliAnalysisSelector(this); - TChain *chain; while ((chain=fEventPool->GetNextChain())) { - TIter next(fTasks); - AliAnalysisTask *task; + next.Reset(); // Call NotifyBinChange for all tasks while ((task=(AliAnalysisTask*)next())) if (!task->IsPostEventLoop()) task->NotifyBinChange(); - chain->Process(fSelector); + retv = chain->Process(fSelector); + if (retv < 0) { + Error("StartAnalysis", "Mixing analysis failed"); + return retv; + } } PackOutput(fSelector->GetOutputList()); Terminate(); - } + } + return retv; } //______________________________________________________________________________ -void AliAnalysisManager::StartAnalysis(const char *type, const char *dataset, Long64_t nentries, Long64_t firstentry) +Long64_t AliAnalysisManager::StartAnalysis(const char *type, const char *dataset, Long64_t nentries, Long64_t firstentry) { // Start analysis for this manager on a given dataset. Analysis task can be: // LOCAL, PROOF or GRID. Process nentries starting from firstentry. if (!fInitOK) { Error("StartAnalysis","Analysis manager was not initialized !"); - return; + return -1; } if (fDebug > 0) printf("StartAnalysis %s\n",GetName()); TString anaType = type; anaType.ToLower(); if (!anaType.Contains("proof")) { - Error("Cannot process datasets in %s mode. Try PROOF.", type); - return; + Error("StartAnalysis", "Cannot process datasets in %s mode. Try PROOF.", type); + return -1; } fMode = kProofAnalysis; char line[256]; @@ -992,44 +1444,168 @@ void AliAnalysisManager::StartAnalysis(const char *type, const char *dataset, Lo } if (!gROOT->GetListOfProofs() || !gROOT->GetListOfProofs()->GetEntries()) { - printf("StartAnalysis: no PROOF!!!\n"); - return; + Error("StartAnalysis", "No PROOF!!! Exiting."); + return -1; } sprintf(line, "gProof->AddInput((TObject*)0x%lx);", (ULong_t)this); gROOT->ProcessLine(line); sprintf(line, "gProof->GetDataSet(\"%s\");", dataset); if (!gROOT->ProcessLine(line)) { Error("StartAnalysis", "Dataset %s not found", dataset); - return; + return -1; } sprintf(line, "gProof->Process(\"%s\", \"AliAnalysisSelector\", \"\", %lld, %lld);", dataset, nentries, firstentry); cout << "===== RUNNING PROOF ANALYSIS " << GetName() << " ON DATASET " << dataset << endl; - gROOT->ProcessLine(line); + Long_t retv = (Long_t)gROOT->ProcessLine(line); + return retv; } //______________________________________________________________________________ -TFile *AliAnalysisManager::OpenProofFile(const char *filename, const char *option) +TFile *AliAnalysisManager::OpenFile(AliAnalysisDataContainer *cont, const char *option, Bool_t ignoreProof) +{ +// Opens according the option the file specified by cont->GetFileName() and changes +// current directory to cont->GetFolderName(). If the file was already opened, it +// checks if the option UPDATE was preserved. File open via TProofOutputFile can +// be optionally ignored. + AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager(); + TString filename = cont->GetFileName(); + TFile *f = NULL; + if (filename.IsNull()) { + ::Error("AliAnalysisManager::OpenFile", "No file name specified for container %s", cont->GetName()); + return NULL; + } + if (mgr->GetAnalysisType()==AliAnalysisManager::kProofAnalysis && cont->IsSpecialOutput() + && !ignoreProof) + f = mgr->OpenProofFile(cont,option); + else { + // Check first if the file is already opened + f = (TFile*)gROOT->GetListOfFiles()->FindObject(filename); + if (f) { + // Check if option "UPDATE" was preserved + TString opt(option); + opt.ToUpper(); + if ((opt=="UPDATE") && (opt!=f->GetOption())) + ::Info("AliAnalysisManager::OpenFile", "File %s already opened in %s mode!", cont->GetFileName(), f->GetOption()); + } else { + f = TFile::Open(filename, option); + } + } + if (f && !f->IsZombie() && !f->TestBit(TFile::kRecovered)) { + cont->SetFile(f); + // Cd to file + f->cd(); + // Check for a folder request + TString dir = cont->GetFolderName(); + if (!dir.IsNull()) { + if (!f->GetDirectory(dir)) f->mkdir(dir); + f->cd(dir); + } + return f; + } + ::Fatal("AliAnalysisManager::OpenFile", "File %s could not be opened", filename.Data()); + cont->SetFile(NULL); + return NULL; +} + +//______________________________________________________________________________ +TFile *AliAnalysisManager::OpenProofFile(AliAnalysisDataContainer *cont, const char *option) { // Opens a special output file used in PROOF. - char line[256]; - if (fMode!=kProofAnalysis || !fSelector) { - Error("OpenProofFile","Cannot open PROOF file %s",filename); - return NULL; - } - sprintf(line, "TProofOutputFile *pf = new TProofOutputFile(\"%s\");", filename); - if (fDebug > 1) printf("=== %s\n", line); - gROOT->ProcessLine(line); - sprintf(line, "pf->OpenFile(\"%s\");", option); - gROOT->ProcessLine(line); - if (fDebug > 1) { + TString line; + TString filename = cont->GetFileName(); + if (cont == fCommonOutput) { + if (fOutputEventHandler) filename = fOutputEventHandler->GetOutputFileName(); + else Fatal("OpenProofFile","No output container. Exiting."); + } + TFile *f = NULL; + if (fMode!=kProofAnalysis || !fSelector) { + Fatal("OpenProofFile","Cannot open PROOF file %s: no PROOF or selector",filename.Data()); + return NULL; + } + if (fSpecialOutputLocation.Length()) { + f = (TFile*)gROOT->GetListOfFiles()->FindObject(filename); + if (f) { + // Check if option "UPDATE" was preserved + TString opt(option); + opt.ToUpper(); + if ((opt=="UPDATE") && (opt!=f->GetOption())) + ::Info("OpenProofFile", "File %s already opened in %s mode!", cont->GetFileName(), f->GetOption()); + } else { + f = new TFile(filename, option); + } + if (f && !f->IsZombie() && !f->TestBit(TFile::kRecovered)) { + cont->SetFile(f); + // Cd to file + f->cd(); + // Check for a folder request + TString dir = cont->GetFolderName(); + if (dir.Length()) { + if (!f->GetDirectory(dir)) f->mkdir(dir); + f->cd(dir); + } + return f; + } + Fatal("OpenProofFile", "File %s could not be opened", cont->GetFileName()); + cont->SetFile(NULL); + return NULL; + } + // Check if there is already a proof output file in the output list + TObject *pof = fSelector->GetOutputList()->FindObject(filename); + if (pof) { + // Get the actual file + line = Form("((TProofOutputFile*)0x%lx)->GetFileName();", (ULong_t)pof); + filename = (const char*)gROOT->ProcessLine(line); + if (fDebug>1) { + printf("File: %s already booked via TProofOutputFile\n", filename.Data()); + } + f = (TFile*)gROOT->GetListOfFiles()->FindObject(filename); + if (!f) Fatal("OpenProofFile", "Proof output file found but no file opened for %s", filename.Data()); + // Check if option "UPDATE" was preserved + TString opt(option); + opt.ToUpper(); + if ((opt=="UPDATE") && (opt!=f->GetOption())) + Fatal("OpenProofFile", "File %s already opened, but not in UPDATE mode!", cont->GetFileName()); + } else { + if (cont->IsRegisterDataset()) { + TString dsetName = filename; + dsetName.ReplaceAll(".root", cont->GetTitle()); + dsetName.ReplaceAll(":","_"); + if (fDebug>1) printf("Booking dataset: %s\n", dsetName.Data()); + line = Form("TProofOutputFile *pf = new TProofOutputFile(\"%s\", \"DROV\", \"%s\");", filename.Data(), dsetName.Data()); + } else { + if (fDebug>1) printf("Booking TProofOutputFile: %s to be merged\n", filename.Data()); + line = Form("TProofOutputFile *pf = new TProofOutputFile(\"%s\");", filename.Data()); + } + if (fDebug > 1) printf("=== %s\n", line.Data()); + gROOT->ProcessLine(line); + line = Form("pf->OpenFile(\"%s\");", option); + gROOT->ProcessLine(line); + f = gFile; + if (fDebug > 1) { gROOT->ProcessLine("pf->Print()"); - printf(" == proof file name: %s\n", gFile->GetName()); - } - sprintf(line, "((TList*)0x%lx)->Add(pf);",(ULong_t)fSelector->GetOutputList()); - if (fDebug > 1) printf("=== %s\n", line); - gROOT->ProcessLine(line); - return gFile; + printf(" == proof file name: %s", f->GetName()); + } + // Add to proof output list + line = Form("((TList*)0x%lx)->Add(pf);",(ULong_t)fSelector->GetOutputList()); + if (fDebug > 1) printf("=== %s\n", line.Data()); + gROOT->ProcessLine(line); + } + if (f && !f->IsZombie() && !f->TestBit(TFile::kRecovered)) { + cont->SetFile(f); + // Cd to file + f->cd(); + // Check for a folder request + TString dir = cont->GetFolderName(); + if (!dir.IsNull()) { + if (!f->GetDirectory(dir)) f->mkdir(dir); + f->cd(dir); + } + return f; + } + Fatal("OpenProofFile", "File %s could not be opened", cont->GetFileName()); + cont->SetFile(NULL); + return NULL; } //______________________________________________________________________________ @@ -1037,8 +1613,9 @@ void AliAnalysisManager::ExecAnalysis(Option_t *option) { // Execute analysis. static Long64_t ncalls = 0; + if (fDebug > 0) printf("MGR: Processing event #%lld\n", ncalls); Bool_t getsysInfo = ((fNSysInfo>0) && (fMode==kLocalAnalysis))?kTRUE:kFALSE; - if (getsysInfo && ncalls==0) AliSysInfo::AddStamp("Start", (Int_t)ncalls); + if (getsysInfo && ((ncalls%fNSysInfo)==0)) AliSysInfo::AddStamp("Exec_start", (Int_t)ncalls); ncalls++; if (!fInitOK) { Error("ExecAnalysis", "Analysis manager was not initialized !"); @@ -1050,7 +1627,8 @@ void AliAnalysisManager::ExecAnalysis(Option_t *option) TIter next(fTasks); // De-activate all tasks while ((task=(AliAnalysisTask*)next())) task->SetActive(kFALSE); - AliAnalysisDataContainer *cont = (AliAnalysisDataContainer*)fInputs->At(0); + AliAnalysisDataContainer *cont = fCommonInput; + if (!cont) cont = (AliAnalysisDataContainer*)fInputs->At(0); if (!cont) { Error("ExecAnalysis","Cannot execute analysis in TSelector mode without at least one top container"); return; @@ -1063,16 +1641,21 @@ void AliAnalysisManager::ExecAnalysis(Option_t *option) if (fInputEventHandler) fInputEventHandler ->BeginEvent(entry); if (fOutputEventHandler) fOutputEventHandler ->BeginEvent(entry); if (fMCtruthEventHandler) fMCtruthEventHandler->BeginEvent(entry); + if (getsysInfo && ((ncalls%fNSysInfo)==0)) + AliSysInfo::AddStamp("Handlers_BeginEvent",(Int_t)ncalls, 1000, 0); // // Execute the tasks // TIter next1(cont->GetConsumers()); TIter next1(fTopTasks); + Int_t itask = 0; while ((task=(AliAnalysisTask*)next1())) { if (fDebug >1) { cout << " Executing task " << task->GetName() << endl; - } - + } task->ExecuteTask(option); + if (getsysInfo && ((ncalls%fNSysInfo)==0)) + AliSysInfo::AddStamp(task->ClassName(),(Int_t)ncalls, itask, 1); + itask++; } // // Call FinishEvent() for optional output and MC services @@ -1081,7 +1664,7 @@ void AliAnalysisManager::ExecAnalysis(Option_t *option) if (fMCtruthEventHandler) fMCtruthEventHandler->FinishEvent(); // Gather system information if requested if (getsysInfo && ((ncalls%fNSysInfo)==0)) - AliSysInfo::AddStamp(Form("Event#%lld",ncalls),(Int_t)ncalls); + AliSysInfo::AddStamp("Handlers_FinishEvent",(Int_t)ncalls, 1000, 1); return; } // The event loop is not controlled by TSelector @@ -1090,6 +1673,8 @@ void AliAnalysisManager::ExecAnalysis(Option_t *option) if (fInputEventHandler) fInputEventHandler ->BeginEvent(-1); if (fOutputEventHandler) fOutputEventHandler ->BeginEvent(-1); if (fMCtruthEventHandler) fMCtruthEventHandler->BeginEvent(-1); + if (getsysInfo && ((ncalls%fNSysInfo)==0)) + AliSysInfo::AddStamp("Handlers_BeginEvent",(Int_t)ncalls, 1000, 0); TIter next2(fTopTasks); while ((task=(AliAnalysisTask*)next2())) { task->SetActive(kTRUE); @@ -1103,10 +1688,214 @@ void AliAnalysisManager::ExecAnalysis(Option_t *option) if (fInputEventHandler) fInputEventHandler ->FinishEvent(); if (fOutputEventHandler) fOutputEventHandler ->FinishEvent(); if (fMCtruthEventHandler) fMCtruthEventHandler->FinishEvent(); + if (getsysInfo && ((ncalls%fNSysInfo)==0)) + AliSysInfo::AddStamp("Handlers_FinishEvent",(Int_t)ncalls, 1000, 1); } //______________________________________________________________________________ -void AliAnalysisManager::FinishAnalysis() +void AliAnalysisManager::SetInputEventHandler(AliVEventHandler* const handler) { -// Finish analysis. +// Set the input event handler and create a container for it. + fInputEventHandler = handler; + fCommonInput = CreateContainer("cAUTO_INPUT", TChain::Class(), AliAnalysisManager::kInputContainer); +// Warning("SetInputEventHandler", " An automatic input container for the input chain was created.\nPlease use: mgr->GetCommonInputContainer() to access it."); +} + +//______________________________________________________________________________ +void AliAnalysisManager::SetOutputEventHandler(AliVEventHandler* const handler) +{ +// Set the input event handler and create a container for it. + fOutputEventHandler = handler; + fCommonOutput = CreateContainer("cAUTO_OUTPUT", TTree::Class(), AliAnalysisManager::kOutputContainer, "default"); + fCommonOutput->SetSpecialOutput(); +// Warning("SetOutputEventHandler", " An automatic output container for the output tree was created.\nPlease use: mgr->GetCommonOutputContainer() to access it."); +} + +//______________________________________________________________________________ +void AliAnalysisManager::RegisterExtraFile(const char *fname) +{ +// This method is used externally to register output files which are not +// connected to any output container, so that the manager can properly register, +// retrieve or merge them when running in distributed mode. The file names are +// separated by blancs. The method has to be called in MyAnalysisTask::LocalInit(). + if (fExtraFiles.Length()) fExtraFiles += " "; + fExtraFiles += fname; +} + +//______________________________________________________________________________ +Bool_t AliAnalysisManager::GetFileFromWrapper(const char *filename, const TList *source) +{ +// Copy a file from the location specified ina the wrapper with the same name from the source list. + char fullPath[512]; + char chUrl[512]; + TObject *pof = source->FindObject(filename); + if (!pof || !pof->InheritsFrom("TProofOutputFile")) { + Error("GetFileFromWrapper", "TProofOutputFile object not found in output list for file %s", filename); + return kFALSE; + } + gROOT->ProcessLine(Form("sprintf((char*)0x%lx, \"%%s\", ((TProofOutputFile*)0x%lx)->GetOutputFileName();)", fullPath, pof)); + gROOT->ProcessLine(Form("sprintf((char*)0x%lx, \"%%s\", gProof->GetUrl();)", chUrl)); + TString clientUrl(chUrl); + TString fullPath_str(fullPath); + if (clientUrl.Contains("localhost")){ + TObjArray* array = fullPath_str.Tokenize ( "//" ); + TObjString *strobj = ( TObjString *)array->At(1); + TObjArray* arrayPort = strobj->GetString().Tokenize ( ":" ); + TObjString *strobjPort = ( TObjString *) arrayPort->At(1); + fullPath_str.ReplaceAll(strobj->GetString().Data(),"localhost:PORT"); + fullPath_str.ReplaceAll(":PORT",Form(":%s",strobjPort->GetString().Data())); + if (fDebug > 1) Info("GetFileFromWrapper","Using tunnel from %s to %s",fullPath_str.Data(),filename); + delete arrayPort; + delete array; + } + if (fDebug > 1) + Info("GetFileFromWrapper","Copying file %s from PROOF scratch space", fullPath_str.Data()); + Bool_t gotit = TFile::Cp(fullPath_str.Data(), filename); + if (!gotit) + Error("GetFileFromWrapper", "Could not get file %s from proof scratch space", filename); + return gotit; +} + +//______________________________________________________________________________ +void AliAnalysisManager::GetAnalysisTypeString(TString &type) const +{ +// Fill analysis type in the provided string. + switch (fMode) { + case kLocalAnalysis: + type = "local"; + return; + case kProofAnalysis: + type = "proof"; + return; + case kGridAnalysis: + type = "grid"; + return; + case kMixingAnalysis: + type = "mix"; + } +} + +//______________________________________________________________________________ +Bool_t AliAnalysisManager::ValidateOutputFiles() const +{ +// Validate all output files. + TIter next(fOutputs); + AliAnalysisDataContainer *output; + TDirectory *cdir = gDirectory; + TString openedFiles; + while ((output=(AliAnalysisDataContainer*)next())) { + if (output->IsRegisterDataset()) continue; + TString filename = output->GetFileName(); + if (filename == "default") { + if (!fOutputEventHandler) continue; + filename = fOutputEventHandler->GetOutputFileName(); + // Main AOD may not be there + if (gSystem->AccessPathName(filename)) continue; + } + // Check if the file is closed + if (openedFiles.Contains(filename)) continue;; + TFile *file = (TFile*)gROOT->GetListOfFiles()->FindObject(filename); + if (file) { + Warning("ValidateOutputs", "File %s was not closed. Closing.", filename.Data()); + file->Close(); + } + file = TFile::Open(filename); + if (!file || file->IsZombie() || file->TestBit(TFile::kRecovered)) { + Error("ValidateOutputs", "Output file <%s> was not created or invalid", filename.Data()); + cdir->cd(); + return kFALSE; + } + file->Close(); + openedFiles += filename; + openedFiles += " "; + } + cdir->cd(); + return kTRUE; +} + +//______________________________________________________________________________ +void AliAnalysisManager::ProgressBar(const char *opname, Long64_t current, Long64_t size, TStopwatch * const watch, Bool_t last, Bool_t refresh) +{ +// Implements a nice text mode progress bar. + static Long64_t icount = 0; + static TString oname; + static TString nname; + static Long64_t ocurrent = 0; + static Long64_t osize = 0; + static Int_t oseconds = 0; + static TStopwatch *owatch = 0; + static Bool_t oneoftwo = kFALSE; + static Int_t nrefresh = 0; + static Int_t nchecks = 0; + const char symbol[4] = {'=','\\','|','/'}; + char progress[11] = " "; + Int_t ichar = icount%4; + + if (!refresh) { + nrefresh = 0; + if (!size) return; + owatch = watch; + oname = opname; + ocurrent = TMath::Abs(current); + osize = TMath::Abs(size); + if (ocurrent > osize) ocurrent=osize; + } else { + nrefresh++; + if (!osize) return; + } + icount++; + Double_t time = 0.; + Int_t hours = 0; + Int_t minutes = 0; + Int_t seconds = 0; + if (owatch && !last) { + owatch->Stop(); + time = owatch->RealTime(); + hours = (Int_t)(time/3600.); + time -= 3600*hours; + minutes = (Int_t)(time/60.); + time -= 60*minutes; + seconds = (Int_t)time; + if (refresh) { + if (oseconds==seconds) { + owatch->Continue(); + return; + } + oneoftwo = !oneoftwo; + } + oseconds = seconds; + } + if (refresh && oneoftwo) { + nname = oname; + if (nchecks <= 0) nchecks = nrefresh+1; + Int_t pctdone = (Int_t)(100.*nrefresh/nchecks); + oname = Form(" == %d%% ==", pctdone); + } + Double_t percent = 100.0*ocurrent/osize; + Int_t nchar = Int_t(percent/10); + if (nchar>10) nchar=10; + Int_t i; + for (i=0; i0.) fprintf(stderr, "[%6.2f %%] TIME %.2d:%.2d:%.2d \r", percent, hours, minutes, seconds); + else fprintf(stderr, "[%6.2f %%]\r", percent); + if (refresh && oneoftwo) oname = nname; + if (owatch) owatch->Continue(); + if (last) { + icount = 0; + owatch = 0; + ocurrent = 0; + osize = 0; + oseconds = 0; + oneoftwo = kFALSE; + nrefresh = 0; + fprintf(stderr, "\n"); + } }