for (Int_t i=0; i<indent; i++) ind += " ";
TString opt(option);
opt.ToLower();
+ TString ctype = "Container";
+ if (IsExchange()) ctype = "Exchange container";
Bool_t dep = (opt.Contains("dep"))?kTRUE:kFALSE;
if (!dep) {
- printf("%sContainer: %s type: %s POST_LOOP=%i", ind.Data(), GetName(), GetTitle(), IsPostEventLoop());
- if (fProducer)
- printf("%s = Data producer: task %s",ind.Data(),fProducer->GetName());
- else
- printf("%s= No data producer",ind.Data());
- printf("%s = Consumer tasks: ", ind.Data());
- if (!fConsumers || !fConsumers->GetEntriesFast()) printf("-none-\n");
- else printf("\n");
+ if (IsPostEventLoop()) printf("%s%s: %s DATA TYPE: %s POST_LOOP task\n", ind.Data(), ctype.Data(), GetName(), GetTitle());
+ else printf("%s%s: %s DATA TYPE: %s\n", ind.Data(), ctype.Data(), GetName(), GetTitle());
+ if (!fProducer)
+// printf("%s = Data producer: task %s\n",ind.Data(),fProducer->GetName());
+// else
+ printf("%s= Not connected to a data producer\n",ind.Data());
+ if (fConsumers && fConsumers->GetEntriesFast())
+ printf("%s = Client tasks indented below:\n", ind.Data());
}
- if (fFolderName.Length())
- printf("Filename: %s folder: %s\n", fFileName.Data(), fFolderName.Data());
- else
- printf("Filename: %s\n", fFileName.Data());
+ if (!IsExchange()) {
+ if (!fFolderName.IsNull())
+ printf("%s = Filename: %s folder: %s\n", ind.Data(),fFileName.Data(), fFolderName.Data());
+ else
+ if (!fFileName.IsNull()) printf("%s = Filename: %s\n", ind.Data(),fFileName.Data());
+ }
+ ((AliAnalysisDataContainer*)this)->SetTouched(kTRUE);
TIter next(fConsumers);
AliAnalysisTask *task;
while ((task=(AliAnalysisTask*)next())) task->PrintTask(option, indent+3);
enum EAnalysisContainerFlags {
kPostEventLoop = BIT(14),
kSpecialOutput = BIT(15),
- kRegisterDataset = BIT(16)
+ kRegisterDataset = BIT(16),
+ kExchangeData = BIT(17),
+ kTouchedFlag = BIT(18)
};
AliAnalysisDataContainer();
AliAnalysisDataContainer(const AliAnalysisDataContainer &cont);
TObjArray *GetConsumers() const {return fConsumers;}
virtual void GetEntry(Long64_t ientry);
// Setters
+ void Reset() {fData = 0; fDataReady = kFALSE; SetTouched(kFALSE);}
void ResetDataReady() {fDataReady = kFALSE;}
virtual Bool_t SetData(TObject *data, Option_t *option="");
void SetDataOwned(Bool_t flag) {fOwnedData = flag;}
+ void SetExchange(Bool_t flag) {TObject::SetBit(kExchangeData,flag);}
void SetPostEventLoop(Bool_t flag=kTRUE) {TObject::SetBit(kPostEventLoop,flag);}
void SetSpecialOutput(Bool_t flag=kTRUE) {TObject::SetBit(kSpecialOutput,flag);}
void SetRegisterDataset(Bool_t flag=kTRUE) {TObject::SetBit(kRegisterDataset,flag);}
void SetFileName(const char *filename);
void SetFile(TFile *f) {fFile = f;}
void SetProducer(AliAnalysisTask *prod, Int_t islot);
+ void SetTouched(Bool_t flag=kTRUE) {TObject::SetBit(kTouchedFlag,flag);}
void AddConsumer(AliAnalysisTask *cons, Int_t islot);
void DeleteData();
// Wrapping
void ImportData(AliAnalysisDataWrapper *pack);
// Container status checking
Bool_t IsDataReady() const {return fDataReady;}
+ Bool_t IsExchange() const {return TObject::TestBit(kExchangeData);}
Bool_t IsPostEventLoop() const {return TObject::TestBit(kPostEventLoop);}
Bool_t IsSpecialOutput() const {return TObject::TestBit(kSpecialOutput);}
Bool_t IsRegisterDataset() const {return TObject::TestBit(kRegisterDataset);}
+ Bool_t IsTouched() const {return TObject::TestBit(kTouchedFlag);}
Bool_t IsOwnedData() const {return fOwnedData;}
Bool_t ClientsExecuted() const;
Bool_t HasConsumers() const {return (fConsumers != 0);}
fInputs(0),
fOutputs(0),
fParamCont(0),
+ fExchangeCont(0),
fDebugOptions(0),
fFileDescriptors(new TObjArray()),
fCurrentDescriptor(0),
fInputs = new TObjArray();
fOutputs = new TObjArray();
fParamCont = new TObjArray();
+ fExchangeCont = new TObjArray();
fGlobals = new TMap();
}
fIOTimer = new TStopwatch();
fInputs(NULL),
fOutputs(NULL),
fParamCont(NULL),
+ fExchangeCont(NULL),
fDebugOptions(NULL),
fFileDescriptors(new TObjArray()),
fCurrentDescriptor(0),
fInputs = new TObjArray(*other.fInputs);
fOutputs = new TObjArray(*other.fOutputs);
fParamCont = new TObjArray(*other.fParamCont);
+ fExchangeCont = new TObjArray(*other.fExchangeCont);
fgCommonFileName = "AnalysisResults.root";
fgAnalysisManager = this;
}
fInputs = new TObjArray(*other.fInputs);
fOutputs = new TObjArray(*other.fOutputs);
fParamCont = new TObjArray(*other.fParamCont);
+ fExchangeCont = new TObjArray(*other.fExchangeCont);
fDebugOptions = NULL;
fFileDescriptors = new TObjArray();
fCurrentDescriptor = 0;
{
// Destructor.
if (fTasks) {fTasks->Delete(); delete fTasks;}
- if (fTopTasks) delete fTopTasks;
- if (fZombies) delete fZombies;
+ delete fTopTasks;
+ delete fZombies;
if (fContainers) {fContainers->Delete(); delete fContainers;}
- if (fInputs) delete fInputs;
- if (fOutputs) delete fOutputs;
- if (fParamCont) delete fParamCont;
- if (fDebugOptions) delete fDebugOptions;
- if (fGridHandler) delete fGridHandler;
- if (fInputEventHandler) delete fInputEventHandler;
- if (fOutputEventHandler) delete fOutputEventHandler;
- if (fMCtruthEventHandler) delete fMCtruthEventHandler;
- if (fEventPool) delete fEventPool;
+ delete fInputs;
+ delete fOutputs;
+ delete fParamCont;
+ delete fExchangeCont;
+ delete fDebugOptions;
+ delete fGridHandler;
+ delete fInputEventHandler;
+ delete fOutputEventHandler;
+ delete fMCtruthEventHandler;
+ delete fEventPool;
if (fgAnalysisManager==this) fgAnalysisManager = NULL;
if (fGlobals) {fGlobals->DeleteAll(); delete fGlobals;}
if (fFileDescriptors) {fFileDescriptors->Delete(); delete fFileDescriptors;}
}
break;
case kExchangeContainer:
+ cont->SetExchange(kTRUE);
+ fExchangeCont->Add(cont);
+ cont->SetDataOwned(kFALSE); // data owned by the publisher
break;
}
return cont;
Bool_t getsysInfo = ((fNSysInfo>0) && (fMode==kLocalAnalysis))?kTRUE:kFALSE;
if (getsysInfo)
Info("PrintStatus", "System information will be collected each %lld events", fNSysInfo);
- TIter next(fTopTasks);
+ AliAnalysisDataContainer *cont = fCommonInput;
+ if (!cont) cont = (AliAnalysisDataContainer*)fInputs->At(0);
+ printf("=== TOP CONTAINER:\n");
+ cont->PrintContainer(option,0);
+ // Reset "touched" flag
+ TIter next(fContainers);
+ while ((cont = (AliAnalysisDataContainer*)next())) cont->SetTouched(kFALSE);
+ TIter nextt(fTasks);
AliAnalysisTask *task;
- while ((task=(AliAnalysisTask*)next()))
- task->PrintTask(option);
+ while ((task=(AliAnalysisTask*)nextt()))
+ task->SetActive(kFALSE);
if (!fAutoBranchHandling && !fRequestedBranches.IsNull())
printf("Requested input branches:\n%s\n", fRequestedBranches.Data());
void AliAnalysisManager::ResetAnalysis()
{
// Reset all execution flags and clean containers.
- CleanContainers();
+ TIter nextTask(fTasks);
+ AliAnalysisTask *task;
+ while ((task=(AliAnalysisTask*)nextTask())) {
+ // Clean all tasks
+ task->Reset();
+ }
+// CleanContainers();
}
//______________________________________________________________________________
}
fNcalls++;
AliAnalysisTask *task;
+ // Reset the analysis
+ ResetAnalysis();
// Check if the top tree is active.
if (fTree) {
if (getsysInfo && ((fNcalls%fNSysInfo)==0))
AliSysInfo::AddStamp("Handlers_BeginEventGroup",fNcalls, 1002, 0);
TIter next(fTasks);
- // De-activate all tasks
- while ((task=(AliAnalysisTask*)next())) task->SetActive(kFALSE);
+ // De-activate all tasks (not needed anymore after ResetAnalysis
+// while ((task=(AliAnalysisTask*)next())) task->SetActive(kFALSE);
AliAnalysisDataContainer *cont = fCommonInput;
if (!cont) cont = (AliAnalysisDataContainer*)fInputs->At(0);
if (!cont) {
if (cdir) cdir->cd();
return;
}
- cont->SetData(fTree); // This will notify all consumers
+ cont->SetData(fTree); // This set activity for all tasks reading only from the top container
Long64_t entry = fTree->GetTree()->GetReadEntry();
//
// Call BeginEvent() for optional input/output and MC services
TIter next1(fTopTasks);
Int_t itask = 0;
while ((task=(AliAnalysisTask*)next1())) {
+ task->SetActive(kTRUE);
if (fDebug >1) {
cout << " Executing task " << task->GetName() << endl;
}
AliVEventHandler* GetOutputEventHandler() const {return fOutputEventHandler;}
TObjArray *GetOutputs() const {return fOutputs;}
TObjArray *GetParamOutputs() const {return fParamCont;}
+ TObjArray *GetExchangeContainers() const {return fExchangeCont;}
Int_t GetRunFromPath() const {return fRunFromPath;}
const char *GetRequestedBranches() const {return fRequestedBranches.Data();}
TObjArray *GetTasks() const {return fTasks;}
TObjArray *fContainers; // List of all containers
TObjArray *fInputs; // List of containers with input data
TObjArray *fOutputs; // List of containers with results
- TObjArray *fParamCont; // List of containers with results
+ TObjArray *fParamCont; // List of containers with parameters
+ TObjArray *fExchangeCont; // List of exchange containers
TObjArray *fDebugOptions; // List of debug options
TObjArray *fFileDescriptors; //! List of file descriptors
AliAnalysisFileDescriptor *fCurrentDescriptor; //! Current file descriptor
static TString fgCommonFileName; //! Common output file name (not streamed)
static TString fgMacroNames; //! Loaded macro names
static AliAnalysisManager *fgAnalysisManager; //! static pointer to object instance
- ClassDef(AliAnalysisManager,19) // Analysis manager class
+ ClassDef(AliAnalysisManager,20) // Analysis manager class
};
#endif
AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
for (Int_t islot=0; islot<fNoutputs; islot++) {
coutput = GetOutputSlot(islot)->GetContainer();
- if (!mgr->GetOutputs()->FindObject(coutput) || coutput==mgr->GetCommonOutputContainer()) continue;
+ if (!coutput) continue;
+ if (coutput->IsExchange() || !mgr->GetOutputs()->FindObject(coutput) || coutput==mgr->GetCommonOutputContainer()) continue;
if (!coutput->GetData()) {
Error("CheckPostData", "Data not posted for slot #%d of task %s (%s)",
islot, GetName(), ClassName());
}
}
+//______________________________________________________________________________
+void AliAnalysisTask::Reset()
+{
+// Clear activity flag. Reset data for exchange containers.
+ fActive = kFALSE;
+ fHasExecuted = kFALSE;
+ // Call PostData(islot, 0) for all slots connected to exchange containers
+ Int_t islot;
+ AliAnalysisDataContainer *cont;
+ for (islot=0; islot<fNinputs; islot++) {
+ cont = GetInputSlot(islot)->GetContainer();
+ if (cont && cont->IsExchange()) cont->Reset();
+ }
+}
+
//______________________________________________________________________________
Bool_t AliAnalysisTask::CheckCircularDeps()
{
SetChecked(kFALSE);
return kFALSE;
}
-
+
+//______________________________________________________________________________
+Bool_t AliAnalysisTask::ProducersTouched() const
+{
+// Check if all producer containers are in the "touched" state.
+ Int_t islot;
+ AliAnalysisDataContainer *cont;
+ for (islot=0; islot<fNinputs; islot++) {
+ cont = GetInputSlot(islot)->GetContainer();
+ // Simulate the data flow so the tasks are printed only when all inputs
+ // are touched
+ if (cont && !cont->IsTouched()) return kFALSE;
+ }
+ return kTRUE;
+}
+
//______________________________________________________________________________
void AliAnalysisTask::PrintTask(Option_t *option, Int_t indent) const
{
// Print task info.
+ Int_t islot;
+ AliAnalysisDataContainer *cont;
+ if (fActive) return;
+ if (!ProducersTouched()) return;
TString opt(option);
opt.ToLower();
Bool_t dep = (opt.Contains("dep"))?kTRUE:kFALSE;
TString ind;
- Int_t islot;
- AliAnalysisDataContainer *cont;
for (Int_t i=0; i<indent; i++) ind += " ";
if (!dep || (dep && IsChecked())) {
printf("______________________________________________________________________________\n");
}
}
}
+ ((AliAnalysisTask*)this)->SetActive(kTRUE);
PrintContainers(option, indent+3);
- if (!fBranchNames.IsNull()) printf("Requested branches: %s\n", fBranchNames.Data());
+ if (!fBranchNames.IsNull()) printf("%sRequested branches: %s\n", ind.Data(), fBranchNames.Data());
}
//______________________________________________________________________________
Bool_t CheckCircularDeps();
virtual Bool_t CheckPostData() const;
virtual Bool_t CheckOwnership() const;
+ // Reset task
+ virtual void Reset();
// Getters
void GetBranches(const char *type, TString &result) const;
Int_t GetNinputs() const {return fNinputs;}
Bool_t HasBranches() const {return !fBranchNames.IsNull();}
virtual void PrintTask(Option_t *option="all", Int_t indent=0) const;
void PrintContainers(Option_t *option="all", Int_t indent=0) const;
+ Bool_t ProducersTouched() const;
void SetBranches(const char *names) {fBranchNames = names;}
void SetChecked(Bool_t flag=kTRUE) {TObject::SetBit(kTaskChecked,flag);}
void SetPostEventLoop(Bool_t flag=kTRUE);
--- /dev/null
+TaskConsumer *AddTaskConsumer(const char *name, const char *prodname1, const char *prodname2)
+{
+// Provide as input the name of the consumer task and the name of the
+// producer task
+ // pointer to the analysis manager
+ AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
+ if (!mgr) {
+ ::Error("AddTaskConsumer", "No analysis manager to connect to.");
+ return NULL;
+ }
+ // create the task
+ TaskConsumer *task = new TaskConsumer(name);
+ mgr->AddTask(task);
+
+ // connecting the input/output containers
+ TString outfile = mgr->GetCommonFileName();
+ // input data feed
+ AliAnalysisDataContainer *cinput0 = mgr->GetCommonInputContainer();
+ mgr->ConnectInput (task, 0, cinput0 );
+ // producer task
+ TaskProducer *prod1 = mgr->GetTask(prodname1);
+ TaskProducer *prod2 = mgr->GetTask(prodname2);
+ if (!prod1 || !prod2) {
+ ::Error("AddTaskConsumer", "Producer task %s or %s not found in the analysis manager",
+ prodname1, prodname2);
+ return 0;
+ }
+ // Connect to exchange container
+ AliAnalysisDataContainer *cinput1 = prod1->GetOutputSlot(2)->GetContainer();
+ mgr->ConnectInput(task, 1, cinput1);
+ AliAnalysisDataContainer *cinput2 = prod2->GetOutputSlot(2)->GetContainer();
+ mgr->ConnectInput(task, 2, cinput2);
+
+ AliAnalysisDataContainer *coutput1 = mgr->CreateContainer(
+ TString::Format("output_%s", name),
+ TList::Class(), AliAnalysisManager::kOutputContainer,
+ TString::Format("%s:output",outfile.Data()));
+ mgr->ConnectOutput(task, 1, coutput1);
+ return task;
+}
--- /dev/null
+TaskProducer *AddTaskProducer(const char *name)
+{
+ // pointer to the analysis manager
+ AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
+ if (!mgr) {
+ Error("AddTaskProducer", "No analysis manager to connect to.");
+ return NULL;
+ }
+ // create the task
+ TaskProducer *task = new TaskProducer(name);
+ mgr->AddTask(task);
+
+ // connecting the input/output containers
+ TString outfile = mgr->GetCommonFileName();
+ // input data feed
+ AliAnalysisDataContainer *cinput0 = mgr->GetCommonInputContainer();
+ mgr->ConnectInput (task, 0, cinput0 );
+ // producer output
+ AliAnalysisDataContainer *coutput1 = mgr->CreateContainer(
+ TString::Format("output_%s", name),
+ TList::Class(), AliAnalysisManager::kOutputContainer,
+ TString::Format("%s:output",outfile.Data()));
+ mgr->ConnectOutput(task, 1, coutput1);
+ // exchange output
+ AliAnalysisDataContainer *coutput2 = mgr->CreateContainer(
+ TString::Format("exchange_%s", name),
+ TObjArray::Class(), AliAnalysisManager::kExchangeContainer);
+ mgr->ConnectOutput(task, 2, coutput2);
+ return task;
+}
+
+
--- /dev/null
+/**************************************************************************
+ * Copyright(c) 1998-1999, ALICE Experiment at CERN, All rights reserved. *
+ * *
+ * Author: The ALICE Off-line Project. *
+ * Contributors are mentioned in the code where appropriate. *
+ * *
+ * Permission to use, copy, modify and distribute this software and its *
+ * documentation strictly for non-commercial purposes is hereby granted *
+ * without fee, provided that the above copyright notice appears in all *
+ * copies and that both the copyright notice and this permission notice *
+ * appear in the supporting documentation. The authors make no claims *
+ * about the suitability of this software for any purpose. It is *
+ * provided "as is" without express or implied warranty. *
+ **************************************************************************/
+
+//______________________________________________________________________________
+// Producer task that exchanges data with other task using exchange containers
+//______________________________________________________________________________
+//
+#include "TaskExchange.h"
+
+#include "AliAnalysisManager.h"
+#include "TList.h"
+#include "TObjArray.h"
+
+ClassImp(TaskProducer)
+
+//________________________________________________________________________
+TaskProducer::TaskProducer() // All data members should be initialised here
+ :AliAnalysisTaskSE(),
+ fOutput(0),
+ fExchangedData(0)
+{
+ // Dummy constructor ALWAYS needed for I/O.
+}
+
+//________________________________________________________________________
+TaskProducer::TaskProducer(const char *name) // All data members should be initialised here
+ :AliAnalysisTaskSE(name),
+ fOutput(0),
+ fExchangedData(0)
+{
+ // Constructor
+ // Define input and output slots here (never in the dummy constructor)
+ // Input slot #0 works with a TChain - it is connected to the default input container
+
+ // Output slot #1 used to publish the task private data
+ DefineOutput(1, TList::Class());
+
+ // Output slot #2 used to publish the exchanged data
+ DefineOutput(2, TObjArray::Class()); // Type can be anything else
+}
+
+//________________________________________________________________________
+TaskProducer::~TaskProducer()
+{
+ // Destructor. Clean-up the output list, but not the histograms that are put inside
+ // (the list is owner and will clean-up these histograms). Protect in PROOF case.
+ if (fOutput && !AliAnalysisManager::GetAnalysisManager()->IsProofMode()) {
+ delete fOutput;
+ }
+ delete fExchangedData;
+}
+
+//________________________________________________________________________
+void TaskProducer::UserCreateOutputObjects()
+{
+ // Create histograms
+ // Called once (on the worker node)
+
+ fOutput = new TList();
+ fOutput->SetOwner(); // IMPORTANT!
+
+// fOutput->Add(fHistPt);
+// fOutput->Add(fHistEta);
+ // NEW HISTO added to fOutput here
+
+ // Data to be exchanged can be initialized here, but updated in UserExec
+ fExchangedData = new TObjArray();
+ fExchangedData->SetOwner();
+
+ PostData(1, fOutput); // Post data for ALL output slots >0 here, to get at least an empty histogram
+// Do NOT post data for the exchange containers !
+// PostData(2, fExchangedData);
+}
+
+//________________________________________________________________________
+void TaskProducer::UserExec(Option_t *)
+{
+ // Main loop
+ // Called for each event
+
+ //Here we fill the private output
+ // ...
+ PostData(1, fOutput);
+
+ // And publish the exchanged data
+ fExchangedData->Delete();
+ TNamed *data = new TNamed(Form("<data by %s, ev. %d>", GetName(), fEntry), "");
+ fExchangedData->Add(data);
+ PostData(2, fExchangedData);
+ Printf(" -- %s published: %s", GetName(), data->GetName());
+}
+
+//______________________________________________________________________________
+// Consumer task reading data produced by other task
+//______________________________________________________________________________
+//
+ClassImp(TaskConsumer)
+
+//________________________________________________________________________
+TaskConsumer::TaskConsumer() // All data members should be initialised here
+ :AliAnalysisTaskSE(),
+ fOutput(0),
+ fImported1(0),
+ fImported2(0)
+{
+ // Dummy constructor ALWAYS needed for I/O.
+}
+
+//________________________________________________________________________
+TaskConsumer::TaskConsumer(const char *name) // All data members should be initialised here
+ :AliAnalysisTaskSE(name),
+ fOutput(0),
+ fImported1(0),
+ fImported2(0)
+{
+ // Constructor
+ // Define input and output slots here (never in the dummy constructor)
+ // Input slot #0 works with a TChain - it is connected to the default input container
+
+ // Input slot #1 reads from an exchange container
+ DefineInput(1, TObjArray::Class());
+ // Input slot #2 reads from an exchange container
+ DefineInput(2, TObjArray::Class());
+
+ // Output slot #0 is reserved
+ // Output slot #1 used to publish the task private data
+ DefineOutput(1, TList::Class());
+}
+
+//________________________________________________________________________
+TaskConsumer::~TaskConsumer()
+{
+ // Destructor. Clean-up the output list, but not the histograms that are put inside
+ // (the list is owner and will clean-up these histograms). Protect in PROOF case.
+ if (fOutput && !AliAnalysisManager::GetAnalysisManager()->IsProofMode()) {
+ delete fOutput;
+ }
+ // Do not delete the imported data - we are not owners
+}
+
+//________________________________________________________________________
+void TaskConsumer::UserCreateOutputObjects()
+{
+ // Create histograms
+ // Called once (on the worker node)
+
+ fOutput = new TList();
+ fOutput->SetOwner(); // IMPORTANT!
+
+// fOutput->Add(fHistPt);
+// fOutput->Add(fHistEta);
+ // NEW HISTO added to fOutput here
+
+ PostData(1, fOutput); // Post data for ALL output slots >0 here, to get at least an empty histogram
+}
+
+//________________________________________________________________________
+void TaskConsumer::UserExec(Option_t *)
+{
+ // Main loop
+ // Called for each event
+ // This is how we get the actual exchange data
+ TObjArray *exchange1 = (TObjArray*)GetInputData(1);
+ if (!exchange1) {
+ Error("UserExec", "Task %s could not read the exchanged data for slot 1", GetName());
+ return;
+ }
+ fImported1 = (TNamed*)exchange1->At(0);
+ TObjArray *exchange2 = (TObjArray*)GetInputData(1);
+ if (!exchange2) {
+ Error("UserExec", "Task %s could not read the exchanged data for slot 2", GetName());
+ return;
+ }
+ fImported2 = (TNamed*)exchange2->At(0);
+ Printf(" -- imported: %s and %s", fImported1->GetName(), fImported2->GetName());
+
+ //Here we fill the private output
+ // ...
+ PostData(1, fOutput);
+}
--- /dev/null
+/* Copyright(c) 1998-1999, ALICE Experiment at CERN, All rights reserved. *
+ * See cxx source for full Copyright notice */
+
+//______________________________________________________________________________
+//
+// Task that produces some generic data to be exchanged with some consumers
+//
+//______________________________________________________________________________
+
+#ifndef TASKEXCHANGE_H
+#define TASKEXCHANGE_H
+
+#ifndef ALIANALYSISTASKSE_H
+#include "AliAnalysisTaskSE.h"
+#endif
+
+class TList;
+class TObjArray;
+
+class TaskProducer : public AliAnalysisTaskSE {
+ public:
+ TaskProducer();
+ TaskProducer(const char *name);
+ virtual ~TaskProducer();
+
+ virtual void UserCreateOutputObjects();
+ virtual void UserExec(Option_t *option);
+
+ private:
+ TList *fOutput; //! Output list (do not stream)
+ TObjArray *fExchangedData; //! Arbitrary data (do not stream)
+
+ TaskProducer(const TaskProducer&); // not implemented
+ TaskProducer& operator=(const TaskProducer&); // not implemented
+
+ ClassDef(TaskProducer, 1); // example of producer
+};
+
+//______________________________________________________________________________
+//
+// Task that uses generic data produced by other tasks
+//
+//______________________________________________________________________________
+
+class TaskConsumer : public AliAnalysisTaskSE {
+ public:
+ TaskConsumer();
+ TaskConsumer(const char *name);
+ virtual ~TaskConsumer();
+
+ virtual void UserCreateOutputObjects();
+ virtual void UserExec(Option_t *option);
+
+ private:
+ TList *fOutput; //! Output list (do not stream)
+ TNamed *fImported1; //! Arbitrary data (do not stream)
+ TNamed *fImported2; //! Arbitrary data (do not stream)
+
+ TaskConsumer(const TaskConsumer&); // not implemented
+ TaskConsumer& operator=(const TaskConsumer&); // not implemented
+
+ ClassDef(TaskConsumer, 1); // example of consumer task
+};
+
+#endif
+
--- /dev/null
+// run.C
+//
+// Template run macro for producer/consumer tasks
+//
+// Author: Andrei Gheata
+//
+void CreateAlienHandler();
+
+//______________________________________________________________________________
+void runEx02()
+{
+ // load libraries
+ gSystem->Load("libCore.so");
+ gSystem->Load("libGeom.so");
+ gSystem->Load("libVMC.so");
+ gSystem->Load("libPhysics.so");
+ gSystem->Load("libTree.so");
+ gSystem->Load("libSTEERBase.so");
+ gSystem->Load("libESD.so");
+ gSystem->Load("libAOD.so");
+ gSystem->Load("libANALYSIS.so");
+ gSystem->Load("libOADB.so");
+ gSystem->Load("libANALYSISalice.so");
+
+ // add aliroot indlude path
+ gROOT->ProcessLine(Form(".include %s/include",gSystem->ExpandPathName("$ALICE_ROOT")));
+ gROOT->SetStyle("Plain");
+
+ // analysis manager
+ AliAnalysisManager* mgr = new AliAnalysisManager("example for using producer/consumer tasks");
+ mgr->SetCommonFileName("output.root");
+
+ // create the alien handler and attach it to the manager
+ AliAnalysisGrid *plugin = CreateAlienHandler();
+ mgr->SetGridHandler(plugin);
+
+ // create the alien handler and attach it to the manager
+
+ AliVEventHandler* iH = new AliESDInputHandler();
+ mgr->SetInputEventHandler(iH);
+
+ // create 2 producer tasks
+ gROOT->LoadMacro("TaskExchange.cxx+g");
+ gROOT->LoadMacro("AddTaskProducer.C");
+ TaskProducer *prod1 = AddTaskProducer("producer1");
+ TaskProducer *prod2 = AddTaskProducer("producer2");
+ if (!prod1 || !prod2) return;
+
+ // Create 1 consumer task
+ gROOT->LoadMacro("AddTaskConsumer.C");
+ TaskConsumer *task = AddTaskConsumer("consumer", "producer1", "producer2");
+
+ // enable debug printouts
+ mgr->SetDebugLevel(2);
+ if (!mgr->InitAnalysis()) return;
+ mgr->PrintStatus();
+
+ // start analysis
+ Printf("Starting analysis example for producer/consumers...");
+ mgr->StartAnalysis("local",3);
+}
+
+//______________________________________________________________________________
+AliAnalysisGrid* CreateAlienHandler()
+{
+ AliAnalysisAlien *plugin = new AliAnalysisAlien();
+ // Set the run mode (can be "full", "test", "offline", "submit" or "terminate")
+ plugin->SetRunMode("test");
+ // Plugin test mode works only providing a file containing test file locations, used in "local" mode also
+ plugin->SetFileForTestMode("files.txt"); // file should contain path name to a local directory containg *ESDs.root etc
+ return plugin;
+}