Added SetNworkersPerSlave() and SetProofConnectGrid() methods for the proof mode...
authoragheata <agheata@f7af4fe6-9843-0410-8265-dc069ae4e863>
Wed, 15 Sep 2010 09:57:16 +0000 (09:57 +0000)
committeragheata <agheata@f7af4fe6-9843-0410-8265-dc069ae4e863>
Wed, 15 Sep 2010 09:57:16 +0000 (09:57 +0000)
SetFileForTestMode in the plugin.

ANALYSIS/AliAnalysisAlien.cxx
ANALYSIS/AliAnalysisAlien.h
ANALYSIS/AliAnalysisGrid.h
ANALYSIS/AliAnalysisManager.cxx
ANALYSIS/AliAnalysisManager.h

index 825d31e..6003707 100644 (file)
@@ -66,6 +66,7 @@ AliAnalysisAlien::AliAnalysisAlien()
                   fOverwriteMode(1),
                   fNreplicas(2),
                   fNproofWorkers(0),
+                  fNproofWorkersPerSlave(0),
                   fProofReset(0),
                   fRunNumbers(),
                   fExecutable(),
@@ -131,6 +132,7 @@ AliAnalysisAlien::AliAnalysisAlien(const char *name)
                   fOverwriteMode(1),
                   fNreplicas(2),
                   fNproofWorkers(0),
+                  fNproofWorkersPerSlave(0),
                   fProofReset(0),
                   fRunNumbers(),
                   fExecutable(),
@@ -196,6 +198,7 @@ AliAnalysisAlien::AliAnalysisAlien(const AliAnalysisAlien& other)
                   fOverwriteMode(other.fOverwriteMode),
                   fNreplicas(other.fNreplicas),
                   fNproofWorkers(other.fNproofWorkers),
+                  fNproofWorkersPerSlave(other.fNproofWorkersPerSlave),
                   fProofReset(other.fProofReset),
                   fRunNumbers(other.fRunNumbers),
                   fExecutable(other.fExecutable),
@@ -291,6 +294,7 @@ AliAnalysisAlien &AliAnalysisAlien::operator=(const AliAnalysisAlien& other)
       fOverwriteMode           = other.fOverwriteMode;
       fNreplicas               = other.fNreplicas;
       fNproofWorkers           = other.fNproofWorkers;
+      fNproofWorkersPerSlave   = other.fNproofWorkersPerSlave;
       fProofReset              = other.fProofReset;
       fRunNumbers              = other.fRunNumbers;
       fExecutable              = other.fExecutable;
@@ -2192,9 +2196,12 @@ Bool_t AliAnalysisAlien::StartAnalysis(Long64_t /*nentries*/, Long64_t /*firstEn
       }
       // Connect to PROOF and check the status
       Long_t proof = 0;
+      TString sworkers;
+      if (fNproofWorkersPerSlave) sworkers = Form("workers=%dx", fNproofWorkersPerSlave);
+      else if (fNproofWorkers) sworkers = Form("workers=%d", fNproofWorkers);
       if (!testMode) {
-         if (fNproofWorkers) 
-            proof = gROOT->ProcessLine(Form("TProof::Open(\"%s\", \"workers=%d\");", fProofCluster.Data(), fNproofWorkers));
+         if (!sworkers.IsNull()) 
+            proof = gROOT->ProcessLine(Form("TProof::Open(\"%s\", \"%s\");", fProofCluster.Data(), sworkers.Data()));
          else   
             proof = gROOT->ProcessLine(Form("TProof::Open(\"%s\");", fProofCluster.Data()));
       } else {
@@ -2208,6 +2215,8 @@ Bool_t AliAnalysisAlien::StartAnalysis(Long64_t /*nentries*/, Long64_t /*firstEn
          Error("StartAnalysis", "Could not connect to PROOF cluster <%s>", fProofCluster.Data());
          return kFALSE;
       }   
+      if (fNproofWorkersPerSlave*fNproofWorkers > 0)
+         gROOT->ProcessLine(Form("gProof->SetParallel(%d);", fNproofWorkers));
       // Is dataset existing ?
       if (!testMode) {
          TString dataset = fProofDataSet;
@@ -2258,6 +2267,9 @@ Bool_t AliAnalysisAlien::StartAnalysis(Long64_t /*nentries*/, Long64_t /*firstEn
             Info("StartAnalysis", "Adding extra includes: %s",includePath.Data()); 
             optionsList.Add(new TNamed("ALIROOT_EXTRA_INCLUDES",includePath.Data()));
          }
+         // Check if connection to grid is requested
+         if (TestSpecialBit(kProofConnectGrid)) 
+            optionsList.Add(new TNamed("ALIROOT_ENABLE_ALIEN", "1"));
          // Enable AliRoot par
          if (testMode) {
          // Enable proof lite package
@@ -2279,7 +2291,7 @@ Bool_t AliAnalysisAlien::StartAnalysis(Long64_t /*nentries*/, Long64_t /*firstEn
                Error("StartAnalysis", "There was an error trying to enable package VO_ALICE@AliRoot::%s", fAliROOTVersion.Data());
                return kFALSE;
             }         
-         }      
+         }
       } else {
          if (fAdditionalLibs.Contains(".so") && !testMode) {
             Error("StartAnalysis", "You request additional libs to be loaded but did not enabled any AliRoot mode. Please refer to: \
@@ -2293,7 +2305,7 @@ Bool_t AliAnalysisAlien::StartAnalysis(Long64_t /*nentries*/, Long64_t /*firstEn
          TObject *package;
          while ((package=next())) {
             if (gROOT->ProcessLine(Form("gProof->UploadPackage(\"%s\");", package->GetName()))) {
-               if (gROOT->ProcessLine(Form("gProof->EnablePackage(\"%s\");", package->GetName()))) {
+               if (gROOT->ProcessLine(Form("gProof->EnablePackage(\"%s\",kTRUE);", package->GetName()))) {
                   Error("StartAnalysis", "There was an error trying to enable package %s", package->GetName());
                   return kFALSE;
                }
@@ -2326,11 +2338,8 @@ Bool_t AliAnalysisAlien::StartAnalysis(Long64_t /*nentries*/, Long64_t /*firstEn
          }   
          TFileCollection *coll = new TFileCollection();
          coll->AddFromFile(fFileForTestMode);
-         gROOT->ProcessLine("if (gProof->ExistsDataSet(\"test_collection\")) gProof->RemoveDataSet(\"test_collection\");");
-         gROOT->ProcessLine(Form("gProof->RegisterDataSet(\"test_collection\", (TFileCollection*)0x%lx);", (ULong_t)coll));
-         gROOT->ProcessLine("gProof->VerifyDataSet(\"test_collection\");");
-         gROOT->ProcessLine("gProof->ShowDataSets();");
-         
+         gROOT->ProcessLine(Form("gProof->RegisterDataSet(\"test_collection\", (TFileCollection*)0x%lx, \"OV\");", (ULong_t)coll));
+         gROOT->ProcessLine("gProof->ShowDataSets()");
       }
       return kTRUE;
    }
index 759e708..46c862f 100644 (file)
@@ -118,6 +118,7 @@ public:
    virtual const char *GetProofDataSet() const                           {return fProofDataSet.Data();}
    virtual void        SetProofReset(Int_t mode)                         {fProofReset = mode;}
    virtual void        SetNproofWorkers(Int_t nworkers)                  {fNproofWorkers = nworkers;}
+   virtual void        SetNproofWorkersPerSlave(Int_t nworkers)          {fNproofWorkersPerSlave = nworkers;}
    virtual void        SetRootVersionForProof(const char *version)       {fRootVersionForProof = version;}
    virtual void        SetAliRootMode(const char *mode)                  {fAliRootMode = mode;}
    // .txt file containing the list of files to be chained in test mode
@@ -156,6 +157,7 @@ private:
    Int_t            fOverwriteMode;   // Overwrite existing files if any
    Int_t            fNreplicas;       // Number of replicas for the output files
    Int_t            fNproofWorkers;   // Number of workers in proof mode
+   Int_t            fNproofWorkersPerSlave; // Max number of workers per slave in proof mode
    Int_t            fProofReset;      // Proof reset mode: 0=no reset, 1=soft, 2=hard
    TString          fRunNumbers;      // List of runs to be processed
    TString          fExecutable;      // Executable script for AliEn job
index 56120b3..e6cb395 100644 (file)
@@ -39,7 +39,8 @@ enum EPluginBits {
    kCheckCopy  = BIT(1),
    kKeepLogs   = BIT(2),
    kClearPackages = BIT(3),
-   kUseSubmitPolicy = BIT(4)
+   kUseSubmitPolicy = BIT(4),
+   kProofConnectGrid = BIT(5)
 };
 
    AliAnalysisGrid() : TNamed(), fSpecialBits(0) {}
@@ -120,7 +121,9 @@ enum EPluginBits {
    virtual const char *GetProofDataSet() const                           = 0;
    virtual void        SetProofReset(Int_t mode)                         = 0;
    virtual void        SetClearPackages(Bool_t flag=kTRUE) {SetSpecialBit(kClearPackages,flag);}
+   virtual void        SetProofConnectGrid(Bool_t flag=kTRUE) {SetSpecialBit(kProofConnectGrid,flag);}
    virtual void        SetNproofWorkers(Int_t nworkers)                  = 0;
+   virtual void        SetNproofWorkersPerSlave(Int_t nworkers)          = 0;
    virtual void        SetRootVersionForProof(const char *version)       = 0;
    virtual void        SetAliRootMode(const char *mode)                  = 0;
    // .txt file containing the list of files to be chained in test mode
index 1abdcb5..a466507 100644 (file)
@@ -1326,6 +1326,19 @@ void AliAnalysisManager::ResetAnalysis()
 }
 
 //______________________________________________________________________________
+Long64_t AliAnalysisManager::StartAnalysis(const char *type, Long64_t nentries, Long64_t firstentry)
+{
+// Start analysis having a grid handler.
+   if (!fGridHandler) {
+      Error("StartAnalysis", "Cannot start analysis providing just the analysis type without a grid handler.");
+      Info("===", "Add an AliAnalysisAlien object as plugin for this manager and configure it.");
+      return -1;
+   }
+   TTree *tree = NULL;
+   return StartAnalysis(type, tree, nentries, firstentry);
+}
+
+//______________________________________________________________________________
 Long64_t AliAnalysisManager::StartAnalysis(const char *type, TTree * const tree, Long64_t nentries, Long64_t firstentry)
 {
 // Start analysis for this manager. Analysis task can be: LOCAL, PROOF, GRID or
@@ -1397,7 +1410,7 @@ Long64_t AliAnalysisManager::StartAnalysis(const char *type, TTree * const tree,
    TString line;
    SetEventLoop(kFALSE);
    // Enable event loop mode if a tree was provided
-   if (tree || fMode==kMixingAnalysis) SetEventLoop(kTRUE);
+   if (tree || fGridHandler || fMode==kMixingAnalysis) SetEventLoop(kTRUE);
 
    TChain *chain = 0;
    TString ttype = "TTree";
@@ -1426,7 +1439,7 @@ Long64_t AliAnalysisManager::StartAnalysis(const char *type, TTree * const tree,
    
    switch (fMode) {
       case kLocalAnalysis:
-         if (!tree) {
+         if (!tree && !fGridHandler) {
             TIter nextT(fTasks);
             // Call CreateOutputObjects for all tasks
             Int_t itask = 0;
@@ -1448,9 +1461,27 @@ Long64_t AliAnalysisManager::StartAnalysis(const char *type, TTree * const tree,
             Terminate();
             return 0;
          } 
+         fSelector = new AliAnalysisSelector(this);
+         // Check if a plugin handler is used
+         if (fGridHandler) {
+            // Get the chain from the plugin
+            TString dataType = "esdTree";
+            if (fInputEventHandler) {
+               dataType = fInputEventHandler->GetDataType();
+               dataType.ToLower();
+               dataType += "Tree";
+            }   
+            chain = fGridHandler->GetChainForTestMode(dataType);
+            if (!chain) {
+               Error("StartAnalysis", "No chain for test mode. Aborting.");
+               return -1;
+            }
+            cout << "===== RUNNING LOCAL ANALYSIS" << GetName() << " ON CHAIN " << chain->GetName() << endl;
+            retv = chain->Process(fSelector, "", nentries, firstentry);
+            break;
+         }
          // Run tree-based analysis via AliAnalysisSelector  
          cout << "===== RUNNING LOCAL ANALYSIS " << GetName() << " ON TREE " << tree->GetName() << endl;
-         fSelector = new AliAnalysisSelector(this);
          retv = tree->Process(fSelector, "", nentries, firstentry);
          break;
       case kProofAnalysis:
@@ -1477,8 +1508,39 @@ Long64_t AliAnalysisManager::StartAnalysis(const char *type, TTree * const tree,
          }      
          break;
       case kGridAnalysis:
-         Warning("StartAnalysis", "GRID analysis mode not implemented. Running local.");
-         break;
+         fIsRemote = kTRUE;
+         if (!anaType.Contains("terminate")) {
+            if (!fGridHandler) {
+               Error("StartAnalysis", "Cannot start grid analysis without a grid handler.");
+               Info("===", "Add an AliAnalysisAlien object as plugin for this manager and configure it.");
+               cdir->cd();
+               return -1;
+            }
+            // Write analysis manager in the analysis file
+            cout << "===== RUNNING GRID ANALYSIS: " << GetName() << endl;
+            // Start the analysis via the handler
+            if (!fGridHandler->StartAnalysis(nentries, firstentry)) {
+               Info("StartAnalysis", "Grid analysis was stopped and cannot be terminated");
+               cdir->cd();
+               return -1;
+            }   
+
+            // Terminate grid analysis
+            if (fSelector && fSelector->GetStatus() == -1) {cdir->cd(); return -1;}
+            if (fGridHandler->GetRunMode() == AliAnalysisGrid::kOffline) {cdir->cd(); return 0;}
+            cout << "===== MERGING OUTPUTS REGISTERED BY YOUR ANALYSIS JOB: " << GetName() << endl;
+            if (!fGridHandler->MergeOutputs()) {
+               // Return if outputs could not be merged or if it alien handler
+               // was configured for offline mode or local testing.
+               cdir->cd();
+               return 0;
+            }
+         }   
+         cout << "===== TERMINATING GRID ANALYSIS JOB: " << GetName() << endl;
+         ImportWrappers(NULL);
+         Terminate();
+         cdir->cd();
+         return 0;
       case kMixingAnalysis:   
          // Run event mixing analysis
          if (!fEventPool) {
@@ -1540,21 +1602,6 @@ Long64_t AliAnalysisManager::StartAnalysis(const char *type, const char *dataset
       // Check if the plugin is in test mode
       if (fGridHandler->GetRunMode() == AliAnalysisGrid::kTest) {
          dataset = "test_collection";
-         // Get the chain to be used for test mode
-/*
-         TString dataType = "esdTree";
-         if (fInputEventHandler) {
-            dataType = fInputEventHandler->GetDataType();
-            dataType.ToLower();
-            dataType += "Tree";
-         }   
-         chain = fGridHandler->GetChainForTestMode(dataType);
-         if (!chain) {
-            Error("StartAnalysis", "No chain for test mode. Aborting.");
-            return -1;
-         }
-         fTree = chain;
-*/         
       } else {
          dataset = fGridHandler->GetProofDataSet();
       }
index ede0513..8cf3847 100644 (file)
@@ -76,8 +76,9 @@ enum EAliAnalysisFlags {
    static TFile       *OpenFile(AliAnalysisDataContainer *cont, const char *option, Bool_t ignoreProof=kFALSE);
    void                PackOutput(TList *target);
    void                RegisterExtraFile(const char *fname);
-   Long64_t            StartAnalysis(const char *type="local", TTree * const tree=0, Long64_t nentries=1234567890, Long64_t firstentry=0);
+   Long64_t            StartAnalysis(const char *type, TTree * const tree, Long64_t nentries=1234567890, Long64_t firstentry=0);
    Long64_t            StartAnalysis(const char *type, const char *dataset, Long64_t nentries=1234567890, Long64_t firstentry=0);
+   Long64_t            StartAnalysis(const char *type, Long64_t nentries=1234567890, Long64_t firstentry=0);
    virtual void        SlaveBegin(TTree *tree);
    virtual void        Terminate();
    void                UnpackOutput(TList *source);