Fix neccessary for local merge
[u/mrichter/AliRoot.git] / PWGPP / CalibMacros / CPass1 / merge.C
1 /*
2   merge output calib objects on Alien
3   using AliFileMerger functionality
4
5   Directory with runCalibTrain output: outputDir
6   pattern: AliESDfriends_v1.root
7   Output file name: CalibObjects.root
8
9   Example:
10   .L $ALICE_ROOT/ANALYSIS/CalibMacros/MergeCalibration/merge.C
11   merge("alien:///alice/cern.ch/user/j/jotwinow/CalibTrain/output","AliESDfriends_v1.root");
12 */
13 void mergeInChunksTXT(const char* mlist, const char* dest, int maxFiles=700);
14
15
16 void merge(const char* outputDir, const char* pattern, Bool_t copyLocal=kFALSE)
17 {
18   //
19   // load libraries
20   //
21   printf("Merging with chunks copying turned %s\n",copyLocal ? "ON":"OFF");
22   gROOT->Macro("$ALICE_ROOT/PWGPP/CalibMacros/CPass1/LoadLibraries.C");
23   
24   //if pattern is empty and outputDir is a local file assume it contains the list of files to merge
25   //otherwise fall back to old behaviour: search alien and process those files
26   TString listFileName("file.list");
27   TString patternStr(pattern);
28   Long_t id, size, flags, modtime;
29   Bool_t outputDirFailure = gSystem->GetPathInfo(outputDir, &id, &size, &flags, &modtime);
30   printf("st: %i, flags: %i, patt: %s\n",outputDirFailure,flags,patternStr.Data());
31   if (!outputDirFailure && (flags==0)) 
32   { 
33     printf("### processing local fileList: %s\n",outputDir);
34     listFileName=outputDir;
35   }
36   else
37   {
38     cpTimeOut(outputDir, pattern, 10, copyLocal);
39     listFileName="calib.list";
40   }
41
42   //
43   // local
44   mergeInChunksTXT(listFileName.Data(),"CalibObjects.root");
45   //  AliFileMerger merger;
46   //  merger.AddReject("esdFriend"); // do not merge
47   //  merger.SetMaxFilesOpen(700);
48   //  merger.IterTXT("calib.list","CalibObjects.root",kFALSE);
49
50   // alien
51   //merger.IterAlien(outputDir, "CalibObjects.root", pattern);
52
53   return;
54 }
55
56 void cpTimeOut(const char * searchdir, const char* pattern, Int_t timeOut=10, Bool_t copyLocal)
57 {
58
59   gSystem->Setenv("XRDCLIENTMAXWAIT",Form("%d",timeOut));
60   gEnv->SetValue("XNet.RequestTimeout", timeOut);
61   gEnv->SetValue("XNet.ConnectTimeout", timeOut);
62   gEnv->SetValue("XNet.TransactionTimeout", timeOut);
63   TFile::SetOpenTimeout(timeOut);
64
65   TGrid::Connect("alien");
66
67   TString filelist;
68   TString command;
69   command = Form("find %s/ %s", searchdir, pattern);
70   cerr<<"command: "<<command<<endl;
71   TGridResult *res = gGrid->Command(command);
72   if (!res) return;
73   res->Print();
74   TIter nextmap(res);
75   TMap *map = 0;
76
77   ofstream outputFile;
78   outputFile.open(Form("calib.list"));
79   Int_t counter=0;
80
81   while((map=(TMap*)nextmap())) 
82   {
83     TObjString *objs = dynamic_cast<TObjString*>(map->GetValue("turl"));
84     if (!objs || !objs->GetString().Length())
85     {
86       delete res;
87       break;
88     }
89
90     TString src=Form("%s",objs->GetString().Data());
91     TString dst=src;
92     Bool_t result = kTRUE;
93     if (copyLocal) {
94       dst.ReplaceAll("alien:///","");
95       dst.ReplaceAll("/","_");
96       TTimeStamp s1;
97       result = TFile::Cp(src.Data(),dst.Data(),kTRUE);
98       TTimeStamp s2;
99       AliSysInfo::AddStamp(dst.Data(),counter, result);
100     }
101     if (result) {
102       counter++;
103       outputFile << dst.Data()<< endl;
104     }
105   }
106   if (copyLocal) cout<<counter<<" files copied!"<<endl;
107   else           cout<<counter<<" files registerd!"<<endl;
108   outputFile.close();
109   gSystem->Exec("mv syswatch.log syswatch_copy.log");
110   return;
111 }
112
113 void mergeInChunksTXT(const char* mlist, const char* dest, int maxFiles)
114 {
115   TH1::AddDirectory(0);
116   AliFileMerger merger;
117   //  merger.SetMaxFilesOpen(999);
118   merger.AddReject("esdFriend"); // do not merge
119   //
120   if (maxFiles<2) maxFiles = 2;
121   TString filesToMerge = mlist, fileDest = dest;
122   if (filesToMerge.IsNull()) {printf("List to merge is not provided\n"); return;}
123   if (fileDest.IsNull())     {printf("Merging destination is not provided\n"); return;}
124   const char* tmpMerge[3]={"__merge_part0.root","__merge_part1.root","__part_of_calib.list"};
125   //
126   gSystem->ExpandPathName(filesToMerge);
127   ofstream outfile;
128   ifstream infile(filesToMerge.Data());
129   if (!infile) {printf("No %s file\n",filesToMerge.Data()); return;}
130   //
131   int currTmp = 0, nfiles = 0, nparts = 0; // counter for number of merging operations
132   string line;
133   TString lineS;
134   while ( !infile.eof() ) {
135     getline(infile, line); 
136     lineS = line;
137     if (lineS.IsNull() || lineS.BeginsWith("#")) continue;
138     int st = nfiles%maxFiles;
139     if (st==0) { // new chunk should be started
140       if (nfiles) { // merge prev. chunk
141         outfile.close();
142         merger.IterTXT(tmpMerge[2], tmpMerge[currTmp] ,kFALSE);
143         printf("Merging to %s | %d files at step %d\n",tmpMerge[currTmp], nfiles,nparts);
144       }
145       outfile.open(tmpMerge[2], ios::out); // start list for new chunk  
146       if (nparts++) {
147         printf("Adding previous result %s | %d files %d at part\n",tmpMerge[currTmp], nfiles,nparts);
148         outfile << tmpMerge[currTmp] << endl; // result of previous merge goes 1st
149       }
150       currTmp = (currTmp==0) ? 1:0;         // swap tmp files
151     }
152     outfile << line << endl;
153     nfiles++;
154   }
155   // merge the rest
156   merger.IterTXT(tmpMerge[2], dest ,kFALSE);
157   outfile.close();
158   infile.close();
159   for (int i=0;i<3;i++) gSystem->Exec(Form("if [ -e %s ]; then \nrm %s\nfi",tmpMerge[i],tmpMerge[i]));
160   printf("Merged %d files in %d steps\n",nfiles, nparts);
161   //
162 }
163