]> git.uio.no Git - u/mrichter/AliRoot.git/blob - ANALYSIS/AliAnalysisManager.cxx
Plot rather virtual memory usage per process than total memory.
[u/mrichter/AliRoot.git] / ANALYSIS / AliAnalysisManager.cxx
1 /**************************************************************************
2  * Copyright(c) 1998-1999, ALICE Experiment at CERN, All rights reserved. *
3  *                                                                        *
4  * Author: The ALICE Off-line Project.                                    *
5  * Contributors are mentioned in the code where appropriate.              *
6  *                                                                        *
7  * Permission to use, copy, modify and distribute this software and its   *
8  * documentation strictly for non-commercial purposes is hereby granted   *
9  * without fee, provided that the above copyright notice appears in all   *
10  * copies and that both the copyright notice and this permission notice   *
11  * appear in the supporting documentation. The authors make no claims     *
12  * about the suitability of this software for any purpose. It is          *
13  * provided "as is" without express or implied warranty.                  *
14  **************************************************************************/
15
16 /* $Id$ */
17 // Author: Andrei Gheata, 31/05/2006
18
19 //==============================================================================
20 //   AliAnalysysManager - Manager analysis class. Allows creation of several
21 // analysis tasks and data containers storing their input/output. Allows
22 // connecting/chaining tasks via shared data containers. Serializes the current
23 // event for all tasks depending only on initial input data.
24 //==============================================================================
25 //
26 //==============================================================================
27
28 #include <Riostream.h>
29
30 #include <TError.h>
31 #include <TClass.h>
32 #include <TFile.h>
33 #include <TKey.h>
34 #include <TMethodCall.h>
35 #include <TChain.h>
36 #include <TSystem.h>
37 #include <TROOT.h>
38 #include <TCanvas.h>
39 #include <TStopwatch.h>
40
41 #include "AliAnalysisSelector.h"
42 #include "AliAnalysisGrid.h"
43 #include "AliAnalysisTask.h"
44 #include "AliAnalysisDataContainer.h"
45 #include "AliAnalysisDataSlot.h"
46 #include "AliVEventHandler.h"
47 #include "AliVEventPool.h"
48 #include "AliSysInfo.h"
49 #include "AliAnalysisManager.h"
50
51 ClassImp(AliAnalysisManager)
52
53 AliAnalysisManager *AliAnalysisManager::fgAnalysisManager = NULL;
54 TString AliAnalysisManager::fgCommonFileName = "";
55
56 //______________________________________________________________________________
57 AliAnalysisManager::AliAnalysisManager(const char *name, const char *title)
58                    :TNamed(name,title),
59                     fTree(NULL),
60                     fInputEventHandler(NULL),
61                     fOutputEventHandler(NULL),
62                     fMCtruthEventHandler(NULL),
63                     fEventPool(NULL),
64                     fCurrentEntry(-1),
65                     fNSysInfo(0),
66                     fMode(kLocalAnalysis),
67                     fInitOK(kFALSE),
68                     fDebug(0),
69                     fSpecialOutputLocation(""), 
70                     fTasks(NULL),
71                     fTopTasks(NULL),
72                     fZombies(NULL),
73                     fContainers(NULL),
74                     fInputs(NULL),
75                     fOutputs(NULL),
76                     fCommonInput(NULL),
77                     fCommonOutput(NULL),
78                     fSelector(NULL),
79                     fGridHandler(NULL),
80                     fExtraFiles("")
81 {
82 // Default constructor.
83    fgAnalysisManager = this;
84    fgCommonFileName  = "AnalysisResults.root";
85    fTasks      = new TObjArray();
86    fTopTasks   = new TObjArray();
87    fZombies    = new TObjArray();
88    fContainers = new TObjArray();
89    fInputs     = new TObjArray();
90    fOutputs    = new TObjArray();
91    SetEventLoop(kTRUE);
92    TObject::SetObjectStat(kFALSE);
93 }
94
95 //______________________________________________________________________________
96 AliAnalysisManager::AliAnalysisManager(const AliAnalysisManager& other)
97                    :TNamed(other),
98                     fTree(NULL),
99                     fInputEventHandler(NULL),
100                     fOutputEventHandler(NULL),
101                     fMCtruthEventHandler(NULL),
102                     fEventPool(NULL),
103                     fCurrentEntry(-1),
104                     fNSysInfo(0),
105                     fMode(other.fMode),
106                     fInitOK(other.fInitOK),
107                     fDebug(other.fDebug),
108                     fSpecialOutputLocation(""), 
109                     fTasks(NULL),
110                     fTopTasks(NULL),
111                     fZombies(NULL),
112                     fContainers(NULL),
113                     fInputs(NULL),
114                     fOutputs(NULL),
115                     fCommonInput(NULL),
116                     fCommonOutput(NULL),
117                     fSelector(NULL),
118                     fGridHandler(NULL),
119                     fExtraFiles()
120 {
121 // Copy constructor.
122    fTasks      = new TObjArray(*other.fTasks);
123    fTopTasks   = new TObjArray(*other.fTopTasks);
124    fZombies    = new TObjArray(*other.fZombies);
125    fContainers = new TObjArray(*other.fContainers);
126    fInputs     = new TObjArray(*other.fInputs);
127    fOutputs    = new TObjArray(*other.fOutputs);
128    fgCommonFileName  = "AnalysisResults.root";
129    fgAnalysisManager = this;
130    TObject::SetObjectStat(kFALSE);
131 }
132    
133 //______________________________________________________________________________
134 AliAnalysisManager& AliAnalysisManager::operator=(const AliAnalysisManager& other)
135 {
136 // Assignment
137    if (&other != this) {
138       TNamed::operator=(other);
139       fInputEventHandler   = other.fInputEventHandler;
140       fOutputEventHandler  = other.fOutputEventHandler;
141       fMCtruthEventHandler = other.fMCtruthEventHandler;
142       fEventPool           = other.fEventPool;
143       fTree       = NULL;
144       fCurrentEntry = -1;
145       fNSysInfo   = other.fNSysInfo;
146       fMode       = other.fMode;
147       fInitOK     = other.fInitOK;
148       fDebug      = other.fDebug;
149       fTasks      = new TObjArray(*other.fTasks);
150       fTopTasks   = new TObjArray(*other.fTopTasks);
151       fZombies    = new TObjArray(*other.fZombies);
152       fContainers = new TObjArray(*other.fContainers);
153       fInputs     = new TObjArray(*other.fInputs);
154       fOutputs    = new TObjArray(*other.fOutputs);
155       fCommonInput = NULL;
156       fCommonOutput = NULL;
157       fSelector   = NULL;
158       fGridHandler = NULL;
159       fExtraFiles = other.fExtraFiles;
160       fgCommonFileName = "AnalysisResults.root";
161       fgAnalysisManager = this;
162    }
163    return *this;
164 }
165
166 //______________________________________________________________________________
167 AliAnalysisManager::~AliAnalysisManager()
168 {
169 // Destructor.
170    if (fTasks) {fTasks->Delete(); delete fTasks;}
171    if (fTopTasks) delete fTopTasks;
172    if (fZombies) delete fZombies;
173    if (fContainers) {fContainers->Delete(); delete fContainers;}
174    if (fInputs) delete fInputs;
175    if (fOutputs) delete fOutputs;
176    if (fGridHandler) delete fGridHandler;
177    if (fgAnalysisManager==this) fgAnalysisManager = NULL;
178    TObject::SetObjectStat(kTRUE);
179 }
180
181 //______________________________________________________________________________
182 Int_t AliAnalysisManager::GetEntry(Long64_t entry, Int_t getall)
183 {
184 // Read one entry of the tree or a whole branch.
185    static Int_t itot = 0;
186    if (fDebug > 1) printf("MGR: Processing event #%d (entry %lld)\n", itot, entry);
187    itot++;
188    fCurrentEntry = entry;
189    return fTree ? fTree->GetTree()->GetEntry(entry, getall) : 0;
190 }
191    
192 //______________________________________________________________________________
193 Bool_t AliAnalysisManager::Init(TTree *tree)
194 {
195   // The Init() function is called when the selector needs to initialize
196   // a new tree or chain. Typically here the branch addresses of the tree
197   // will be set. It is normaly not necessary to make changes to the
198   // generated code, but the routine can be extended by the user if needed.
199   // Init() will be called many times when running with PROOF.
200    Bool_t init = kFALSE;
201    if (!tree) return kFALSE; // Should not happen - protected in selector caller
202    if (fDebug > 0) {
203       printf("->AliAnalysisManager::Init(%s)\n", tree->GetName());
204    }
205    // Call InitTree of EventHandler
206    if (fOutputEventHandler) {
207       if (fMode == kProofAnalysis) {
208          init = fOutputEventHandler->Init(0x0, "proof");
209       } else {
210          init = fOutputEventHandler->Init(0x0, "local");
211       }
212       if (!init) {
213          Error("Init", "Output event handler failed to initialize");
214          return kFALSE;
215       }         
216    }
217    
218    if (fInputEventHandler) {
219       if (fMode == kProofAnalysis) {
220          init = fInputEventHandler->Init(tree, "proof");
221       } else {
222          init = fInputEventHandler->Init(tree, "local");
223       }
224       if (!init) {
225          Error("Init", "Input event handler failed to initialize tree"); 
226          return kFALSE;
227       }         
228    } else {
229       // If no input event handler we need to get the tree once
230       // for the chain
231       if(!tree->GetTree()) {
232          Long64_t readEntry = tree->LoadTree(0);
233          if (readEntry == -2) {
234             Error("Init", "Input tree has no entry. Aborting");
235             return kFALSE;
236          }
237       }   
238    }
239
240    if (fMCtruthEventHandler) {
241       if (fMode == kProofAnalysis) {
242          init = fMCtruthEventHandler->Init(0x0, "proof");
243       } else {
244          init = fMCtruthEventHandler->Init(0x0, "local");
245       }
246       if (!init) {
247          Error("Init", "MC event handler failed to initialize"); 
248          return kFALSE;
249       }         
250    }
251
252    if (!fInitOK) InitAnalysis();
253    if (!fInitOK) return kFALSE;
254    fTree = tree;
255    AliAnalysisDataContainer *top = fCommonInput;
256    if (!top) top = (AliAnalysisDataContainer*)fInputs->At(0);
257    if (!top) {
258       Error("Init","No top input container !");
259       return kFALSE;
260    }
261    top->SetData(tree);
262    if (fDebug > 0) {
263       printf("<-AliAnalysisManager::Init(%s)\n", tree->GetName());
264    }
265    return kTRUE;
266 }
267
268 //______________________________________________________________________________
269 void AliAnalysisManager::SlaveBegin(TTree *tree)
270 {
271   // The SlaveBegin() function is called after the Begin() function.
272   // When running with PROOF SlaveBegin() is called on each slave server.
273   // The tree argument is deprecated (on PROOF 0 is passed).
274    if (fDebug > 0) printf("->AliAnalysisManager::SlaveBegin()\n");
275    static Bool_t isCalled = kFALSE;
276    Bool_t init = kFALSE;
277    Bool_t initOK = kTRUE;
278    TString msg;
279    TDirectory *curdir = gDirectory;
280    // Call SlaveBegin only once in case of mixing
281    if (isCalled && fMode==kMixingAnalysis) return;
282    // Call Init of EventHandler
283    if (fOutputEventHandler) {
284       if (fMode == kProofAnalysis) {
285          // Merging AOD's in PROOF via TProofOutputFile
286          if (fDebug > 1) printf("   Initializing AOD output file %s...\n", fOutputEventHandler->GetOutputFileName());
287          init = fOutputEventHandler->Init("proof");
288          if (!init) msg = "Failed to initialize output handler on worker";
289       } else {
290          init = fOutputEventHandler->Init("local");
291          if (!init) msg = "Failed to initialize output handler";
292       }
293       initOK &= init;
294       if (!fSelector) Error("SlaveBegin", "Selector not set");
295       else if (!init) {fSelector->Abort(msg); fSelector->SetStatus(-1);}
296    }
297
298    if (fInputEventHandler) {
299       fInputEventHandler->SetInputTree(tree);
300       if (fMode == kProofAnalysis) {
301          init = fInputEventHandler->Init("proof");
302          if (!init) msg = "Failed to initialize input handler on worker";
303       } else {
304          init = fInputEventHandler->Init("local");
305          if (!init) msg = "Failed to initialize input handler";
306       }
307       initOK &= init;
308       if (!fSelector) Error("SlaveBegin", "Selector not set");      
309       else if (!init) {fSelector->Abort(msg); fSelector->SetStatus(-1);}
310    }
311
312    if (fMCtruthEventHandler) {
313       if (fMode == kProofAnalysis) {
314          init = fMCtruthEventHandler->Init("proof");
315          if (!init) msg = "Failed to initialize MC handler on worker";
316       } else {
317          init = fMCtruthEventHandler->Init("local");
318          if (!init) msg = "Failed to initialize MC handler";
319       }
320       initOK &= init;
321       if (!fSelector) Error("SlaveBegin", "Selector not set");      
322       else if (!init) {fSelector->Abort(msg); fSelector->SetStatus(-1);}
323    }
324    if (curdir) curdir->cd();
325    isCalled = kTRUE;
326    if (!initOK) return;   
327    TIter next(fTasks);
328    AliAnalysisTask *task;
329    // Call CreateOutputObjects for all tasks
330    while ((task=(AliAnalysisTask*)next())) {
331       curdir = gDirectory;
332       task->CreateOutputObjects();
333       if (curdir) curdir->cd();
334    }
335    if (fDebug > 0) printf("<-AliAnalysisManager::SlaveBegin()\n");
336 }
337
338 //______________________________________________________________________________
339 Bool_t AliAnalysisManager::Notify()
340 {
341    // The Notify() function is called when a new file is opened. This
342    // can be either for a new TTree in a TChain or when when a new TTree
343    // is started when using PROOF. It is normaly not necessary to make changes
344    // to the generated code, but the routine can be extended by the
345    // user if needed. The return value is currently not used.
346    if (!fTree) return kFALSE;
347
348    TFile *curfile = fTree->GetCurrentFile();
349    if (!curfile) {
350       Error("Notify","No current file");
351       return kFALSE;
352    }   
353    
354    if (fDebug > 0) printf("->AliAnalysisManager::Notify() file: %s\n", curfile->GetName());
355    TIter next(fTasks);
356    AliAnalysisTask *task;
357    // Call Notify for all tasks
358    while ((task=(AliAnalysisTask*)next())) 
359       task->Notify();
360         
361    // Call Notify of the event handlers
362    if (fInputEventHandler) {
363        fInputEventHandler->Notify(curfile->GetName());
364    }
365
366    if (fOutputEventHandler) {
367        fOutputEventHandler->Notify(curfile->GetName());
368    }
369
370    if (fMCtruthEventHandler) {
371        fMCtruthEventHandler->Notify(curfile->GetName());
372    }
373    if (fDebug > 0) printf("<-AliAnalysisManager::Notify()\n");
374    return kTRUE;
375 }    
376
377 //______________________________________________________________________________
378 Bool_t AliAnalysisManager::Process(Long64_t entry)
379 {
380   // The Process() function is called for each entry in the tree (or possibly
381   // keyed object in the case of PROOF) to be processed. The entry argument
382   // specifies which entry in the currently loaded tree is to be processed.
383   // It can be passed to either TTree::GetEntry() or TBranch::GetEntry()
384   // to read either all or the required parts of the data. When processing
385   // keyed objects with PROOF, the object is already loaded and is available
386   // via the fObject pointer.
387   //
388   // This function should contain the "body" of the analysis. It can contain
389   // simple or elaborate selection criteria, run algorithms on the data
390   // of the event and typically fill histograms.
391
392   // WARNING when a selector is used with a TChain, you must use
393   //  the pointer to the current TTree to call GetEntry(entry).
394   //  The entry is always the local entry number in the current tree.
395   //  Assuming that fChain is the pointer to the TChain being processed,
396   //  use fChain->GetTree()->GetEntry(entry).
397    if (fDebug > 0) printf("->AliAnalysisManager::Process(%lld)\n", entry);
398
399    if (fInputEventHandler)   fInputEventHandler  ->BeginEvent(entry);
400    if (fOutputEventHandler)  fOutputEventHandler ->BeginEvent(entry);
401    if (fMCtruthEventHandler) fMCtruthEventHandler->BeginEvent(entry);
402    
403    GetEntry(entry);
404    ExecAnalysis();
405    if (fDebug > 0) printf("<-AliAnalysisManager::Process()\n");
406    return kTRUE;
407 }
408
409 //______________________________________________________________________________
410 void AliAnalysisManager::PackOutput(TList *target)
411 {
412   // Pack all output data containers in the output list. Called at SlaveTerminate
413   // stage in PROOF case for each slave.
414    if (fDebug > 0) printf("->AliAnalysisManager::PackOutput()\n");
415    if (!target) {
416       Error("PackOutput", "No target. Aborting.");
417       return;
418    }
419    if (fInputEventHandler)   fInputEventHandler  ->Terminate();
420    if (fOutputEventHandler)  fOutputEventHandler ->Terminate();
421    if (fMCtruthEventHandler) fMCtruthEventHandler->Terminate();
422
423    // Call FinishTaskOutput() for each event loop task (not called for 
424    // post-event loop tasks - use Terminate() fo those)
425    TIter nexttask(fTasks);
426    AliAnalysisTask *task;
427    while ((task=(AliAnalysisTask*)nexttask())) {
428       if (!task->IsPostEventLoop()) {
429          if (fDebug > 0) printf("->FinishTaskOutput: task %s\n", task->GetName());
430          task->FinishTaskOutput();
431          if (fDebug > 0) printf("<-FinishTaskOutput: task %s\n", task->GetName());
432       }
433    }      
434    
435    if (fMode == kProofAnalysis) {
436       TIter next(fOutputs);
437       AliAnalysisDataContainer *output;
438       Bool_t isManagedByHandler = kFALSE;
439       while ((output=(AliAnalysisDataContainer*)next())) {
440          // Do not consider outputs of post event loop tasks
441          isManagedByHandler = kFALSE;
442          if (output->GetProducer()->IsPostEventLoop()) continue;
443          const char *filename = output->GetFileName();
444          if (!(strcmp(filename, "default")) && fOutputEventHandler) {
445             isManagedByHandler = kTRUE;
446             printf("#### Handler output. Extra: %s\n", fExtraFiles.Data());
447             filename = fOutputEventHandler->GetOutputFileName();
448          }
449          // Check if data was posted to this container. If not, issue an error.
450          if (!output->GetData() && !isManagedByHandler) {
451             Error("PackOutput", "No data for output container %s. Forgot to PostData ?", output->GetName());
452             continue;
453          }   
454          if (!output->IsSpecialOutput()) {
455             // Normal outputs
456             if (strlen(filename) && !isManagedByHandler) {
457                // Backup current folder
458                TDirectory *opwd = gDirectory;
459                // File resident outputs. 
460                // Check first if the file exists.
461                TString open_option = "RECREATE";
462                if (!gSystem->AccessPathName(output->GetFileName())) open_option = "UPDATE";
463                TFile *file = AliAnalysisManager::OpenFile(output, open_option, kTRUE);
464                // Clear file list to release object ownership to user.
465                file->Clear();
466                // Save data to file, then close.
467                if (output->GetData()->InheritsFrom(TCollection::Class())) {
468                   // If data is a collection, we set the name of the collection 
469                   // as the one of the container and we save as a single key.
470                   TCollection *coll = (TCollection*)output->GetData();
471                   coll->SetName(output->GetName());
472                   coll->Write(output->GetName(), TObject::kSingleKey);
473                } else {
474                   if (output->GetData()->InheritsFrom(TTree::Class())) {
475                      TTree *tree = (TTree*)output->GetData();
476                      // tree->SetDirectory(file);
477                      tree->AutoSave();
478                   } else {
479                      output->GetData()->Write();
480                   }   
481                }      
482                if (fDebug > 1) printf("PackOutput %s: memory merge, file resident output\n", output->GetName());
483                if (fDebug > 2) {
484                   printf("   file %s listing content:\n", filename);
485                   file->ls();
486                }   
487                file->Close();
488                output->SetFile(NULL);
489                // Restore current directory
490                if (opwd) opwd->cd();
491             } else {
492                // Memory-resident outputs   
493                if (fDebug > 1) printf("PackOutput %s: memory merge memory resident output\n", filename);
494             }   
495             AliAnalysisDataWrapper *wrap = 0;
496             if (isManagedByHandler) {
497                wrap = new AliAnalysisDataWrapper(fOutputEventHandler->GetTree());
498                wrap->SetName(output->GetName());
499             }   
500             else                    wrap =output->ExportData();
501             // Output wrappers must NOT delete data after merging - the user owns them
502             wrap->SetDeleteData(kFALSE);
503             target->Add(wrap);
504          } else {
505          // Special outputs. The file must be opened and connected to the container.
506             TDirectory *opwd = gDirectory;
507             TFile *file = output->GetFile();
508             if (!file) {
509                AliAnalysisTask *producer = output->GetProducer();
510                Fatal("PackOutput", 
511                      "File %s for special container %s was NOT opened in %s::CreateOutputObjects !!!",
512                      output->GetFileName(), output->GetName(), producer->ClassName());
513                continue;
514             }   
515             TString outFilename = file->GetName();
516             if (fDebug > 1) printf("PackOutput %s: special output\n", output->GetName());
517             if (isManagedByHandler) {
518                // Terminate IO for files managed by the output handler
519                // file->Write() moved to AOD handler (A.G. 11.01.10)
520 //               if (file) file->Write();
521                if (file && fDebug > 2) {
522                   printf("   handled file %s listing content:\n", file->GetName());
523                   file->ls();
524                }   
525                fOutputEventHandler->TerminateIO();
526             } else {               
527                file->cd();
528                // Release object ownership to users after writing data to file
529                if (output->GetData()->InheritsFrom(TCollection::Class())) {
530                   // If data is a collection, we set the name of the collection 
531                   // as the one of the container and we save as a single key.
532                   TCollection *coll = (TCollection*)output->GetData();
533                   coll->SetName(output->GetName());
534                   coll->Write(output->GetName(), TObject::kSingleKey);
535                } else {
536                   if (output->GetData()->InheritsFrom(TTree::Class())) {
537                      TTree *tree = (TTree*)output->GetData();
538                      tree->SetDirectory(file);
539                      tree->AutoSave();
540                   } else {
541                      output->GetData()->Write();
542                   }   
543                }      
544                file->Clear();
545                if (fDebug > 2) {
546                   printf("   file %s listing content:\n", output->GetFileName());
547                   file->ls();
548                }
549                file->Close();
550                output->SetFile(NULL);
551             }
552             // Restore current directory
553             if (opwd) opwd->cd();
554             // Check if a special output location was provided or the output files have to be merged
555             if (strlen(fSpecialOutputLocation.Data())) {
556                TString remote = fSpecialOutputLocation;
557                remote += "/";
558                Int_t gid = gROOT->ProcessLine("gProofServ->GetGroupId();");
559                if (remote.BeginsWith("alien://")) {
560                   gROOT->ProcessLine("TGrid::Connect(\"alien://pcapiserv01.cern.ch:10000\", gProofServ->GetUser());");
561                   remote += outFilename;
562                   remote.ReplaceAll(".root", Form("_%d.root", gid));
563                } else {   
564                   remote += Form("%s_%d_", gSystem->HostName(), gid);
565                   remote += outFilename;
566                }   
567                if (fDebug > 1) 
568                   Info("PackOutput", "Output file for container %s to be copied \n   at: %s. No merging.",
569                        output->GetName(), remote.Data());
570                TFile::Cp ( outFilename.Data(), remote.Data() );
571                // Copy extra outputs
572                if (fExtraFiles.Length() && isManagedByHandler) {
573                   TObjArray *arr = fExtraFiles.Tokenize(" ");
574                   TObjString *os;
575                   TIter nextfilename(arr);
576                   while ((os=(TObjString*)nextfilename())) {
577                      outFilename = os->GetString();
578                      remote = fSpecialOutputLocation;
579                      remote += "/";
580                      if (remote.BeginsWith("alien://")) {
581                         remote += outFilename;
582                         remote.ReplaceAll(".root", Form("_%d.root", gid));
583                      } else {   
584                         remote += Form("%s_%d_", gSystem->HostName(), gid);
585                         remote += outFilename;
586                      }   
587                      if (fDebug > 1) 
588                         Info("PackOutput", "Extra AOD file %s to be copied \n   at: %s. No merging.",
589                              outFilename.Data(), remote.Data());
590                      TFile::Cp ( outFilename.Data(), remote.Data() );
591                   }   
592                   delete arr;
593                }   
594             } else {
595             // No special location specified-> use TProofOutputFile as merging utility
596             // The file at this output slot must be opened in CreateOutputObjects
597                if (fDebug > 1) printf("   File for container %s to be merged via file merger...\n", output->GetName());
598             }
599          }      
600       }
601    } 
602    if (fDebug > 0) printf("<-AliAnalysisManager::PackOutput: output list contains %d containers\n", target->GetSize());
603 }
604
605 //______________________________________________________________________________
606 void AliAnalysisManager::ImportWrappers(TList *source)
607 {
608 // Import data in output containers from wrappers coming in source.
609    if (fDebug > 0) printf("->AliAnalysisManager::ImportWrappers()\n");
610    TIter next(fOutputs);
611    AliAnalysisDataContainer *cont;
612    AliAnalysisDataWrapper   *wrap;
613    Int_t icont = 0;
614    Bool_t inGrid = (fMode == kGridAnalysis)?kTRUE:kFALSE;
615    TDirectory *cdir = gDirectory;
616    while ((cont=(AliAnalysisDataContainer*)next())) {
617       wrap = 0;
618       if (cont->GetProducer()->IsPostEventLoop() && !inGrid) continue;
619       if (cont->IsRegisterDataset()) continue;
620       const char *filename = cont->GetFileName();
621       Bool_t isManagedByHandler = kFALSE;
622       if (!(strcmp(filename, "default")) && fOutputEventHandler) {
623          isManagedByHandler = kTRUE;
624          filename = fOutputEventHandler->GetOutputFileName();
625       }
626       if (cont->IsSpecialOutput() || inGrid) {
627          if (strlen(fSpecialOutputLocation.Data())) continue;
628          // Copy merged file from PROOF scratch space. 
629          // In case of grid the files are already in the current directory.
630          if (!inGrid) {
631             if (isManagedByHandler && fExtraFiles.Length()) {
632                // Copy extra registered dAOD files.
633                TObjArray *arr = fExtraFiles.Tokenize(" ");
634                TObjString *os;
635                TIter nextfilename(arr);
636                while ((os=(TObjString*)nextfilename())) GetFileFromWrapper(os->GetString(), source);
637                delete arr;
638             }
639             if (!GetFileFromWrapper(filename, source)) continue;
640          }   
641          // Normally we should connect data from the copied file to the
642          // corresponding output container, but it is not obvious how to do this
643          // automatically if several objects in file...
644          TFile *f = (TFile*)gROOT->GetListOfFiles()->FindObject(filename);
645          if (!f) f = TFile::Open(filename, "READ");
646          if (!f) {
647             Error("ImportWrappers", "Cannot open file %s in read-only mode", filename);
648             continue;
649          }   
650          TObject *obj = 0;
651          // Cd to the directory pointed by the container
652          TString folder = cont->GetFolderName();
653          if (!folder.IsNull()) f->cd(folder);
654          // Try to fetch first an object having the container name.
655          obj = gDirectory->Get(cont->GetName());
656          if (!obj) {
657             Warning("ImportWrappers", "Could not import object for container %s in file %s:%s.\n Object will not be available in Terminate()", 
658                     cont->GetName(), filename, cont->GetFolderName());
659             continue;
660          }  
661          wrap = new AliAnalysisDataWrapper(obj);
662          wrap->SetDeleteData(kFALSE);
663       }   
664       if (!wrap) wrap = (AliAnalysisDataWrapper*)source->FindObject(cont->GetName());
665       if (!wrap) {
666          Error("ImportWrappers","Container %s not found in analysis output !", cont->GetName());
667          continue;
668       }
669       icont++;
670       if (fDebug > 1) {
671          printf("   Importing data for container %s\n", cont->GetName());
672          if (strlen(filename)) printf("    -> file %s\n", filename);
673          else printf("\n");
674       }   
675       cont->ImportData(wrap);
676    }
677    if (cdir) cdir->cd();
678    if (fDebug > 0) printf("<-AliAnalysisManager::ImportWrappers(): %d containers imported\n", icont);
679 }
680
681 //______________________________________________________________________________
682 void AliAnalysisManager::UnpackOutput(TList *source)
683 {
684   // Called by AliAnalysisSelector::Terminate only on the client.
685    if (fDebug > 0) printf("->AliAnalysisManager::UnpackOutput()\n");
686    if (!source) {
687       Error("UnpackOutput", "No target. Aborting.");
688       return;
689    }
690    if (fDebug > 1) printf("   Source list contains %d containers\n", source->GetSize());
691
692    if (fMode == kProofAnalysis) ImportWrappers(source);
693
694    TIter next(fOutputs);
695    AliAnalysisDataContainer *output;
696    while ((output=(AliAnalysisDataContainer*)next())) {
697       if (!output->GetData()) continue;
698       // Check if there are client tasks that run post event loop
699       if (output->HasConsumers()) {
700          // Disable event loop semaphore
701          output->SetPostEventLoop(kTRUE);
702          TObjArray *list = output->GetConsumers();
703          Int_t ncons = list->GetEntriesFast();
704          for (Int_t i=0; i<ncons; i++) {
705             AliAnalysisTask *task = (AliAnalysisTask*)list->At(i);
706             task->CheckNotify(kTRUE);
707             // If task is active, execute it
708             if (task->IsPostEventLoop() && task->IsActive()) {
709                if (fDebug > 0) printf("== Executing post event loop task %s\n", task->GetName());
710                task->ExecuteTask();
711             }   
712          }
713       }   
714    }
715    if (fDebug > 0) printf("<-AliAnalysisManager::UnpackOutput()\n");
716 }
717
718 //______________________________________________________________________________
719 void AliAnalysisManager::Terminate()
720 {
721   // The Terminate() function is the last function to be called during
722   // a query. It always runs on the client, it can be used to present
723   // the results graphically.
724    if (fDebug > 0) printf("->AliAnalysisManager::Terminate()\n");
725    AliAnalysisTask *task;
726    AliAnalysisDataContainer *output;
727    TIter next(fTasks);
728    TStopwatch timer;
729    // Call Terminate() for tasks
730    while (!IsSkipTerminate() && (task=(AliAnalysisTask*)next())) {
731       // Save all the canvases produced by the Terminate
732       TString pictname = Form("%s_%s", task->GetName(), task->ClassName());
733       task->Terminate();
734       if (TObject::TestBit(kSaveCanvases)) {
735          if (!gROOT->IsBatch()) {
736             Warning("Terminate", "Waiting 5 sec for %s::Terminate() to finish drawing", task->ClassName());
737             timer.Start();
738             while (timer.CpuTime()<5) {
739                timer.Continue();
740                gSystem->ProcessEvents();
741             }
742          }
743          Int_t iend = gROOT->GetListOfCanvases()->GetEntries();
744          if (iend==0) continue;
745          TCanvas *canvas;
746          for (Int_t ipict=0; ipict<iend; ipict++) {
747             canvas = (TCanvas*)gROOT->GetListOfCanvases()->At(ipict);
748             if (!canvas) continue;         
749             canvas->SaveAs(Form("%s_%02d.gif", pictname.Data(),ipict));
750          } 
751          gROOT->GetListOfCanvases()->Delete(); 
752       }
753    }   
754    //
755    if (fInputEventHandler)   fInputEventHandler  ->TerminateIO();
756    if (fOutputEventHandler)  fOutputEventHandler ->TerminateIO();
757    if (fMCtruthEventHandler) fMCtruthEventHandler->TerminateIO();
758    TIter next1(fOutputs);
759    TString handlerFile = "";
760    if (fOutputEventHandler) {
761       handlerFile = fOutputEventHandler->GetOutputFileName();
762    }
763    while ((output=(AliAnalysisDataContainer*)next1())) {
764       // Special outputs or grid files have the files already closed and written.
765       if (fMode == kGridAnalysis) continue;
766       if (fMode == kProofAnalysis) {
767         if (output->IsSpecialOutput() || output->IsRegisterDataset()) continue;
768       }  
769       const char *filename = output->GetFileName();
770       TString open_option = "RECREATE";
771       if (!(strcmp(filename, "default"))) continue;
772       if (!strlen(filename)) continue;
773       if (!output->GetData()) continue;
774       TDirectory *opwd = gDirectory;
775       TFile *file = output->GetFile();
776       if (!file) file = (TFile*)gROOT->GetListOfFiles()->FindObject(filename);
777       if (!file) {
778               printf("Terminate : handlerFile = %s, filename = %s\n",handlerFile.Data(),filename);
779               //if (handlerFile == filename && !gSystem->AccessPathName(filename)) open_option = "UPDATE";
780          if (!gSystem->AccessPathName(filename)) open_option = "UPDATE";
781          file = new TFile(filename, open_option);
782       }
783       if (file->IsZombie()) {
784          Error("Terminate", "Cannot open output file %s", filename);
785          continue;
786       }   
787       output->SetFile(file);
788       file->cd();
789       // Check for a folder request
790       TString dir = output->GetFolderName();
791       if (!dir.IsNull()) {
792          if (!file->GetDirectory(dir)) file->mkdir(dir);
793          file->cd(dir);
794       }  
795       if (fDebug > 1) printf("   writing output data %s to file %s:%s\n", output->GetData()->GetName(), file->GetName(), output->GetFolderName());
796       if (output->GetData()->InheritsFrom(TCollection::Class())) {
797       // If data is a collection, we set the name of the collection 
798       // as the one of the container and we save as a single key.
799          TCollection *coll = (TCollection*)output->GetData();
800          coll->SetName(output->GetName());
801          coll->Write(output->GetName(), TObject::kSingleKey);
802       } else {
803          if (output->GetData()->InheritsFrom(TTree::Class())) {
804             TTree *tree = (TTree*)output->GetData();
805             tree->SetDirectory(file);
806             tree->AutoSave();
807          } else {
808             output->GetData()->Write();
809          }   
810       }      
811       if (opwd) opwd->cd();
812    }   
813    next1.Reset();
814    while ((output=(AliAnalysisDataContainer*)next1())) {
815       // Close all files at output
816       TDirectory *opwd = gDirectory;
817       if (output->GetFile()) {
818          output->GetFile()->Close();
819          output->SetFile(NULL);
820          // Copy merged outputs in alien if requested
821          if (fSpecialOutputLocation.Length() && 
822              fSpecialOutputLocation.BeginsWith("alien://")) {
823             Info("Terminate", "Copy file %s to %s", output->GetFile()->GetName(),fSpecialOutputLocation.Data()); 
824             TFile::Cp(output->GetFile()->GetName(), 
825                       Form("%s/%s", fSpecialOutputLocation.Data(), output->GetFile()->GetName()));
826          }             
827       }   
828       if (opwd) opwd->cd();
829    }   
830
831    Bool_t getsysInfo = ((fNSysInfo>0) && (fMode==kLocalAnalysis))?kTRUE:kFALSE;
832    if (getsysInfo) {
833       TDirectory *cdir = gDirectory;
834       TFile f("syswatch.root", "RECREATE");
835       if (!f.IsZombie()) {
836          TTree *tree = AliSysInfo::MakeTree("syswatch.log");
837          tree->SetMarkerStyle(kCircle);
838          tree->SetMarkerColor(kBlue);
839          tree->SetMarkerSize(0.5);
840          if (!gROOT->IsBatch()) {
841             tree->SetAlias("event", "id0");
842             tree->SetAlias("memUSED", "mi.fMemUsed");
843             tree->SetAlias("memVIRT", "pi.fMemVirtual");
844             new TCanvas("SysInfo","SysInfo",10,10,800,600);
845             tree->Draw("memVIRT:event","","", 1234567890, 0);
846          }   
847          tree->Write();
848          f.Close();
849          delete tree;
850       }
851       if (cdir) cdir->cd();
852    }
853    // Validate the output files
854    if (ValidateOutputFiles()) {
855       ofstream out;
856       out.open("outputs_valid", ios::out);
857       out.close();
858    }      
859    if (fDebug > 0) printf("<-AliAnalysisManager::Terminate()\n");
860 }
861
862 //______________________________________________________________________________
863 void AliAnalysisManager::AddTask(AliAnalysisTask *task)
864 {
865 // Adds a user task to the global list of tasks.
866    if (fTasks->FindObject(task)) {
867       Warning("AddTask", "Task %s: the same object already added to the analysis manager. Not adding.", task->GetName());
868       return;
869    }   
870    task->SetActive(kFALSE);
871    fTasks->Add(task);
872 }  
873
874 //______________________________________________________________________________
875 AliAnalysisTask *AliAnalysisManager::GetTask(const char *name) const
876 {
877 // Retreive task by name.
878    if (!fTasks) return NULL;
879    return (AliAnalysisTask*)fTasks->FindObject(name);
880 }
881
882 //______________________________________________________________________________
883 AliAnalysisDataContainer *AliAnalysisManager::CreateContainer(const char *name, 
884                                 TClass *datatype, EAliAnalysisContType type, const char *filename)
885 {
886 // Create a data container of a certain type. Types can be:
887 //   kExchangeContainer  = 0, used to exchange data between tasks
888 //   kInputContainer   = 1, used to store input data
889 //   kOutputContainer  = 2, used for writing result to a file
890 // filename: composed by file#folder (e.g. results.root#INCLUSIVE) - will write
891 // the output object to a folder inside the output file
892    if (fContainers->FindObject(name)) {
893       Error("CreateContainer","A container named %s already defined !",name);
894       return NULL;
895    }   
896    AliAnalysisDataContainer *cont = new AliAnalysisDataContainer(name, datatype);
897    fContainers->Add(cont);
898    switch (type) {
899       case kInputContainer:
900          fInputs->Add(cont);
901          break;
902       case kOutputContainer:
903          fOutputs->Add(cont);
904          if (filename && strlen(filename)) {
905             cont->SetFileName(filename);
906             cont->SetDataOwned(kFALSE);  // data owned by the file
907          }   
908          break;
909       case kExchangeContainer:
910          break;   
911    }
912    return cont;
913 }
914          
915 //______________________________________________________________________________
916 Bool_t AliAnalysisManager::ConnectInput(AliAnalysisTask *task, Int_t islot,
917                                         AliAnalysisDataContainer *cont)
918 {
919 // Connect input of an existing task to a data container.
920    if (!task) {
921       Error("ConnectInput", "Task pointer is NULL");
922       return kFALSE;
923    }   
924    if (!fTasks->FindObject(task)) {
925       AddTask(task);
926       Info("ConnectInput", "Task %s was not registered. Now owned by analysis manager", task->GetName());
927    } 
928    Bool_t connected = task->ConnectInput(islot, cont);
929    return connected;
930 }   
931
932 //______________________________________________________________________________
933 Bool_t AliAnalysisManager::ConnectOutput(AliAnalysisTask *task, Int_t islot,
934                                         AliAnalysisDataContainer *cont)
935 {
936 // Connect output of an existing task to a data container.
937    if (!task) {
938       Error("ConnectOutput", "Task pointer is NULL");
939       return kFALSE;
940    }   
941    if (!fTasks->FindObject(task)) {
942       AddTask(task);
943       Warning("ConnectOutput", "Task %s not registered. Now owned by analysis manager", task->GetName());
944    } 
945    Bool_t connected = task->ConnectOutput(islot, cont);
946    return connected;
947 }   
948                                
949 //______________________________________________________________________________
950 void AliAnalysisManager::CleanContainers()
951 {
952 // Clean data from all containers that have already finished all client tasks.
953    TIter next(fContainers);
954    AliAnalysisDataContainer *cont;
955    while ((cont=(AliAnalysisDataContainer *)next())) {
956       if (cont->IsOwnedData() && 
957           cont->IsDataReady() && 
958           cont->ClientsExecuted()) cont->DeleteData();
959    }
960 }
961
962 //______________________________________________________________________________
963 Bool_t AliAnalysisManager::InitAnalysis()
964 {
965 // Initialization of analysis chain of tasks. Should be called after all tasks
966 // and data containers are properly connected
967    // Reset flag and remove valid_outputs file if exists
968    fInitOK = kFALSE;
969    if (!gSystem->AccessPathName("outputs_valid"))
970       gSystem->Unlink("outputs_valid");
971    // Check for top tasks (depending only on input data containers)
972    if (!fTasks->First()) {
973       Error("InitAnalysis", "Analysis has no tasks !");
974       return kFALSE;
975    }   
976    TIter next(fTasks);
977    AliAnalysisTask *task;
978    AliAnalysisDataContainer *cont;
979    Int_t ntop = 0;
980    Int_t nzombies = 0;
981    Bool_t iszombie = kFALSE;
982    Bool_t istop = kTRUE;
983    Int_t i;
984    while ((task=(AliAnalysisTask*)next())) {
985       istop = kTRUE;
986       iszombie = kFALSE;
987       Int_t ninputs = task->GetNinputs();
988       for (i=0; i<ninputs; i++) {
989          cont = task->GetInputSlot(i)->GetContainer();
990          if (!cont) {
991             if (!iszombie) {
992                task->SetZombie();
993                fZombies->Add(task);
994                nzombies++;
995                iszombie = kTRUE;
996             }   
997             Error("InitAnalysis", "Input slot %d of task %s has no container connected ! Declared zombie...", 
998                   i, task->GetName()); 
999          }
1000          if (iszombie) continue;
1001          // Check if cont is an input container
1002          if (istop && !fInputs->FindObject(cont)) istop=kFALSE;
1003          // Connect to parent task
1004       }
1005       if (istop) {
1006          ntop++;
1007          fTopTasks->Add(task);
1008       }
1009    }
1010    if (!ntop) {
1011       Error("InitAnalysis", "No top task defined. At least one task should be connected only to input containers");
1012       return kFALSE;
1013    }                        
1014    // Check now if there are orphan tasks
1015    for (i=0; i<ntop; i++) {
1016       task = (AliAnalysisTask*)fTopTasks->At(i);
1017       task->SetUsed();
1018    }
1019    Int_t norphans = 0;
1020    next.Reset();
1021    while ((task=(AliAnalysisTask*)next())) {
1022       if (!task->IsUsed()) {
1023          norphans++;
1024          Warning("InitAnalysis", "Task %s is orphan", task->GetName());
1025       }   
1026    }          
1027    // Check the task hierarchy (no parent task should depend on data provided
1028    // by a daughter task)
1029    for (i=0; i<ntop; i++) {
1030       task = (AliAnalysisTask*)fTopTasks->At(i);
1031       if (task->CheckCircularDeps()) {
1032          Error("InitAnalysis", "Found illegal circular dependencies between following tasks:");
1033          PrintStatus("dep");
1034          return kFALSE;
1035       }   
1036    }
1037    // Check that all containers feeding post-event loop tasks are in the outputs list
1038    TIter nextcont(fContainers); // loop over all containers
1039    while ((cont=(AliAnalysisDataContainer*)nextcont())) {
1040       if (!cont->IsPostEventLoop() && !fOutputs->FindObject(cont)) {
1041          if (cont->HasConsumers()) {
1042          // Check if one of the consumers is post event loop
1043             TIter nextconsumer(cont->GetConsumers());
1044             while ((task=(AliAnalysisTask*)nextconsumer())) {
1045                if (task->IsPostEventLoop()) {
1046                   fOutputs->Add(cont);
1047                   break;
1048                }
1049             }
1050          }
1051       }
1052    }   
1053    // Check if all special output containers have a file name provided
1054    TIter nextout(fOutputs);
1055    while ((cont=(AliAnalysisDataContainer*)nextout())) {
1056       if (cont->IsSpecialOutput() && !strlen(cont->GetFileName())) {
1057          Error("InitAnalysis", "Wrong container %s : a file name MUST be provided for special outputs", cont->GetName());
1058          return kFALSE;
1059       }
1060    }      
1061    fInitOK = kTRUE;
1062    return kTRUE;
1063 }   
1064
1065 //______________________________________________________________________________
1066 void AliAnalysisManager::PrintStatus(Option_t *option) const
1067 {
1068 // Print task hierarchy.
1069    if (!fInitOK) {
1070       Info("PrintStatus", "Analysis manager %s not initialized : call InitAnalysis() first", GetName());
1071       return;
1072    }   
1073    Bool_t getsysInfo = ((fNSysInfo>0) && (fMode==kLocalAnalysis))?kTRUE:kFALSE;
1074    if (getsysInfo)
1075       Info("PrintStatus", "System information will be collected each %lld events", fNSysInfo);
1076    TIter next(fTopTasks);
1077    AliAnalysisTask *task;
1078    while ((task=(AliAnalysisTask*)next()))
1079       task->PrintTask(option);
1080 }
1081
1082 //______________________________________________________________________________
1083 void AliAnalysisManager::ResetAnalysis()
1084 {
1085 // Reset all execution flags and clean containers.
1086    CleanContainers();
1087 }
1088
1089 //______________________________________________________________________________
1090 Long64_t AliAnalysisManager::StartAnalysis(const char *type, TTree *tree, Long64_t nentries, Long64_t firstentry)
1091 {
1092 // Start analysis for this manager. Analysis task can be: LOCAL, PROOF, GRID or
1093 // MIX. Process nentries starting from firstentry
1094    Long64_t retv = 0;
1095    if (!fInitOK) {
1096       Error("StartAnalysis","Analysis manager was not initialized !");
1097       return -1;
1098    }
1099    if (fDebug > 0) printf("StartAnalysis %s\n",GetName());
1100    TString anaType = type;
1101    anaType.ToLower();
1102    fMode = kLocalAnalysis;
1103    Bool_t runlocalinit = kTRUE;
1104    if (anaType.Contains("file")) {
1105       runlocalinit = kFALSE;
1106    }   
1107    if (anaType.Contains("proof"))     fMode = kProofAnalysis;
1108    else if (anaType.Contains("grid")) fMode = kGridAnalysis;
1109    else if (anaType.Contains("mix"))  fMode = kMixingAnalysis;
1110
1111    if (fMode == kGridAnalysis) {
1112       if (!fGridHandler) {
1113          Error("StartAnalysis", "Cannot start grid analysis without a grid handler.");
1114          Info("===", "Add an AliAnalysisAlien object as plugin for this manager and configure it.");
1115          return -1;
1116       }
1117       // Write analysis manager in the analysis file
1118       cout << "===== RUNNING GRID ANALYSIS: " << GetName() << endl;
1119       // run local task configuration
1120       TIter nextTask(fTasks);
1121       AliAnalysisTask *task;
1122       while ((task=(AliAnalysisTask*)nextTask())) {
1123          task->LocalInit();
1124       }
1125       if (!fGridHandler->StartAnalysis(nentries, firstentry)) {
1126          Info("StartAnalysis", "Grid analysis was stopped and cannot be terminated");
1127          return -1;
1128       }   
1129
1130       // Terminate grid analysis
1131       if (fSelector && fSelector->GetStatus() == -1) return -1;
1132       if (fGridHandler->GetRunMode() == AliAnalysisGrid::kOffline) return 0;
1133       cout << "===== MERGING OUTPUTS REGISTERED BY YOUR ANALYSIS JOB: " << GetName() << endl;
1134       if (!fGridHandler->MergeOutputs()) {
1135          // Return if outputs could not be merged or if it alien handler
1136          // was configured for offline mode or local testing.
1137          return 0;
1138       }
1139       ImportWrappers(NULL);
1140       Terminate();
1141       return 0;
1142    }
1143    char line[256];
1144    SetEventLoop(kFALSE);
1145    // Enable event loop mode if a tree was provided
1146    if (tree || fMode==kMixingAnalysis) SetEventLoop(kTRUE);
1147
1148    TChain *chain = 0;
1149    TString ttype = "TTree";
1150    if (tree && tree->IsA() == TChain::Class()) {
1151       chain = (TChain*)tree;
1152       if (!chain || !chain->GetListOfFiles()->First()) {
1153          Error("StartAnalysis", "Cannot process null or empty chain...");
1154          return -1;
1155       }   
1156       ttype = "TChain";
1157    }   
1158
1159    // Initialize locally all tasks (happens for all modes)
1160    TIter next(fTasks);
1161    AliAnalysisTask *task;
1162    if (runlocalinit) {
1163       while ((task=(AliAnalysisTask*)next())) {
1164          task->LocalInit();
1165       }
1166    }   
1167    
1168    switch (fMode) {
1169       case kLocalAnalysis:
1170          if (!tree) {
1171             TIter nextT(fTasks);
1172             // Call CreateOutputObjects for all tasks
1173             while ((task=(AliAnalysisTask*)nextT())) {
1174                TDirectory *curdir = gDirectory;
1175                task->CreateOutputObjects();
1176                if (curdir) curdir->cd();
1177             }   
1178             if (IsExternalLoop()) {
1179                Info("StartAnalysis", "Initialization done. Event loop is controlled externally.\
1180                      \nSetData for top container, call ExecAnalysis in a loop and then Terminate manually");
1181                return 0;
1182             }         
1183             ExecAnalysis();
1184             Terminate();
1185             return 0;
1186          } 
1187          // Run tree-based analysis via AliAnalysisSelector  
1188          cout << "===== RUNNING LOCAL ANALYSIS " << GetName() << " ON TREE " << tree->GetName() << endl;
1189          fSelector = new AliAnalysisSelector(this);
1190          retv = tree->Process(fSelector, "", nentries, firstentry);
1191          break;
1192       case kProofAnalysis:
1193          if (!gROOT->GetListOfProofs() || !gROOT->GetListOfProofs()->GetEntries()) {
1194             Error("StartAnalysis", "No PROOF!!! Aborting.");
1195             return -1;
1196          }   
1197          sprintf(line, "gProof->AddInput((TObject*)0x%lx);", (ULong_t)this);
1198          gROOT->ProcessLine(line);
1199          if (chain) {
1200             chain->SetProof();
1201             cout << "===== RUNNING PROOF ANALYSIS " << GetName() << " ON CHAIN " << chain->GetName() << endl;
1202             retv = chain->Process("AliAnalysisSelector", "", nentries, firstentry);
1203          } else {
1204             Error("StartAnalysis", "No chain!!! Aborting.");
1205             return -1;
1206          }      
1207          break;
1208       case kGridAnalysis:
1209          Warning("StartAnalysis", "GRID analysis mode not implemented. Running local.");
1210          break;
1211       case kMixingAnalysis:   
1212          // Run event mixing analysis
1213          if (!fEventPool) {
1214             Error("StartAnalysis", "Cannot run event mixing without event pool");
1215             return -1;
1216          }
1217          cout << "===== RUNNING EVENT MIXING ANALYSIS " << GetName() << endl;
1218          fSelector = new AliAnalysisSelector(this);
1219          while ((chain=fEventPool->GetNextChain())) {
1220             next.Reset();
1221             // Call NotifyBinChange for all tasks
1222             while ((task=(AliAnalysisTask*)next()))
1223                if (!task->IsPostEventLoop()) task->NotifyBinChange();
1224             retv = chain->Process(fSelector);
1225             if (retv < 0) {
1226                Error("StartAnalysis", "Mixing analysis failed");
1227                return retv;
1228             }   
1229          }
1230          PackOutput(fSelector->GetOutputList());
1231          Terminate();
1232    }
1233    return retv;
1234 }   
1235
1236 //______________________________________________________________________________
1237 Long64_t AliAnalysisManager::StartAnalysis(const char *type, const char *dataset, Long64_t nentries, Long64_t firstentry)
1238 {
1239 // Start analysis for this manager on a given dataset. Analysis task can be: 
1240 // LOCAL, PROOF or GRID. Process nentries starting from firstentry.
1241    if (!fInitOK) {
1242       Error("StartAnalysis","Analysis manager was not initialized !");
1243       return -1;
1244    }
1245    if (fDebug > 0) printf("StartAnalysis %s\n",GetName());
1246    TString anaType = type;
1247    anaType.ToLower();
1248    if (!anaType.Contains("proof")) {
1249       Error("StartAnalysis", "Cannot process datasets in %s mode. Try PROOF.", type);
1250       return -1;
1251    }   
1252    fMode = kProofAnalysis;
1253    char line[256];
1254    SetEventLoop(kTRUE);
1255    // Set the dataset flag
1256    TObject::SetBit(kUseDataSet);
1257    fTree = 0;
1258
1259    // Initialize locally all tasks
1260    TIter next(fTasks);
1261    AliAnalysisTask *task;
1262    while ((task=(AliAnalysisTask*)next())) {
1263       task->LocalInit();
1264    }
1265    
1266    if (!gROOT->GetListOfProofs() || !gROOT->GetListOfProofs()->GetEntries()) {
1267       Error("StartAnalysis", "No PROOF!!! Aborting.");
1268       return -1;
1269    }   
1270    sprintf(line, "gProof->AddInput((TObject*)0x%lx);", (ULong_t)this);
1271    gROOT->ProcessLine(line);
1272    sprintf(line, "gProof->GetDataSet(\"%s\");", dataset);
1273    if (!gROOT->ProcessLine(line)) {
1274       Error("StartAnalysis", "Dataset %s not found", dataset);
1275       return -1;
1276    }   
1277    sprintf(line, "gProof->Process(\"%s\", \"AliAnalysisSelector\", \"\", %lld, %lld);",
1278            dataset, nentries, firstentry);
1279    cout << "===== RUNNING PROOF ANALYSIS " << GetName() << " ON DATASET " << dataset << endl;
1280    Long_t retv = (Long_t)gROOT->ProcessLine(line);
1281    return retv;
1282 }   
1283
1284 //______________________________________________________________________________
1285 TFile *AliAnalysisManager::OpenFile(AliAnalysisDataContainer *cont, const char *option, Bool_t ignoreProof)
1286 {
1287 // Opens according the option the file specified by cont->GetFileName() and changes
1288 // current directory to cont->GetFolderName(). If the file was already opened, it
1289 // checks if the option UPDATE was preserved. File open via TProofOutputFile can
1290 // be optionally ignored.
1291   AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
1292   TString filename = cont->GetFileName();
1293   TFile *f = NULL;
1294   if (filename.IsNull()) {
1295     ::Error("AliAnalysisManager::OpenFile", "No file name specified for container %s", cont->GetName());
1296     return NULL;
1297   }
1298   if (mgr->GetAnalysisType()==AliAnalysisManager::kProofAnalysis && cont->IsSpecialOutput()
1299       && !ignoreProof)
1300     f = mgr->OpenProofFile(cont,option);
1301   else {
1302     // Check first if the file is already opened
1303     f = (TFile*)gROOT->GetListOfFiles()->FindObject(filename);
1304     if (f) {
1305       // Check if option "UPDATE" was preserved 
1306       TString opt(option);
1307       opt.ToUpper();
1308       if ((opt=="UPDATE") && (opt!=f->GetOption())) 
1309         ::Info("AliAnalysisManager::OpenFile", "File %s already opened in %s mode!", cont->GetFileName(), f->GetOption());
1310     } else {
1311       f = TFile::Open(filename, option);
1312     }    
1313   }   
1314   if (f && !f->IsZombie() && !f->TestBit(TFile::kRecovered)) {
1315     cont->SetFile(f);
1316     // Cd to file
1317     f->cd();
1318     // Check for a folder request
1319     TString dir = cont->GetFolderName(); 
1320     if (!dir.IsNull()) {
1321       if (!f->GetDirectory(dir)) f->mkdir(dir);
1322       f->cd(dir);
1323     }
1324     return f;
1325   }
1326   ::Fatal("AliAnalysisManager::OpenFile", "File %s could not be opened", filename.Data());
1327   cont->SetFile(NULL);
1328   return NULL;
1329 }    
1330  
1331 //______________________________________________________________________________
1332 TFile *AliAnalysisManager::OpenProofFile(AliAnalysisDataContainer *cont, const char *option)
1333 {
1334 // Opens a special output file used in PROOF.
1335   TString line;
1336   TString filename = cont->GetFileName();
1337   if (cont == fCommonOutput) {
1338      if (fOutputEventHandler) filename = fOutputEventHandler->GetOutputFileName();
1339      else Fatal("OpenProofFile","No output container. Aborting.");
1340   }   
1341   TFile *f = NULL;
1342   if (fMode!=kProofAnalysis || !fSelector) {
1343     Fatal("OpenProofFile","Cannot open PROOF file %s: no PROOF or selector",filename.Data());
1344     return NULL;
1345   } 
1346   if (fSpecialOutputLocation.Length()) {
1347     f = (TFile*)gROOT->GetListOfFiles()->FindObject(filename);
1348     if (f) {
1349       // Check if option "UPDATE" was preserved 
1350       TString opt(option);
1351       opt.ToUpper();
1352       if ((opt=="UPDATE") && (opt!=f->GetOption()))
1353         ::Info("OpenProofFile", "File %s already opened in %s mode!", cont->GetFileName(), f->GetOption());
1354     } else {
1355       f = new TFile(filename, option);
1356     }
1357     if (f && !f->IsZombie() && !f->TestBit(TFile::kRecovered)) {
1358       cont->SetFile(f);
1359       // Cd to file
1360       f->cd();
1361       // Check for a folder request
1362       TString dir = cont->GetFolderName(); 
1363       if (dir.Length()) {
1364         if (!f->GetDirectory(dir)) f->mkdir(dir);
1365         f->cd(dir);
1366       }      
1367       return f;
1368     }
1369     Fatal("OpenProofFile", "File %s could not be opened", cont->GetFileName());
1370     cont->SetFile(NULL);
1371     return NULL;       
1372   }
1373   // Check if there is already a proof output file in the output list
1374   TObject *pof = fSelector->GetOutputList()->FindObject(filename);
1375   if (pof) {
1376     // Get the actual file
1377     line = Form("((TProofOutputFile*)0x%lx)->GetFileName();", (ULong_t)pof);
1378     filename = (const char*)gROOT->ProcessLine(line);
1379     if (fDebug>1) {
1380       printf("File: %s already booked via TProofOutputFile\n", filename.Data());
1381     }  
1382     f = (TFile*)gROOT->GetListOfFiles()->FindObject(filename);
1383     if (!f) Fatal("OpenProofFile", "Proof output file found but no file opened for %s", filename.Data());
1384     // Check if option "UPDATE" was preserved 
1385     TString opt(option);
1386     opt.ToUpper();
1387     if ((opt=="UPDATE") && (opt!=f->GetOption())) 
1388       Fatal("OpenProofFile", "File %s already opened, but not in UPDATE mode!", cont->GetFileName());
1389   } else {
1390     if (cont->IsRegisterDataset()) {
1391       TString dset_name = filename;
1392       dset_name.ReplaceAll(".root", cont->GetTitle());
1393       dset_name.ReplaceAll(":","_");
1394       if (fDebug>1) printf("Booking dataset: %s\n", dset_name.Data());
1395       line = Form("TProofOutputFile *pf = new TProofOutputFile(\"%s\", \"DROV\", \"%s\");", filename.Data(), dset_name.Data());
1396     } else {
1397       if (fDebug>1) printf("Booking TProofOutputFile: %s to be merged\n", filename.Data());
1398       line = Form("TProofOutputFile *pf = new TProofOutputFile(\"%s\");", filename.Data());
1399     }
1400     if (fDebug > 1) printf("=== %s\n", line.Data());
1401     gROOT->ProcessLine(line);
1402     line = Form("pf->OpenFile(\"%s\");", option);
1403     gROOT->ProcessLine(line);
1404     f = gFile;
1405     if (fDebug > 1) {
1406       gROOT->ProcessLine("pf->Print()");
1407       printf(" == proof file name: %s", f->GetName());
1408     }   
1409     // Add to proof output list
1410     line = Form("((TList*)0x%lx)->Add(pf);",(ULong_t)fSelector->GetOutputList());
1411     if (fDebug > 1) printf("=== %s\n", line.Data());
1412     gROOT->ProcessLine(line);
1413   }
1414   if (f && !f->IsZombie() && !f->TestBit(TFile::kRecovered)) {
1415     cont->SetFile(f);
1416     // Cd to file
1417     f->cd();
1418     // Check for a folder request
1419     TString dir = cont->GetFolderName(); 
1420     if (!dir.IsNull()) {
1421       if (!f->GetDirectory(dir)) f->mkdir(dir);
1422       f->cd(dir);
1423     }
1424     return f;
1425   }
1426   Fatal("OpenProofFile", "File %s could not be opened", cont->GetFileName());
1427   cont->SetFile(NULL);  
1428   return NULL;
1429 }   
1430
1431 //______________________________________________________________________________
1432 void AliAnalysisManager::ExecAnalysis(Option_t *option)
1433 {
1434 // Execute analysis.
1435    static Long64_t ncalls = 0;
1436    Bool_t getsysInfo = ((fNSysInfo>0) && (fMode==kLocalAnalysis))?kTRUE:kFALSE;
1437    if (getsysInfo && ncalls==0) AliSysInfo::AddStamp("Start", (Int_t)ncalls);
1438    ncalls++;
1439    if (!fInitOK) {
1440      Error("ExecAnalysis", "Analysis manager was not initialized !");
1441       return;
1442    }   
1443    AliAnalysisTask *task;
1444    // Check if the top tree is active.
1445    if (fTree) {
1446       TIter next(fTasks);
1447    // De-activate all tasks
1448       while ((task=(AliAnalysisTask*)next())) task->SetActive(kFALSE);
1449       AliAnalysisDataContainer *cont = fCommonInput;
1450       if (!cont) cont = (AliAnalysisDataContainer*)fInputs->At(0);
1451       if (!cont) {
1452               Error("ExecAnalysis","Cannot execute analysis in TSelector mode without at least one top container");
1453          return;
1454       }   
1455       cont->SetData(fTree); // This will notify all consumers
1456       Long64_t entry = fTree->GetTree()->GetReadEntry();
1457       
1458 //
1459 //    Call BeginEvent() for optional input/output and MC services 
1460       if (fInputEventHandler)   fInputEventHandler  ->BeginEvent(entry);
1461       if (fOutputEventHandler)  fOutputEventHandler ->BeginEvent(entry);
1462       if (fMCtruthEventHandler) fMCtruthEventHandler->BeginEvent(entry);
1463 //
1464 //    Execute the tasks
1465 //      TIter next1(cont->GetConsumers());
1466       TIter next1(fTopTasks);
1467       while ((task=(AliAnalysisTask*)next1())) {
1468          if (fDebug >1) {
1469             cout << "    Executing task " << task->GetName() << endl;
1470          }   
1471          
1472          task->ExecuteTask(option);
1473       }
1474 //
1475 //    Call FinishEvent() for optional output and MC services 
1476       if (fInputEventHandler)   fInputEventHandler  ->FinishEvent();
1477       if (fOutputEventHandler)  fOutputEventHandler ->FinishEvent();
1478       if (fMCtruthEventHandler) fMCtruthEventHandler->FinishEvent();
1479       // Gather system information if requested
1480       if (getsysInfo && ((ncalls%fNSysInfo)==0)) 
1481          AliSysInfo::AddStamp(Form("Event#%lld",ncalls),(Int_t)ncalls);
1482       return;
1483    }   
1484    // The event loop is not controlled by TSelector   
1485 //
1486 //  Call BeginEvent() for optional input/output and MC services 
1487    if (fInputEventHandler)   fInputEventHandler  ->BeginEvent(-1);
1488    if (fOutputEventHandler)  fOutputEventHandler ->BeginEvent(-1);
1489    if (fMCtruthEventHandler) fMCtruthEventHandler->BeginEvent(-1);
1490    TIter next2(fTopTasks);
1491    while ((task=(AliAnalysisTask*)next2())) {
1492       task->SetActive(kTRUE);
1493       if (fDebug > 1) {
1494          cout << "    Executing task " << task->GetName() << endl;
1495       }   
1496       task->ExecuteTask(option);
1497    }   
1498 //
1499 // Call FinishEvent() for optional output and MC services 
1500    if (fInputEventHandler)   fInputEventHandler  ->FinishEvent();
1501    if (fOutputEventHandler)  fOutputEventHandler ->FinishEvent();
1502    if (fMCtruthEventHandler) fMCtruthEventHandler->FinishEvent();
1503 }
1504
1505 //______________________________________________________________________________
1506 void AliAnalysisManager::FinishAnalysis()
1507 {
1508 // Finish analysis.
1509 }
1510
1511 //______________________________________________________________________________
1512 void AliAnalysisManager::SetInputEventHandler(AliVEventHandler*  handler)
1513 {
1514 // Set the input event handler and create a container for it.
1515    fInputEventHandler   = handler;
1516    fCommonInput = CreateContainer("cAUTO_INPUT", TChain::Class(), AliAnalysisManager::kInputContainer);
1517 //   Warning("SetInputEventHandler", " An automatic input container for the input chain was created.\nPlease use: mgr->GetCommonInputContainer() to access it.");
1518 }
1519
1520 //______________________________________________________________________________
1521 void AliAnalysisManager::SetOutputEventHandler(AliVEventHandler*  handler)
1522 {
1523 // Set the input event handler and create a container for it.
1524    fOutputEventHandler   = handler;
1525    fCommonOutput = CreateContainer("cAUTO_OUTPUT", TTree::Class(), AliAnalysisManager::kOutputContainer, "default");
1526    fCommonOutput->SetSpecialOutput();
1527 //   Warning("SetOutputEventHandler", " An automatic output container for the output tree was created.\nPlease use: mgr->GetCommonOutputContainer() to access it.");
1528 }
1529
1530 //______________________________________________________________________________
1531 void AliAnalysisManager::RegisterExtraFile(const char *fname)
1532 {
1533 // This method is used externally to register output files which are not
1534 // connected to any output container, so that the manager can properly register,
1535 // retrieve or merge them when running in distributed mode. The file names are
1536 // separated by blancs. The method has to be called in MyAnalysisTask::LocalInit().
1537    if (fExtraFiles.Length()) fExtraFiles += " ";
1538    fExtraFiles += fname;
1539 }
1540
1541 //______________________________________________________________________________
1542 Bool_t AliAnalysisManager::GetFileFromWrapper(const char *filename, TList *source)
1543 {
1544 // Copy a file from the location specified ina the wrapper with the same name from the source list.
1545    char full_path[512];
1546    char ch_url[512];
1547    TObject *pof =  source->FindObject(filename);
1548    if (!pof || !pof->InheritsFrom("TProofOutputFile")) {
1549       Error("GetFileFromWrapper", "TProofOutputFile object not found in output list for file %s", filename);
1550       return kFALSE;
1551    }
1552    gROOT->ProcessLine(Form("sprintf((char*)0x%lx, \"%%s\", ((TProofOutputFile*)0x%lx)->GetOutputFileName();)", full_path, pof));
1553    gROOT->ProcessLine(Form("sprintf((char*)0x%lx, \"%%s\", gProof->GetUrl();)", ch_url));
1554    TString clientUrl(ch_url);
1555    TString full_path_str(full_path);
1556    if (clientUrl.Contains("localhost")){
1557       TObjArray* array = full_path_str.Tokenize ( "//" );
1558       TObjString *strobj = ( TObjString *)array->At(1);
1559       TObjArray* arrayPort = strobj->GetString().Tokenize ( ":" );
1560       TObjString *strobjPort = ( TObjString *) arrayPort->At(1);
1561       full_path_str.ReplaceAll(strobj->GetString().Data(),"localhost:PORT");
1562       full_path_str.ReplaceAll(":PORT",Form(":%s",strobjPort->GetString().Data()));
1563       if (fDebug > 1) Info("GetFileFromWrapper","Using tunnel from %s to %s",full_path_str.Data(),filename);
1564       delete arrayPort;
1565       delete array;
1566    }
1567    if (fDebug > 1) 
1568       Info("GetFileFromWrapper","Copying file %s from PROOF scratch space", full_path_str.Data());
1569    Bool_t gotit = TFile::Cp(full_path_str.Data(), filename); 
1570    if (!gotit)
1571       Error("GetFileFromWrapper", "Could not get file %s from proof scratch space", filename);
1572    return gotit;
1573 }
1574
1575 //______________________________________________________________________________
1576 void AliAnalysisManager::GetAnalysisTypeString(TString &type) const
1577 {
1578 // Fill analysis type in the provided string.
1579    switch (fMode) {
1580       case kLocalAnalysis:
1581          type = "local";
1582          return;
1583       case kProofAnalysis:
1584          type = "proof";
1585          return;
1586       case kGridAnalysis:
1587          type = "grid";
1588          return;
1589       case kMixingAnalysis:
1590          type = "mix";
1591    }
1592 }
1593
1594 //______________________________________________________________________________
1595 Bool_t AliAnalysisManager::ValidateOutputFiles() const
1596 {
1597 // Validate all output files.
1598    TIter next(fOutputs);
1599    AliAnalysisDataContainer *output;
1600    TDirectory *cdir = gDirectory;
1601    TString openedFiles;
1602    while ((output=(AliAnalysisDataContainer*)next())) {
1603       if (output->IsRegisterDataset()) continue;
1604       TString filename = output->GetFileName();
1605       if (filename == "default") {
1606          if (!fOutputEventHandler) continue;
1607          filename = fOutputEventHandler->GetOutputFileName();
1608          // Main AOD may not be there
1609          if (gSystem->AccessPathName(filename)) continue;
1610       }
1611       // Check if the file is closed
1612       if (openedFiles.Contains(filename)) continue;;
1613       TFile *file = (TFile*)gROOT->GetListOfFiles()->FindObject(filename);
1614       if (file) {
1615          Warning("ValidateOutputs", "File %s was not closed. Closing.", filename.Data());
1616          file->Close();
1617       }
1618       file = TFile::Open(filename);
1619       if (!file || file->IsZombie() || file->TestBit(TFile::kRecovered)) {
1620          Error("ValidateOutputs", "Output file <%s> was not created or invalid", filename.Data());
1621          cdir->cd();
1622          return kFALSE;
1623       }
1624       file->Close();
1625       openedFiles += filename;
1626       openedFiles += " ";
1627    }
1628    cdir->cd();
1629    return kTRUE;
1630 }   
1631
1632 //______________________________________________________________________________
1633 void AliAnalysisManager::ProgressBar(const char *opname, Long64_t current, Long64_t size, TStopwatch *watch, Bool_t last, Bool_t refresh)
1634 {
1635 // Implements a nice text mode progress bar.
1636    static Long64_t icount = 0;
1637    static TString oname;
1638    static TString nname;
1639    static Long64_t ocurrent = 0;
1640    static Long64_t osize = 0;
1641    static Int_t oseconds = 0;
1642    static TStopwatch *owatch = 0;
1643    static Bool_t oneoftwo = kFALSE;
1644    static Int_t nrefresh = 0;
1645    static Int_t nchecks = 0;
1646    const char symbol[4] = {'=','\\','|','/'}; 
1647    char progress[11] = "          ";
1648    Int_t ichar = icount%4;
1649    
1650    if (!refresh) {
1651       nrefresh = 0;
1652       if (!size) return;
1653       owatch = watch;
1654       oname = opname;
1655       ocurrent = TMath::Abs(current);
1656       osize = TMath::Abs(size);
1657       if (ocurrent > osize) ocurrent=osize;
1658    } else {
1659       nrefresh++;
1660       if (!osize) return;
1661    }     
1662    icount++;
1663    Double_t time = 0.;
1664    Int_t hours = 0;
1665    Int_t minutes = 0;
1666    Int_t seconds = 0;
1667    if (owatch && !last) {
1668       owatch->Stop();
1669       time = owatch->RealTime();
1670       hours = (Int_t)(time/3600.);
1671       time -= 3600*hours;
1672       minutes = (Int_t)(time/60.);
1673       time -= 60*minutes;
1674       seconds = (Int_t)time;
1675       if (refresh)  {
1676          if (oseconds==seconds) {
1677             owatch->Continue();
1678             return;
1679          }
1680          oneoftwo = !oneoftwo;   
1681       }
1682       oseconds = seconds;   
1683    }
1684    if (refresh && oneoftwo) {
1685       nname = oname;
1686       if (nchecks <= 0) nchecks = nrefresh+1;
1687       Int_t pctdone = (Int_t)(100.*nrefresh/nchecks);
1688       oname = Form("     == %d%% ==", pctdone);
1689    }         
1690    Double_t percent = 100.0*ocurrent/osize;
1691    Int_t nchar = Int_t(percent/10);
1692    if (nchar>10) nchar=10;
1693    Int_t i;
1694    for (i=0; i<nchar; i++)  progress[i] = '=';
1695    progress[nchar] = symbol[ichar];
1696    for (i=nchar+1; i<10; i++) progress[i] = ' ';
1697    progress[10] = '\0';
1698    oname += "                    ";
1699    oname.Remove(20);
1700    if(size<10000) fprintf(stderr, "%s [%10s] %4lld ", oname.Data(), progress, ocurrent);
1701    else if(size<100000) fprintf(stderr, "%s [%10s] %5lld ",oname.Data(), progress, ocurrent);
1702    else fprintf(stderr, "%s [%10s] %7lld ",oname.Data(), progress, ocurrent);
1703    if (time>0.) fprintf(stderr, "[%6.2f %%]   TIME %.2d:%.2d:%.2d             \r", percent, hours, minutes, seconds);
1704    else fprintf(stderr, "[%6.2f %%]\r", percent);
1705    if (refresh && oneoftwo) oname = nname;
1706    if (owatch) owatch->Continue();
1707    if (last) {
1708       icount = 0;
1709       owatch = 0;
1710       ocurrent = 0;
1711       osize = 0;
1712       oseconds = 0;
1713       oneoftwo = kFALSE;
1714       nrefresh = 0;
1715       fprintf(stderr, "\n");
1716    }   
1717 }