]> git.uio.no Git - u/mrichter/AliRoot.git/blob - PWGLF/FORWARD/trains/GridWatch.C
Merge branch 'master' of https://git.cern.ch/reps/AliRoot
[u/mrichter/AliRoot.git] / PWGLF / FORWARD / trains / GridWatch.C
1 /**
2  * @file   GridWatch.C
3  * @author Christian Holm Christensen <cholm@master.hehi.nbi.dk>
4  * @date   Thu Jan 24 23:06:08 2013
5  * 
6  * @brief Script to watch master jobs and automatically submit
7  * terminate jobs
8  * 
9  * 
10  * @ingroup pwglf_forward_trains_helper
11  */
12 #ifndef __CINT__
13 # include <TString.h>
14 # include <TGrid.h>
15 # include <TSystem.h>
16 # include <TObjArray.h>
17 # include <iostream>
18 # include <fstream>
19 # include <TError.h>
20 # include <TDatime.h>
21 # include <TEnv.h>
22 #else 
23 class TString;
24 #endif
25 #include <TArrayI.h>
26
27 /** 
28  * Create token name 
29  * 
30  * @param name   Name
31  * @param ext    Extension
32  * @param merge  Merge state or not 
33  * 
34  * @return Formatted string
35  * @ingroup pwglf_forward_trains_helper
36  */
37 TString CacheFileName(const TString& name, 
38                       const TString& ext, 
39                       Bool_t merge=false)
40 {
41   return TString::Format("%s%s.%s", name.Data(), 
42                          (merge ? "_merge" : ""), ext.Data());
43 }
44 /** 
45  * Check if we have a particular file 
46  * 
47  * @param name   Base name 
48  * @param ext    Extension 
49  * @param merge  Merging stage or not 
50  * 
51  * @return true if file exits
52  * @ingroup pwglf_forward_trains_helper
53  */
54 Bool_t CheckCacheFile(const TString& name, 
55                       const TString& ext, 
56                       Bool_t merge=false)
57 {
58   // TSystem::AccessPathName return false if file is there 
59   return !gSystem->AccessPathName(CacheFileName(name, ext, merge));
60 }
61 /** 
62  * Remove a token file 
63  * 
64  * @param name   Base name 
65  * @param ext    Extension 
66  * @param merge  Merging stage or not 
67  * @ingroup pwglf_forward_trains_helper
68  */
69 void RemoveCacheFile(const TString& name, 
70                   const TString& ext, 
71                   Bool_t merge=false)
72 {
73   gSystem->Unlink(CacheFileName(name, ext, merge));
74 }
75
76 /** 
77  * Read one line of text from file and return tokens.
78  * 
79  * @param name   Base name 
80  * @param ext    Extension
81  * @param merge  If true append "_merge" to name
82  * 
83  * @return Array of tokens or null
84  *
85  * @ingroup pwglf_forward_trains_helper
86  */
87 TObjArray* ReadCacheFile(const TString& name, 
88                       const TString& ext, 
89                       bool merge=false) 
90 {
91   TString fn = TString::Format("%s%s.%s", name.Data(), 
92                                (merge ? "_merge" : ""), ext.Data());
93   std::ifstream in(fn.Data());
94   if (!in) { 
95     Error("ReadCacheFile", "Failed to open %s", fn.Data());
96     return 0;
97   }
98   TString ln;
99   ln.ReadLine(in);
100   in.close();
101   
102   if (ln.IsNull()) return 0;
103   return ln.Tokenize(" \t");
104 }
105   
106 /** 
107  * Read list of job IDs from file 
108  * 
109  * @param name   Base name 
110  * @param merge  If true append "_merge" to name
111  * 
112  * @return Array of job IDs or null
113  *
114  * @ingroup pwglf_forward_trains_helper
115  */
116 TObjArray* ReadJobIDs(const TString& name, bool merge=false)
117 {
118   return ReadCacheFile(name, "jobid", merge);
119 }
120
121 /** 
122  * Read list of job stages from file 
123  * 
124  * @param name   Base name 
125  * @param merge  If true append "_merge" to name
126  * 
127  * @return Array of job stages or null
128  *
129  * @ingroup pwglf_forward_trains_helper
130  */
131 TObjArray* ReadStages(const TString& name, bool merge=false)
132 {
133   return ReadCacheFile(name, "stage", merge);
134 }
135
136 /** 
137  * Parse the job IDs into an array of integers 
138  * 
139  * @param jobIds List of jobs
140  * @param ret    Return array
141  * 
142  * @return true on success
143  *
144  * @ingroup pwglf_forward_trains_helper
145  */
146 Bool_t ParseJobIDs(const TObjArray* jobIds, TArrayI& ret)
147 {
148   if (!jobIds) return false;
149
150   Int_t n = jobIds->GetEntries();
151   ret.Set(n);
152   ret.Reset(-1);
153
154   for (Int_t i = 0; i < n; i++) { 
155     TObjString*    id = static_cast<TObjString*>(jobIds->At(i));
156     const TString& s  = id->String();
157     ret.SetAt(s.Atoi(), i);
158   }
159   return true;
160 }
161
162 /** 
163  * Parse string representing status and return human-readable string 
164  * 
165  * @param status Return from ps command
166  * @param out    Output
167  * 
168  * @return true on success
169  *
170  * @ingroup pwglf_forward_trains_helper
171  */
172 Bool_t ParseState(const TString& status, TString& out)
173 {
174   switch (status[0]) { 
175   case 'D': out = "DONE"; break;
176   case 'E': out = "ERROR"; break;
177   case 'R': out = "RUNNING"; break; 
178   case 'W': out = "WAITING"; break;
179   case 'O': out = "WAITING_QUOTA"; break;
180   case 'A': out = "ASSIGNED"; break;
181   case 'S': out = "STARTED" ; break;
182   case 'I': out = "INSERTING"; break;
183   case 'K': out = "KILLED"; break;
184   default:  out = "UNKNOWN"; return false;
185   }
186   if (status[1] != '\0' && 
187       (status[0] != 'O' || status[0] != 'S')) { 
188     out.Append("_");
189     switch (status[1]) { 
190     case 'S': out.Append(status[0] == 'E' ? "SUBMIT" : "SPLIT"); break;
191     case 'X': out.Append("EXPIRED"); break;
192     case 'A': out.Append("ASSIGNING"); break;
193     case 'E': out.Append("EXECUTING"); break;
194     case 'V': 
195       if (status[0] == 'S') out = "SAVING"; 
196       else out.Append("VALIDATING"); 
197       break;
198     case 'd': 
199       if (status[0] == 'I') {
200         out = "INTERACTIVE_IDLE";
201         break;
202       } // Fall through on else 
203     case 'a': 
204       if (status[0] == 'I') {
205         out = "INTERACTIVE_USED";
206         break;
207       } // Fall through on else
208     default:  out.Append("UNKNOWN"); return false;
209     }
210     if (status[2] != '\0') { 
211       switch (status[2]) {
212       case 'V': if (status[0] == 'E') out.ReplaceAll("SUBMIT", "SAVING");
213         break;
214       default: out.Append("_UNKNOWN");
215       }
216     }
217   }
218   return true;
219 }  
220
221 /** 
222  * Do a PS on the grid 
223  * 
224  * @param tmp The file generated 
225  * 
226  * @return true on success
227  */
228 Bool_t GridPs(TString& tmp)
229 {
230   tmp = "gridMonitor";
231   FILE* fp = gSystem->TempFileName(tmp);
232
233 #if 0
234   // Here, we'd ideally use TGrid::Ps but that doesn't work, so we use
235   // the shell instead. 
236   gSystem->RedirectOutput(fn);
237   gGrid->Command("ps -Ax");
238   gGrid->Stdout();
239   gSystem->RedirectOutput(0);
240   gGrid->Stderr();
241   fclose(fp);
242 #else
243   fclose(fp);
244   
245   // Printf("Using gbbox ps -Ax >> %s", tmp.Data());
246   gSystem->Exec(Form("gbbox ps -Ax >> %s", tmp.Data()));
247 #endif
248   return true;
249 }
250
251 /** 
252  * Get the job state 
253  * 
254  * @param jobId Job status 
255  * @param out   Output status string 
256  * 
257  * @return true on success
258  *
259  * @ingroup pwglf_forward_trains_helper
260  */
261 Bool_t GetJobState(Int_t jobId, TString& out) 
262 {
263   out = "MISSING";
264
265   TString fn;
266   GridPs(fn);
267
268   std::ifstream in(fn.Data());
269
270   while (!in.eof()) { 
271     TString l;
272     l.ReadLine(in);
273     if (in.bad()) break;
274
275     TObjArray* tokens = l.Tokenize(" \t");
276     if (tokens->GetEntries() < 2) break;
277
278     //TString  user   = tokens->At(0)->GetName(); 
279     TString    sjid   = tokens->At(1)->GetName(); // Job ID
280     TString    stat   = tokens->At(2)->GetName(); // State 
281     Int_t      jid    = sjid.Atoi();
282     
283     if (jid != jobId) continue;
284
285     ParseState(stat, out);
286     break;
287   }
288
289   in.close();
290   gSystem->Unlink(fn);
291
292   return true;
293 }
294
295 /** 
296  * Get the job states
297  * 
298  * @param jobs   List of job IDs
299  * @param states On return the states
300  * 
301  * @return true on success
302  *
303  * @ingroup pwglf_forward_trains_helper
304  */
305 Bool_t GetJobStates(const TArrayI& jobs, TObjArray& states)
306 {
307   Int_t n = jobs.GetSize();
308   states.Expand(n);
309   for (Int_t i = 0; i < n; i++) {
310     TObjString* s = static_cast<TObjString*>(states.At(i));
311     if (!s) states.AddAt(s = new TObjString(""), i);
312     s->SetString("MISSING");
313   }
314
315   TString fn;
316   GridPs(fn);
317
318   std::ifstream in(fn.Data());
319
320   while (!in.eof()) {
321     TString l;
322     l.ReadLine(in);
323     if (in.bad()) break;
324     if (l.IsNull()) continue;
325
326     TObjArray* tokens = l.Tokenize(" \t");
327     if (tokens->GetEntries() < 3) { 
328       Warning("GetJobStates", "Got too few tokens (%d): %s", 
329               tokens->GetEntries(), l.Data());
330       tokens->Print();
331       break;
332     }
333
334     //TString  user   = tokens->At(0)->GetName(); 
335     TString    sjid   = tokens->At(1)->GetName(); // Job ID
336     TString    stat   = tokens->At(2)->GetName(); // State 
337     Int_t      jid    = sjid.Atoi();
338     
339     for (Int_t i = 0; i < n; i++) { 
340       if (jid != jobs.At(i)) continue;
341       TObjString* s = static_cast<TObjString*>(states.At(i));
342       TString out;
343       if (!ParseState(stat, out)) continue;
344       s->SetString(out);
345     }
346   }
347
348   in.close();
349   gSystem->Unlink(fn);
350
351   return true;
352 }
353
354 /** 
355  * Check if the AliEn token is valid 
356  * 
357  * 
358  * @return true if it is 
359  */
360 Bool_t CheckAlienToken()
361 {
362   Int_t ret = gSystem->Exec("alien-token-info > /dev/null 2>&1");
363   if (ret != 0) {
364     Printf("=== AliEn token not valid");
365     return false;
366   }
367   return true;
368 }
369
370 /** 
371  * Refersh the grid token every 6th hour
372  * 
373  * @param now 
374  * @param force 
375  */
376 #if 0
377 void RefreshAlienToken(UInt_t, Bool_t f=false)
378 {}
379 #else
380 void RefreshAlienToken(UInt_t now, Bool_t force=false)
381 {
382   Bool_t renew = force;
383   if (!renew && !CheckAlienToken()) renew = true;
384
385   if (!renew) {
386     TString l = gSystem->GetFromPipe(Form("cat /tmp/gclient_token_%d",
387                                           gSystem->GetUid()));
388     TObjArray*  lines  = l.Tokenize("\n");
389     TObjString* sline  = 0;
390     UInt_t      expire = 0;
391     TIter       next(lines);
392     while ((sline = static_cast<TObjString*>(next()))) {
393       TString& line = sline->String();
394       if (!line.BeginsWith("Expiretime")) continue;
395
396       Size_t  eq      = line.Index("=");
397       TString sdatime = line(eq+2, line.Length()-eq-2);
398       expire          = sdatime.Atoi();
399       break;
400     }
401     lines->Delete();
402     // If the expiration date/time has passed or is less than 30 min
403     // away, we refresh
404     Int_t diff = (expire - now);
405     if (now > expire || diff < 30*60) renew = true;
406
407     Printf("=== Now: %d, Expires: %d, in %03d:%02d:%02d -> %s", 
408            now, expire, diff/60/60, (diff/60 % 60), (diff % 60), 
409             (renew ? "renew" : "nothing"));
410            
411   }
412
413   if (!renew) return;
414
415   // Reset the start time 
416   Printf("=== Refreshing AliEn token");
417   gSystem->Exec("alien-token-init");
418   Printf("=== Done refreshing AliEn token");
419 }
420 #endif
421
422
423 /** 
424  * Wait of jobs to finish 
425  * 
426  * @param jobs    List of jobs
427  * @param stages  Stages
428  * @param delay   Delay for check
429  * @param batch   If true, do not prompt 
430  * 
431  * @return true on success, false otherwise
432  *
433  * @ingroup pwglf_forward_trains_helper
434  */
435 Bool_t WaitForJobs(TArrayI&   jobs, 
436                    TObjArray* stages, 
437                    Int_t      delay,
438                    Bool_t     batch)
439 {
440   if (!CheckAlienToken()) return false;
441   // Bool_t stopped = false;
442   TFileHandler h(0, 0x1);
443   // RefreshAlienToken(0, true);
444   do { 
445     Bool_t allDone = true;
446     TDatime t;
447     Printf("--- %4d/%02d/%02d %02d:%02d:%02d [Press enter to pause] ---", 
448            t.GetYear(), t.GetMonth(), t.GetDay(), 
449            t.GetHour(), t.GetMinute(), t.GetSecond());
450     UInt_t now = t.Convert(true);
451
452     TObjArray states;
453     GetJobStates(jobs, states);
454
455     Int_t missing = 0;
456     Int_t total   = jobs.GetSize();
457     // Bool_t allAccounted = false;
458     for (Int_t i = 0; i < total; i++) { 
459       Int_t job = jobs.At(i);
460
461       if (job < 0) continue;
462
463       TObjString* obj = static_cast<TObjString*>(states.At(i));
464       const TString& state = obj->String();
465       
466       if (state.BeginsWith("ERROR_"))
467         jobs.SetAt(-1, i);
468       else if (state.EqualTo("MISSING")) 
469         missing++;
470       else if (!state.EqualTo("DONE")) 
471         allDone = false;
472       
473
474       Printf(" %d(%s)=%s", job, stages->At(i)->GetName(), state.Data());
475       
476     }
477     RefreshAlienToken(now);
478
479     if (allDone) break;
480     if (missing >= total) {
481       Error("GetJobStates", "Info on all jobs missing");
482       break;
483     }
484     if (!batch) {
485       if (gSystem->Select(&h, 1000*delay)) {
486         // Got input on std::cin 
487         std::string l;
488         std::getline(std::cin, l);
489         std::cout << "Do you want to terminate now [yN]? " << std::flush;
490         std::getline(std::cin, l);
491         if (l[0] == 'y' || l[0] == 'Y') { 
492           // stopped = true;
493           break;
494         }
495       }
496     }
497     else 
498       gSystem->Sleep(1000*delay);
499
500     // 
501   } while (true);
502
503   return true;
504 }
505 /** 
506  * Watch Grid for termination of main job, and submit merging jobs as needed. 
507  * 
508  * @param name   Name of the job
509  * @param batch  If true, do not prompt 
510  * @param delay  Delay between updates in seconds
511  *
512  * @ingroup pwglf_forward_trains_helper
513  */
514 void GridWatch(const TString& name, Bool_t batch=false, UShort_t delay=5*60)
515 {
516 #if 1
517   // We use command line tools instead of ROOT interface - which is
518   // broken so badly that it's hard to believe it ever worked.
519   gEnv->SetValue("XSec.GSI.DelegProxy", "2");
520   TGrid::Connect("alien:///");
521   if (!gGrid) { 
522     Error("GridWatch", "Failed to connect to the Grid");
523     return;
524   }
525 #endif
526
527   TObjArray* jobIDs = ReadJobIDs(name, false);
528   TObjArray* stages = ReadStages(name, false);
529
530   if (!jobIDs || !stages) return;
531
532   TArrayI jobs;
533   if (!ParseJobIDs(jobIDs, jobs)) return;
534
535   gSystem->Sleep(10*1000);
536   if (!(CheckCacheFile(name, "jobid", true) && 
537         CheckCacheFile(name, "stage", true))) 
538     if (!WaitForJobs(jobs, stages, delay, batch)) return;
539
540   delete jobIDs;
541   delete stages;
542
543   // return;
544   do {
545     if (!CheckCacheFile(name, "jobid", true) && 
546         !CheckCacheFile(name, "stage", true)) {
547       if (!CheckAlienToken()) return;
548       Printf("Now executing terminate");
549       gSystem->Exec("aliroot -l -b -q Terminate.C");
550       gSystem->Sleep(10*1000);
551     }
552
553     Printf("Reading job ids");
554     jobIDs = ReadJobIDs(name, true);
555     stages = ReadStages(name, true);
556     
557     if (!ParseJobIDs(jobIDs, jobs)) {
558       Error("GridWatch", "Failed to parse job ids %s", 
559             CacheFileName(name,"jobid",true).Data());
560       return;
561     }
562
563     if (!WaitForJobs(jobs, stages, delay, batch)) return;
564     
565     Bool_t allFinal = true;
566     for (Int_t i = 0; i < jobs.GetSize(); i++) {
567       if (jobs.At(i) < 0) continue;
568
569       const TString& s = static_cast<TObjString*>(stages->At(i))->String();
570       if (!s.BeginsWith("final_")) allFinal = false;
571     }
572     
573     delete jobIDs;
574     delete stages;
575
576     Printf("All jobs in final stage");
577     if (allFinal) break;
578
579     RemoveCacheFile(name, "jobid", true);
580     RemoveCacheFile(name, "stage", true);
581   } while (true);
582
583   Printf("Finished");
584 }
585 //
586 // EOF
587 //
588