fMasterResubmitThreshold(0),
fNtestFiles(0),
fNMasterJobs(0),
+ fMaxMergeFiles(0),
fRunNumbers(),
fExecutable(),
fArguments(),
fMasterResubmitThreshold(0),
fNtestFiles(0),
fNMasterJobs(0),
+ fMaxMergeFiles(0),
fRunNumbers(),
fExecutable(),
fArguments(),
fMasterResubmitThreshold(other.fMasterResubmitThreshold),
fNtestFiles(other.fNtestFiles),
fNMasterJobs(other.fNMasterJobs),
+ fMaxMergeFiles(other.fMaxMergeFiles),
fRunNumbers(other.fRunNumbers),
fExecutable(other.fExecutable),
fArguments(other.fArguments),
return kFALSE;
}
+//______________________________________________________________________________
+void AliAnalysisAlien::Print(Option_t *) const
+{
+// Print current plugin settings.
+ printf("### AliEn analysis plugin current settings ###\n");
+ printf("= Version of API requested: ____________________ %s\n", fAPIVersion.Data());
+ printf("= Version of ROOT requested: ___________________ %s\n", fROOTVersion.Data());
+ printf("= Version of AliRoot requested: ________________ %s\n", fAliROOTVersion.Data());
+ if (fUser.Length())
+ printf("= User running the plugin: _____________________ %s\n", fUser.Data());
+ printf("= Grid workdir relative to user $HOME: _________ %s\n", fGridWorkingDir.Data());
+ printf("= Grid output directory relative to workdir: ___ %s\n", fGridOutputDir.Data());
+ printf("= Data base directory path requested: __________ %s\n", fGridDataDir.Data());
+ printf("= Data search pattern: _________________________ %s\n", fDataPattern.Data());
+ printf("= Input data format: ___________________________ %s\n", fInputFormat.Data());
+ if (fRunNumbers.Length())
+ printf("= Run numbers to be processed: _________________ %s\n", fRunNumbers.Data());
+ if (fRunRange[0])
+ printf("= Run range to be processed: ___________________ %d-%d\n", fRunRange[0], fRunRange[1]);
+ if (!fRunRange[0] && !fRunNumbers.Length()) {
+ TIter next(fInputFiles);
+ TObject *obj;
+ TString list;
+ while ((obj=next())) list += obj->GetName();
+ printf("= Input files to be processed: _________________ %s\n", list.Data());
+ }
+ if (TestBit(AliAnalysisGrid::kTest))
+ printf("= Number of input files used in test mode: _____ %d\n", fNtestFiles);
+ printf("= List of output files to be registered: _______ %s\n", fOutputFiles.Data());
+ printf("= List of outputs going to be archived: ________ %s\n", fOutputArchive.Data());
+ printf("= List of outputs that should not be merged: ___ %s\n", fMergeExcludes.Data());
+ printf("=====================================================================\n");
+ printf("= Job price: ___________________________________ %d\n", fPrice);
+ printf("= Time to live (TTL): __________________________ %d\n", fTTL);
+ printf("= Max files per subjob: ________________________ %d\n", fSplitMaxInputFileNumber);
+ if (fMaxInitFailed>0)
+ printf("= Max number of subjob fails to kill: __________ %d\n", fMaxInitFailed);
+ if (fMasterResubmitThreshold>0)
+ printf("= Resubmit master job if failed subjobs >_______ %d\n", fMasterResubmitThreshold);
+ if (fNMasterJobs>0)
+ printf("= Number of master jobs to be launched: ________ not used yet\n");
+ printf("= Number of files in one chunk to be merged: ___ %d\n", fMaxMergeFiles);
+ printf("= Name of the generated execution script: ______ %s\n",fExecutable.Data());
+ if (fArguments.Length())
+ printf("= Arguments for the execution script: __________ %s\n",fArguments.Data());
+ printf("= Name of the generated analysis macro: ________ %s\n",fAnalysisMacro.Data());
+ printf("= User analysis files to be deployed: __________ %s\n",fAnalysisSource.Data());
+ printf("= Additional libs to be loaded or souces to be compiled runtime: <%s>\n",fAdditionalLibs.Data());
+ printf("= Master jobs split mode: ______________________ %s\n",fSplitMode.Data());
+ if (fDatasetName)
+ printf("= Custom name for the dataset to be created: ___ %s\n", fDatasetName.Data());
+ printf("= Name of the generated JDL: ___________________ %s\n", fJDLName.Data());
+ if (fIncludePath.Data())
+ printf("= Include path for runtime task compilation: ___ %s\n", fIncludePath.Data());
+ if (fCloseSE.Length())
+ printf("= Force job outputs to storage element: ________ %s\n", fCloseSE.Data());
+ if (fFriendChainName.Length())
+ printf("= Open friend chain file on worker: ____________ %s\n", fFriendChainName.Data());
+ if (fPackages) {
+ TIter next(fPackages);
+ TObject *obj;
+ TString list;
+ while ((obj=next())) list += obj->GetName();
+ printf("= Par files to be used: ________________________ %s\n", list.Data());
+ }
+}
+
//______________________________________________________________________________
void AliAnalysisAlien::SetDefaults()
{
fRunRange[0] = 0;
fRunRange[1] = 0;
fNMasterJobs = 0;
+ fMaxMergeFiles = 100;
fRunNumbers = "";
fExecutable = "analysis.sh";
fArguments = "";
TObjString *str;
TString command;
TString output_file;
+ TString output_chunk;
+ TString previous_chunk;
+ Int_t count_chunk = 0;
+ Int_t count_zero = fMaxMergeFiles;
Bool_t merged = kTRUE;
while((str=(TObjString*)next())) {
output_file = str->GetString();
if (index > 0) output_file.Remove(index);
if (fMergeExcludes.Length() &&
fMergeExcludes.Contains(output_file.Data())) continue;
+ // Perform a 'find' command in the output directory, looking for registered outputs
command = Form("find %s/ *%s", output.Data(), output_file.Data());
printf("command: %s\n", command.Data());
TGridResult *res = gGrid->Command(command);
TFileMerger *fm = 0;
TIter nextmap(res);
TMap *map;
+ previous_chunk = "";
+ count_chunk = 0;
+ count_zero = fMaxMergeFiles;
while ((map=(TMap*)nextmap())) {
+ // Loop 'find' results and get next LFN
+ if (count_zero == fMaxMergeFiles) {
+ // First file in chunk - create file merger and add previous chunk if any.
+ fm = new TFileMerger(kFALSE);
+ fm->SetFastMethod(kTRUE);
+ if (previous_chunk.Length()) fm->AddFile(previous_chunk.Data());
+ output_chunk = output_file;
+ output_chunk.ReplaceAll(".root", Form("_%04d.root", count_chunk));
+ }
+ // If last file found, put merged results in the output file
+ if (map == res->Last()) output_chunk = output_file;
TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("turl"));
if (!objs || !objs->GetString().Length()) {
+ // Nothing found - skip this output
delete res;
- continue;
- }
- if (!fm) {
- fm = new TFileMerger(kFALSE);
- fm->SetFastMethod(kTRUE);
- fm->OutputFile(output_file);
+ delete fm;
+ break;
+ }
+ // Add file to be merged and decrement chunk counter.
+ fm->AddFile(objs->GetString());
+ count_zero--;
+ if (count_zero==0 || map == res->Last()) {
+ fm->OutputFile(output_chunk);
+ if (!fm->GetMergeList() || !fm->GetMergeList()->GetSize()) {
+ // Nothing found - skip this output
+ Warning("MergeOutputs", "No <%s> files found.", output_file.Data());
+ delete res;
+ delete fm;
+ break;
+ }
+ // Merge the outputs, then go to next chunk
+ if (!fm->Merge()) {
+ Error("MergeOutputs", "Could not merge all <%s> files", output_file.Data());
+ delete res;
+ delete fm;
+ merged = kFALSE;
+ break;
+ } else {
+ Info("MergeOutputs", "\n##### Merged %d output files to <%s>", fm->GetMergeList()->GetSize(), output_chunk.Data());
+ }
+ if (map == res->Last()) {
+ delete res;
+ delete fm;
+ break;
+ }
+ count_chunk++;
+ count_zero = fMaxMergeFiles;
+ previous_chunk = output_chunk;
}
- fm->AddFile(objs->GetString());
}
- if (!fm || !fm->GetMergeList() || !fm->GetMergeList()->GetSize()) {
- Warning("MergeOutputs", "No <%s> files found.", output_file.Data());
- merged = kFALSE;
- delete res;
- continue;
- }
- if (!fm->Merge()) {
- Error("MergeOutputs", "Could not merge all <%s> files", output_file.Data());
- merged = kFALSE;
- } else {
- Info("MergeOutputs", "\n##### Merged %d output files <%s>", fm->GetMergeList()->GetSize(), output_file.Data());
- }
- delete fm;
- delete res;
}
if (!merged) {
Error("MergeOutputs", "Terminate() will NOT be executed");
if (!Connect()) {
Error("StartAnalysis", "Cannot start grid analysis without grid connection");
return;
- }
+ }
+ Print();
if (!CheckInputData()) {
Error("StartAnalysis", "There was an error in preprocessing your requested input data");
return;
CdWork();
TGridResult *res;
TString jobID = "";
- if (fRunNumbers.Length()) {
+ if (!fRunRange[0]) {
+ // Submit a given xml or a set of runs
res = gGrid->Command(Form("submit %s", fJDLName.Data()));
printf("*************************** %s\n",Form("submit %s", fJDLName.Data()));
if (res) {
delete res;
}
} else {
- if (!fRunRange[0]) {
- Error("StartAnalysis", "No runs defined. Exiting.");
- return;
- }
+ // Submit for a range of runs.
TString sjdl;
for (Int_t irun=fRunRange[0]; irun<=fRunRange[1]; irun++) {
if (!fInputFiles->At(irun-fRunRange[0])) break;
Info("StartAnalysis", "\n#### STARTING AN ALIEN SHELL FOR YOU. EXIT WHEN YOUR JOB %s HAS FINISHED. #### \
\n You may exit at any time and terminate the job later using the option <terminate> \
\n ##################################################################################", jobID.Data());
- //gGrid->Shell();
gSystem->Exec("aliensh");
}