X-Git-Url: http://git.uio.no/git/?a=blobdiff_plain;f=ANALYSIS%2FAliAnalysisManager.cxx;h=d07b415b7135e466e739324468ec4e69d12c1a36;hb=abda854188ebf50082975185957c100147a929a1;hp=ce706320d9e39eefc4c8925f1cd8595fd97fba53;hpb=6bb2b24faf6994640f75ee64893704905582aa41;p=u%2Fmrichter%2FAliRoot.git diff --git a/ANALYSIS/AliAnalysisManager.cxx b/ANALYSIS/AliAnalysisManager.cxx index ce706320d9e..d07b415b713 100644 --- a/ANALYSIS/AliAnalysisManager.cxx +++ b/ANALYSIS/AliAnalysisManager.cxx @@ -29,59 +29,49 @@ #include #include +#include #include #include #include #include +#include +#include "AliAnalysisSelector.h" +#include "AliAnalysisGrid.h" #include "AliAnalysisTask.h" #include "AliAnalysisDataContainer.h" #include "AliAnalysisDataSlot.h" -#include "AliVirtualEventHandler.h" +#include "AliVEventHandler.h" +#include "AliVEventPool.h" +#include "AliSysInfo.h" #include "AliAnalysisManager.h" ClassImp(AliAnalysisManager) AliAnalysisManager *AliAnalysisManager::fgAnalysisManager = NULL; -//______________________________________________________________________________ -AliAnalysisManager::AliAnalysisManager() - :TNamed(), - fTree(NULL), - fOutputEventHandler(NULL), - fMCtruthEventHandler(NULL), - fCurrentEntry(-1), - fMode(kLocalAnalysis), - fInitOK(kFALSE), - fDebug(0), - fTasks(NULL), - fTopTasks(NULL), - fZombies(NULL), - fContainers(NULL), - fInputs(NULL), - fOutputs(NULL) -{ -// Dummy constructor. - fgAnalysisManager = this; - SetEventLoop(kTRUE); -} - //______________________________________________________________________________ AliAnalysisManager::AliAnalysisManager(const char *name, const char *title) :TNamed(name,title), fTree(NULL), - fOutputEventHandler(NULL), - fMCtruthEventHandler(NULL), + fInputEventHandler(NULL), + fOutputEventHandler(NULL), + fMCtruthEventHandler(NULL), + fEventPool(NULL), fCurrentEntry(-1), + fNSysInfo(0), fMode(kLocalAnalysis), fInitOK(kFALSE), fDebug(0), + fSpecialOutputLocation(""), fTasks(NULL), fTopTasks(NULL), fZombies(NULL), fContainers(NULL), fInputs(NULL), - fOutputs(NULL) + fOutputs(NULL), + fSelector(NULL), + fGridHandler(NULL) { // Default constructor. fgAnalysisManager = this; @@ -98,18 +88,24 @@ AliAnalysisManager::AliAnalysisManager(const char *name, const char *title) AliAnalysisManager::AliAnalysisManager(const AliAnalysisManager& other) :TNamed(other), fTree(NULL), - fOutputEventHandler(NULL), - fMCtruthEventHandler(NULL), + fInputEventHandler(NULL), + fOutputEventHandler(NULL), + fMCtruthEventHandler(NULL), + fEventPool(NULL), fCurrentEntry(-1), + fNSysInfo(0), fMode(other.fMode), fInitOK(other.fInitOK), fDebug(other.fDebug), + fSpecialOutputLocation(""), fTasks(NULL), fTopTasks(NULL), fZombies(NULL), fContainers(NULL), fInputs(NULL), - fOutputs(NULL) + fOutputs(NULL), + fSelector(NULL), + fGridHandler(NULL) { // Copy constructor. fTasks = new TObjArray(*other.fTasks); @@ -127,10 +123,13 @@ AliAnalysisManager& AliAnalysisManager::operator=(const AliAnalysisManager& othe // Assignment if (&other != this) { TNamed::operator=(other); + fInputEventHandler = other.fInputEventHandler; fOutputEventHandler = other.fOutputEventHandler; fMCtruthEventHandler = other.fMCtruthEventHandler; + fEventPool = other.fEventPool; fTree = NULL; fCurrentEntry = -1; + fNSysInfo = other.fNSysInfo; fMode = other.fMode; fInitOK = other.fInitOK; fDebug = other.fDebug; @@ -140,6 +139,8 @@ AliAnalysisManager& AliAnalysisManager::operator=(const AliAnalysisManager& othe fContainers = new TObjArray(*other.fContainers); fInputs = new TObjArray(*other.fInputs); fOutputs = new TObjArray(*other.fOutputs); + fSelector = NULL; + fGridHandler = NULL; fgAnalysisManager = this; } return *this; @@ -155,6 +156,7 @@ AliAnalysisManager::~AliAnalysisManager() if (fContainers) {fContainers->Delete(); delete fContainers;} if (fInputs) delete fInputs; if (fOutputs) delete fOutputs; + if (fGridHandler) delete fGridHandler; if (fgAnalysisManager==this) fgAnalysisManager = NULL; } @@ -162,9 +164,7 @@ AliAnalysisManager::~AliAnalysisManager() Int_t AliAnalysisManager::GetEntry(Long64_t entry, Int_t getall) { // Read one entry of the tree or a whole branch. - if (fDebug > 1) { - cout << "== AliAnalysisManager::GetEntry()" << endl; - } + if (fDebug > 0) printf("== AliAnalysisManager::GetEntry(%lld)\n", entry); fCurrentEntry = entry; return fTree ? fTree->GetTree()->GetEntry(entry, getall) : 0; } @@ -178,65 +178,115 @@ void AliAnalysisManager::Init(TTree *tree) // generated code, but the routine can be extended by the user if needed. // Init() will be called many times when running with PROOF. if (!tree) return; - if (fDebug > 1) { + if (fDebug > 0) { printf("->AliAnalysisManager::Init(%s)\n", tree->GetName()); } + + // Call InitTree of EventHandler + if (fOutputEventHandler) { + if (fMode == kProofAnalysis) { + fOutputEventHandler->Init(0x0, "proof"); + } else { + fOutputEventHandler->Init(0x0, "local"); + } + } + + if (fInputEventHandler) { + if (fMode == kProofAnalysis) { + fInputEventHandler->Init(tree, "proof"); + } else { + fInputEventHandler->Init(tree, "local"); + } + } else { + // If no input event handler we need to get the tree once + // for the chain + if(!tree->GetTree()) tree->LoadTree(0); + } + + + if (fMCtruthEventHandler) { + if (fMode == kProofAnalysis) { + fMCtruthEventHandler->Init(0x0, "proof"); + } else { + fMCtruthEventHandler->Init(0x0, "local"); + } + } + if (!fInitOK) InitAnalysis(); if (!fInitOK) return; fTree = tree; AliAnalysisDataContainer *top = (AliAnalysisDataContainer*)fInputs->At(0); if (!top) { - cout<<"Error: No top input container !" <SetData(tree); - if (fDebug > 1) { + if (fDebug > 0) { printf("<-AliAnalysisManager::Init(%s)\n", tree->GetName()); } } -//______________________________________________________________________________ -void AliAnalysisManager::Begin(TTree *tree) -{ - // The Begin() function is called at the start of the query. - // When running with PROOF Begin() is only called on the client. - // The tree argument is deprecated (on PROOF 0 is passed). - if (fDebug > 1) { - cout << "AliAnalysisManager::Begin()" << endl; - } - Init(tree); -} - //______________________________________________________________________________ void AliAnalysisManager::SlaveBegin(TTree *tree) { // The SlaveBegin() function is called after the Begin() function. // When running with PROOF SlaveBegin() is called on each slave server. // The tree argument is deprecated (on PROOF 0 is passed). - if (fDebug > 1) { - cout << "->AliAnalysisManager::SlaveBegin()" << endl; - } - // Call InitIO of EventHandler + if (fDebug > 0) printf("->AliAnalysisManager::SlaveBegin()\n"); + static Bool_t isCalled = kFALSE; + TDirectory *curdir = gDirectory; + // Call SlaveBegin only once in case of mixing + if (isCalled && fMode==kMixingAnalysis) return; + // Call Init of EventHandler if (fOutputEventHandler) { - if (fMode == kProofAnalysis) { - fOutputEventHandler->InitIO("proof"); - } else { - fOutputEventHandler->InitIO("local"); - } + if (fMode == kProofAnalysis) { + TIter nextout(fOutputs); + AliAnalysisDataContainer *c_aod; + while ((c_aod=(AliAnalysisDataContainer*)nextout())) if (!strcmp(c_aod->GetFileName(),"default")) break; + if (c_aod && c_aod->IsSpecialOutput()) { + // Merging via files + if (fDebug > 1) printf(" Initializing special output file %s...\n", fOutputEventHandler->GetOutputFileName()); + OpenProofFile(fOutputEventHandler->GetOutputFileName(), "RECREATE"); + c_aod->SetFile(gFile); + fOutputEventHandler->Init("proofspecial"); + } else { + // Merging in memory + fOutputEventHandler->Init("proof"); + } + } else { + fOutputEventHandler->Init("local"); + } } - // + + if (fInputEventHandler) { + fInputEventHandler->SetInputTree(tree); + if (fMode == kProofAnalysis) { + fInputEventHandler->Init("proof"); + } else { + fInputEventHandler->Init("local"); + } + } + + if (fMCtruthEventHandler) { + if (fMode == kProofAnalysis) { + fMCtruthEventHandler->Init("proof"); + } else { + fMCtruthEventHandler->Init("local"); + } + } + if (curdir) curdir->cd(); + TIter next(fTasks); AliAnalysisTask *task; // Call CreateOutputObjects for all tasks while ((task=(AliAnalysisTask*)next())) { - TDirectory *curdir = gDirectory; + curdir = gDirectory; task->CreateOutputObjects(); if (curdir) curdir->cd(); - } - if (fMode == kLocalAnalysis) Init(tree); - if (fDebug > 1) { - cout << "<-AliAnalysisManager::SlaveBegin()" << endl; } + isCalled = kTRUE; + + if (fDebug > 0) printf("<-AliAnalysisManager::SlaveBegin()\n"); } //______________________________________________________________________________ @@ -247,17 +297,36 @@ Bool_t AliAnalysisManager::Notify() // is started when using PROOF. It is normaly not necessary to make changes // to the generated code, but the routine can be extended by the // user if needed. The return value is currently not used. - if (fTree) { - TFile *curfile = fTree->GetCurrentFile(); - if (curfile && fDebug>1) printf("AliAnalysisManager::Notify() file: %s\n", curfile->GetName()); - TIter next(fTasks); - AliAnalysisTask *task; - // Call Notify for all tasks - while ((task=(AliAnalysisTask*)next())) - task->Notify(); - // Call Notify of the MC truth handler - if (fMCtruthEventHandler) fMCtruthEventHandler->Notify(); + if (!fTree) { + Error("Notify","No current tree."); + return kFALSE; + } + TFile *curfile = fTree->GetCurrentFile(); + if (!curfile) { + Error("Notify","No current file"); + return kFALSE; + } + + if (fDebug > 0) printf("->AliAnalysisManager::Notify() file: %s\n", curfile->GetName()); + TIter next(fTasks); + AliAnalysisTask *task; + // Call Notify for all tasks + while ((task=(AliAnalysisTask*)next())) + task->Notify(); + + // Call Notify of the event handlers + if (fInputEventHandler) { + fInputEventHandler->Notify(curfile->GetName()); } + + if (fOutputEventHandler) { + fOutputEventHandler->Notify(curfile->GetName()); + } + + if (fMCtruthEventHandler) { + fMCtruthEventHandler->Notify(curfile->GetName()); + } + if (fDebug > 0) printf("<-AliAnalysisManager::Notify()\n"); return kTRUE; } @@ -281,18 +350,15 @@ Bool_t AliAnalysisManager::Process(Long64_t entry) // The entry is always the local entry number in the current tree. // Assuming that fChain is the pointer to the TChain being processed, // use fChain->GetTree()->GetEntry(entry). - if (fDebug > 1) { - cout << "->AliAnalysisManager::Process()" << endl; - } + if (fDebug > 0) printf("->AliAnalysisManager::Process(%lld)\n", entry); - if (fOutputEventHandler) fOutputEventHandler ->BeginEvent(); - if (fMCtruthEventHandler) fMCtruthEventHandler->BeginEvent(); + if (fInputEventHandler) fInputEventHandler ->BeginEvent(entry); + if (fOutputEventHandler) fOutputEventHandler ->BeginEvent(entry); + if (fMCtruthEventHandler) fMCtruthEventHandler->BeginEvent(entry); GetEntry(entry); ExecAnalysis(); - if (fDebug > 1) { - cout << "<-AliAnalysisManager::Process()" << endl; - } + if (fDebug > 0) printf("<-AliAnalysisManager::Process()\n"); return kTRUE; } @@ -301,72 +367,272 @@ void AliAnalysisManager::PackOutput(TList *target) { // Pack all output data containers in the output list. Called at SlaveTerminate // stage in PROOF case for each slave. - if (fDebug > 1) { - cout << "->AliAnalysisManager::PackOutput()" << endl; - } + if (fDebug > 0) printf("->AliAnalysisManager::PackOutput()\n"); if (!target) { Error("PackOutput", "No target. Aborting."); return; } - + if (fInputEventHandler) fInputEventHandler ->Terminate(); if (fOutputEventHandler) fOutputEventHandler ->Terminate(); if (fMCtruthEventHandler) fMCtruthEventHandler->Terminate(); + + // Call FinishTaskOutput() for each event loop task (not called for + // post-event loop tasks - use Terminate() fo those) + TIter nexttask(fTasks); + AliAnalysisTask *task; + while ((task=(AliAnalysisTask*)nexttask())) { + if (!task->IsPostEventLoop()) { + if (fDebug > 0) printf("->FinishTaskOutput: task %s\n", task->GetName()); + task->FinishTaskOutput(); + if (fDebug > 0) printf("<-FinishTaskOutput: task %s\n", task->GetName()); + } + } if (fMode == kProofAnalysis) { TIter next(fOutputs); AliAnalysisDataContainer *output; + Bool_t isManagedByHandler = kFALSE; while ((output=(AliAnalysisDataContainer*)next())) { - if (fDebug > 1) printf(" Packing container %s...\n", output->GetName()); - if (output->GetData()) target->Add(output->ExportData()); + // Do not consider outputs of post event loop tasks + isManagedByHandler = kFALSE; + if (output->GetProducer()->IsPostEventLoop()) continue; + const char *filename = output->GetFileName(); + if (!(strcmp(filename, "default")) && fOutputEventHandler) { + isManagedByHandler = kTRUE; + filename = fOutputEventHandler->GetOutputFileName(); + } + // Check if data was posted to this container. If not, issue an error. + if (!output->GetData() && !isManagedByHandler) { + Error("PackOutput", "No data for output container %s. Forgot to PostData ?\n", output->GetName()); + continue; + } + if (!output->IsSpecialOutput()) { + // Normal outputs + if (strlen(filename) && !isManagedByHandler) { + // File resident outputs + TFile *file = output->GetFile(); + // Backup current folder + TDirectory *opwd = gDirectory; + // Create file if not existing and register to container. + if (file) file->cd(); + else file = new TFile(filename, "RECREATE"); + if (file->IsZombie()) { + Fatal("PackOutput", "Could not recreate file %s\n", filename); + return; + } + output->SetFile(file); + // Clear file list to release object ownership to user. + file->Clear(); + // Save data to file, then close. + if (output->GetData()->InheritsFrom(TCollection::Class())) { + // If data is a collection, we set the name of the collection + // as the one of the container and we save as a single key. + TCollection *coll = (TCollection*)output->GetData(); + coll->SetName(output->GetName()); + coll->Write(output->GetName(), TObject::kSingleKey); + } else { + if (output->GetData()->InheritsFrom(TTree::Class())) { + TTree *tree = (TTree*)output->GetData(); + tree->SetDirectory(file); + tree->AutoSave(); + } else { + output->GetData()->Write(); + } + } + if (fDebug > 1) printf("PackOutput %s: memory merge, file resident output\n", output->GetName()); + if (fDebug > 2) { + printf(" file %s listing content:\n", filename); + file->ls(); + } + file->Close(); + // Restore current directory + if (opwd) opwd->cd(); + } else { + // Memory-resident outputs + if (fDebug > 1) printf("PackOutput %s: memory merge memory resident output\n", filename); + } + AliAnalysisDataWrapper *wrap = 0; + if (isManagedByHandler) { + wrap = new AliAnalysisDataWrapper(fOutputEventHandler->GetTree()); + wrap->SetName(output->GetName()); + } + else wrap =output->ExportData(); + // Output wrappers must NOT delete data after merging - the user owns them + wrap->SetDeleteData(kFALSE); + target->Add(wrap); + } else { + // Special outputs + TDirectory *opwd = gDirectory; + TFile *file = output->GetFile(); + if (fDebug > 1 && file) printf("PackOutput %s: file merge, special output\n", output->GetName()); + if (isManagedByHandler) { + // Terminate IO for files managed by the output handler + if (file) file->Write(); + if (file && fDebug > 2) { + printf(" handled file %s listing content:\n", file->GetName()); + file->ls(); + } + fOutputEventHandler->TerminateIO(); + continue; + } + + if (!file) { + AliAnalysisTask *producer = output->GetProducer(); + Error("PackOutput", + "File %s for special container %s was NOT opened in %s::CreateOutputObjects !!!", + output->GetFileName(), output->GetName(), producer->ClassName()); + continue; + } + file->cd(); + // Release object ownership to users after writing data to file + if (output->GetData()->InheritsFrom(TCollection::Class())) { + // If data is a collection, we set the name of the collection + // as the one of the container and we save as a single key. + TCollection *coll = (TCollection*)output->GetData(); + coll->SetName(output->GetName()); + coll->Write(output->GetName(), TObject::kSingleKey); + } else { + if (output->GetData()->InheritsFrom(TTree::Class())) { + TTree *tree = (TTree*)output->GetData(); + tree->SetDirectory(file); + tree->AutoSave(); + } else { + output->GetData()->Write(); + } + } + file->Clear(); + if (fDebug > 2) { + printf(" file %s listing content:\n", output->GetFileName()); + file->ls(); + } + TString outFilename = file->GetName(); + file->Close(); + // Restore current directory + if (opwd) opwd->cd(); + // Check if a special output location was provided or the output files have to be merged + if (strlen(fSpecialOutputLocation.Data())) { + TString remote = fSpecialOutputLocation; + remote += "/"; + Int_t gid = gROOT->ProcessLine("gProofServ->GetGroupId();"); + remote += Form("%s_%d_", gSystem->HostName(), gid); + remote += output->GetFileName(); + TFile::Cp ( outFilename.Data(), remote.Data() ); + } else { + // No special location specified-> use TProofOutputFile as merging utility + // The file at this output slot must be opened in CreateOutputObjects + if (fDebug > 1) printf(" File %s to be merged...\n", output->GetFileName()); + } + } } } - if (fDebug > 1) { - printf("<-AliAnalysisManager::PackOutput: output list contains %d containers\n", target->GetSize()); - } + if (fDebug > 0) printf("<-AliAnalysisManager::PackOutput: output list contains %d containers\n", target->GetSize()); } //______________________________________________________________________________ void AliAnalysisManager::ImportWrappers(TList *source) { // Import data in output containers from wrappers coming in source. - if (fDebug > 1) { - cout << "->AliAnalysisManager::ImportWrappers()" << endl; - } + if (fDebug > 0) printf("->AliAnalysisManager::ImportWrappers()\n"); TIter next(fOutputs); AliAnalysisDataContainer *cont; AliAnalysisDataWrapper *wrap; Int_t icont = 0; + Bool_t inGrid = (fMode == kGridAnalysis)?kTRUE:kFALSE; while ((cont=(AliAnalysisDataContainer*)next())) { - wrap = (AliAnalysisDataWrapper*)source->FindObject(cont->GetName()); - if (!wrap && fDebug>1) { - printf("(WW) ImportWrappers: container %s not found in analysis output !\n", cont->GetName()); + wrap = 0; + if (cont->GetProducer()->IsPostEventLoop() && !inGrid) continue; + const char *filename = cont->GetFileName(); + Bool_t isManagedByHandler = kFALSE; + if (!(strcmp(filename, "default")) && fOutputEventHandler) { + isManagedByHandler = kTRUE; + filename = fOutputEventHandler->GetOutputFileName(); + } + if (cont->IsSpecialOutput() || inGrid) { + if (strlen(fSpecialOutputLocation.Data()) && !isManagedByHandler) continue; + // Copy merged file from PROOF scratch space. + // In case of grid the files are already in the current directory. + if (!inGrid) { + char full_path[512]; + char ch_url[512]; + TObject *pof = source->FindObject(filename); + if (!pof || !pof->InheritsFrom("TProofOutputFile")) { + Error("ImportWrappers", "TProofOutputFile object not found in output list for container %s", cont->GetName()); + continue; + } + gROOT->ProcessLine(Form("sprintf((char*)0x%lx, \"%%s\", ((TProofOutputFile*)0x%lx)->GetOutputFileName();)", full_path, pof)); + gROOT->ProcessLine(Form("sprintf((char*)0x%lx, \"%%s\", gProof->GetUrl();)", ch_url)); + TString clientUrl(ch_url); + TString full_path_str(full_path); + if (clientUrl.Contains("localhost")){ + TObjArray* array = full_path_str.Tokenize ( "//" ); + TObjString *strobj = ( TObjString *)array->At(1); + TObjArray* arrayPort = strobj->GetString().Tokenize ( ":" ); + TObjString *strobjPort = ( TObjString *) arrayPort->At(1); + full_path_str.ReplaceAll(strobj->GetString().Data(),"localhost:PORT"); + full_path_str.ReplaceAll(":PORT",Form(":%s",strobjPort->GetString().Data())); + if (fDebug > 1) Info("ImportWrappers","Using tunnel from %s to %s",full_path_str.Data(),filename); + } + if (fDebug > 1) + printf(" Copying file %s from PROOF scratch space\n", full_path_str.Data()); + Bool_t gotit = TFile::Cp(full_path_str.Data(), filename); + if (!gotit) { + Error("ImportWrappers", "Could not get file %s from proof scratch space", cont->GetFileName()); + continue; + } + } + // Normally we should connect data from the copied file to the + // corresponding output container, but it is not obvious how to do this + // automatically if several objects in file... + TFile *f = TFile::Open(filename, "READ"); + if (!f) { + Error("ImportWrappers", "Cannot open file %s in read-only mode", filename); + continue; + } + TObject *obj = 0; + // Try to fetch first a list object having the container name. + obj = f->Get(cont->GetName()); + if (!obj) { + // Fetch first object from file having the container type. + TIter nextkey(f->GetListOfKeys()); + TKey *key; + while ((key=(TKey*)nextkey())) { + obj = f->Get(key->GetName()); + if (obj && obj->IsA()->InheritsFrom(cont->GetType())) break; + } + } + if (!obj) { + Error("ImportWrappers", "Could not find object for container %s in file %s", cont->GetName(), filename); + continue; + } + wrap = new AliAnalysisDataWrapper(obj); + wrap->SetDeleteData(kFALSE); + } + if (!wrap) wrap = (AliAnalysisDataWrapper*)source->FindObject(cont->GetName()); + if (!wrap) { + Error("ImportWrappers","Container %s not found in analysis output !", cont->GetName()); continue; } icont++; - if (fDebug > 1) printf(" Importing data for container %s\n", wrap->GetName()); - if (cont->GetFileName()) printf(" -> %s\n", cont->GetFileName()); + if (fDebug > 1) { + printf(" Importing data for container %s", cont->GetName()); + if (strlen(filename)) printf(" -> file %s\n", filename); + else printf("\n"); + } cont->ImportData(wrap); } - if (fDebug > 1) { - cout << "<-AliAnalysisManager::ImportWrappers(): "<< icont << " containers imported" << endl; - } + if (fDebug > 0) printf("<-AliAnalysisManager::ImportWrappers(): %d containers imported\n", icont); } //______________________________________________________________________________ void AliAnalysisManager::UnpackOutput(TList *source) { - // Called by AliAnalysisSelector::Terminate. Output containers should - // be in source in the same order as in fOutputs. - if (fDebug > 1) { - cout << "->AliAnalysisManager::UnpackOutput()" << endl; - } + // Called by AliAnalysisSelector::Terminate only on the client. + if (fDebug > 0) printf("->AliAnalysisManager::UnpackOutput()\n"); if (!source) { Error("UnpackOutput", "No target. Aborting."); return; } - if (fDebug > 1) { - printf(" Source list contains %d containers\n", source->GetSize()); - } + if (fDebug > 1) printf(" Source list contains %d containers\n", source->GetSize()); if (fMode == kProofAnalysis) ImportWrappers(source); @@ -385,37 +651,13 @@ void AliAnalysisManager::UnpackOutput(TList *source) task->CheckNotify(kTRUE); // If task is active, execute it if (task->IsPostEventLoop() && task->IsActive()) { - if (fDebug > 1) { - cout << "== Executing post event loop task " << task->GetName() << endl; - } + if (fDebug > 0) printf("== Executing post event loop task %s\n", task->GetName()); task->ExecuteTask(); } } } - // Check if the output need to be written to a file. - const char *filename = output->GetFileName(); - if (!(strcmp(filename, "default"))) { - if (fOutputEventHandler) filename = fOutputEventHandler->GetOutputFileName(); - } - - if (!filename || !strlen(filename)) continue; - TFile *file = (TFile*)gROOT->GetListOfFiles()->FindObject(filename); - if (file) file->cd(); - else file = new TFile(filename, "RECREATE"); - if (file->IsZombie()) continue; - // Reparent data to this file - TMethodCall callEnv; - if (output->GetData()->IsA()) - callEnv.InitWithPrototype(output->GetData()->IsA(), "SetDirectory", "TDirectory*"); - if (callEnv.IsValid()) { - callEnv.SetParam((Long_t) file); - callEnv.Execute(output->GetData()); - } - output->GetData()->Write(); } - if (fDebug > 1) { - cout << "<-AliAnalysisManager::UnpackOutput()" << endl; - } + if (fDebug > 0) printf("<-AliAnalysisManager::UnpackOutput()\n"); } //______________________________________________________________________________ @@ -424,24 +666,104 @@ void AliAnalysisManager::Terminate() // The Terminate() function is the last function to be called during // a query. It always runs on the client, it can be used to present // the results graphically. - if (fDebug > 1) { - cout << "->AliAnalysisManager::Terminate()" << endl; - } + if (fDebug > 0) printf("->AliAnalysisManager::Terminate()\n"); AliAnalysisTask *task; TIter next(fTasks); // Call Terminate() for tasks while ((task=(AliAnalysisTask*)next())) task->Terminate(); - if (fDebug > 1) { - cout << "<-AliAnalysisManager::Terminate()" << endl; - } // - if (fOutputEventHandler) fOutputEventHandler->TerminateIO(); + TIter next1(fOutputs); + AliAnalysisDataContainer *output; + while ((output=(AliAnalysisDataContainer*)next1())) { + // Special outputs or grid files have the files already closed and written. + if (fMode == kGridAnalysis) continue; + if (output->IsSpecialOutput()&&(fMode == kProofAnalysis)) continue; + const char *filename = output->GetFileName(); + if (!(strcmp(filename, "default"))) { + if (fOutputEventHandler) filename = fOutputEventHandler->GetOutputFileName(); + TFile *aodfile = (TFile*)gROOT->GetListOfFiles()->FindObject(filename); + if (aodfile) { + if (fDebug > 1) printf("Writing output handler file: %s\n", filename); + aodfile->Write(); + continue; + } + } + if (!strlen(filename)) continue; + if (!output->GetData()) continue; + TFile *file = output->GetFile(); + TDirectory *opwd = gDirectory; + file = (TFile*)gROOT->GetListOfFiles()->FindObject(filename); + if (!file) file = new TFile(filename, "RECREATE"); + if (file->IsZombie()) continue; + output->SetFile(file); + file->cd(); + if (fDebug > 1) printf(" writing output data %s to file %s\n", output->GetData()->GetName(), file->GetName()); + if (output->GetData()->InheritsFrom(TCollection::Class())) { + // If data is a collection, we set the name of the collection + // as the one of the container and we save as a single key. + TCollection *coll = (TCollection*)output->GetData(); + coll->SetName(output->GetName()); + coll->Write(output->GetName(), TObject::kSingleKey); + } else { + if (output->GetData()->InheritsFrom(TTree::Class())) { + TTree *tree = (TTree*)output->GetData(); + tree->SetDirectory(file); + tree->AutoSave(); + } else { + output->GetData()->Write(); + } + } + if (opwd) opwd->cd(); + } + next1.Reset(); + while ((output=(AliAnalysisDataContainer*)next1())) { + // Close all files at output + TDirectory *opwd = gDirectory; + if (output->GetFile()) output->GetFile()->Close(); + if (opwd) opwd->cd(); + } + + if (fInputEventHandler) fInputEventHandler ->TerminateIO(); + if (fOutputEventHandler) fOutputEventHandler ->TerminateIO(); + if (fMCtruthEventHandler) fMCtruthEventHandler->TerminateIO(); + + Bool_t getsysInfo = ((fNSysInfo>0) && (fMode==kLocalAnalysis))?kTRUE:kFALSE; + if (getsysInfo) { + TDirectory *cdir = gDirectory; + TFile f("syswatch.root", "RECREATE"); + if (!f.IsZombie()) { + TTree *tree = AliSysInfo::MakeTree("syswatch.log"); + tree->SetMarkerStyle(kCircle); + tree->SetMarkerColor(kBlue); + tree->SetMarkerSize(0.5); + if (!gROOT->IsBatch()) { + tree->SetAlias("event", "id0"); + tree->SetAlias("memUSED", "pI.fMemVirtual"); + tree->SetAlias("userCPU", "pI.fCpuUser"); + TCanvas *c = new TCanvas("SysInfo","SysInfo",10,10,800,600); + c->Divide(2,1,0.01,0.01); + c->cd(1); + tree->Draw("memUSED:event","","", 1234567890, 0); + c->cd(2); + tree->Draw("userCPU:event","","", 1234567890, 0); + } + tree->Write(); + f.Close(); + delete tree; + } + if (cdir) cdir->cd(); + } + if (fDebug > 0) printf("<-AliAnalysisManager::Terminate()\n"); } //______________________________________________________________________________ void AliAnalysisManager::AddTask(AliAnalysisTask *task) { // Adds a user task to the global list of tasks. + if (fTasks->FindObject(task)) { + Warning("AddTask", "Task %s: the same object already added to the analysis manager. Not adding.", task->GetName()); + return; + } task->SetActive(kFALSE); fTasks->Add(task); } @@ -474,7 +796,10 @@ AliAnalysisDataContainer *AliAnalysisManager::CreateContainer(const char *name, break; case kOutputContainer: fOutputs->Add(cont); - if (filename && strlen(filename)) cont->SetFileName(filename); + if (filename && strlen(filename)) { + cont->SetFileName(filename); + cont->SetDataOwned(kFALSE); // data owned by the file + } break; case kExchangeContainer: break; @@ -489,7 +814,7 @@ Bool_t AliAnalysisManager::ConnectInput(AliAnalysisTask *task, Int_t islot, // Connect input of an existing task to a data container. if (!fTasks->FindObject(task)) { AddTask(task); - Warning("ConnectInput", "Task %s not registered. Now owned by analysis manager", task->GetName()); + Info("ConnectInput", "Task %s was not registered. Now owned by analysis manager", task->GetName()); } Bool_t connected = task->ConnectInput(islot, cont); return connected; @@ -610,6 +935,14 @@ Bool_t AliAnalysisManager::InitAnalysis() } } } + // Check if all special output containers have a file name provided + TIter nextout(fOutputs); + while ((cont=(AliAnalysisDataContainer*)nextout())) { + if (cont->IsSpecialOutput() && !strlen(cont->GetFileName())) { + Error("InitAnalysis", "Wrong container %s : a file name MUST be provided for special outputs", cont->GetName()); + return kFALSE; + } + } fInitOK = kTRUE; return kTRUE; } @@ -618,6 +951,13 @@ Bool_t AliAnalysisManager::InitAnalysis() void AliAnalysisManager::PrintStatus(Option_t *option) const { // Print task hierarchy. + if (!fInitOK) { + Info("PrintStatus", "Analysis manager %s not initialized : call InitAnalysis() first", GetName()); + return; + } + Bool_t getsysInfo = ((fNSysInfo>0) && (fMode==kLocalAnalysis))?kTRUE:kFALSE; + if (getsysInfo) + Info("PrintStatus", "System information will be collected each %lld events", fNSysInfo); TIter next(fTopTasks); AliAnalysisTask *task; while ((task=(AliAnalysisTask*)next())) @@ -632,54 +972,83 @@ void AliAnalysisManager::ResetAnalysis() } //______________________________________________________________________________ -void AliAnalysisManager::StartAnalysis(const char *type, TTree *tree) +void AliAnalysisManager::StartAnalysis(const char *type, TTree *tree, Long64_t nentries, Long64_t firstentry) { -// Start analysis for this manager. Analysis task can be: LOCAL, PROOF or GRID. +// Start analysis for this manager. Analysis task can be: LOCAL, PROOF, GRID or +// MIX. Process nentries starting from firstentry if (!fInitOK) { Error("StartAnalysis","Analysis manager was not initialized !"); return; } - if (fDebug>1) { - cout << "StartAnalysis: " << GetName() << endl; - } + if (fDebug > 0) printf("StartAnalysis %s\n",GetName()); TString anaType = type; anaType.ToLower(); fMode = kLocalAnalysis; - if (tree) { - if (anaType.Contains("proof")) fMode = kProofAnalysis; - else if (anaType.Contains("grid")) fMode = kGridAnalysis; - } + Bool_t runlocalinit = kTRUE; + if (anaType.Contains("file")) runlocalinit = kFALSE; + if (anaType.Contains("proof")) fMode = kProofAnalysis; + else if (anaType.Contains("grid")) fMode = kGridAnalysis; + else if (anaType.Contains("mix")) fMode = kMixingAnalysis; + if (fMode == kGridAnalysis) { - Warning("StartAnalysis", "GRID analysis mode not implemented. Running local."); - fMode = kLocalAnalysis; + if (!fGridHandler) { + Error("StartAnalysis", "Cannot start grid analysis without a grid handler."); + Info("===", "Add an AliAnalysisAlien object as plugin for this manager and configure it."); + return; + } + // Write analysis manager in the analysis file + cout << "===== RUNNING GRID ANALYSIS: " << GetName() << endl; + // run local task configuration + TIter nextTask(fTasks); + AliAnalysisTask *task; + while ((task=(AliAnalysisTask*)nextTask())) { + task->LocalInit(); + } + fGridHandler->StartAnalysis(nentries, firstentry); + + // Terminate grid analysis + if (fGridHandler->GetRunMode() == AliAnalysisGrid::kOffline) return; + cout << "===== MERGING OUTPUTS REGISTERED BY YOUR ANALYSIS JOB: " << GetName() << endl; + if (!fGridHandler->MergeOutputs()) { + // Return if outputs could not be merged or if it alien handler + // was configured for offline mode or local testing. + return; + } + ImportWrappers(NULL); + Terminate(); + return; } - char line[128]; + char line[256]; SetEventLoop(kFALSE); - // Disable all branches if requested and set event loop mode - if (tree) { - if (TestBit(kDisableBranches)) { - printf("Disabling all branches...\n"); -// tree->SetBranchStatus("*",0); // not yet working + // Enable event loop mode if a tree was provided + if (tree || fMode==kMixingAnalysis) SetEventLoop(kTRUE); + + TChain *chain = 0; + TString ttype = "TTree"; + if (tree && tree->IsA() == TChain::Class()) { + chain = (TChain*)tree; + if (!chain || !chain->GetListOfFiles()->First()) { + Error("StartAnalysis", "Cannot process null or empty chain..."); + return; } - SetEventLoop(kTRUE); + ttype = "TChain"; } - TChain *chain = dynamic_cast(tree); - - // Initialize locally all tasks + // Initialize locally all tasks (happens for all modes) TIter next(fTasks); AliAnalysisTask *task; - while ((task=(AliAnalysisTask*)next())) { - task->LocalInit(); - } + if (runlocalinit) { + while ((task=(AliAnalysisTask*)next())) { + task->LocalInit(); + } + } switch (fMode) { case kLocalAnalysis: if (!tree) { - TIter next(fTasks); - AliAnalysisTask *task; + TIter nextT(fTasks); // Call CreateOutputObjects for all tasks - while ((task=(AliAnalysisTask*)next())) { + while ((task=(AliAnalysisTask*)nextT())) { TDirectory *curdir = gDirectory; task->CreateOutputObjects(); if (curdir) curdir->cd(); @@ -689,12 +1058,9 @@ void AliAnalysisManager::StartAnalysis(const char *type, TTree *tree) return; } // Run tree-based analysis via AliAnalysisSelector -// gROOT->ProcessLine(".L $ALICE_ROOT/ANALYSIS/AliAnalysisSelector.cxx+"); cout << "===== RUNNING LOCAL ANALYSIS " << GetName() << " ON TREE " << tree->GetName() << endl; - sprintf(line, "AliAnalysisSelector *selector = new AliAnalysisSelector((AliAnalysisManager*)0x%lx);",(ULong_t)this); - gROOT->ProcessLine(line); - sprintf(line, "((TTree*)0x%lx)->Process(selector);",(ULong_t)tree); - gROOT->ProcessLine(line); + fSelector = new AliAnalysisSelector(this); + tree->Process(fSelector, "", nentries, firstentry); break; case kProofAnalysis: if (!gROOT->GetListOfProofs() || !gROOT->GetListOfProofs()->GetEntries()) { @@ -706,7 +1072,7 @@ void AliAnalysisManager::StartAnalysis(const char *type, TTree *tree) if (chain) { chain->SetProof(); cout << "===== RUNNING PROOF ANALYSIS " << GetName() << " ON CHAIN " << chain->GetName() << endl; - chain->Process("AliAnalysisSelector"); + chain->Process("AliAnalysisSelector", "", nentries, firstentry); } else { printf("StartAnalysis: no chain\n"); return; @@ -714,13 +1080,106 @@ void AliAnalysisManager::StartAnalysis(const char *type, TTree *tree) break; case kGridAnalysis: Warning("StartAnalysis", "GRID analysis mode not implemented. Running local."); + break; + case kMixingAnalysis: + // Run event mixing analysis + if (!fEventPool) { + Error("StartAnalysis", "Cannot run event mixing without event pool"); + return; + } + cout << "===== RUNNING EVENT MIXING ANALYSIS " << GetName() << endl; + fSelector = new AliAnalysisSelector(this); + while ((chain=fEventPool->GetNextChain())) { + next.Reset(); + // Call NotifyBinChange for all tasks + while ((task=(AliAnalysisTask*)next())) + if (!task->IsPostEventLoop()) task->NotifyBinChange(); + chain->Process(fSelector); + } + PackOutput(fSelector->GetOutputList()); + Terminate(); } } +//______________________________________________________________________________ +void AliAnalysisManager::StartAnalysis(const char *type, const char *dataset, Long64_t nentries, Long64_t firstentry) +{ +// Start analysis for this manager on a given dataset. Analysis task can be: +// LOCAL, PROOF or GRID. Process nentries starting from firstentry. + if (!fInitOK) { + Error("StartAnalysis","Analysis manager was not initialized !"); + return; + } + if (fDebug > 0) printf("StartAnalysis %s\n",GetName()); + TString anaType = type; + anaType.ToLower(); + if (!anaType.Contains("proof")) { + Error("StartAnalysis", "Cannot process datasets in %s mode. Try PROOF.", type); + return; + } + fMode = kProofAnalysis; + char line[256]; + SetEventLoop(kTRUE); + // Set the dataset flag + TObject::SetBit(kUseDataSet); + fTree = 0; + + // Initialize locally all tasks + TIter next(fTasks); + AliAnalysisTask *task; + while ((task=(AliAnalysisTask*)next())) { + task->LocalInit(); + } + + if (!gROOT->GetListOfProofs() || !gROOT->GetListOfProofs()->GetEntries()) { + printf("StartAnalysis: no PROOF!!!\n"); + return; + } + sprintf(line, "gProof->AddInput((TObject*)0x%lx);", (ULong_t)this); + gROOT->ProcessLine(line); + sprintf(line, "gProof->GetDataSet(\"%s\");", dataset); + if (!gROOT->ProcessLine(line)) { + Error("StartAnalysis", "Dataset %s not found", dataset); + return; + } + sprintf(line, "gProof->Process(\"%s\", \"AliAnalysisSelector\", \"\", %lld, %lld);", + dataset, nentries, firstentry); + cout << "===== RUNNING PROOF ANALYSIS " << GetName() << " ON DATASET " << dataset << endl; + gROOT->ProcessLine(line); +} + +//______________________________________________________________________________ +TFile *AliAnalysisManager::OpenProofFile(const char *filename, const char *option) +{ +// Opens a special output file used in PROOF. + char line[256]; + if (fMode!=kProofAnalysis || !fSelector) { + Error("OpenProofFile","Cannot open PROOF file %s",filename); + return NULL; + } + sprintf(line, "TProofOutputFile *pf = new TProofOutputFile(\"%s\");", filename); + if (fDebug > 1) printf("=== %s\n", line); + gROOT->ProcessLine(line); + sprintf(line, "pf->OpenFile(\"%s\");", option); + gROOT->ProcessLine(line); + if (fDebug > 1) { + gROOT->ProcessLine("pf->Print()"); + printf(" == proof file name: %s\n", gFile->GetName()); + } + sprintf(line, "((TList*)0x%lx)->Add(pf);",(ULong_t)fSelector->GetOutputList()); + if (fDebug > 1) printf("=== %s\n", line); + gROOT->ProcessLine(line); + return gFile; +} + //______________________________________________________________________________ void AliAnalysisManager::ExecAnalysis(Option_t *option) { // Execute analysis. + static Long64_t ncalls = 0; + Bool_t getsysInfo = ((fNSysInfo>0) && (fMode==kLocalAnalysis))?kTRUE:kFALSE; + if (getsysInfo && ncalls==0) AliSysInfo::AddStamp("Start", (Int_t)ncalls); + ncalls++; if (!fInitOK) { Error("ExecAnalysis", "Analysis manager was not initialized !"); return; @@ -728,9 +1187,6 @@ void AliAnalysisManager::ExecAnalysis(Option_t *option) AliAnalysisTask *task; // Check if the top tree is active. if (fTree) { - if (fDebug>1) { - printf("AliAnalysisManager::ExecAnalysis\n"); - } TIter next(fTasks); // De-activate all tasks while ((task=(AliAnalysisTask*)next())) task->SetActive(kFALSE); @@ -740,13 +1196,17 @@ void AliAnalysisManager::ExecAnalysis(Option_t *option) return; } cont->SetData(fTree); // This will notify all consumers + Long64_t entry = fTree->GetTree()->GetReadEntry(); + // -// Call BeginEvent() for optional output and MC services - if (fOutputEventHandler) fOutputEventHandler ->BeginEvent(); - if (fMCtruthEventHandler) fMCtruthEventHandler->BeginEvent(); +// Call BeginEvent() for optional input/output and MC services + if (fInputEventHandler) fInputEventHandler ->BeginEvent(entry); + if (fOutputEventHandler) fOutputEventHandler ->BeginEvent(entry); + if (fMCtruthEventHandler) fMCtruthEventHandler->BeginEvent(entry); // // Execute the tasks - TIter next1(cont->GetConsumers()); +// TIter next1(cont->GetConsumers()); + TIter next1(fTopTasks); while ((task=(AliAnalysisTask*)next1())) { if (fDebug >1) { cout << " Executing task " << task->GetName() << endl; @@ -756,16 +1216,20 @@ void AliAnalysisManager::ExecAnalysis(Option_t *option) } // // Call FinishEvent() for optional output and MC services + if (fInputEventHandler) fInputEventHandler ->FinishEvent(); if (fOutputEventHandler) fOutputEventHandler ->FinishEvent(); if (fMCtruthEventHandler) fMCtruthEventHandler->FinishEvent(); -// + // Gather system information if requested + if (getsysInfo && ((ncalls%fNSysInfo)==0)) + AliSysInfo::AddStamp(Form("Event#%lld",ncalls),(Int_t)ncalls); return; } // The event loop is not controlled by TSelector // -// Call BeginEvent() for optional output and MC services - if (fOutputEventHandler) fOutputEventHandler ->BeginEvent(); - if (fMCtruthEventHandler) fMCtruthEventHandler->BeginEvent(); +// Call BeginEvent() for optional input/output and MC services + if (fInputEventHandler) fInputEventHandler ->BeginEvent(-1); + if (fOutputEventHandler) fOutputEventHandler ->BeginEvent(-1); + if (fMCtruthEventHandler) fMCtruthEventHandler->BeginEvent(-1); TIter next2(fTopTasks); while ((task=(AliAnalysisTask*)next2())) { task->SetActive(kTRUE); @@ -776,7 +1240,8 @@ void AliAnalysisManager::ExecAnalysis(Option_t *option) } // // Call FinishEvent() for optional output and MC services - if (fOutputEventHandler) fOutputEventHandler->FinishEvent(); + if (fInputEventHandler) fInputEventHandler ->FinishEvent(); + if (fOutputEventHandler) fOutputEventHandler ->FinishEvent(); if (fMCtruthEventHandler) fMCtruthEventHandler->FinishEvent(); }