3 * @author Christian Holm Christensen <cholm@master.hehi.nbi.dk>
4 * @date Thu Jan 24 23:06:08 2013
6 * @brief Script to watch master jobs and automatically submit
10 * @ingroup pwglf_forward_trains_helper
16 # include <TObjArray.h>
31 * @param ext Extension
32 * @param merge Merge state or not
34 * @return Formatted string
35 * @ingroup pwglf_forward_trains_helper
37 TString CacheFileName(const TString& name,
41 return TString::Format("%s%s.%s", name.Data(),
42 (merge ? "_merge" : ""), ext.Data());
45 * Check if we have a particular file
47 * @param name Base name
48 * @param ext Extension
49 * @param merge Merging stage or not
51 * @return true if file exits
52 * @ingroup pwglf_forward_trains_helper
54 Bool_t CheckCacheFile(const TString& name,
58 // TSystem::AccessPathName return false if file is there
59 return !gSystem->AccessPathName(CacheFileName(name, ext, merge));
64 * @param name Base name
65 * @param ext Extension
66 * @param merge Merging stage or not
67 * @ingroup pwglf_forward_trains_helper
69 void RemoveCacheFile(const TString& name,
73 gSystem->Unlink(CacheFileName(name, ext, merge));
77 * Read one line of text from file and return tokens.
79 * @param name Base name
80 * @param ext Extension
81 * @param merge If true append "_merge" to name
83 * @return Array of tokens or null
85 * @ingroup pwglf_forward_trains_helper
87 TObjArray* ReadCacheFile(const TString& name,
91 TString fn = TString::Format("%s%s.%s", name.Data(),
92 (merge ? "_merge" : ""), ext.Data());
93 std::ifstream in(fn.Data());
95 Error("ReadCacheFile", "Failed to open %s", fn.Data());
102 if (ln.IsNull()) return 0;
103 return ln.Tokenize(" \t");
107 * Read list of job IDs from file
109 * @param name Base name
110 * @param merge If true append "_merge" to name
112 * @return Array of job IDs or null
114 * @ingroup pwglf_forward_trains_helper
116 TObjArray* ReadJobIDs(const TString& name, bool merge=false)
118 return ReadCacheFile(name, "jobid", merge);
122 * Read list of job stages from file
124 * @param name Base name
125 * @param merge If true append "_merge" to name
127 * @return Array of job stages or null
129 * @ingroup pwglf_forward_trains_helper
131 TObjArray* ReadStages(const TString& name, bool merge=false)
133 return ReadCacheFile(name, "stage", merge);
137 * Parse the job IDs into an array of integers
139 * @param jobIds List of jobs
140 * @param ret Return array
142 * @return true on success
144 * @ingroup pwglf_forward_trains_helper
146 Bool_t ParseJobIDs(const TObjArray* jobIds, TArrayI& ret)
148 if (!jobIds) return false;
150 Int_t n = jobIds->GetEntries();
154 for (Int_t i = 0; i < n; i++) {
155 TObjString* id = static_cast<TObjString*>(jobIds->At(i));
156 const TString& s = id->String();
157 ret.SetAt(s.Atoi(), i);
163 * Parse string representing status and return human-readable string
165 * @param status Return from ps command
168 * @return true on success
170 * @ingroup pwglf_forward_trains_helper
172 Bool_t ParseState(const TString& status, TString& out)
175 case 'D': out = "DONE"; break;
176 case 'E': out = "ERROR"; break;
177 case 'R': out = "RUNNING"; break;
178 case 'W': out = "WAITING"; break;
179 case 'O': out = "WAITING_QUOTA"; break;
180 case 'A': out = "ASSIGNED"; break;
181 case 'S': out = "STARTED" ; break;
182 case 'I': out = "INSERTING"; break;
183 case 'K': out = "KILLED"; break;
184 default: out = "UNKNOWN"; return false;
186 if (status[1] != '\0' &&
187 (status[0] != 'O' || status[0] != 'S')) {
190 case 'S': out.Append(status[0] == 'E' ? "SUBMIT" : "SPLIT"); break;
191 case 'X': out.Append("EXPIRED"); break;
192 case 'A': out.Append("ASSIGNING"); break;
193 case 'E': out.Append("EXECUTING"); break;
195 if (status[0] == 'S') out = "SAVING";
196 else out.Append("VALIDATING");
199 if (status[0] == 'I') {
200 out = "INTERACTIVE_IDLE";
202 } // Fall through on else
204 if (status[0] == 'I') {
205 out = "INTERACTIVE_USED";
207 } // Fall through on else
208 default: out.Append("UNKNOWN"); return false;
210 if (status[2] != '\0') {
212 case 'V': if (status[0] == 'E') out.ReplaceAll("SUBMIT", "SAVING");
214 default: out.Append("_UNKNOWN");
222 * Do a PS on the grid
224 * @param tmp The file generated
226 * @return true on success
228 Bool_t GridPs(TString& tmp)
231 FILE* fp = gSystem->TempFileName(tmp);
234 // Here, we'd ideally use TGrid::Ps but that doesn't work, so we use
235 // the shell instead.
236 gSystem->RedirectOutput(fn);
237 gGrid->Command("ps -Ax");
239 gSystem->RedirectOutput(0);
245 // Printf("Using gbbox ps -Ax >> %s", tmp.Data());
246 gSystem->Exec(Form("gbbox ps -Ax >> %s", tmp.Data()));
254 * @param jobId Job status
255 * @param out Output status string
257 * @return true on success
259 * @ingroup pwglf_forward_trains_helper
261 Bool_t GetJobState(Int_t jobId, TString& out)
268 std::ifstream in(fn.Data());
275 TObjArray* tokens = l.Tokenize(" \t");
276 if (tokens->GetEntries() < 2) break;
278 //TString user = tokens->At(0)->GetName();
279 TString sjid = tokens->At(1)->GetName(); // Job ID
280 TString stat = tokens->At(2)->GetName(); // State
281 Int_t jid = sjid.Atoi();
283 if (jid != jobId) continue;
285 ParseState(stat, out);
298 * @param jobs List of job IDs
299 * @param states On return the states
301 * @return true on success
303 * @ingroup pwglf_forward_trains_helper
305 Bool_t GetJobStates(const TArrayI& jobs, TObjArray& states)
307 Int_t n = jobs.GetSize();
309 for (Int_t i = 0; i < n; i++) {
310 TObjString* s = static_cast<TObjString*>(states.At(i));
311 if (!s) states.AddAt(s = new TObjString(""), i);
312 s->SetString("MISSING");
318 std::ifstream in(fn.Data());
324 if (l.IsNull()) continue;
326 TObjArray* tokens = l.Tokenize(" \t");
327 if (tokens->GetEntries() < 3) {
328 Warning("GetJobStates", "Got too few tokens (%d): %s",
329 tokens->GetEntries(), l.Data());
334 //TString user = tokens->At(0)->GetName();
335 TString sjid = tokens->At(1)->GetName(); // Job ID
336 TString stat = tokens->At(2)->GetName(); // State
337 Int_t jid = sjid.Atoi();
339 for (Int_t i = 0; i < n; i++) {
340 if (jid != jobs.At(i)) continue;
341 TObjString* s = static_cast<TObjString*>(states.At(i));
343 if (!ParseState(stat, out)) continue;
355 * Check if the AliEn token is valid
358 * @return true if it is
360 Bool_t CheckAlienToken()
362 Int_t ret = gSystem->Exec("alien-token-info > /dev/null 2>&1");
364 Printf("=== AliEn token not valid");
371 * Refersh the grid token every 6th hour
377 void RefreshAlienToken(UInt_t, Bool_t f=false)
380 void RefreshAlienToken(UInt_t now, Bool_t force=false)
382 Bool_t renew = force;
383 if (!renew && !CheckAlienToken()) renew = true;
386 TString l = gSystem->GetFromPipe(Form("cat /tmp/gclient_token_%d",
388 TObjArray* lines = l.Tokenize("\n");
389 TObjString* sline = 0;
392 while ((sline = static_cast<TObjString*>(next()))) {
393 TString& line = sline->String();
394 if (!line.BeginsWith("Expiretime")) continue;
396 Size_t eq = line.Index("=");
397 TString sdatime = line(eq+2, line.Length()-eq-2);
398 expire = sdatime.Atoi();
402 // If the expiration date/time has passed or is less than 30 min
404 Int_t diff = (expire - now);
405 if (now > expire || diff < 30*60) renew = true;
407 Printf("=== Now: %d, Expires: %d, in %03d:%02d:%02d -> %s",
408 now, expire, diff/60/60, (diff/60 % 60), (diff % 60),
409 (renew ? "renew" : "nothing"));
415 // Reset the start time
416 Printf("=== Refreshing AliEn token");
417 gSystem->Exec("alien-token-init");
418 Printf("=== Done refreshing AliEn token");
424 * Wait of jobs to finish
426 * @param jobs List of jobs
427 * @param stages Stages
428 * @param delay Delay for check
429 * @param batch If true, do not prompt
431 * @return true on success, false otherwise
433 * @ingroup pwglf_forward_trains_helper
435 Bool_t WaitForJobs(TArrayI& jobs,
440 if (!CheckAlienToken()) return false;
441 // Bool_t stopped = false;
442 TFileHandler h(0, 0x1);
443 // RefreshAlienToken(0, true);
445 Bool_t allDone = true;
447 Printf("--- %4d/%02d/%02d %02d:%02d:%02d [Press enter to pause] ---",
448 t.GetYear(), t.GetMonth(), t.GetDay(),
449 t.GetHour(), t.GetMinute(), t.GetSecond());
450 UInt_t now = t.Convert(true);
453 GetJobStates(jobs, states);
456 Int_t total = jobs.GetSize();
457 // Bool_t allAccounted = false;
458 for (Int_t i = 0; i < total; i++) {
459 Int_t job = jobs.At(i);
461 if (job < 0) continue;
463 TObjString* obj = static_cast<TObjString*>(states.At(i));
464 const TString& state = obj->String();
466 if (state.BeginsWith("ERROR_"))
468 else if (state.EqualTo("MISSING"))
470 else if (!state.EqualTo("DONE"))
474 Printf(" %d(%s)=%s", job, stages->At(i)->GetName(), state.Data());
477 RefreshAlienToken(now);
480 if (missing >= total) {
481 Error("GetJobStates", "Info on all jobs missing");
485 if (gSystem->Select(&h, 1000*delay)) {
486 // Got input on std::cin
488 std::getline(std::cin, l);
489 std::cout << "Do you want to terminate now [yN]? " << std::flush;
490 std::getline(std::cin, l);
491 if (l[0] == 'y' || l[0] == 'Y') {
498 gSystem->Sleep(1000*delay);
506 * Watch Grid for termination of main job, and submit merging jobs as needed.
508 * @param name Name of the job
509 * @param batch If true, do not prompt
510 * @param delay Delay between updates in seconds
512 * @ingroup pwglf_forward_trains_helper
514 void GridWatch(const TString& name, Bool_t batch=false, UShort_t delay=5*60)
517 // We use command line tools instead of ROOT interface - which is
518 // broken so badly that it's hard to believe it ever worked.
519 gEnv->SetValue("XSec.GSI.DelegProxy", "2");
520 TGrid::Connect("alien:///");
522 Error("GridWatch", "Failed to connect to the Grid");
527 TObjArray* jobIDs = ReadJobIDs(name, false);
528 TObjArray* stages = ReadStages(name, false);
530 if (!jobIDs || !stages) return;
533 if (!ParseJobIDs(jobIDs, jobs)) return;
535 gSystem->Sleep(10*1000);
536 if (!(CheckCacheFile(name, "jobid", true) &&
537 CheckCacheFile(name, "stage", true)))
538 if (!WaitForJobs(jobs, stages, delay, batch)) return;
545 if (!CheckCacheFile(name, "jobid", true) &&
546 !CheckCacheFile(name, "stage", true)) {
547 if (!CheckAlienToken()) return;
548 Printf("Now executing terminate");
549 gSystem->Exec("aliroot -l -b -q Terminate.C");
550 gSystem->Sleep(10*1000);
553 Printf("Reading job ids");
554 jobIDs = ReadJobIDs(name, true);
555 stages = ReadStages(name, true);
557 if (!ParseJobIDs(jobIDs, jobs)) {
558 Error("GridWatch", "Failed to parse job ids %s",
559 CacheFileName(name,"jobid",true).Data());
563 if (!WaitForJobs(jobs, stages, delay, batch)) return;
565 Bool_t allFinal = true;
566 for (Int_t i = 0; i < jobs.GetSize(); i++) {
567 if (jobs.At(i) < 0) continue;
569 const TString& s = static_cast<TObjString*>(stages->At(i))->String();
570 if (!s.BeginsWith("final_")) allFinal = false;
576 Printf("All jobs in final stage");
579 RemoveCacheFile(name, "jobid", true);
580 RemoveCacheFile(name, "stage", true);