]>
Commit | Line | Data |
---|---|---|
08275d05 | 1 | /** |
2 | * @file GridWatch.C | |
3 | * @author Christian Holm Christensen <cholm@master.hehi.nbi.dk> | |
4 | * @date Thu Jan 24 23:06:08 2013 | |
5 | * | |
6 | * @brief Script to watch master jobs and automatically submit | |
7 | * terminate jobs | |
8 | * | |
9 | * | |
33438b4c | 10 | * @ingroup pwglf_forward_trains_helper |
08275d05 | 11 | */ |
12 | #ifndef __CINT__ | |
13 | # include <TString.h> | |
14 | # include <TGrid.h> | |
15 | # include <TSystem.h> | |
16 | # include <TObjArray.h> | |
17 | # include <iostream> | |
18 | # include <fstream> | |
19 | # include <TError.h> | |
20 | # include <TDatime.h> | |
21 | # include <TEnv.h> | |
22 | #else | |
23 | class TString; | |
24 | #endif | |
25 | #include <TArrayI.h> | |
26 | ||
33438b4c | 27 | /** |
28 | * Create token name | |
29 | * | |
30 | * @param name Name | |
31 | * @param ext Extension | |
32 | * @param merge Merge state or not | |
33 | * | |
34 | * @return Formatted string | |
35 | * @ingroup pwglf_forward_trains_helper | |
36 | */ | |
aeab3a4e | 37 | TString CacheFileName(const TString& name, |
38 | const TString& ext, | |
39 | Bool_t merge=false) | |
08275d05 | 40 | { |
41 | return TString::Format("%s%s.%s", name.Data(), | |
42 | (merge ? "_merge" : ""), ext.Data()); | |
43 | } | |
33438b4c | 44 | /** |
45 | * Check if we have a particular file | |
46 | * | |
47 | * @param name Base name | |
48 | * @param ext Extension | |
49 | * @param merge Merging stage or not | |
50 | * | |
51 | * @return true if file exits | |
52 | * @ingroup pwglf_forward_trains_helper | |
53 | */ | |
aeab3a4e | 54 | Bool_t CheckCacheFile(const TString& name, |
55 | const TString& ext, | |
56 | Bool_t merge=false) | |
08275d05 | 57 | { |
58 | // TSystem::AccessPathName return false if file is there | |
aeab3a4e | 59 | return !gSystem->AccessPathName(CacheFileName(name, ext, merge)); |
08275d05 | 60 | } |
33438b4c | 61 | /** |
62 | * Remove a token file | |
63 | * | |
64 | * @param name Base name | |
65 | * @param ext Extension | |
66 | * @param merge Merging stage or not | |
67 | * @ingroup pwglf_forward_trains_helper | |
68 | */ | |
aeab3a4e | 69 | void RemoveCacheFile(const TString& name, |
08275d05 | 70 | const TString& ext, |
71 | Bool_t merge=false) | |
72 | { | |
aeab3a4e | 73 | gSystem->Unlink(CacheFileName(name, ext, merge)); |
08275d05 | 74 | } |
75 | ||
76 | /** | |
77 | * Read one line of text from file and return tokens. | |
78 | * | |
79 | * @param name Base name | |
80 | * @param ext Extension | |
81 | * @param merge If true append "_merge" to name | |
82 | * | |
83 | * @return Array of tokens or null | |
33438b4c | 84 | * |
85 | * @ingroup pwglf_forward_trains_helper | |
08275d05 | 86 | */ |
aeab3a4e | 87 | TObjArray* ReadCacheFile(const TString& name, |
08275d05 | 88 | const TString& ext, |
89 | bool merge=false) | |
90 | { | |
91 | TString fn = TString::Format("%s%s.%s", name.Data(), | |
92 | (merge ? "_merge" : ""), ext.Data()); | |
93 | std::ifstream in(fn.Data()); | |
94 | if (!in) { | |
aeab3a4e | 95 | Error("ReadCacheFile", "Failed to open %s", fn.Data()); |
08275d05 | 96 | return 0; |
97 | } | |
98 | TString ln; | |
99 | ln.ReadLine(in); | |
100 | in.close(); | |
101 | ||
102 | if (ln.IsNull()) return 0; | |
103 | return ln.Tokenize(" \t"); | |
104 | } | |
105 | ||
106 | /** | |
107 | * Read list of job IDs from file | |
108 | * | |
109 | * @param name Base name | |
110 | * @param merge If true append "_merge" to name | |
111 | * | |
112 | * @return Array of job IDs or null | |
33438b4c | 113 | * |
114 | * @ingroup pwglf_forward_trains_helper | |
08275d05 | 115 | */ |
116 | TObjArray* ReadJobIDs(const TString& name, bool merge=false) | |
117 | { | |
aeab3a4e | 118 | return ReadCacheFile(name, "jobid", merge); |
08275d05 | 119 | } |
120 | ||
121 | /** | |
122 | * Read list of job stages from file | |
123 | * | |
124 | * @param name Base name | |
125 | * @param merge If true append "_merge" to name | |
126 | * | |
127 | * @return Array of job stages or null | |
33438b4c | 128 | * |
129 | * @ingroup pwglf_forward_trains_helper | |
08275d05 | 130 | */ |
131 | TObjArray* ReadStages(const TString& name, bool merge=false) | |
132 | { | |
aeab3a4e | 133 | return ReadCacheFile(name, "stage", merge); |
08275d05 | 134 | } |
135 | ||
136 | /** | |
137 | * Parse the job IDs into an array of integers | |
138 | * | |
139 | * @param jobIds List of jobs | |
140 | * @param ret Return array | |
141 | * | |
142 | * @return true on success | |
33438b4c | 143 | * |
144 | * @ingroup pwglf_forward_trains_helper | |
08275d05 | 145 | */ |
146 | Bool_t ParseJobIDs(const TObjArray* jobIds, TArrayI& ret) | |
147 | { | |
148 | if (!jobIds) return false; | |
149 | ||
150 | Int_t n = jobIds->GetEntries(); | |
151 | ret.Set(n); | |
152 | ret.Reset(-1); | |
153 | ||
154 | for (Int_t i = 0; i < n; i++) { | |
155 | TObjString* id = static_cast<TObjString*>(jobIds->At(i)); | |
156 | const TString& s = id->String(); | |
157 | ret.SetAt(s.Atoi(), i); | |
158 | } | |
159 | return true; | |
160 | } | |
161 | ||
162 | /** | |
163 | * Parse string representing status and return human-readable string | |
164 | * | |
165 | * @param status Return from ps command | |
166 | * @param out Output | |
167 | * | |
168 | * @return true on success | |
33438b4c | 169 | * |
170 | * @ingroup pwglf_forward_trains_helper | |
08275d05 | 171 | */ |
172 | Bool_t ParseState(const TString& status, TString& out) | |
173 | { | |
174 | switch (status[0]) { | |
175 | case 'D': out = "DONE"; break; | |
176 | case 'E': out = "ERROR"; break; | |
177 | case 'R': out = "RUNNING"; break; | |
178 | case 'W': out = "WAITING"; break; | |
179 | case 'O': out = "WAITING_QUOTA"; break; | |
180 | case 'A': out = "ASSIGNED"; break; | |
181 | case 'S': out = "STARTED" ; break; | |
182 | case 'I': out = "INSERTING"; break; | |
183 | case 'K': out = "KILLED"; break; | |
184 | default: out = "UNKNOWN"; return false; | |
185 | } | |
186 | if (status[1] != '\0' && | |
187 | (status[0] != 'O' || status[0] != 'S')) { | |
188 | out.Append("_"); | |
189 | switch (status[1]) { | |
190 | case 'S': out.Append(status[0] == 'E' ? "SUBMIT" : "SPLIT"); break; | |
191 | case 'X': out.Append("EXPIRED"); break; | |
192 | case 'A': out.Append("ASSIGNING"); break; | |
193 | case 'E': out.Append("EXECUTING"); break; | |
194 | case 'V': | |
195 | if (status[0] == 'S') out = "SAVING"; | |
196 | else out.Append("VALIDATING"); | |
197 | break; | |
198 | case 'd': | |
199 | if (status[0] == 'I') { | |
200 | out = "INTERACTIVE_IDLE"; | |
201 | break; | |
202 | } // Fall through on else | |
203 | case 'a': | |
204 | if (status[0] == 'I') { | |
205 | out = "INTERACTIVE_USED"; | |
206 | break; | |
207 | } // Fall through on else | |
208 | default: out.Append("UNKNOWN"); return false; | |
209 | } | |
210 | if (status[2] != '\0') { | |
211 | switch (status[2]) { | |
212 | case 'V': if (status[0] == 'E') out.ReplaceAll("SUBMIT", "SAVING"); | |
213 | break; | |
214 | default: out.Append("_UNKNOWN"); | |
215 | } | |
216 | } | |
217 | } | |
218 | return true; | |
219 | } | |
220 | ||
635bda19 | 221 | /** |
222 | * Do a PS on the grid | |
223 | * | |
224 | * @param tmp The file generated | |
225 | * | |
226 | * @return true on success | |
227 | */ | |
228 | Bool_t GridPs(TString& tmp) | |
229 | { | |
230 | tmp = "gridMonitor"; | |
231 | FILE* fp = gSystem->TempFileName(tmp); | |
232 | ||
233 | #if 0 | |
234 | // Here, we'd ideally use TGrid::Ps but that doesn't work, so we use | |
235 | // the shell instead. | |
236 | gSystem->RedirectOutput(fn); | |
237 | gGrid->Command("ps -Ax"); | |
238 | gGrid->Stdout(); | |
239 | gSystem->RedirectOutput(0); | |
240 | gGrid->Stderr(); | |
241 | fclose(fp); | |
242 | #else | |
243 | fclose(fp); | |
244 | ||
245 | // Printf("Using gbbox ps -Ax >> %s", tmp.Data()); | |
246 | gSystem->Exec(Form("gbbox ps -Ax >> %s", tmp.Data())); | |
247 | #endif | |
248 | return true; | |
249 | } | |
250 | ||
08275d05 | 251 | /** |
252 | * Get the job state | |
253 | * | |
254 | * @param jobId Job status | |
255 | * @param out Output status string | |
256 | * | |
257 | * @return true on success | |
33438b4c | 258 | * |
259 | * @ingroup pwglf_forward_trains_helper | |
08275d05 | 260 | */ |
261 | Bool_t GetJobState(Int_t jobId, TString& out) | |
262 | { | |
263 | out = "MISSING"; | |
264 | ||
635bda19 | 265 | TString fn; |
266 | GridPs(fn); | |
08275d05 | 267 | |
268 | std::ifstream in(fn.Data()); | |
269 | ||
270 | while (!in.eof()) { | |
271 | TString l; | |
272 | l.ReadLine(in); | |
273 | if (in.bad()) break; | |
274 | ||
275 | TObjArray* tokens = l.Tokenize(" \t"); | |
276 | if (tokens->GetEntries() < 2) break; | |
277 | ||
278 | //TString user = tokens->At(0)->GetName(); | |
279 | TString sjid = tokens->At(1)->GetName(); // Job ID | |
280 | TString stat = tokens->At(2)->GetName(); // State | |
281 | Int_t jid = sjid.Atoi(); | |
282 | ||
283 | if (jid != jobId) continue; | |
284 | ||
285 | ParseState(stat, out); | |
286 | break; | |
287 | } | |
288 | ||
289 | in.close(); | |
290 | gSystem->Unlink(fn); | |
291 | ||
292 | return true; | |
293 | } | |
294 | ||
33438b4c | 295 | /** |
296 | * Get the job states | |
297 | * | |
298 | * @param jobs List of job IDs | |
299 | * @param states On return the states | |
300 | * | |
301 | * @return true on success | |
302 | * | |
303 | * @ingroup pwglf_forward_trains_helper | |
304 | */ | |
08275d05 | 305 | Bool_t GetJobStates(const TArrayI& jobs, TObjArray& states) |
306 | { | |
307 | Int_t n = jobs.GetSize(); | |
308 | states.Expand(n); | |
309 | for (Int_t i = 0; i < n; i++) { | |
310 | TObjString* s = static_cast<TObjString*>(states.At(i)); | |
311 | if (!s) states.AddAt(s = new TObjString(""), i); | |
312 | s->SetString("MISSING"); | |
313 | } | |
bfab35d9 | 314 | |
635bda19 | 315 | TString fn; |
316 | GridPs(fn); | |
08275d05 | 317 | |
318 | std::ifstream in(fn.Data()); | |
319 | ||
320 | while (!in.eof()) { | |
321 | TString l; | |
322 | l.ReadLine(in); | |
323 | if (in.bad()) break; | |
324 | if (l.IsNull()) continue; | |
325 | ||
326 | TObjArray* tokens = l.Tokenize(" \t"); | |
327 | if (tokens->GetEntries() < 3) { | |
328 | Warning("GetJobStates", "Got too few tokens (%d): %s", | |
329 | tokens->GetEntries(), l.Data()); | |
330 | tokens->Print(); | |
331 | break; | |
332 | } | |
333 | ||
334 | //TString user = tokens->At(0)->GetName(); | |
335 | TString sjid = tokens->At(1)->GetName(); // Job ID | |
336 | TString stat = tokens->At(2)->GetName(); // State | |
337 | Int_t jid = sjid.Atoi(); | |
338 | ||
339 | for (Int_t i = 0; i < n; i++) { | |
340 | if (jid != jobs.At(i)) continue; | |
341 | TObjString* s = static_cast<TObjString*>(states.At(i)); | |
342 | TString out; | |
343 | if (!ParseState(stat, out)) continue; | |
344 | s->SetString(out); | |
345 | } | |
346 | } | |
347 | ||
348 | in.close(); | |
349 | gSystem->Unlink(fn); | |
350 | ||
351 | return true; | |
352 | } | |
353 | ||
aeab3a4e | 354 | /** |
355 | * Check if the AliEn token is valid | |
356 | * | |
357 | * | |
358 | * @return true if it is | |
359 | */ | |
360 | Bool_t CheckAlienToken() | |
361 | { | |
362 | Int_t ret = gSystem->Exec("alien-token-info > /dev/null 2>&1"); | |
363 | if (ret != 0) { | |
364 | Printf("=== AliEn token not valid"); | |
365 | return false; | |
366 | } | |
367 | return true; | |
368 | } | |
635bda19 | 369 | |
bfab35d9 | 370 | /** |
371 | * Refersh the grid token every 6th hour | |
372 | * | |
373 | * @param now | |
374 | * @param force | |
375 | */ | |
aeab3a4e | 376 | #if 0 |
377 | void RefreshAlienToken(UInt_t, Bool_t f=false) | |
378 | {} | |
379 | #else | |
380 | void RefreshAlienToken(UInt_t now, Bool_t force=false) | |
bfab35d9 | 381 | { |
aeab3a4e | 382 | Bool_t renew = force; |
383 | if (!renew && !CheckAlienToken()) renew = true; | |
384 | ||
385 | if (!renew) { | |
386 | TString l = gSystem->GetFromPipe(Form("cat /tmp/gclient_token_%d", | |
387 | gSystem->GetUid())); | |
388 | TObjArray* lines = l.Tokenize("\n"); | |
389 | TObjString* sline = 0; | |
390 | UInt_t expire = 0; | |
391 | TIter next(lines); | |
392 | while ((sline = static_cast<TObjString*>(next()))) { | |
393 | TString& line = sline->String(); | |
394 | if (!line.BeginsWith("Expiretime")) continue; | |
395 | ||
396 | Size_t eq = line.Index("="); | |
397 | TString sdatime = line(eq+2, line.Length()-eq-2); | |
398 | expire = sdatime.Atoi(); | |
399 | break; | |
400 | } | |
401 | lines->Delete(); | |
402 | // If the expiration date/time has passed or is less than 30 min | |
403 | // away, we refresh | |
404 | Int_t diff = (expire - now); | |
405 | if (now > expire || diff < 30*60) renew = true; | |
406 | ||
407 | Printf("=== Now: %d, Expires: %d, in %03d:%02d:%02d -> %s", | |
408 | now, expire, diff/60/60, (diff/60 % 60), (diff % 60), | |
409 | (renew ? "renew" : "nothing")); | |
410 | ||
411 | } | |
412 | ||
413 | if (!renew) return; | |
bfab35d9 | 414 | |
415 | // Reset the start time | |
bfab35d9 | 416 | Printf("=== Refreshing AliEn token"); |
417 | gSystem->Exec("alien-token-init"); | |
418 | Printf("=== Done refreshing AliEn token"); | |
419 | } | |
aeab3a4e | 420 | #endif |
421 | ||
422 | ||
08275d05 | 423 | /** |
33438b4c | 424 | * Wait of jobs to finish |
08275d05 | 425 | * |
33438b4c | 426 | * @param jobs List of jobs |
427 | * @param stages Stages | |
428 | * @param delay Delay for check | |
1de8812e | 429 | * @param batch If true, do not prompt |
08275d05 | 430 | * |
33438b4c | 431 | * @return true on success, false otherwise |
432 | * | |
433 | * @ingroup pwglf_forward_trains_helper | |
08275d05 | 434 | */ |
8449e3e0 | 435 | Bool_t WaitForJobs(TArrayI& jobs, |
436 | TObjArray* stages, | |
437 | Int_t delay, | |
438 | Bool_t batch) | |
08275d05 | 439 | { |
aeab3a4e | 440 | if (!CheckAlienToken()) return false; |
635bda19 | 441 | // Bool_t stopped = false; |
08275d05 | 442 | TFileHandler h(0, 0x1); |
aeab3a4e | 443 | // RefreshAlienToken(0, true); |
08275d05 | 444 | do { |
445 | Bool_t allDone = true; | |
446 | TDatime t; | |
447 | Printf("--- %4d/%02d/%02d %02d:%02d:%02d [Press enter to pause] ---", | |
448 | t.GetYear(), t.GetMonth(), t.GetDay(), | |
449 | t.GetHour(), t.GetMinute(), t.GetSecond()); | |
8449e3e0 | 450 | UInt_t now = t.Convert(true); |
08275d05 | 451 | |
452 | TObjArray states; | |
453 | GetJobStates(jobs, states); | |
454 | ||
455 | Int_t missing = 0; | |
456 | Int_t total = jobs.GetSize(); | |
457 | // Bool_t allAccounted = false; | |
458 | for (Int_t i = 0; i < total; i++) { | |
459 | Int_t job = jobs.At(i); | |
460 | ||
461 | if (job < 0) continue; | |
462 | ||
463 | TObjString* obj = static_cast<TObjString*>(states.At(i)); | |
464 | const TString& state = obj->String(); | |
465 | ||
466 | if (state.BeginsWith("ERROR_")) | |
467 | jobs.SetAt(-1, i); | |
468 | else if (state.EqualTo("MISSING")) | |
469 | missing++; | |
470 | else if (!state.EqualTo("DONE")) | |
471 | allDone = false; | |
472 | ||
473 | ||
474 | Printf(" %d(%s)=%s", job, stages->At(i)->GetName(), state.Data()); | |
475 | ||
476 | } | |
aeab3a4e | 477 | RefreshAlienToken(now); |
8449e3e0 | 478 | |
08275d05 | 479 | if (allDone) break; |
480 | if (missing >= total) { | |
481 | Error("GetJobStates", "Info on all jobs missing"); | |
482 | break; | |
483 | } | |
8449e3e0 | 484 | if (!batch) { |
485 | if (gSystem->Select(&h, 1000*delay)) { | |
486 | // Got input on std::cin | |
487 | std::string l; | |
488 | std::getline(std::cin, l); | |
489 | std::cout << "Do you want to terminate now [yN]? " << std::flush; | |
490 | std::getline(std::cin, l); | |
491 | if (l[0] == 'y' || l[0] == 'Y') { | |
635bda19 | 492 | // stopped = true; |
8449e3e0 | 493 | break; |
494 | } | |
08275d05 | 495 | } |
496 | } | |
8449e3e0 | 497 | else |
498 | gSystem->Sleep(1000*delay); | |
08275d05 | 499 | |
8449e3e0 | 500 | // |
08275d05 | 501 | } while (true); |
502 | ||
503 | return true; | |
504 | } | |
505 | /** | |
33438b4c | 506 | * Watch Grid for termination of main job, and submit merging jobs as needed. |
08275d05 | 507 | * |
33438b4c | 508 | * @param name Name of the job |
1de8812e | 509 | * @param batch If true, do not prompt |
33438b4c | 510 | * @param delay Delay between updates in seconds |
511 | * | |
512 | * @ingroup pwglf_forward_trains_helper | |
08275d05 | 513 | */ |
8449e3e0 | 514 | void GridWatch(const TString& name, Bool_t batch=false, UShort_t delay=5*60) |
08275d05 | 515 | { |
aeab3a4e | 516 | #if 1 |
517 | // We use command line tools instead of ROOT interface - which is | |
518 | // broken so badly that it's hard to believe it ever worked. | |
08275d05 | 519 | gEnv->SetValue("XSec.GSI.DelegProxy", "2"); |
520 | TGrid::Connect("alien:///"); | |
521 | if (!gGrid) { | |
522 | Error("GridWatch", "Failed to connect to the Grid"); | |
523 | return; | |
524 | } | |
aeab3a4e | 525 | #endif |
08275d05 | 526 | |
527 | TObjArray* jobIDs = ReadJobIDs(name, false); | |
528 | TObjArray* stages = ReadStages(name, false); | |
529 | ||
530 | if (!jobIDs || !stages) return; | |
531 | ||
532 | TArrayI jobs; | |
533 | if (!ParseJobIDs(jobIDs, jobs)) return; | |
534 | ||
635bda19 | 535 | gSystem->Sleep(10*1000); |
aeab3a4e | 536 | if (!(CheckCacheFile(name, "jobid", true) && |
537 | CheckCacheFile(name, "stage", true))) | |
538 | if (!WaitForJobs(jobs, stages, delay, batch)) return; | |
08275d05 | 539 | |
540 | delete jobIDs; | |
541 | delete stages; | |
542 | ||
543 | // return; | |
544 | do { | |
aeab3a4e | 545 | if (!CheckCacheFile(name, "jobid", true) && |
546 | !CheckCacheFile(name, "stage", true)) { | |
547 | if (!CheckAlienToken()) return; | |
08275d05 | 548 | Printf("Now executing terminate"); |
549 | gSystem->Exec("aliroot -l -b -q Terminate.C"); | |
550 | gSystem->Sleep(10*1000); | |
551 | } | |
08275d05 | 552 | |
aeab3a4e | 553 | Printf("Reading job ids"); |
08275d05 | 554 | jobIDs = ReadJobIDs(name, true); |
555 | stages = ReadStages(name, true); | |
556 | ||
557 | if (!ParseJobIDs(jobIDs, jobs)) { | |
558 | Error("GridWatch", "Failed to parse job ids %s", | |
aeab3a4e | 559 | CacheFileName(name,"jobid",true).Data()); |
08275d05 | 560 | return; |
561 | } | |
562 | ||
aeab3a4e | 563 | if (!WaitForJobs(jobs, stages, delay, batch)) return; |
08275d05 | 564 | |
565 | Bool_t allFinal = true; | |
566 | for (Int_t i = 0; i < jobs.GetSize(); i++) { | |
567 | if (jobs.At(i) < 0) continue; | |
568 | ||
569 | const TString& s = static_cast<TObjString*>(stages->At(i))->String(); | |
570 | if (!s.BeginsWith("final_")) allFinal = false; | |
571 | } | |
572 | ||
573 | delete jobIDs; | |
574 | delete stages; | |
575 | ||
576 | Printf("All jobs in final stage"); | |
577 | if (allFinal) break; | |
578 | ||
aeab3a4e | 579 | RemoveCacheFile(name, "jobid", true); |
580 | RemoveCacheFile(name, "stage", true); | |
08275d05 | 581 | } while (true); |
582 | ||
583 | Printf("Finished"); | |
584 | } | |
585 | // | |
586 | // EOF | |
587 | // | |
588 |