fProofCluster(),
fProofDataSet(),
fFileForTestMode(),
- fRootVersionForProof(),
fAliRootMode(),
fProofProcessOpt(),
fMergeDirName(),
fPackages(0),
fModules(0),
fProofParam(),
- fDropToShell(true)
+ fDropToShell(true),
+ fGridJobIDs(""),
+ fGridStages(""),
+ fFriendLibs("")
{
// Dummy ctor.
SetDefaults();
fProofCluster(),
fProofDataSet(),
fFileForTestMode(),
- fRootVersionForProof(),
fAliRootMode(),
fProofProcessOpt(),
fMergeDirName(),
fPackages(0),
fModules(0),
fProofParam(),
- fDropToShell(true)
+ fDropToShell(true),
+ fGridJobIDs(""),
+ fGridStages(""),
+ fFriendLibs("")
{
// Default ctor.
SetDefaults();
fProofCluster(other.fProofCluster),
fProofDataSet(other.fProofDataSet),
fFileForTestMode(other.fFileForTestMode),
- fRootVersionForProof(other.fRootVersionForProof),
fAliRootMode(other.fAliRootMode),
fProofProcessOpt(other.fProofProcessOpt),
fMergeDirName(other.fMergeDirName),
fPackages(0),
fModules(0),
fProofParam(),
- fDropToShell(other.fDropToShell)
+ fDropToShell(other.fDropToShell),
+ fGridJobIDs(other.fGridJobIDs),
+ fGridStages(other.fGridStages),
+ fFriendLibs(other.fFriendLibs)
{
// Copy ctor.
fGridJDL = (TGridJDL*)gROOT->ProcessLine("new TAlienJDL()");
fProofCluster = other.fProofCluster;
fProofDataSet = other.fProofDataSet;
fFileForTestMode = other.fFileForTestMode;
- fRootVersionForProof = other.fRootVersionForProof;
fAliRootMode = other.fAliRootMode;
fProofProcessOpt = other.fProofProcessOpt;
fMergeDirName = other.fMergeDirName;
fDropToShell = other.fDropToShell;
+ fGridJobIDs = other.fGridJobIDs;
+ fGridStages = other.fGridStages;
+ fFriendLibs = other.fFriendLibs;
if (other.fInputFiles) {
fInputFiles = new TObjArray();
TIter next(other.fInputFiles);
return kFALSE;
}
if (!LoadModule(mod)) return kFALSE;
+ if (!LoadFriendLibs()) return kFALSE;
} else if (!LoadModules()) return kFALSE;
AliAnalysisManager *mgr = AliAnalysisManager::GetAnalysisManager();
if (!mgr->InitAnalysis()) return kFALSE;
TString execCommand = fExecutableCommand;
SetAnalysisMacro(Form("%s.C", name));
SetExecutable(Form("%s.sh", name));
+ fOutputFiles = GetListOfFiles("outaod");
+ // Add extra files registered to the analysis manager
+ TString extra = GetListOfFiles("ext");
+ if (!extra.IsNull()) {
+ extra.ReplaceAll(".root", "*.root");
+ if (!fOutputFiles.IsNull()) fOutputFiles += ",";
+ fOutputFiles += extra;
+ }
// SetExecutableCommand("aliroot -b -q ");
SetValidationScript(Form("%s_validation.sh", name));
WriteAnalysisFile();
mod = (AliAnalysisTaskCfg*)fModules->At(imod);
if (!LoadModule(mod)) return kFALSE;
}
- return kTRUE;
+ // Load additional friend libraries
+ return LoadFriendLibs();
}
+//______________________________________________________________________________
+Bool_t AliAnalysisAlien::LoadFriendLibs() const
+{
+// Load libraries required for reading friends.
+ if (fFriendLibs.Length()) {
+ TObjArray *list = 0;
+ TString lib;
+ if (fFriendLibs.Contains(",")) list = fFriendLibs.Tokenize(",");
+ else list = fFriendLibs.Tokenize(" ");
+ for (Int_t ilib=0; ilib<list->GetEntriesFast(); ilib++) {
+ lib = list->At(ilib)->GetName();
+ lib.ReplaceAll(".so","");
+ lib.ReplaceAll(" ","");
+ if (lib.BeginsWith("lib")) lib.Remove(0, 3);
+ lib.Prepend("lib");
+ Int_t loaded = strlen(gSystem->GetLibraries(lib,"",kFALSE));
+ if (!loaded) loaded = gSystem->Load(lib);
+ if (loaded < 0) {
+ Error("LoadModules", "Cannot load library for friends %s", lib.Data());
+ return kFALSE;
+ }
+ }
+ delete list;
+ }
+ return kTRUE;
+}
+
//______________________________________________________________________________
void AliAnalysisAlien::SetRunPrefix(const char *prefix)
{
fMergingJDL->AddToInputSandbox(Form("LF:%s/%s", workdir.Data(), obj->GetName()));
}
}
+ const char *comment = "List of output files and archives";
if (fOutputArchive.Length()) {
TString outputArchive = fOutputArchive;
if (!fRegisterExcludes.IsNull()) {
arr = outputArchive.Tokenize(" ");
TIter next(arr);
Bool_t first = kTRUE;
- const char *comment = "Files to be archived";
- const char *comment1 = comment;
while ((os=(TObjString*)next())) {
- if (!first) comment = NULL;
if (!os->GetString().Contains("@") && fCloseSE.Length())
- fGridJDL->AddToOutputArchive(Form("%s@%s",os->GetString().Data(), fCloseSE.Data()), comment);
+ fGridJDL->AddToSet("Output", Form("%s@%s",os->GetString().Data(), fCloseSE.Data()));
else
- fGridJDL->AddToOutputArchive(os->GetString(), comment);
+ fGridJDL->AddToSet("Output", os->GetString());
+ if (first) fGridJDL->AddToSetDescription("Output", comment);
first = kFALSE;
}
delete arr;
files.ReplaceAll(".root", "*.root");
if (mgr->IsCollectThroughput())
- outputArchive += Form("root_archive.zip:%s,*.stat,*%s@disk=%d",files.Data(),mgr->GetFileInfoLog(),fNreplicas);
+ outputArchive += Form("root_archive.zip:%s,*.stat@disk=%d %s@disk=%d",files.Data(),fNreplicas, mgr->GetFileInfoLog(),fNreplicas);
else
outputArchive += Form("root_archive.zip:%s,*.stat@disk=%d",files.Data(),fNreplicas);
} else {
}
arr = outputArchive.Tokenize(" ");
TIter next2(arr);
- comment = comment1;
first = kTRUE;
while ((os=(TObjString*)next2())) {
- if (!first) comment = NULL;
TString currentfile = os->GetString();
if (!currentfile.Contains("@") && fCloseSE.Length())
- fMergingJDL->AddToOutputArchive(Form("%s@%s",currentfile.Data(), fCloseSE.Data()), comment);
+ fMergingJDL->AddToSet("Output", Form("%s@%s",currentfile.Data(), fCloseSE.Data()));
else
- fMergingJDL->AddToOutputArchive(currentfile, comment);
+ fMergingJDL->AddToSet("Output", currentfile);
+ if (first) fMergingJDL->AddToSetDescription("Output", comment);
first = kFALSE;
}
delete arr;
arr = fOutputFiles.Tokenize(",");
TIter next(arr);
Bool_t first = kTRUE;
- const char *comment = "Files to be saved";
while ((os=(TObjString*)next())) {
// Ignore ouputs in jdl that are also in outputarchive
TString sout = os->GetString();
if (fRegisterExcludes.Contains(sout)) continue;
if (!first) comment = NULL;
if (!os->GetString().Contains("@") && fCloseSE.Length())
- fGridJDL->AddToOutputSandbox(Form("%s@%s",os->GetString().Data(), fCloseSE.Data()), comment);
+ fGridJDL->AddToSet("Output", Form("%s@%s",os->GetString().Data(), fCloseSE.Data()));
else
- fGridJDL->AddToOutputSandbox(os->GetString(), comment);
- first = kFALSE;
+ fGridJDL->AddToSet("Output", os->GetString());
+ if (first) fGridJDL->AddToSetDescription("Output", comment);
if (fMergeExcludes.Contains(sout)) continue;
if (!os->GetString().Contains("@") && fCloseSE.Length())
- fMergingJDL->AddToOutputSandbox(Form("%s@%s",os->GetString().Data(), fCloseSE.Data()), comment);
+ fMergingJDL->AddToSet("Output", Form("%s@%s",os->GetString().Data(), fCloseSE.Data()));
else
- fMergingJDL->AddToOutputSandbox(os->GetString(), comment);
+ fMergingJDL->AddToSet("Output", os->GetString());
+ if (first) fMergingJDL->AddToSetDescription("Output", comment);
+ first = kFALSE;
}
delete arr;
fGridJDL->SetPrice((UInt_t)fPrice, "AliEn price for this job");
}
TString sjdl2 = fMergingJDL->Generate();
Int_t index, index1;
- sjdl.ReplaceAll("\"LF:", "\n \"LF:");
+ sjdl.ReplaceAll("\",\"", "\",\n \"");
sjdl.ReplaceAll("(member", "\n (member");
sjdl.ReplaceAll("\",\"VO_", "\",\n \"VO_");
sjdl.ReplaceAll("{", "{\n ");
sjdl.ReplaceAll("{\n \n", "{\n");
sjdl.ReplaceAll("\n\n", "\n");
sjdl.ReplaceAll("OutputDirectory", "OutputDir");
- sjdl1.ReplaceAll("\"LF:", "\n \"LF:");
+ sjdl1.ReplaceAll("\",\"", "\",\n \"");
sjdl1.ReplaceAll("(member", "\n (member");
sjdl1.ReplaceAll("\",\"VO_", "\",\n \"VO_");
sjdl1.ReplaceAll("{", "{\n ");
sjdl1.ReplaceAll("{\n \n", "{\n");
sjdl1.ReplaceAll("\n\n", "\n");
sjdl1.ReplaceAll("OutputDirectory", "OutputDir");
- sjdl2.ReplaceAll("\"LF:", "\n \"LF:");
+ sjdl2.ReplaceAll("\",\"", "\",\n \"");
sjdl2.ReplaceAll("(member", "\n (member");
sjdl2.ReplaceAll("\",\"VO_", "\",\n \"VO_");
sjdl2.ReplaceAll("{", "{\n ");
printf("= Soft reset signal will be send to master______ CHANGE BEHAVIOR AFTER COMPLETION\n");
if (fProofReset>1)
printf("= Hard reset signal will be send to master______ CHANGE BEHAVIOR AFTER COMPLETION\n");
- if (!fRootVersionForProof.IsNull())
- printf("= ROOT version requested________________________ %s\n", fRootVersionForProof.Data());
+ if (!fROOTVersion.IsNull())
+ printf("= ROOT version requested________________________ %s\n", fROOTVersion.Data());
else
printf("= ROOT version requested________________________ default\n");
printf("= AliRoot version requested_____________________ %s\n", fAliROOTVersion.Data());
fOverwriteMode = 1;
}
+//______________________________________________________________________________
+void AliAnalysisAlien::SetFriendChainName(const char *name, const char *libnames)
+{
+ // Set file name for the chain of friends and optionally additional libs to be loaded.
+ // Libs should be separated by blancs.
+ fFriendChainName = name;
+ fFriendLibs = libnames;
+ if (fFriendLibs.Length() && !fFriendLibs.Contains(".so")) {
+ Fatal("SetFriendChainName()", "You should provide explicit library names (with extension)");
+ }
+}
+
+//______________________________________________________________________________
+void AliAnalysisAlien::SetRootVersionForProof(const char *version)
+{
+// Obsolete method. Use SetROOTVersion instead
+ Warning("SetRootVersionForProof", "Obsolete. Use SetROOTVersion instead");
+ if (fROOTVersion.IsNull()) SetROOTVersion(version);
+ else Error("SetRootVersionForProof", "ROOT version already set to %s", fROOTVersion.Data());
+}
+
//______________________________________________________________________________
Bool_t AliAnalysisAlien::CheckMergedFiles(const char *filename, const char *aliendir, Int_t nperchunk, const char *jdl)
{
// Check if this is the last stage to be done.
Bool_t laststage = (nfiles<nperchunk);
if (fMaxMergeStages && stage>=fMaxMergeStages) laststage = kTRUE;
+ Int_t jobId = 0;
if (laststage) {
printf("### Submiting final merging stage %d\n", stage);
TString finalJDL = jdl;
finalJDL.ReplaceAll(".jdl", "_final.jdl");
TString query = Form("submit %s %s %d", finalJDL.Data(), aliendir, stage);
- Int_t jobId = SubmitSingleJob(query);
- if (!jobId) return kFALSE;
+ jobId = SubmitSingleJob(query);
} else {
printf("### Submiting merging stage %d\n", stage);
TString query = Form("submit %s %s %d", jdl, aliendir, stage);
- Int_t jobId = SubmitSingleJob(query);
- if (!jobId) return kFALSE;
+ jobId = SubmitSingleJob(query);
}
+ if (!jobId) return kFALSE;
+
+ if (!fGridJobIDs.IsNull()) fGridJobIDs.Append(" ");
+ fGridJobIDs.Append(Form("%d", jobId));
+ if (!fGridStages.IsNull()) fGridStages.Append(" ");
+ fGridStages.Append(Form("%s_merge_stage%d",
+ laststage ? "final" : "partial", stage));
+
return kTRUE;
}
::Error("SubmitSingleJob", "Your query %s could not be submitted", query);
return 0;
}
- printf(" Job id: %s\n", jobId.Data());
- return atoi(jobId);
+ Int_t ijobId = jobId.Atoi();
+ printf(" Job id: '%s' (%d)\n", jobId.Data(), ijobId);
+ return ijobId;
}
//______________________________________________________________________________
Bool_t AliAnalysisAlien::MergeInfo(const char *output, const char *collection)
{
// Merges a collection of output files using concatenation.
+ TString scoll(collection);
+ if (!scoll.Contains(".xml")) return kFALSE;
TGridCollection *coll = (TGridCollection*)gROOT->ProcessLine(Form("TAlienCollection::Open(\"%s\");", collection));
if (!coll) {
::Error("MergeInfo", "Input XML %s collection empty.", collection);
Int_t countChunk = 0;
Int_t countZero = nmaxmerge;
Bool_t merged = kTRUE;
+ Bool_t isGrid = kTRUE;
Int_t index = outputFile.Index("@");
if (index > 0) outputFile.Remove(index);
TString inputFile = outputFile;
listoffiles->Add(new TNamed(fname.Data(),""));
}
} else if (sbasedir.Contains(".txt")) {
+ // The file having the .txt extension is expected to contain a list of
+ // folders where the output files will be looked. For alien folders,
+ // the full folder LFN is expected (starting with alien://)
// Assume lfn's on each line
TString line;
ifstream in;
in.open(sbasedir);
+ if (in.fail()) {
+ ::Error("MergeOutput", "File %s cannot be opened. Merging stopped." ,sbasedir.Data());
+ return kTRUE;
+ }
Int_t nfiles = 0;
while (in.good()) {
in >> line;
if (line.IsNull() || line.BeginsWith("#")) continue;
+ line.Strip();
+ if (!line.Contains("alien:")) isGrid = kFALSE;
+ line += "/";
+ line += outputFile;
nfiles++;
listoffiles->Add(new TNamed(line.Data(),""));
}
+ in.close();
if (!nfiles) {
::Error("MergeOutput","Input file %s contains no files to be merged\n", sbasedir.Data());
delete listoffiles;
// Loop 'find' results and get next LFN
if (countZero == nmaxmerge) {
// First file in chunk - create file merger and add previous chunk if any.
- fm = new TFileMerger(kTRUE);
+ fm = new TFileMerger(isGrid);
fm->SetFastMethod(kTRUE);
if (previousChunk.Length()) fm->AddFile(previousChunk.Data());
outputChunk = outputFile;
}
// Merging stage different than 0.
// Move to the begining of the requested chunk.
- fm = new TFileMerger(kTRUE);
+ fm = new TFileMerger(isGrid);
fm->SetFastMethod(kTRUE);
while ((nextfile=next())) fm->AddFile(nextfile->GetName());
delete listoffiles;
merged = MergeOutput(outputFile, fGridOutputDir, fMaxMergeFiles);
if (!merged) {
Error("MergeOutputs", "Terminate() will NOT be executed");
- return kFALSE;
+ delete list;
+ return kFALSE;
}
TFile *fileOpened = (TFile*)gROOT->GetListOfFiles()->FindObject(outputFile);
if (fileOpened) fileOpened->Close();
}
+ delete list;
return kTRUE;
}
}
}
// Do we need to change the ROOT version ? The success of this cannot be checked.
- if (!fRootVersionForProof.IsNull() && !testMode) {
- gROOT->ProcessLine(Form("TProof::Mgr(\"%s\")->SetROOTVersion(\"%s\");",
- fProofCluster.Data(), fRootVersionForProof.Data()));
+ if (!fROOTVersion.IsNull() && !testMode) {
+ gROOT->ProcessLine(Form("TProof::Mgr(\"%s\")->SetROOTVersion(\"VO_ALICE@ROOT::%s\");",
+ fProofCluster.Data(), fROOTVersion.Data()));
}
// Connect to PROOF and check the status
Long_t proof = 0;
if (!alirootMode.IsNull()) extraLibs = "ANALYSIS:OADB:ANALYSISalice";
// Parse the extra libs for .so
if (fAdditionalLibs.Length()) {
- TObjArray *list = fAdditionalLibs.Tokenize(" ");
+ TString additionalLibs = fAdditionalLibs;
+ additionalLibs.Strip();
+ if (additionalLibs.Length() && fFriendLibs.Length())
+ additionalLibs += " ";
+ additionalLibs += fFriendLibs;
+ TObjArray *list = additionalLibs.Tokenize(" ");
TIter next(list);
TObjString *str;
while((str=(TObjString*)next())) {
}
if (list) delete list;
}
- if (!extraLibs.IsNull()) {
- Info("StartAnalysis", "Adding extra libs: %s",extraLibs.Data());
- optionsList.Add(new TNamed("ALIROOT_EXTRA_LIBS",extraLibs.Data()));
- }
+ if (!extraLibs.IsNull()) {
+ Info("StartAnalysis", "Adding extra libs: %s",extraLibs.Data());
+ optionsList.Add(new TNamed("ALIROOT_EXTRA_LIBS",extraLibs.Data()));
+ }
// Check extra includes
if (!fIncludePath.IsNull()) {
TString includePath = fIncludePath;
// Compose the output archive.
fOutputArchive = "log_archive.zip:std*@disk=1 ";
if (mgr->IsCollectThroughput())
- fOutputArchive += Form("root_archive.zip:%s,*.stat,*%s@disk=%d",fOutputFiles.Data(),mgr->GetFileInfoLog(),fNreplicas);
+ fOutputArchive += Form("root_archive.zip:%s,*.stat@disk=%d %s@disk=%d",fOutputFiles.Data(),fNreplicas, mgr->GetFileInfoLog(),fNreplicas);
else
fOutputArchive += Form("root_archive.zip:%s,*.stat@disk=%d",fOutputFiles.Data(),fNreplicas);
}
gGrid->Cd(fGridOutputDir);
TGridResult *res;
TString jobID = "";
+ fGridJobIDs = "";
+ fGridStages = "";
if (!fRunNumbers.Length() && !fRunRange[0]) {
// Submit a given xml or a set of runs
res = gGrid->Command(Form("submit %s", fJDLName.Data()));
\n_______________________________________________________________________",
fJDLName.Data(), cjobId);
jobID = cjobId;
+ if (jobID.Atoi()) {
+ if (!fGridJobIDs.IsNull()) fGridJobIDs.Append(" ");
+ fGridJobIDs.Append(jobID);
+ if (!fGridStages.IsNull()) fGridStages.Append(" ");
+ fGridStages.Append("full");
+ }
}
delete res;
} else {
} else {
// Submit for a range of enumeration of runs.
if (!Submit()) return kFALSE;
+ jobID = fGridJobIDs;
}
if (fDropToShell) {
\n##### Your JDL %s submitted (%d to go). \nTHE JOB ID IS: %s \
\n_______________________________________________________________________",
fJDLName.Data(), nmasterjobs-fNsubmitted-1, cjobId1.Data());
+ if (!fGridJobIDs.IsNull()) fGridJobIDs.Append(" ");
+ fGridJobIDs.Append(cjobId1);
+ if (!fGridStages.IsNull()) fGridStages.Append(" ");
+ fGridStages.Append("full");
jobID += cjobId1;
jobID += " ";
lastmaster = cjobId1.Atoi();
out << " printf(\"Include path: %s\\n\", gSystem->GetIncludePath());" << endl << endl;
if (fAdditionalLibs.Length()) {
out << "// Add aditional AliRoot libraries" << endl;
- TObjArray *list = fAdditionalLibs.Tokenize(" ");
+ TString additionalLibs = fAdditionalLibs;
+ additionalLibs.Strip();
+ if (additionalLibs.Length() && fFriendLibs.Length())
+ additionalLibs += " ";
+ additionalLibs += fFriendLibs;
+ TObjArray *list = additionalLibs.Tokenize(" ");
TIter next(list);
TObjString *str;
while((str=(TObjString*)next())) {
out << " plugin->SetFileForTestMode(\"" << fFileForTestMode << "\");" << endl;
out << " plugin->SetNtestFiles(" << fNtestFiles << ");" << endl;
if (!fFriendChainName.IsNull())
- out << " plugin->SetFriendChainName(\"" << fFriendChainName << "\");" << endl;
+ out << " plugin->SetFriendChainName(\"" << fFriendChainName << "\",\"" << fFriendLibs << "\");" << endl;
if (IsUseMCchain())
out << " plugin->SetUseMCchain();" << endl;
out << " mgr->SetGridHandler(plugin);" << endl;
out << " printf(\"Include path: %s\\n\", gSystem->GetIncludePath());" << endl << endl;
if (fAdditionalLibs.Length()) {
out << "// Add aditional AliRoot libraries" << endl;
- TObjArray *list = fAdditionalLibs.Tokenize(" ");
+ TString additionalLibs = fAdditionalLibs;
+ additionalLibs.Strip();
+ if (additionalLibs.Length() && fFriendLibs.Length())
+ additionalLibs += " ";
+ additionalLibs += fFriendLibs;
+ TObjArray *list = additionalLibs.Tokenize(" ");
TIter next(list);
TObjString *str;
while((str=(TObjString*)next())) {
out << " printf(\"ERROR: Analysis manager could not be extracted from file \");" << endl;
out << " return;" << endl;
out << " }" << endl;
+ if (IsLocalTest()) {
+ out << " printf(\"===================================\n\");" << endl;
+ out << " printf(\"Testing merging...\\n\");" << endl;
+ out << " printf(\"===================================\n\");" << endl;
+ }
out << " while((str=(TObjString*)iter->Next())) {" << endl;
out << " outputFile = str->GetString();" << endl;
out << " if (outputFile.Contains(\"*\")) continue;" << endl;
out << " if (index > 0) outputFile.Remove(index);" << endl;
out << " // Skip already merged outputs" << endl;
out << " if (!gSystem->AccessPathName(outputFile)) {" << endl;
- out << " printf(\"Output file <%s> found. Not merging again.\",outputFile.Data());" << endl;
+ out << " printf(\"Output file <%s> found. Not merging again.\\n\",outputFile.Data());" << endl;
out << " continue;" << endl;
out << " }" << endl;
out << " if (mergeExcludes.Contains(outputFile.Data())) continue;" << endl;
out << " return;" << endl;
out << " }" << endl;
out << " }" << endl;
- if (mgr && mgr->IsCollectThroughput()) {
+ if (mgr && mgr->IsCollectThroughput() && !IsLocalTest()) {
out << " TString infolog = \"" << mgr->GetFileInfoLog() << "\";" << endl;
out << " AliAnalysisAlien::MergeInfo(infolog, outputDir);" << endl;
}
out << " out.open(\"outputs_valid\", ios::out);" << endl;
out << " out.close();" << endl;
out << " // read the analysis manager from file" << endl;
- out << " if (!outputDir.Contains(\"Stage\")) return;" << endl;
+ if (IsLocalTest()) {
+ out << " printf(\"===================================\n\");" << endl;
+ out << " printf(\"Testing Terminate()...\\n\");" << endl;
+ out << " printf(\"===================================\n\");" << endl;
+ } else {
+ out << " if (!outputDir.Contains(\"Stage\")) return;" << endl;
+ }
out << " mgr->SetRunFromPath(mgr->GetRunFromAlienPath(dir));" << endl;
out << " mgr->SetSkipTerminate(kFALSE);" << endl;
out << " mgr->PrintStatus();" << endl;