]> git.uio.no Git - u/mrichter/AliRoot.git/blame - PWGLF/FORWARD/trains/GridWatch.C
Modified QA script to allow creating QA results (Sans energy loss)
[u/mrichter/AliRoot.git] / PWGLF / FORWARD / trains / GridWatch.C
CommitLineData
08275d05 1/**
2 * @file GridWatch.C
3 * @author Christian Holm Christensen <cholm@master.hehi.nbi.dk>
4 * @date Thu Jan 24 23:06:08 2013
5 *
6 * @brief Script to watch master jobs and automatically submit
7 * terminate jobs
8 *
9 *
33438b4c 10 * @ingroup pwglf_forward_trains_helper
08275d05 11 */
12#ifndef __CINT__
13# include <TString.h>
14# include <TGrid.h>
15# include <TSystem.h>
16# include <TObjArray.h>
17# include <iostream>
18# include <fstream>
19# include <TError.h>
20# include <TDatime.h>
21# include <TEnv.h>
22#else
23class TString;
24#endif
25#include <TArrayI.h>
26
33438b4c 27/**
28 * Create token name
29 *
30 * @param name Name
31 * @param ext Extension
32 * @param merge Merge state or not
33 *
34 * @return Formatted string
35 * @ingroup pwglf_forward_trains_helper
36 */
aeab3a4e 37TString CacheFileName(const TString& name,
38 const TString& ext,
39 Bool_t merge=false)
08275d05 40{
41 return TString::Format("%s%s.%s", name.Data(),
42 (merge ? "_merge" : ""), ext.Data());
43}
33438b4c 44/**
45 * Check if we have a particular file
46 *
47 * @param name Base name
48 * @param ext Extension
49 * @param merge Merging stage or not
50 *
51 * @return true if file exits
52 * @ingroup pwglf_forward_trains_helper
53 */
aeab3a4e 54Bool_t CheckCacheFile(const TString& name,
55 const TString& ext,
56 Bool_t merge=false)
08275d05 57{
58 // TSystem::AccessPathName return false if file is there
aeab3a4e 59 return !gSystem->AccessPathName(CacheFileName(name, ext, merge));
08275d05 60}
33438b4c 61/**
62 * Remove a token file
63 *
64 * @param name Base name
65 * @param ext Extension
66 * @param merge Merging stage or not
67 * @ingroup pwglf_forward_trains_helper
68 */
aeab3a4e 69void RemoveCacheFile(const TString& name,
08275d05 70 const TString& ext,
71 Bool_t merge=false)
72{
aeab3a4e 73 gSystem->Unlink(CacheFileName(name, ext, merge));
08275d05 74}
75
76/**
77 * Read one line of text from file and return tokens.
78 *
79 * @param name Base name
80 * @param ext Extension
81 * @param merge If true append "_merge" to name
82 *
83 * @return Array of tokens or null
33438b4c 84 *
85 * @ingroup pwglf_forward_trains_helper
08275d05 86 */
aeab3a4e 87TObjArray* ReadCacheFile(const TString& name,
08275d05 88 const TString& ext,
89 bool merge=false)
90{
91 TString fn = TString::Format("%s%s.%s", name.Data(),
92 (merge ? "_merge" : ""), ext.Data());
93 std::ifstream in(fn.Data());
94 if (!in) {
aeab3a4e 95 Error("ReadCacheFile", "Failed to open %s", fn.Data());
08275d05 96 return 0;
97 }
98 TString ln;
99 ln.ReadLine(in);
100 in.close();
101
102 if (ln.IsNull()) return 0;
103 return ln.Tokenize(" \t");
104}
105
106/**
107 * Read list of job IDs from file
108 *
109 * @param name Base name
110 * @param merge If true append "_merge" to name
111 *
112 * @return Array of job IDs or null
33438b4c 113 *
114 * @ingroup pwglf_forward_trains_helper
08275d05 115 */
116TObjArray* ReadJobIDs(const TString& name, bool merge=false)
117{
aeab3a4e 118 return ReadCacheFile(name, "jobid", merge);
08275d05 119}
120
121/**
122 * Read list of job stages from file
123 *
124 * @param name Base name
125 * @param merge If true append "_merge" to name
126 *
127 * @return Array of job stages or null
33438b4c 128 *
129 * @ingroup pwglf_forward_trains_helper
08275d05 130 */
131TObjArray* ReadStages(const TString& name, bool merge=false)
132{
aeab3a4e 133 return ReadCacheFile(name, "stage", merge);
08275d05 134}
135
136/**
137 * Parse the job IDs into an array of integers
138 *
139 * @param jobIds List of jobs
140 * @param ret Return array
141 *
142 * @return true on success
33438b4c 143 *
144 * @ingroup pwglf_forward_trains_helper
08275d05 145 */
146Bool_t ParseJobIDs(const TObjArray* jobIds, TArrayI& ret)
147{
148 if (!jobIds) return false;
149
150 Int_t n = jobIds->GetEntries();
151 ret.Set(n);
152 ret.Reset(-1);
153
154 for (Int_t i = 0; i < n; i++) {
155 TObjString* id = static_cast<TObjString*>(jobIds->At(i));
156 const TString& s = id->String();
157 ret.SetAt(s.Atoi(), i);
158 }
159 return true;
160}
161
162/**
163 * Parse string representing status and return human-readable string
164 *
165 * @param status Return from ps command
166 * @param out Output
167 *
168 * @return true on success
33438b4c 169 *
170 * @ingroup pwglf_forward_trains_helper
08275d05 171 */
172Bool_t ParseState(const TString& status, TString& out)
173{
174 switch (status[0]) {
175 case 'D': out = "DONE"; break;
176 case 'E': out = "ERROR"; break;
177 case 'R': out = "RUNNING"; break;
178 case 'W': out = "WAITING"; break;
179 case 'O': out = "WAITING_QUOTA"; break;
180 case 'A': out = "ASSIGNED"; break;
181 case 'S': out = "STARTED" ; break;
182 case 'I': out = "INSERTING"; break;
183 case 'K': out = "KILLED"; break;
184 default: out = "UNKNOWN"; return false;
185 }
186 if (status[1] != '\0' &&
187 (status[0] != 'O' || status[0] != 'S')) {
188 out.Append("_");
189 switch (status[1]) {
190 case 'S': out.Append(status[0] == 'E' ? "SUBMIT" : "SPLIT"); break;
191 case 'X': out.Append("EXPIRED"); break;
192 case 'A': out.Append("ASSIGNING"); break;
193 case 'E': out.Append("EXECUTING"); break;
194 case 'V':
195 if (status[0] == 'S') out = "SAVING";
196 else out.Append("VALIDATING");
197 break;
198 case 'd':
199 if (status[0] == 'I') {
200 out = "INTERACTIVE_IDLE";
201 break;
202 } // Fall through on else
203 case 'a':
204 if (status[0] == 'I') {
205 out = "INTERACTIVE_USED";
206 break;
207 } // Fall through on else
208 default: out.Append("UNKNOWN"); return false;
209 }
210 if (status[2] != '\0') {
211 switch (status[2]) {
212 case 'V': if (status[0] == 'E') out.ReplaceAll("SUBMIT", "SAVING");
213 break;
214 default: out.Append("_UNKNOWN");
215 }
216 }
217 }
218 return true;
219}
220
635bda19 221/**
222 * Do a PS on the grid
223 *
224 * @param tmp The file generated
225 *
226 * @return true on success
227 */
228Bool_t GridPs(TString& tmp)
229{
230 tmp = "gridMonitor";
231 FILE* fp = gSystem->TempFileName(tmp);
232
233#if 0
234 // Here, we'd ideally use TGrid::Ps but that doesn't work, so we use
235 // the shell instead.
236 gSystem->RedirectOutput(fn);
237 gGrid->Command("ps -Ax");
238 gGrid->Stdout();
239 gSystem->RedirectOutput(0);
240 gGrid->Stderr();
241 fclose(fp);
242#else
243 fclose(fp);
244
245 // Printf("Using gbbox ps -Ax >> %s", tmp.Data());
246 gSystem->Exec(Form("gbbox ps -Ax >> %s", tmp.Data()));
247#endif
248 return true;
249}
250
08275d05 251/**
252 * Get the job state
253 *
254 * @param jobId Job status
255 * @param out Output status string
256 *
257 * @return true on success
33438b4c 258 *
259 * @ingroup pwglf_forward_trains_helper
08275d05 260 */
261Bool_t GetJobState(Int_t jobId, TString& out)
262{
263 out = "MISSING";
264
635bda19 265 TString fn;
266 GridPs(fn);
08275d05 267
268 std::ifstream in(fn.Data());
269
270 while (!in.eof()) {
271 TString l;
272 l.ReadLine(in);
273 if (in.bad()) break;
274
275 TObjArray* tokens = l.Tokenize(" \t");
276 if (tokens->GetEntries() < 2) break;
277
278 //TString user = tokens->At(0)->GetName();
279 TString sjid = tokens->At(1)->GetName(); // Job ID
280 TString stat = tokens->At(2)->GetName(); // State
281 Int_t jid = sjid.Atoi();
282
283 if (jid != jobId) continue;
284
285 ParseState(stat, out);
286 break;
287 }
288
289 in.close();
290 gSystem->Unlink(fn);
291
292 return true;
293}
294
33438b4c 295/**
296 * Get the job states
297 *
298 * @param jobs List of job IDs
299 * @param states On return the states
300 *
301 * @return true on success
302 *
303 * @ingroup pwglf_forward_trains_helper
304 */
08275d05 305Bool_t GetJobStates(const TArrayI& jobs, TObjArray& states)
306{
307 Int_t n = jobs.GetSize();
308 states.Expand(n);
309 for (Int_t i = 0; i < n; i++) {
310 TObjString* s = static_cast<TObjString*>(states.At(i));
311 if (!s) states.AddAt(s = new TObjString(""), i);
312 s->SetString("MISSING");
313 }
bfab35d9 314
635bda19 315 TString fn;
316 GridPs(fn);
08275d05 317
318 std::ifstream in(fn.Data());
319
320 while (!in.eof()) {
321 TString l;
322 l.ReadLine(in);
323 if (in.bad()) break;
324 if (l.IsNull()) continue;
325
326 TObjArray* tokens = l.Tokenize(" \t");
327 if (tokens->GetEntries() < 3) {
328 Warning("GetJobStates", "Got too few tokens (%d): %s",
329 tokens->GetEntries(), l.Data());
330 tokens->Print();
331 break;
332 }
333
334 //TString user = tokens->At(0)->GetName();
335 TString sjid = tokens->At(1)->GetName(); // Job ID
336 TString stat = tokens->At(2)->GetName(); // State
337 Int_t jid = sjid.Atoi();
338
339 for (Int_t i = 0; i < n; i++) {
340 if (jid != jobs.At(i)) continue;
341 TObjString* s = static_cast<TObjString*>(states.At(i));
342 TString out;
343 if (!ParseState(stat, out)) continue;
344 s->SetString(out);
345 }
346 }
347
348 in.close();
349 gSystem->Unlink(fn);
350
351 return true;
352}
353
aeab3a4e 354/**
355 * Check if the AliEn token is valid
356 *
357 *
358 * @return true if it is
359 */
360Bool_t CheckAlienToken()
361{
362 Int_t ret = gSystem->Exec("alien-token-info > /dev/null 2>&1");
363 if (ret != 0) {
364 Printf("=== AliEn token not valid");
365 return false;
366 }
367 return true;
368}
635bda19 369
bfab35d9 370/**
371 * Refersh the grid token every 6th hour
372 *
373 * @param now
374 * @param force
375 */
aeab3a4e 376#if 0
377void RefreshAlienToken(UInt_t, Bool_t f=false)
378{}
379#else
380void RefreshAlienToken(UInt_t now, Bool_t force=false)
bfab35d9 381{
aeab3a4e 382 Bool_t renew = force;
383 if (!renew && !CheckAlienToken()) renew = true;
384
385 if (!renew) {
386 TString l = gSystem->GetFromPipe(Form("cat /tmp/gclient_token_%d",
387 gSystem->GetUid()));
388 TObjArray* lines = l.Tokenize("\n");
389 TObjString* sline = 0;
390 UInt_t expire = 0;
391 TIter next(lines);
392 while ((sline = static_cast<TObjString*>(next()))) {
393 TString& line = sline->String();
394 if (!line.BeginsWith("Expiretime")) continue;
395
396 Size_t eq = line.Index("=");
397 TString sdatime = line(eq+2, line.Length()-eq-2);
398 expire = sdatime.Atoi();
399 break;
400 }
401 lines->Delete();
402 // If the expiration date/time has passed or is less than 30 min
403 // away, we refresh
404 Int_t diff = (expire - now);
405 if (now > expire || diff < 30*60) renew = true;
406
407 Printf("=== Now: %d, Expires: %d, in %03d:%02d:%02d -> %s",
408 now, expire, diff/60/60, (diff/60 % 60), (diff % 60),
409 (renew ? "renew" : "nothing"));
410
411 }
412
413 if (!renew) return;
bfab35d9 414
415 // Reset the start time
bfab35d9 416 Printf("=== Refreshing AliEn token");
417 gSystem->Exec("alien-token-init");
418 Printf("=== Done refreshing AliEn token");
419}
aeab3a4e 420#endif
421
422
08275d05 423/**
33438b4c 424 * Wait of jobs to finish
08275d05 425 *
33438b4c 426 * @param jobs List of jobs
427 * @param stages Stages
428 * @param delay Delay for check
1de8812e 429 * @param batch If true, do not prompt
08275d05 430 *
33438b4c 431 * @return true on success, false otherwise
432 *
433 * @ingroup pwglf_forward_trains_helper
08275d05 434 */
8449e3e0 435Bool_t WaitForJobs(TArrayI& jobs,
436 TObjArray* stages,
437 Int_t delay,
438 Bool_t batch)
08275d05 439{
aeab3a4e 440 if (!CheckAlienToken()) return false;
635bda19 441 // Bool_t stopped = false;
08275d05 442 TFileHandler h(0, 0x1);
aeab3a4e 443 // RefreshAlienToken(0, true);
08275d05 444 do {
445 Bool_t allDone = true;
446 TDatime t;
447 Printf("--- %4d/%02d/%02d %02d:%02d:%02d [Press enter to pause] ---",
448 t.GetYear(), t.GetMonth(), t.GetDay(),
449 t.GetHour(), t.GetMinute(), t.GetSecond());
8449e3e0 450 UInt_t now = t.Convert(true);
08275d05 451
452 TObjArray states;
453 GetJobStates(jobs, states);
454
455 Int_t missing = 0;
456 Int_t total = jobs.GetSize();
457 // Bool_t allAccounted = false;
458 for (Int_t i = 0; i < total; i++) {
459 Int_t job = jobs.At(i);
460
461 if (job < 0) continue;
462
463 TObjString* obj = static_cast<TObjString*>(states.At(i));
464 const TString& state = obj->String();
465
466 if (state.BeginsWith("ERROR_"))
467 jobs.SetAt(-1, i);
468 else if (state.EqualTo("MISSING"))
469 missing++;
470 else if (!state.EqualTo("DONE"))
471 allDone = false;
472
473
474 Printf(" %d(%s)=%s", job, stages->At(i)->GetName(), state.Data());
475
476 }
aeab3a4e 477 RefreshAlienToken(now);
8449e3e0 478
08275d05 479 if (allDone) break;
480 if (missing >= total) {
481 Error("GetJobStates", "Info on all jobs missing");
482 break;
483 }
8449e3e0 484 if (!batch) {
485 if (gSystem->Select(&h, 1000*delay)) {
486 // Got input on std::cin
487 std::string l;
488 std::getline(std::cin, l);
489 std::cout << "Do you want to terminate now [yN]? " << std::flush;
490 std::getline(std::cin, l);
491 if (l[0] == 'y' || l[0] == 'Y') {
635bda19 492 // stopped = true;
8449e3e0 493 break;
494 }
08275d05 495 }
496 }
8449e3e0 497 else
498 gSystem->Sleep(1000*delay);
08275d05 499
8449e3e0 500 //
08275d05 501 } while (true);
502
503 return true;
504}
505/**
33438b4c 506 * Watch Grid for termination of main job, and submit merging jobs as needed.
08275d05 507 *
33438b4c 508 * @param name Name of the job
1de8812e 509 * @param batch If true, do not prompt
33438b4c 510 * @param delay Delay between updates in seconds
511 *
512 * @ingroup pwglf_forward_trains_helper
08275d05 513 */
8449e3e0 514void GridWatch(const TString& name, Bool_t batch=false, UShort_t delay=5*60)
08275d05 515{
aeab3a4e 516#if 1
517 // We use command line tools instead of ROOT interface - which is
518 // broken so badly that it's hard to believe it ever worked.
08275d05 519 gEnv->SetValue("XSec.GSI.DelegProxy", "2");
520 TGrid::Connect("alien:///");
521 if (!gGrid) {
522 Error("GridWatch", "Failed to connect to the Grid");
523 return;
524 }
aeab3a4e 525#endif
08275d05 526
527 TObjArray* jobIDs = ReadJobIDs(name, false);
528 TObjArray* stages = ReadStages(name, false);
529
530 if (!jobIDs || !stages) return;
531
532 TArrayI jobs;
533 if (!ParseJobIDs(jobIDs, jobs)) return;
534
635bda19 535 gSystem->Sleep(10*1000);
aeab3a4e 536 if (!(CheckCacheFile(name, "jobid", true) &&
537 CheckCacheFile(name, "stage", true)))
538 if (!WaitForJobs(jobs, stages, delay, batch)) return;
08275d05 539
540 delete jobIDs;
541 delete stages;
542
543 // return;
544 do {
aeab3a4e 545 if (!CheckCacheFile(name, "jobid", true) &&
546 !CheckCacheFile(name, "stage", true)) {
547 if (!CheckAlienToken()) return;
08275d05 548 Printf("Now executing terminate");
549 gSystem->Exec("aliroot -l -b -q Terminate.C");
550 gSystem->Sleep(10*1000);
551 }
08275d05 552
aeab3a4e 553 Printf("Reading job ids");
08275d05 554 jobIDs = ReadJobIDs(name, true);
555 stages = ReadStages(name, true);
556
557 if (!ParseJobIDs(jobIDs, jobs)) {
558 Error("GridWatch", "Failed to parse job ids %s",
aeab3a4e 559 CacheFileName(name,"jobid",true).Data());
08275d05 560 return;
561 }
562
aeab3a4e 563 if (!WaitForJobs(jobs, stages, delay, batch)) return;
08275d05 564
565 Bool_t allFinal = true;
566 for (Int_t i = 0; i < jobs.GetSize(); i++) {
567 if (jobs.At(i) < 0) continue;
568
569 const TString& s = static_cast<TObjString*>(stages->At(i))->String();
570 if (!s.BeginsWith("final_")) allFinal = false;
571 }
572
573 delete jobIDs;
574 delete stages;
575
576 Printf("All jobs in final stage");
577 if (allFinal) break;
578
aeab3a4e 579 RemoveCacheFile(name, "jobid", true);
580 RemoveCacheFile(name, "stage", true);
08275d05 581 } while (true);
582
583 Printf("Finished");
584}
585//
586// EOF
587//
588