Fixed usage of exchange containers in a multiple producers/consumers schema.
authoragheata <Andrei.Gheata@cern.ch>
Tue, 8 Apr 2014 14:00:40 +0000 (16:00 +0200)
committeragheata <Andrei.Gheata@cern.ch>
Tue, 8 Apr 2014 14:01:20 +0000 (16:01 +0200)
ANALYSIS/AliAnalysisDataContainer.cxx
ANALYSIS/AliAnalysisDataContainer.h
ANALYSIS/AliAnalysisManager.cxx
ANALYSIS/AliAnalysisManager.h
ANALYSIS/AliAnalysisTask.cxx
ANALYSIS/AliAnalysisTask.h
ANALYSIS/examples/AddTaskConsumer.C [new file with mode: 0644]
ANALYSIS/examples/AddTaskProducer.C [new file with mode: 0644]
ANALYSIS/examples/TaskExchange.cxx [new file with mode: 0644]
ANALYSIS/examples/TaskExchange.h [new file with mode: 0644]
ANALYSIS/examples/runEx02.C [new file with mode: 0644]

index 4e36367..8784f50 100644 (file)
@@ -281,21 +281,26 @@ void AliAnalysisDataContainer::PrintContainer(Option_t *option, Int_t indent) co
    for (Int_t i=0; i<indent; i++) ind += " ";
    TString opt(option);
    opt.ToLower();
+   TString ctype = "Container";
+   if (IsExchange()) ctype = "Exchange container";
    Bool_t dep = (opt.Contains("dep"))?kTRUE:kFALSE;
    if (!dep) {
-      printf("%sContainer: %s  type: %s POST_LOOP=%i", ind.Data(), GetName(), GetTitle(), IsPostEventLoop());
-      if (fProducer) 
-         printf("%s = Data producer: task %s",ind.Data(),fProducer->GetName());
-      else
-         printf("%s= No data producer",ind.Data());
-      printf("%s = Consumer tasks: ", ind.Data());
-      if (!fConsumers || !fConsumers->GetEntriesFast()) printf("-none-\n");
-      else printf("\n");
+      if (IsPostEventLoop()) printf("%s%s: %s     DATA TYPE: %s POST_LOOP task\n", ind.Data(), ctype.Data(), GetName(), GetTitle());
+      else printf("%s%s: %s     DATA TYPE: %s\n", ind.Data(), ctype.Data(), GetName(), GetTitle());
+      if (!fProducer) 
+//         printf("%s = Data producer: task %s\n",ind.Data(),fProducer->GetName());
+//      else
+         printf("%s= Not connected to a data producer\n",ind.Data());
+      if (fConsumers && fConsumers->GetEntriesFast())
+         printf("%s = Client tasks indented below:\n", ind.Data());
    }
-   if (fFolderName.Length())
-     printf("Filename: %s  folder: %s\n", fFileName.Data(), fFolderName.Data());
-   else
-     printf("Filename: %s\n", fFileName.Data());
+   if (!IsExchange()) {
+      if (!fFolderName.IsNull())
+        printf("%s = Filename: %s  folder: %s\n", ind.Data(),fFileName.Data(), fFolderName.Data());
+      else
+        if (!fFileName.IsNull()) printf("%s = Filename: %s\n", ind.Data(),fFileName.Data());
+   }     
+   ((AliAnalysisDataContainer*)this)->SetTouched(kTRUE);  
    TIter next(fConsumers);
    AliAnalysisTask *task;
    while ((task=(AliAnalysisTask*)next())) task->PrintTask(option, indent+3);
index 91be24c..4ac8007 100644 (file)
@@ -47,7 +47,9 @@ enum ENotifyMessage {
 enum EAnalysisContainerFlags {
    kPostEventLoop = BIT(14),
    kSpecialOutput = BIT(15),
-   kRegisterDataset = BIT(16)
+   kRegisterDataset = BIT(16),
+   kExchangeData  = BIT(17),
+   kTouchedFlag   = BIT(18)
 };     
    AliAnalysisDataContainer();
    AliAnalysisDataContainer(const AliAnalysisDataContainer &cont);
@@ -66,15 +68,18 @@ enum EAnalysisContainerFlags {
    TObjArray                *GetConsumers() const {return fConsumers;}
    virtual void              GetEntry(Long64_t ientry);
    // Setters
+   void                      Reset()              {fData = 0; fDataReady = kFALSE; SetTouched(kFALSE);}
    void                      ResetDataReady()     {fDataReady = kFALSE;}
    virtual Bool_t            SetData(TObject *data, Option_t *option="");
    void                      SetDataOwned(Bool_t flag) {fOwnedData = flag;}
+   void                      SetExchange(Bool_t flag) {TObject::SetBit(kExchangeData,flag);}
    void                      SetPostEventLoop(Bool_t flag=kTRUE) {TObject::SetBit(kPostEventLoop,flag);}
    void                      SetSpecialOutput(Bool_t flag=kTRUE) {TObject::SetBit(kSpecialOutput,flag);}
    void                      SetRegisterDataset(Bool_t flag=kTRUE) {TObject::SetBit(kRegisterDataset,flag);}
    void                      SetFileName(const char *filename);
    void                      SetFile(TFile *f) {fFile = f;}
    void                      SetProducer(AliAnalysisTask *prod, Int_t islot);
+   void                      SetTouched(Bool_t flag=kTRUE)       {TObject::SetBit(kTouchedFlag,flag);}
    void                      AddConsumer(AliAnalysisTask *cons, Int_t islot);
    void                      DeleteData();
    // Wrapping
@@ -82,9 +87,11 @@ enum EAnalysisContainerFlags {
    void                      ImportData(AliAnalysisDataWrapper *pack);
    // Container status checking
    Bool_t                    IsDataReady() const  {return fDataReady;}
+   Bool_t                    IsExchange() const      {return TObject::TestBit(kExchangeData);}
    Bool_t                    IsPostEventLoop() const {return TObject::TestBit(kPostEventLoop);}
    Bool_t                    IsSpecialOutput() const {return TObject::TestBit(kSpecialOutput);}
    Bool_t                    IsRegisterDataset() const {return TObject::TestBit(kRegisterDataset);}
+   Bool_t                    IsTouched() const       {return TObject::TestBit(kTouchedFlag);}
    Bool_t                    IsOwnedData() const  {return fOwnedData;}
    Bool_t                    ClientsExecuted() const;
    Bool_t                    HasConsumers() const {return (fConsumers != 0);}
index b7267c2..cfafd40 100644 (file)
@@ -91,6 +91,7 @@ AliAnalysisManager::AliAnalysisManager(const char *name, const char *title)
                     fInputs(0),
                     fOutputs(0),
                     fParamCont(0),
+                    fExchangeCont(0),
                     fDebugOptions(0),
                     fFileDescriptors(new TObjArray()),
                     fCurrentDescriptor(0),
@@ -129,6 +130,7 @@ AliAnalysisManager::AliAnalysisManager(const char *name, const char *title)
      fInputs     = new TObjArray();
      fOutputs    = new TObjArray();
      fParamCont  = new TObjArray();
+     fExchangeCont = new TObjArray();
      fGlobals    = new TMap();
    }
    fIOTimer = new TStopwatch();
@@ -162,6 +164,7 @@ AliAnalysisManager::AliAnalysisManager(const AliAnalysisManager& other)
                     fInputs(NULL),
                     fOutputs(NULL),
                     fParamCont(NULL),
+                    fExchangeCont(NULL),
                     fDebugOptions(NULL),
                     fFileDescriptors(new TObjArray()),
                     fCurrentDescriptor(0),
@@ -197,6 +200,7 @@ AliAnalysisManager::AliAnalysisManager(const AliAnalysisManager& other)
    fInputs     = new TObjArray(*other.fInputs);
    fOutputs    = new TObjArray(*other.fOutputs);
    fParamCont  = new TObjArray(*other.fParamCont);
+   fExchangeCont  = new TObjArray(*other.fExchangeCont);
    fgCommonFileName  = "AnalysisResults.root";
    fgAnalysisManager = this;
 }
@@ -227,6 +231,7 @@ AliAnalysisManager& AliAnalysisManager::operator=(const AliAnalysisManager& othe
       fInputs     = new TObjArray(*other.fInputs);
       fOutputs    = new TObjArray(*other.fOutputs);
       fParamCont  = new TObjArray(*other.fParamCont);
+      fExchangeCont  = new TObjArray(*other.fExchangeCont);
       fDebugOptions = NULL;
       fFileDescriptors = new TObjArray();
       fCurrentDescriptor = 0;
@@ -264,18 +269,19 @@ AliAnalysisManager::~AliAnalysisManager()
 {
 // Destructor.
    if (fTasks) {fTasks->Delete(); delete fTasks;}
-   if (fTopTasks) delete fTopTasks;
-   if (fZombies) delete fZombies;
+   delete fTopTasks;
+   delete fZombies;
    if (fContainers) {fContainers->Delete(); delete fContainers;}
-   if (fInputs) delete fInputs;
-   if (fOutputs) delete fOutputs;
-   if (fParamCont) delete fParamCont;
-   if (fDebugOptions) delete fDebugOptions;
-   if (fGridHandler) delete fGridHandler;
-   if (fInputEventHandler) delete fInputEventHandler;
-   if (fOutputEventHandler) delete fOutputEventHandler;
-   if (fMCtruthEventHandler) delete fMCtruthEventHandler;
-   if (fEventPool) delete fEventPool;
+   delete fInputs;
+   delete fOutputs;
+   delete fParamCont;
+   delete fExchangeCont;
+   delete fDebugOptions;
+   delete fGridHandler;
+   delete fInputEventHandler;
+   delete fOutputEventHandler;
+   delete fMCtruthEventHandler;
+   delete fEventPool;
    if (fgAnalysisManager==this) fgAnalysisManager = NULL;
    if (fGlobals) {fGlobals->DeleteAll(); delete fGlobals;}
    if (fFileDescriptors) {fFileDescriptors->Delete(); delete fFileDescriptors;}
@@ -1441,6 +1447,9 @@ AliAnalysisDataContainer *AliAnalysisManager::CreateContainer(const char *name,
          }   
          break;
       case kExchangeContainer:
+         cont->SetExchange(kTRUE);
+         fExchangeCont->Add(cont);
+         cont->SetDataOwned(kFALSE); // data owned by the publisher
          break;   
    }
    return cont;
@@ -1696,10 +1705,17 @@ void AliAnalysisManager::PrintStatus(Option_t *option) const
    Bool_t getsysInfo = ((fNSysInfo>0) && (fMode==kLocalAnalysis))?kTRUE:kFALSE;
    if (getsysInfo)
       Info("PrintStatus", "System information will be collected each %lld events", fNSysInfo);
-   TIter next(fTopTasks);
+   AliAnalysisDataContainer *cont = fCommonInput;
+   if (!cont) cont = (AliAnalysisDataContainer*)fInputs->At(0);
+   printf("=== TOP CONTAINER:\n");
+   cont->PrintContainer(option,0);
+   // Reset "touched" flag
+   TIter next(fContainers);
+   while ((cont = (AliAnalysisDataContainer*)next())) cont->SetTouched(kFALSE);
+   TIter nextt(fTasks);
    AliAnalysisTask *task;
-   while ((task=(AliAnalysisTask*)next()))
-      task->PrintTask(option);
+   while ((task=(AliAnalysisTask*)nextt()))
+      task->SetActive(kFALSE);
   
    if (!fAutoBranchHandling && !fRequestedBranches.IsNull()) 
       printf("Requested input branches:\n%s\n", fRequestedBranches.Data());
@@ -1722,7 +1738,13 @@ void AliAnalysisManager::PrintStatus(Option_t *option) const
 void AliAnalysisManager::ResetAnalysis()
 {
 // Reset all execution flags and clean containers.
-   CleanContainers();
+   TIter nextTask(fTasks);
+   AliAnalysisTask *task;
+   while ((task=(AliAnalysisTask*)nextTask())) {
+      // Clean all tasks
+      task->Reset();
+   }         
+//   CleanContainers();
 }
 
 //______________________________________________________________________________
@@ -2250,13 +2272,15 @@ void AliAnalysisManager::ExecAnalysis(Option_t *option)
    }
    fNcalls++;
    AliAnalysisTask *task;
+   // Reset the analysis
+   ResetAnalysis();
    // Check if the top tree is active.
    if (fTree) {
       if (getsysInfo && ((fNcalls%fNSysInfo)==0)) 
          AliSysInfo::AddStamp("Handlers_BeginEventGroup",fNcalls, 1002, 0);
       TIter next(fTasks);
-   // De-activate all tasks
-      while ((task=(AliAnalysisTask*)next())) task->SetActive(kFALSE);
+   // De-activate all tasks (not needed anymore after ResetAnalysis
+//      while ((task=(AliAnalysisTask*)next())) task->SetActive(kFALSE);
       AliAnalysisDataContainer *cont = fCommonInput;
       if (!cont) cont = (AliAnalysisDataContainer*)fInputs->At(0);
       if (!cont) {
@@ -2264,7 +2288,7 @@ void AliAnalysisManager::ExecAnalysis(Option_t *option)
          if (cdir) cdir->cd();
          return;
       }   
-      cont->SetData(fTree); // This will notify all consumers
+      cont->SetData(fTree); // This set activity for all tasks reading only from the top container
       Long64_t entry = fTree->GetTree()->GetReadEntry();      
 //
 //    Call BeginEvent() for optional input/output and MC services 
@@ -2283,6 +2307,7 @@ void AliAnalysisManager::ExecAnalysis(Option_t *option)
       TIter next1(fTopTasks);
       Int_t itask = 0;
       while ((task=(AliAnalysisTask*)next1())) {
+         task->SetActive(kTRUE);
          if (fDebug >1) {
             cout << "    Executing task " << task->GetName() << endl;
          }
index b3ab9eb..941388a 100644 (file)
@@ -123,6 +123,7 @@ enum EAliAnalysisFlags {
    AliVEventHandler*   GetOutputEventHandler() const  {return fOutputEventHandler;}
    TObjArray          *GetOutputs() const         {return fOutputs;}
    TObjArray          *GetParamOutputs() const    {return fParamCont;}
+   TObjArray          *GetExchangeContainers() const {return fExchangeCont;}
    Int_t               GetRunFromPath() const     {return fRunFromPath;}
    const char         *GetRequestedBranches() const {return fRequestedBranches.Data();}
    TObjArray          *GetTasks() const           {return fTasks;}
@@ -257,7 +258,8 @@ private:
    TObjArray              *fContainers;          // List of all containers
    TObjArray              *fInputs;              // List of containers with input data
    TObjArray              *fOutputs;             // List of containers with results
-   TObjArray              *fParamCont;           // List of containers with results
+   TObjArray              *fParamCont;           // List of containers with parameters
+   TObjArray              *fExchangeCont;        // List of exchange containers
    TObjArray              *fDebugOptions;        // List of debug options
    TObjArray              *fFileDescriptors;     //! List of file descriptors
    AliAnalysisFileDescriptor *fCurrentDescriptor; //! Current file descriptor
@@ -288,6 +290,6 @@ private:
    static TString          fgCommonFileName;     //! Common output file name (not streamed)
    static TString          fgMacroNames;         //! Loaded macro names
    static AliAnalysisManager *fgAnalysisManager; //! static pointer to object instance
-   ClassDef(AliAnalysisManager,19)  // Analysis manager class
+   ClassDef(AliAnalysisManager,20)  // Analysis manager class
 };   
 #endif
index 09c922c..a49b1d8 100644 (file)
@@ -265,7 +265,8 @@ Bool_t AliAnalysisTask::CheckPostData() const
    AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
    for (Int_t islot=0; islot<fNoutputs; islot++) {
       coutput = GetOutputSlot(islot)->GetContainer();
-      if (!mgr->GetOutputs()->FindObject(coutput) || coutput==mgr->GetCommonOutputContainer()) continue;
+      if (!coutput) continue;
+      if (coutput->IsExchange() || !mgr->GetOutputs()->FindObject(coutput) || coutput==mgr->GetCommonOutputContainer()) continue;
       if (!coutput->GetData()) {
          Error("CheckPostData", "Data not posted for slot #%d of task %s (%s)", 
                islot, GetName(), ClassName());
@@ -575,6 +576,21 @@ void AliAnalysisTask::SetUsed(Bool_t flag)
 }   
 
 //______________________________________________________________________________
+void AliAnalysisTask::Reset()
+{
+// Clear activity flag. Reset data for exchange containers.
+   fActive = kFALSE;
+   fHasExecuted = kFALSE;
+   // Call PostData(islot, 0) for all slots connected to exchange containers
+   Int_t islot;
+   AliAnalysisDataContainer *cont;
+   for (islot=0; islot<fNinputs; islot++) {
+      cont = GetInputSlot(islot)->GetContainer();
+      if (cont && cont->IsExchange()) cont->Reset();
+   }   
+}
+   
+//______________________________________________________________________________
 Bool_t AliAnalysisTask::CheckCircularDeps()
 {
 // Check for illegal circular dependencies, e.g. a daughter task should not have
@@ -591,17 +607,34 @@ Bool_t AliAnalysisTask::CheckCircularDeps()
    SetChecked(kFALSE);
    return kFALSE;
 }   
-   
+
+//______________________________________________________________________________
+Bool_t AliAnalysisTask::ProducersTouched() const
+{
+// Check if all producer containers are in the "touched" state.
+   Int_t islot;
+   AliAnalysisDataContainer *cont;
+   for (islot=0; islot<fNinputs; islot++) {
+      cont = GetInputSlot(islot)->GetContainer();
+      // Simulate the data flow so the tasks are printed only when all inputs
+      // are touched
+      if (cont && !cont->IsTouched()) return kFALSE;
+   }
+   return kTRUE;
+}
+
 //______________________________________________________________________________
 void AliAnalysisTask::PrintTask(Option_t *option, Int_t indent) const
 {
 // Print task info.
+   Int_t islot;
+   AliAnalysisDataContainer *cont;
+   if (fActive) return;
+   if (!ProducersTouched()) return;
    TString opt(option);
    opt.ToLower();
    Bool_t dep = (opt.Contains("dep"))?kTRUE:kFALSE;
    TString ind;
-   Int_t islot;
-   AliAnalysisDataContainer *cont;
    for (Int_t i=0; i<indent; i++) ind += " ";
    if (!dep || (dep && IsChecked())) {
       printf("______________________________________________________________________________\n");
@@ -622,8 +655,9 @@ void AliAnalysisTask::PrintTask(Option_t *option, Int_t indent) const
          }            
       }
    }
+   ((AliAnalysisTask*)this)->SetActive(kTRUE);
    PrintContainers(option, indent+3);
-   if (!fBranchNames.IsNull()) printf("Requested branches:   %s\n", fBranchNames.Data());
+   if (!fBranchNames.IsNull()) printf("%sRequested branches:   %s\n", ind.Data(), fBranchNames.Data());
 }      
 
 //______________________________________________________________________________
index cc3b89b..5f41560 100644 (file)
@@ -107,6 +107,8 @@ public:
   Bool_t                    CheckCircularDeps();
   virtual Bool_t            CheckPostData() const;
   virtual Bool_t            CheckOwnership() const;
+  // Reset task
+  virtual void              Reset();
   // Getters
   void                      GetBranches(const char *type, TString &result) const;
   Int_t                     GetNinputs() const  {return fNinputs;}
@@ -129,6 +131,7 @@ public:
   Bool_t                    HasBranches() const {return !fBranchNames.IsNull();}
   virtual void                      PrintTask(Option_t *option="all", Int_t indent=0) const;
   void                      PrintContainers(Option_t *option="all", Int_t indent=0) const;
+  Bool_t                    ProducersTouched() const;
   void                      SetBranches(const char *names) {fBranchNames = names;}
   void                      SetChecked(Bool_t flag=kTRUE) {TObject::SetBit(kTaskChecked,flag);}
   void                      SetPostEventLoop(Bool_t flag=kTRUE);
diff --git a/ANALYSIS/examples/AddTaskConsumer.C b/ANALYSIS/examples/AddTaskConsumer.C
new file mode 100644 (file)
index 0000000..0e43ea8
--- /dev/null
@@ -0,0 +1,40 @@
+TaskConsumer *AddTaskConsumer(const char *name, const char *prodname1, const char *prodname2)
+{
+// Provide as input the name of the consumer task and the name of the 
+// producer task
+       // pointer to the analysis manager
+       AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
+       if (!mgr) {
+               ::Error("AddTaskConsumer", "No analysis manager to connect to.");
+               return NULL;
+       }  
+       // create the task
+   TaskConsumer *task = new TaskConsumer(name);
+   mgr->AddTask(task);
+
+       // connecting the input/output containers
+   TString outfile = mgr->GetCommonFileName();
+   // input data feed
+       AliAnalysisDataContainer *cinput0  = mgr->GetCommonInputContainer();
+       mgr->ConnectInput (task, 0, cinput0 );
+   // producer task
+   TaskProducer *prod1 = mgr->GetTask(prodname1);
+   TaskProducer *prod2 = mgr->GetTask(prodname2);
+   if (!prod1 || !prod2) {
+      ::Error("AddTaskConsumer", "Producer task %s or %s not found in the analysis manager", 
+              prodname1, prodname2);
+      return 0;
+   }
+   // Connect to exchange container
+   AliAnalysisDataContainer *cinput1 = prod1->GetOutputSlot(2)->GetContainer();
+   mgr->ConnectInput(task, 1, cinput1);
+   AliAnalysisDataContainer *cinput2 = prod2->GetOutputSlot(2)->GetContainer();
+   mgr->ConnectInput(task, 2, cinput2);
+   
+       AliAnalysisDataContainer *coutput1  = mgr->CreateContainer(
+                TString::Format("output_%s", name),
+                TList::Class(), AliAnalysisManager::kOutputContainer,
+                TString::Format("%s:output",outfile.Data()));
+       mgr->ConnectOutput(task, 1, coutput1);
+   return task;
+}
diff --git a/ANALYSIS/examples/AddTaskProducer.C b/ANALYSIS/examples/AddTaskProducer.C
new file mode 100644 (file)
index 0000000..082e33c
--- /dev/null
@@ -0,0 +1,32 @@
+TaskProducer *AddTaskProducer(const char *name)
+{
+       // pointer to the analysis manager
+       AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
+       if (!mgr) {
+               Error("AddTaskProducer", "No analysis manager to connect to.");
+               return NULL;
+       }  
+       // create the task
+   TaskProducer *task = new TaskProducer(name);
+   mgr->AddTask(task);
+
+       // connecting the input/output containers
+   TString outfile = mgr->GetCommonFileName();
+   // input data feed
+       AliAnalysisDataContainer *cinput0  = mgr->GetCommonInputContainer();
+       mgr->ConnectInput (task, 0, cinput0 );
+   // producer output
+       AliAnalysisDataContainer *coutput1  = mgr->CreateContainer(
+                TString::Format("output_%s", name),
+                TList::Class(), AliAnalysisManager::kOutputContainer,
+                TString::Format("%s:output",outfile.Data()));
+       mgr->ConnectOutput(task, 1, coutput1);
+   // exchange output
+       AliAnalysisDataContainer *coutput2  = mgr->CreateContainer(
+                TString::Format("exchange_%s", name),
+                TObjArray::Class(), AliAnalysisManager::kExchangeContainer);
+   mgr->ConnectOutput(task, 2, coutput2);
+   return task;
+}
+     
+   
diff --git a/ANALYSIS/examples/TaskExchange.cxx b/ANALYSIS/examples/TaskExchange.cxx
new file mode 100644 (file)
index 0000000..6da6f07
--- /dev/null
@@ -0,0 +1,192 @@
+/**************************************************************************
+ * Copyright(c) 1998-1999, ALICE Experiment at CERN, All rights reserved. *
+ *                                                                        *
+ * Author: The ALICE Off-line Project.                                    *
+ * Contributors are mentioned in the code where appropriate.              *
+ *                                                                        *
+ * Permission to use, copy, modify and distribute this software and its   *
+ * documentation strictly for non-commercial purposes is hereby granted   *
+ * without fee, provided that the above copyright notice appears in all   *
+ * copies and that both the copyright notice and this permission notice   *
+ * appear in the supporting documentation. The authors make no claims     *
+ * about the suitability of this software for any purpose. It is          *
+ * provided "as is" without express or implied warranty.                  *
+ **************************************************************************/
+
+//______________________________________________________________________________
+// Producer task that exchanges data with other task using exchange containers
+//______________________________________________________________________________
+// 
+#include "TaskExchange.h"
+
+#include "AliAnalysisManager.h"
+#include "TList.h"
+#include "TObjArray.h"
+
+ClassImp(TaskProducer)
+
+//________________________________________________________________________
+TaskProducer::TaskProducer() // All data members should be initialised here
+   :AliAnalysisTaskSE(),
+    fOutput(0),
+    fExchangedData(0)
+{
+    // Dummy constructor ALWAYS needed for I/O.
+}
+
+//________________________________________________________________________
+TaskProducer::TaskProducer(const char *name) // All data members should be initialised here
+   :AliAnalysisTaskSE(name),
+    fOutput(0),
+    fExchangedData(0)
+{
+   // Constructor
+   // Define input and output slots here (never in the dummy constructor)
+   // Input slot #0 works with a TChain - it is connected to the default input container
+
+   // Output slot #1 used to publish the task private data
+   DefineOutput(1, TList::Class());
+    
+   // Output slot #2 used to publish the exchanged data
+   DefineOutput(2, TObjArray::Class()); // Type can be anything else
+}
+
+//________________________________________________________________________
+TaskProducer::~TaskProducer()
+{
+   // Destructor. Clean-up the output list, but not the histograms that are put inside
+   // (the list is owner and will clean-up these histograms). Protect in PROOF case.
+   if (fOutput && !AliAnalysisManager::GetAnalysisManager()->IsProofMode()) {
+      delete fOutput;
+   }
+   delete fExchangedData;
+}
+
+//________________________________________________________________________
+void TaskProducer::UserCreateOutputObjects()
+{
+   // Create histograms
+   // Called once (on the worker node)
+        
+   fOutput = new TList();
+   fOutput->SetOwner();  // IMPORTANT!
+    
+//    fOutput->Add(fHistPt);
+//    fOutput->Add(fHistEta);
+   // NEW HISTO added to fOutput here
+
+   // Data to be exchanged can be initialized here, but updated in UserExec
+   fExchangedData = new TObjArray();
+   fExchangedData->SetOwner();
+   PostData(1, fOutput); // Post data for ALL output slots >0 here, to get at least an empty histogram
+// Do NOT post data for the exchange containers !
+//   PostData(2, fExchangedData);
+}
+
+//________________________________________________________________________
+void TaskProducer::UserExec(Option_t *) 
+{
+    // Main loop
+    // Called for each event
+
+    //Here we fill the private output
+    // ...
+    PostData(1, fOutput);
+    
+    // And publish the exchanged data
+    fExchangedData->Delete();
+    TNamed *data = new TNamed(Form("<data by %s, ev. %d>", GetName(), fEntry), "");
+    fExchangedData->Add(data);
+    PostData(2, fExchangedData);
+    Printf("    -- %s published: %s", GetName(), data->GetName());
+}
+
+//______________________________________________________________________________
+// Consumer task reading data produced by other task
+//______________________________________________________________________________
+// 
+ClassImp(TaskConsumer)
+
+//________________________________________________________________________
+TaskConsumer::TaskConsumer() // All data members should be initialised here
+   :AliAnalysisTaskSE(),
+    fOutput(0),
+    fImported1(0),
+    fImported2(0)
+{
+    // Dummy constructor ALWAYS needed for I/O.
+}
+
+//________________________________________________________________________
+TaskConsumer::TaskConsumer(const char *name) // All data members should be initialised here
+   :AliAnalysisTaskSE(name),
+    fOutput(0),
+    fImported1(0),
+    fImported2(0)
+{
+   // Constructor
+   // Define input and output slots here (never in the dummy constructor)
+   // Input slot #0 works with a TChain - it is connected to the default input container
+   
+   // Input slot #1 reads from an exchange container
+   DefineInput(1, TObjArray::Class());
+   // Input slot #2 reads from an exchange container
+   DefineInput(2, TObjArray::Class());
+
+   // Output slot #0 is reserved
+   // Output slot #1 used to publish the task private data
+   DefineOutput(1, TList::Class());
+}
+
+//________________________________________________________________________
+TaskConsumer::~TaskConsumer()
+{
+   // Destructor. Clean-up the output list, but not the histograms that are put inside
+   // (the list is owner and will clean-up these histograms). Protect in PROOF case.
+   if (fOutput && !AliAnalysisManager::GetAnalysisManager()->IsProofMode()) {
+      delete fOutput;
+   }
+   // Do not delete the imported data - we are not owners
+}
+
+//________________________________________________________________________
+void TaskConsumer::UserCreateOutputObjects()
+{
+   // Create histograms
+   // Called once (on the worker node)
+        
+   fOutput = new TList();
+   fOutput->SetOwner();  // IMPORTANT!
+    
+//    fOutput->Add(fHistPt);
+//    fOutput->Add(fHistEta);
+   // NEW HISTO added to fOutput here
+
+   PostData(1, fOutput); // Post data for ALL output slots >0 here, to get at least an empty histogram
+}
+
+//________________________________________________________________________
+void TaskConsumer::UserExec(Option_t *) 
+{
+   // Main loop
+   // Called for each event
+   // This is how we get the actual exchange data
+   TObjArray *exchange1 = (TObjArray*)GetInputData(1);
+   if (!exchange1) {
+      Error("UserExec", "Task %s could not read the exchanged data for slot 1", GetName());
+      return;
+   }
+   fImported1 = (TNamed*)exchange1->At(0);
+   TObjArray *exchange2 = (TObjArray*)GetInputData(1);
+   if (!exchange2) {
+      Error("UserExec", "Task %s could not read the exchanged data for slot 2", GetName());
+      return;
+   }
+   fImported2 = (TNamed*)exchange2->At(0);
+   Printf("    -- imported: %s and %s", fImported1->GetName(), fImported2->GetName());
+
+   //Here we fill the private output
+   // ...
+   PostData(1, fOutput);    
+}
diff --git a/ANALYSIS/examples/TaskExchange.h b/ANALYSIS/examples/TaskExchange.h
new file mode 100644 (file)
index 0000000..e7b4ec3
--- /dev/null
@@ -0,0 +1,66 @@
+/* Copyright(c) 1998-1999, ALICE Experiment at CERN, All rights reserved. *
+ * See cxx source for full Copyright notice                               */
+
+//______________________________________________________________________________
+//
+// Task that produces some generic data to be exchanged with some consumers
+//
+//______________________________________________________________________________
+
+#ifndef TASKEXCHANGE_H
+#define TASKEXCHANGE_H
+
+#ifndef ALIANALYSISTASKSE_H
+#include "AliAnalysisTaskSE.h"
+#endif
+
+class TList;
+class TObjArray;
+
+class TaskProducer : public AliAnalysisTaskSE {
+ public:
+    TaskProducer();
+    TaskProducer(const char *name);
+    virtual ~TaskProducer();
+    
+    virtual void     UserCreateOutputObjects();
+    virtual void     UserExec(Option_t *option);
+    
+ private:
+    TList           *fOutput;        //! Output list (do not stream)
+    TObjArray       *fExchangedData; //! Arbitrary data (do not stream)
+    
+    TaskProducer(const TaskProducer&); // not implemented
+    TaskProducer& operator=(const TaskProducer&); // not implemented
+    
+    ClassDef(TaskProducer, 1); // example of producer
+};
+
+//______________________________________________________________________________
+//
+// Task that uses generic data produced by other tasks
+//
+//______________________________________________________________________________
+
+class TaskConsumer : public AliAnalysisTaskSE {
+ public:
+    TaskConsumer();
+    TaskConsumer(const char *name);
+    virtual ~TaskConsumer();
+    
+    virtual void     UserCreateOutputObjects();
+    virtual void     UserExec(Option_t *option);
+    
+ private:
+    TList           *fOutput;        //! Output list (do not stream)
+    TNamed          *fImported1;     //! Arbitrary data (do not stream)
+    TNamed          *fImported2;     //! Arbitrary data (do not stream)
+    
+    TaskConsumer(const TaskConsumer&); // not implemented
+    TaskConsumer& operator=(const TaskConsumer&); // not implemented
+    
+    ClassDef(TaskConsumer, 1); // example of consumer task
+};
+
+#endif
+
diff --git a/ANALYSIS/examples/runEx02.C b/ANALYSIS/examples/runEx02.C
new file mode 100644 (file)
index 0000000..a14092f
--- /dev/null
@@ -0,0 +1,72 @@
+// run.C
+//
+// Template run macro for producer/consumer tasks
+//
+// Author: Andrei Gheata
+//
+void CreateAlienHandler();
+
+//______________________________________________________________________________
+void runEx02()
+{
+    // load libraries
+    gSystem->Load("libCore.so");        
+    gSystem->Load("libGeom.so");
+    gSystem->Load("libVMC.so");
+    gSystem->Load("libPhysics.so");
+    gSystem->Load("libTree.so");
+    gSystem->Load("libSTEERBase.so");
+    gSystem->Load("libESD.so");
+    gSystem->Load("libAOD.so");
+    gSystem->Load("libANALYSIS.so");
+    gSystem->Load("libOADB.so");
+    gSystem->Load("libANALYSISalice.so");
+  
+    // add aliroot indlude path
+    gROOT->ProcessLine(Form(".include %s/include",gSystem->ExpandPathName("$ALICE_ROOT")));
+    gROOT->SetStyle("Plain");
+        
+    // analysis manager
+    AliAnalysisManager* mgr = new AliAnalysisManager("example for using producer/consumer tasks");
+    mgr->SetCommonFileName("output.root");
+
+    // create the alien handler and attach it to the manager
+    AliAnalysisGrid *plugin = CreateAlienHandler(); 
+    mgr->SetGridHandler(plugin);
+    
+    // create the alien handler and attach it to the manager
+    
+    AliVEventHandler* iH = new AliESDInputHandler();
+    mgr->SetInputEventHandler(iH);        
+                
+    // create 2 producer tasks
+    gROOT->LoadMacro("TaskExchange.cxx+g");
+    gROOT->LoadMacro("AddTaskProducer.C");
+    TaskProducer *prod1 = AddTaskProducer("producer1");
+    TaskProducer *prod2 = AddTaskProducer("producer2");
+    if (!prod1 || !prod2) return;
+    
+    // Create 1 consumer task
+    gROOT->LoadMacro("AddTaskConsumer.C");
+    TaskConsumer *task = AddTaskConsumer("consumer", "producer1", "producer2");
+        
+    // enable debug printouts
+    mgr->SetDebugLevel(2);
+    if (!mgr->InitAnalysis()) return;
+    mgr->PrintStatus();
+  
+    // start analysis
+    Printf("Starting analysis example for producer/consumers...");
+    mgr->StartAnalysis("local",3);
+}
+
+//______________________________________________________________________________
+AliAnalysisGrid* CreateAlienHandler()
+{
+    AliAnalysisAlien *plugin = new AliAnalysisAlien();
+    // Set the run mode (can be "full", "test", "offline", "submit" or "terminate")
+    plugin->SetRunMode("test");
+    // Plugin test mode works only providing a file containing test file locations, used in "local" mode also
+    plugin->SetFileForTestMode("files.txt"); // file should contain path name to a local directory containg *ESDs.root etc
+    return plugin;
+}