]> git.uio.no Git - u/mrichter/AliRoot.git/blob - PWGLF/FORWARD/trains/GridWatch.C
Mega commit of many changes to PWGLFforward
[u/mrichter/AliRoot.git] / PWGLF / FORWARD / trains / GridWatch.C
1 /**
2  * @file   GridWatch.C
3  * @author Christian Holm Christensen <cholm@master.hehi.nbi.dk>
4  * @date   Thu Jan 24 23:06:08 2013
5  * 
6  * @brief Script to watch master jobs and automatically submit
7  * terminate jobs
8  * 
9  * 
10  * @ingroup pwglf_forward_trains_helper
11  */
12 #ifndef __CINT__
13 # include <TString.h>
14 # include <TGrid.h>
15 # include <TSystem.h>
16 # include <TObjArray.h>
17 # include <iostream>
18 # include <fstream>
19 # include <TError.h>
20 # include <TDatime.h>
21 # include <TEnv.h>
22 #else 
23 class TString;
24 #endif
25 #include <TArrayI.h>
26
27 /** 
28  * Create token name 
29  * 
30  * @param name   Name
31  * @param ext    Extension
32  * @param merge  Merge state or not 
33  * 
34  * @return Formatted string
35  * @ingroup pwglf_forward_trains_helper
36  */
37 TString TokenName(const TString& name, 
38                    const TString& ext, 
39                    Bool_t merge=false)
40 {
41   return TString::Format("%s%s.%s", name.Data(), 
42                          (merge ? "_merge" : ""), ext.Data());
43 }
44 /** 
45  * Check if we have a particular file 
46  * 
47  * @param name   Base name 
48  * @param ext    Extension 
49  * @param merge  Merging stage or not 
50  * 
51  * @return true if file exits
52  * @ingroup pwglf_forward_trains_helper
53  */
54 Bool_t CheckTokens(const TString& name, 
55                    const TString& ext, 
56                    Bool_t merge=false)
57 {
58   // TSystem::AccessPathName return false if file is there 
59   return !gSystem->AccessPathName(TokenName(name, ext, merge));
60 }
61 /** 
62  * Remove a token file 
63  * 
64  * @param name   Base name 
65  * @param ext    Extension 
66  * @param merge  Merging stage or not 
67  * @ingroup pwglf_forward_trains_helper
68  */
69 void RemoveTokens(const TString& name, 
70                   const TString& ext, 
71                   Bool_t merge=false)
72 {
73   gSystem->Unlink(TokenName(name, ext, merge));
74 }
75
76 /** 
77  * Read one line of text from file and return tokens.
78  * 
79  * @param name   Base name 
80  * @param ext    Extension
81  * @param merge  If true append "_merge" to name
82  * 
83  * @return Array of tokens or null
84  *
85  * @ingroup pwglf_forward_trains_helper
86  */
87 TObjArray* ReadTokens(const TString& name, 
88                       const TString& ext, 
89                       bool merge=false) 
90 {
91   TString fn = TString::Format("%s%s.%s", name.Data(), 
92                                (merge ? "_merge" : ""), ext.Data());
93   std::ifstream in(fn.Data());
94   if (!in) { 
95     Error("ReadTokens", "Failed to open %s", fn.Data());
96     return 0;
97   }
98   TString ln;
99   ln.ReadLine(in);
100   in.close();
101   
102   if (ln.IsNull()) return 0;
103   return ln.Tokenize(" \t");
104 }
105   
106 /** 
107  * Read list of job IDs from file 
108  * 
109  * @param name   Base name 
110  * @param merge  If true append "_merge" to name
111  * 
112  * @return Array of job IDs or null
113  *
114  * @ingroup pwglf_forward_trains_helper
115  */
116 TObjArray* ReadJobIDs(const TString& name, bool merge=false)
117 {
118   return ReadTokens(name, "jobid", merge);
119 }
120
121 /** 
122  * Read list of job stages from file 
123  * 
124  * @param name   Base name 
125  * @param merge  If true append "_merge" to name
126  * 
127  * @return Array of job stages or null
128  *
129  * @ingroup pwglf_forward_trains_helper
130  */
131 TObjArray* ReadStages(const TString& name, bool merge=false)
132 {
133   return ReadTokens(name, "stage", merge);
134 }
135
136 /** 
137  * Parse the job IDs into an array of integers 
138  * 
139  * @param jobIds List of jobs
140  * @param ret    Return array
141  * 
142  * @return true on success
143  *
144  * @ingroup pwglf_forward_trains_helper
145  */
146 Bool_t ParseJobIDs(const TObjArray* jobIds, TArrayI& ret)
147 {
148   if (!jobIds) return false;
149
150   Int_t n = jobIds->GetEntries();
151   ret.Set(n);
152   ret.Reset(-1);
153
154   for (Int_t i = 0; i < n; i++) { 
155     TObjString*    id = static_cast<TObjString*>(jobIds->At(i));
156     const TString& s  = id->String();
157     ret.SetAt(s.Atoi(), i);
158   }
159   return true;
160 }
161
162 /** 
163  * Parse string representing status and return human-readable string 
164  * 
165  * @param status Return from ps command
166  * @param out    Output
167  * 
168  * @return true on success
169  *
170  * @ingroup pwglf_forward_trains_helper
171  */
172 Bool_t ParseState(const TString& status, TString& out)
173 {
174   switch (status[0]) { 
175   case 'D': out = "DONE"; break;
176   case 'E': out = "ERROR"; break;
177   case 'R': out = "RUNNING"; break; 
178   case 'W': out = "WAITING"; break;
179   case 'O': out = "WAITING_QUOTA"; break;
180   case 'A': out = "ASSIGNED"; break;
181   case 'S': out = "STARTED" ; break;
182   case 'I': out = "INSERTING"; break;
183   case 'K': out = "KILLED"; break;
184   default:  out = "UNKNOWN"; return false;
185   }
186   if (status[1] != '\0' && 
187       (status[0] != 'O' || status[0] != 'S')) { 
188     out.Append("_");
189     switch (status[1]) { 
190     case 'S': out.Append(status[0] == 'E' ? "SUBMIT" : "SPLIT"); break;
191     case 'X': out.Append("EXPIRED"); break;
192     case 'A': out.Append("ASSIGNING"); break;
193     case 'E': out.Append("EXECUTING"); break;
194     case 'V': 
195       if (status[0] == 'S') out = "SAVING"; 
196       else out.Append("VALIDATING"); 
197       break;
198     case 'd': 
199       if (status[0] == 'I') {
200         out = "INTERACTIVE_IDLE";
201         break;
202       } // Fall through on else 
203     case 'a': 
204       if (status[0] == 'I') {
205         out = "INTERACTIVE_USED";
206         break;
207       } // Fall through on else
208     default:  out.Append("UNKNOWN"); return false;
209     }
210     if (status[2] != '\0') { 
211       switch (status[2]) {
212       case 'V': if (status[0] == 'E') out.ReplaceAll("SUBMIT", "SAVING");
213         break;
214       default: out.Append("_UNKNOWN");
215       }
216     }
217   }
218   return true;
219 }  
220
221 /** 
222  * Get the job state 
223  * 
224  * @param jobId Job status 
225  * @param out   Output status string 
226  * 
227  * @return true on success
228  *
229  * @ingroup pwglf_forward_trains_helper
230  */
231 Bool_t GetJobState(Int_t jobId, TString& out) 
232 {
233   out = "MISSING";
234
235   TString fn("gridMonitor");
236   FILE* fp = gSystem->TempFileName(fn);
237
238   gSystem->RedirectOutput(fn);
239   gGrid->Command("ps -Ax");
240   gGrid->Stdout();
241   gSystem->RedirectOutput(0);
242   gGrid->Stderr();
243
244   fclose(fp);
245
246   std::ifstream in(fn.Data());
247
248   while (!in.eof()) { 
249     TString l;
250     l.ReadLine(in);
251     if (in.bad()) break;
252
253     TObjArray* tokens = l.Tokenize(" \t");
254     if (tokens->GetEntries() < 2) break;
255
256     //TString  user   = tokens->At(0)->GetName(); 
257     TString    sjid   = tokens->At(1)->GetName(); // Job ID
258     TString    stat   = tokens->At(2)->GetName(); // State 
259     Int_t      jid    = sjid.Atoi();
260     
261     if (jid != jobId) continue;
262
263     ParseState(stat, out);
264     break;
265   }
266
267   in.close();
268   gSystem->Unlink(fn);
269
270   return true;
271 }
272
273 /** 
274  * Get the job states
275  * 
276  * @param jobs   List of job IDs
277  * @param states On return the states
278  * 
279  * @return true on success
280  *
281  * @ingroup pwglf_forward_trains_helper
282  */
283 Bool_t GetJobStates(const TArrayI& jobs, TObjArray& states)
284 {
285   Int_t n = jobs.GetSize();
286   states.Expand(n);
287   for (Int_t i = 0; i < n; i++) {
288     TObjString* s = static_cast<TObjString*>(states.At(i));
289     if (!s) states.AddAt(s = new TObjString(""), i);
290     s->SetString("MISSING");
291   }
292   
293   TString fn("gridMonitor");
294   FILE* fp = gSystem->TempFileName(fn);
295
296   gSystem->RedirectOutput(fn);
297   gGrid->Command("ps -Ax");
298   gGrid->Stdout();
299   gSystem->RedirectOutput(0);
300   gGrid->Stderr();
301
302   fclose(fp);
303
304   std::ifstream in(fn.Data());
305
306   while (!in.eof()) {
307     TString l;
308     l.ReadLine(in);
309     if (in.bad()) break;
310     if (l.IsNull()) continue;
311
312     TObjArray* tokens = l.Tokenize(" \t");
313     if (tokens->GetEntries() < 3) { 
314       Warning("GetJobStates", "Got too few tokens (%d): %s", 
315               tokens->GetEntries(), l.Data());
316       tokens->Print();
317       break;
318     }
319
320     //TString  user   = tokens->At(0)->GetName(); 
321     TString    sjid   = tokens->At(1)->GetName(); // Job ID
322     TString    stat   = tokens->At(2)->GetName(); // State 
323     Int_t      jid    = sjid.Atoi();
324     
325     for (Int_t i = 0; i < n; i++) { 
326       if (jid != jobs.At(i)) continue;
327       TObjString* s = static_cast<TObjString*>(states.At(i));
328       TString out;
329       if (!ParseState(stat, out)) continue;
330       s->SetString(out);
331     }
332   }
333
334   in.close();
335   gSystem->Unlink(fn);
336
337   return true;
338 }
339
340
341 /** 
342  * Wait of jobs to finish 
343  * 
344  * @param jobs    List of jobs
345  * @param stages  Stages
346  * @param delay   Delay for check
347  * 
348  * @return true on success, false otherwise
349  *
350  * @ingroup pwglf_forward_trains_helper
351  */
352 Bool_t WaitForJobs(TArrayI&   jobs, 
353                    TObjArray* stages, 
354                    Int_t      delay,
355                    Bool_t     batch)
356 {
357   Bool_t stopped = false;
358   TFileHandler h(0, 0x1);
359   UInt_t start = 0;
360   do { 
361     Bool_t allDone = true;
362     TDatime t;
363     Printf("--- %4d/%02d/%02d %02d:%02d:%02d [Press enter to pause] ---", 
364            t.GetYear(), t.GetMonth(), t.GetDay(), 
365            t.GetHour(), t.GetMinute(), t.GetSecond());
366     UInt_t now = t.Convert(true);
367     if (start <= 0) start = now;
368
369     TObjArray states;
370     GetJobStates(jobs, states);
371
372     Int_t missing = 0;
373     Int_t total   = jobs.GetSize();
374     // Bool_t allAccounted = false;
375     for (Int_t i = 0; i < total; i++) { 
376       Int_t job = jobs.At(i);
377
378       if (job < 0) continue;
379
380       TObjString* obj = static_cast<TObjString*>(states.At(i));
381       const TString& state = obj->String();
382       
383       if (state.BeginsWith("ERROR_"))
384         jobs.SetAt(-1, i);
385       else if (state.EqualTo("MISSING")) 
386         missing++;
387       else if (!state.EqualTo("DONE")) 
388         allDone = false;
389       
390
391       Printf(" %d(%s)=%s", job, stages->At(i)->GetName(), state.Data());
392       
393     }
394     // Try to refresh token every 6th hour
395     if ((now - start) / 60 / 60 > 6) { 
396       // Reset the start time 
397       start = now;
398       Printf("=== Refreshing AliEn token");
399       gSystem->Exec("alien-token-init");
400       Printf("=== Done refreshing AliEn token");
401     }
402
403     if (allDone) break;
404     if (missing >= total) {
405       Error("GetJobStates", "Info on all jobs missing");
406       break;
407     }
408     if (!batch) {
409       if (gSystem->Select(&h, 1000*delay)) {
410         // Got input on std::cin 
411         std::string l;
412         std::getline(std::cin, l);
413         std::cout << "Do you want to terminate now [yN]? " << std::flush;
414         std::getline(std::cin, l);
415         if (l[0] == 'y' || l[0] == 'Y') { 
416           stopped = true;
417           break;
418         }
419       }
420     }
421     else 
422       gSystem->Sleep(1000*delay);
423
424     // 
425   } while (true);
426
427   return true;
428 }
429 /** 
430  * Watch Grid for termination of main job, and submit merging jobs as needed. 
431  * 
432  * @param name   Name of the job
433  * @param delay  Delay between updates in seconds
434  *
435  * @ingroup pwglf_forward_trains_helper
436  */
437 void GridWatch(const TString& name, Bool_t batch=false, UShort_t delay=5*60)
438 {
439   gEnv->SetValue("XSec.GSI.DelegProxy", "2");
440   TGrid::Connect("alien:///");
441   if (!gGrid) { 
442     Error("GridWatch", "Failed to connect to the Grid");
443     return;
444   }
445
446   TObjArray* jobIDs = ReadJobIDs(name, false);
447   TObjArray* stages = ReadStages(name, false);
448
449   if (!jobIDs || !stages) return;
450
451   TArrayI jobs;
452   if (!ParseJobIDs(jobIDs, jobs)) return;
453
454   if (!(CheckTokens(name, "jobid", true) && 
455         CheckTokens(name, "stage", true))) 
456     WaitForJobs(jobs, stages, delay, batch);
457
458   delete jobIDs;
459   delete stages;
460
461   // return;
462   do {
463     if (!CheckTokens(name, "jobid", true) && 
464         !CheckTokens(name, "stage", true)) {
465       Printf("Now executing terminate");
466       gSystem->Exec("aliroot -l -b -q Terminate.C");
467       gSystem->Sleep(10*1000);
468     }
469     Printf("Reading job ids");
470
471     jobIDs = ReadJobIDs(name, true);
472     stages = ReadStages(name, true);
473     
474     if (!ParseJobIDs(jobIDs, jobs)) {
475       Error("GridWatch", "Failed to parse job ids %s", 
476             TokenName(name,"jobid",true).Data());
477       return;
478     }
479
480     WaitForJobs(jobs, stages, delay, batch);
481     
482     Bool_t allFinal = true;
483     for (Int_t i = 0; i < jobs.GetSize(); i++) {
484       if (jobs.At(i) < 0) continue;
485
486       const TString& s = static_cast<TObjString*>(stages->At(i))->String();
487       if (!s.BeginsWith("final_")) allFinal = false;
488     }
489     
490     delete jobIDs;
491     delete stages;
492
493     Printf("All jobs in final stage");
494     if (allFinal) break;
495
496     RemoveTokens(name, "jobid", true);
497     RemoveTokens(name, "stage", true);
498   } while (true);
499
500   Printf("Finished");
501 }
502 //
503 // EOF
504 //
505