Bool_t AliAnalysisAlien::CreateDataset(const char *pattern)
{
// Create dataset for the grid data directory + run number.
+ const Int_t gMaxEntries = 15000;
if (fProductionMode || TestBit(AliAnalysisGrid::kOffline)) return kTRUE;
if (!Connect()) {
Error("CreateDataset", "Cannot create dataset with no grid connection");
TString command;
TString options = "-x collection ";
if (TestBit(AliAnalysisGrid::kTest)) options += Form("-l %d ", fNtestFiles);
+ else options += Form("-l %d ", gMaxEntries); // Protection for the find command
TString conditions = "";
-
+ Int_t nstart = 0;
+ Int_t ncount = 0;
+ Int_t stage = 0;
TString file;
TString path;
Int_t nruns = 0;
// CdWork();
if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
else file = Form("%s.xml", gSystem->BaseName(path));
- if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
- command = "find ";
- command += options;
- command += path;
- command += " ";
- command += pattern;
- command += conditions;
- printf("command: %s\n", command.Data());
- TGridResult *res = gGrid->Command(command);
- if (res) delete res;
- // Write standard output to file
- gROOT->ProcessLine(Form("gGrid->Stdout(); > %s", file.Data()));
- Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
- Bool_t nullFile = kFALSE;
- if (!hasGrep) {
- Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
- } else {
- nullFile = (gSystem->Exec(Form("grep /event %s 2>/dev/null > /dev/null",file.Data()))==0)?kFALSE:kTRUE;
- if (nullFile) {
- Error("CreateDataset","Dataset %s produced by the previous find command is empty !", file.Data());
- return kFALSE;
+ while (1) {
+ ncount = 0;
+ stage++;
+ if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
+ command = "find ";
+ command += Form("%s -o %d ",options.Data(), nstart);
+ command += path;
+ command += " ";
+ command += pattern;
+ command += conditions;
+ printf("command: %s\n", command.Data());
+ TGridResult *res = gGrid->Command(command);
+ if (res) delete res;
+ // Write standard output to file
+ gROOT->ProcessLine(Form("gGrid->Stdout(); > __tmp%d__%s", stage, file.Data()));
+ Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
+ Bool_t nullFile = kFALSE;
+ if (!hasGrep) {
+ Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
+ } else {
+ nullFile = (gSystem->Exec(Form("grep -c /event __tmp%d__%s 2>/dev/null > __tmp__",stage,file.Data()))==0)?kFALSE:kTRUE;
+ if (nullFile) {
+ Error("CreateDataset","Dataset %s produced by the previous find command is empty !", file.Data());
+ gSystem->Exec("rm -f __tmp*");
+ return kFALSE;
+ }
+ TString line;
+ ifstream in;
+ in.open("__tmp__");
+ in >> line;
+ in.close();
+ gSystem->Exec("rm -f __tmp__");
+ ncount = line.Atoi();
+ }
+ }
+ if (ncount == gMaxEntries) {
+ Info("CreateDataset", "Dataset %s has more than 15K entries. Trying to merge...", file.Data());
+ cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
+ if (!cbase) cbase = cadd;
+ else {
+ cbase->Add(cadd);
+ delete cadd;
}
- }
+ nstart += ncount;
+ } else {
+ if (cbase) {
+ cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
+ printf("... please wait - TAlienCollection::Add() scales badly...\n");
+ cbase->Add(cadd);
+ delete cadd;
+ cbase->ExportXML(Form("file://%s", file.Data()),kFALSE,kFALSE, file, "Merged entries for a run");
+ delete cbase; cbase = 0;
+ } else {
+ TFile::Cp(Form("__tmp%d__%s",stage, file.Data()), file.Data());
+ }
+ gSystem->Exec("rm -f __tmp*");
+ Info("CreateDataset", "Created dataset %s with %d files", file.Data(), nstart+ncount);
+ break;
+ }
}
Bool_t fileExists = FileExists(file);
if (!TestBit(AliAnalysisGrid::kTest) && (!fileExists || fOverwriteMode)) {
TObjString *os;
TIter next(arr);
while ((os=(TObjString*)next())) {
+ nstart = 0;
+ stage = 0;
path = Form("%s/%s/ ", fGridDataDir.Data(), os->GetString().Data());
if (!DirectoryExists(path)) continue;
// CdWork();
if (TestBit(AliAnalysisGrid::kTest)) file = "wn.xml";
else file = Form("%s.xml", os->GetString().Data());
// If local collection file does not exist, create it via 'find' command.
- if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
- command = "find ";
- command += options;
- command += path;
- command += pattern;
- command += conditions;
- TGridResult *res = gGrid->Command(command);
- if (res) delete res;
- // Write standard output to file
- gROOT->ProcessLine(Form("gGrid->Stdout(); > %s", file.Data()));
- Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
- Bool_t nullFile = kFALSE;
- if (!hasGrep) {
- Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
- } else {
- nullFile = (gSystem->Exec(Form("grep /event %s 2>/dev/null > /dev/null",file.Data()))==0)?kFALSE:kTRUE;
- if (nullFile) {
- Warning("CreateDataset","Dataset %s produced by: <%s> is empty !", file.Data(), command.Data());
- fRunNumbers.ReplaceAll(os->GetString().Data(), "");
- continue;
+ while (1) {
+ ncount = 0;
+ stage++;
+ if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
+ command = "find ";
+ command += Form("%s -o %d ",options.Data(), nstart);
+ command += path;
+ command += pattern;
+ command += conditions;
+ TGridResult *res = gGrid->Command(command);
+ if (res) delete res;
+ // Write standard output to file
+ gROOT->ProcessLine(Form("gGrid->Stdout(); > __tmp%d__%s", stage,file.Data()));
+ Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
+ Bool_t nullFile = kFALSE;
+ if (!hasGrep) {
+ Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
+ } else {
+ nullFile = (gSystem->Exec(Form("grep -c /event __tmp%d__%s 2>/dev/null > __tmp__",stage,file.Data()))==0)?kFALSE:kTRUE;
+ if (nullFile) {
+ Warning("CreateDataset","Dataset %s produced by: <%s> is empty !", file.Data(), command.Data());
+ gSystem->Exec("rm -f __tmp*");
+ fRunNumbers.ReplaceAll(os->GetString().Data(), "");
+ continue;
+ }
+ TString line;
+ ifstream in;
+ in.open("__tmp__");
+ in >> line;
+ in.close();
+ gSystem->Exec("rm -f __tmp__");
+ ncount = line.Atoi();
+ }
+ nullResult = kFALSE;
+ }
+ if (ncount == gMaxEntries) {
+ Info("CreateDataset", "Dataset %s has more than 15K entries. Trying to merge...", file.Data());
+ if (fNrunsPerMaster > 1) {
+ Error("CreateDataset", "File %s has more than %d entries. Please set the number of runs per master to 1 !",
+ file.Data(),gMaxEntries);
+ return kFALSE;
+ }
+ cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
+ if (!cbase) cbase = cadd;
+ else {
+ cbase->Add(cadd);
+ delete cadd;
}
+ nstart += ncount;
+ } else {
+ if (cbase) {
+ cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
+ printf("... please wait - TAlienCollection::Add() scales badly...\n");
+ cbase->Add(cadd);
+ delete cadd;
+ cbase->ExportXML(Form("file://%s", file.Data()),kFALSE,kFALSE, file, "Merged entries for a run");
+ delete cbase; cbase = 0;
+ } else {
+ TFile::Cp(Form("__tmp%d__%s",stage, file.Data()), file.Data());
+ }
+ gSystem->Exec("rm -f __tmp*");
+ Info("CreateDataset", "Created dataset %s with %d files", file.Data(), nstart+ncount);
+ break;
}
- nullResult = kFALSE;
- }
+ }
if (TestBit(AliAnalysisGrid::kTest)) break;
// Check if there is one run per master job.
if (fNrunsPerMaster<2) {
// Process a full run range.
for (Int_t irun=fRunRange[0]; irun<=fRunRange[1]; irun++) {
format = Form("%%s/%s ", fRunPrefix.Data());
+ nstart = 0;
+ stage = 0;
path = Form(format.Data(), fGridDataDir.Data(), irun);
if (!DirectoryExists(path)) continue;
// CdWork();
}
}
// If local collection file does not exist, create it via 'find' command.
- if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
- command = "find ";
- command += options;
- command += path;
- command += pattern;
- command += conditions;
- TGridResult *res = gGrid->Command(command);
- if (res) delete res;
- // Write standard output to file
- gROOT->ProcessLine(Form("gGrid->Stdout(); > %s", file.Data()));
- Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
- Bool_t nullFile = kFALSE;
- if (!hasGrep) {
- Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
- } else {
- nullFile = (gSystem->Exec(Form("grep /event %s 2>/dev/null > /dev/null",file.Data()))==0)?kFALSE:kTRUE;
- if (nullFile) {
- Warning("CreateDataset","Dataset %s produced by: <%s> is empty !", file.Data(), command.Data());
- continue;
+ while (1) {
+ ncount = 0;
+ stage++;
+ if (gSystem->AccessPathName(file) || TestBit(AliAnalysisGrid::kTest) || fOverwriteMode) {
+ command = "find ";
+ command += Form("%s -o %d ",options.Data(), nstart);
+ command += path;
+ command += pattern;
+ command += conditions;
+ TGridResult *res = gGrid->Command(command);
+ if (res) delete res;
+ // Write standard output to file
+ gROOT->ProcessLine(Form("gGrid->Stdout(); > __tmp%d__%s", stage,file.Data()));
+ Bool_t hasGrep = (gSystem->Exec("grep --version 2>/dev/null > /dev/null")==0)?kTRUE:kFALSE;
+ Bool_t nullFile = kFALSE;
+ if (!hasGrep) {
+ Warning("CreateDataset", "'grep' command not available on this system - cannot validate the result of the grid 'find' command");
+ } else {
+ nullFile = (gSystem->Exec(Form("grep -c /event __tmp%d__%s 2>/dev/null > __tmp__",stage,file.Data()))==0)?kFALSE:kTRUE;
+ if (nullFile) {
+ Warning("CreateDataset","Dataset %s produced by: <%s> is empty !", file.Data(), command.Data());
+ gSystem->Exec("rm -f __tmp*");
+ continue;
+ }
+ TString line;
+ ifstream in;
+ in.open("__tmp__");
+ in >> line;
+ in.close();
+ gSystem->Exec("rm -f __tmp__");
+ ncount = line.Atoi();
+ }
+ nullResult = kFALSE;
+ }
+ if (ncount == gMaxEntries) {
+ Info("CreateDataset", "Dataset %s has more than 15K entries. Trying to merge...", file.Data());
+ if (fNrunsPerMaster > 1) {
+ Error("CreateDataset", "File %s has more than %d entries. Please set the number of runs per master to 1 !",
+ file.Data(),gMaxEntries);
+ return kFALSE;
+ }
+ cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
+ if (!cbase) cbase = cadd;
+ else {
+ cbase->Add(cadd);
+ delete cadd;
}
+ nstart += ncount;
+ } else {
+ if (cbase) {
+ cadd = (TGridCollection*)gROOT->ProcessLine(Form("new TAlienCollection(\"__tmp%d__%s\", 1000000);",stage,file.Data()));
+ printf("... please wait - TAlienCollection::Add() scales badly...\n");
+ cbase->Add(cadd);
+ delete cadd;
+ cbase->ExportXML(Form("file://%s", file.Data()),kFALSE,kFALSE, file, "Merged entries for a run");
+ delete cbase; cbase = 0;
+ } else {
+ TFile::Cp(Form("__tmp%d__%s",stage, file.Data()), file.Data());
+ }
+ Info("CreateDataset", "Created dataset %s with %d files", file.Data(), nstart+ncount);
+ break;
}
- nullResult = kFALSE;
}
if (TestBit(AliAnalysisGrid::kTest)) break;
// Check if there is one run per master job.