3 * @author Christian Holm Christensen <cholm@master.hehi.nbi.dk>
4 * @date Thu Jan 24 23:06:08 2013
6 * @brief Script to watch master jobs and automatically submit
10 * @ingroup pwglf_forward_trains_helper
16 # include <TObjArray.h>
31 * @param ext Extension
32 * @param merge Merge state or not
34 * @return Formatted string
35 * @ingroup pwglf_forward_trains_helper
37 TString TokenName(const TString& name,
41 return TString::Format("%s%s.%s", name.Data(),
42 (merge ? "_merge" : ""), ext.Data());
45 * Check if we have a particular file
47 * @param name Base name
48 * @param ext Extension
49 * @param merge Merging stage or not
51 * @return true if file exits
52 * @ingroup pwglf_forward_trains_helper
54 Bool_t CheckTokens(const TString& name,
58 // TSystem::AccessPathName return false if file is there
59 return !gSystem->AccessPathName(TokenName(name, ext, merge));
64 * @param name Base name
65 * @param ext Extension
66 * @param merge Merging stage or not
67 * @ingroup pwglf_forward_trains_helper
69 void RemoveTokens(const TString& name,
73 gSystem->Unlink(TokenName(name, ext, merge));
77 * Read one line of text from file and return tokens.
79 * @param name Base name
80 * @param ext Extension
81 * @param merge If true append "_merge" to name
83 * @return Array of tokens or null
85 * @ingroup pwglf_forward_trains_helper
87 TObjArray* ReadTokens(const TString& name,
91 TString fn = TString::Format("%s%s.%s", name.Data(),
92 (merge ? "_merge" : ""), ext.Data());
93 std::ifstream in(fn.Data());
95 Error("ReadTokens", "Failed to open %s", fn.Data());
102 if (ln.IsNull()) return 0;
103 return ln.Tokenize(" \t");
107 * Read list of job IDs from file
109 * @param name Base name
110 * @param merge If true append "_merge" to name
112 * @return Array of job IDs or null
114 * @ingroup pwglf_forward_trains_helper
116 TObjArray* ReadJobIDs(const TString& name, bool merge=false)
118 return ReadTokens(name, "jobid", merge);
122 * Read list of job stages from file
124 * @param name Base name
125 * @param merge If true append "_merge" to name
127 * @return Array of job stages or null
129 * @ingroup pwglf_forward_trains_helper
131 TObjArray* ReadStages(const TString& name, bool merge=false)
133 return ReadTokens(name, "stage", merge);
137 * Parse the job IDs into an array of integers
139 * @param jobIds List of jobs
140 * @param ret Return array
142 * @return true on success
144 * @ingroup pwglf_forward_trains_helper
146 Bool_t ParseJobIDs(const TObjArray* jobIds, TArrayI& ret)
148 if (!jobIds) return false;
150 Int_t n = jobIds->GetEntries();
154 for (Int_t i = 0; i < n; i++) {
155 TObjString* id = static_cast<TObjString*>(jobIds->At(i));
156 const TString& s = id->String();
157 ret.SetAt(s.Atoi(), i);
163 * Parse string representing status and return human-readable string
165 * @param status Return from ps command
168 * @return true on success
170 * @ingroup pwglf_forward_trains_helper
172 Bool_t ParseState(const TString& status, TString& out)
175 case 'D': out = "DONE"; break;
176 case 'E': out = "ERROR"; break;
177 case 'R': out = "RUNNING"; break;
178 case 'W': out = "WAITING"; break;
179 case 'O': out = "WAITING_QUOTA"; break;
180 case 'A': out = "ASSIGNED"; break;
181 case 'S': out = "STARTED" ; break;
182 case 'I': out = "INSERTING"; break;
183 case 'K': out = "KILLED"; break;
184 default: out = "UNKNOWN"; return false;
186 if (status[1] != '\0' &&
187 (status[0] != 'O' || status[0] != 'S')) {
190 case 'S': out.Append(status[0] == 'E' ? "SUBMIT" : "SPLIT"); break;
191 case 'X': out.Append("EXPIRED"); break;
192 case 'A': out.Append("ASSIGNING"); break;
193 case 'E': out.Append("EXECUTING"); break;
195 if (status[0] == 'S') out = "SAVING";
196 else out.Append("VALIDATING");
199 if (status[0] == 'I') {
200 out = "INTERACTIVE_IDLE";
202 } // Fall through on else
204 if (status[0] == 'I') {
205 out = "INTERACTIVE_USED";
207 } // Fall through on else
208 default: out.Append("UNKNOWN"); return false;
210 if (status[2] != '\0') {
212 case 'V': if (status[0] == 'E') out.ReplaceAll("SUBMIT", "SAVING");
214 default: out.Append("_UNKNOWN");
224 * @param jobId Job status
225 * @param out Output status string
227 * @return true on success
229 * @ingroup pwglf_forward_trains_helper
231 Bool_t GetJobState(Int_t jobId, TString& out)
235 TString fn("gridMonitor");
236 FILE* fp = gSystem->TempFileName(fn);
238 gSystem->RedirectOutput(fn);
239 gGrid->Command("ps -Ax");
241 gSystem->RedirectOutput(0);
246 std::ifstream in(fn.Data());
253 TObjArray* tokens = l.Tokenize(" \t");
254 if (tokens->GetEntries() < 2) break;
256 //TString user = tokens->At(0)->GetName();
257 TString sjid = tokens->At(1)->GetName(); // Job ID
258 TString stat = tokens->At(2)->GetName(); // State
259 Int_t jid = sjid.Atoi();
261 if (jid != jobId) continue;
263 ParseState(stat, out);
276 * @param jobs List of job IDs
277 * @param states On return the states
279 * @return true on success
281 * @ingroup pwglf_forward_trains_helper
283 Bool_t GetJobStates(const TArrayI& jobs, TObjArray& states)
285 Int_t n = jobs.GetSize();
287 for (Int_t i = 0; i < n; i++) {
288 TObjString* s = static_cast<TObjString*>(states.At(i));
289 if (!s) states.AddAt(s = new TObjString(""), i);
290 s->SetString("MISSING");
293 TString fn("gridMonitor");
294 FILE* fp = gSystem->TempFileName(fn);
296 gSystem->RedirectOutput(fn);
297 gGrid->Command("ps -Ax");
299 gSystem->RedirectOutput(0);
304 std::ifstream in(fn.Data());
310 if (l.IsNull()) continue;
312 TObjArray* tokens = l.Tokenize(" \t");
313 if (tokens->GetEntries() < 3) {
314 Warning("GetJobStates", "Got too few tokens (%d): %s",
315 tokens->GetEntries(), l.Data());
320 //TString user = tokens->At(0)->GetName();
321 TString sjid = tokens->At(1)->GetName(); // Job ID
322 TString stat = tokens->At(2)->GetName(); // State
323 Int_t jid = sjid.Atoi();
325 for (Int_t i = 0; i < n; i++) {
326 if (jid != jobs.At(i)) continue;
327 TObjString* s = static_cast<TObjString*>(states.At(i));
329 if (!ParseState(stat, out)) continue;
342 * Wait of jobs to finish
344 * @param jobs List of jobs
345 * @param stages Stages
346 * @param delay Delay for check
348 * @return true on success, false otherwise
350 * @ingroup pwglf_forward_trains_helper
352 Bool_t WaitForJobs(TArrayI& jobs,
357 Bool_t stopped = false;
358 TFileHandler h(0, 0x1);
361 Bool_t allDone = true;
363 Printf("--- %4d/%02d/%02d %02d:%02d:%02d [Press enter to pause] ---",
364 t.GetYear(), t.GetMonth(), t.GetDay(),
365 t.GetHour(), t.GetMinute(), t.GetSecond());
366 UInt_t now = t.Convert(true);
367 if (start <= 0) start = now;
370 GetJobStates(jobs, states);
373 Int_t total = jobs.GetSize();
374 // Bool_t allAccounted = false;
375 for (Int_t i = 0; i < total; i++) {
376 Int_t job = jobs.At(i);
378 if (job < 0) continue;
380 TObjString* obj = static_cast<TObjString*>(states.At(i));
381 const TString& state = obj->String();
383 if (state.BeginsWith("ERROR_"))
385 else if (state.EqualTo("MISSING"))
387 else if (!state.EqualTo("DONE"))
391 Printf(" %d(%s)=%s", job, stages->At(i)->GetName(), state.Data());
394 // Try to refresh token every 6th hour
395 if ((now - start) / 60 / 60 > 6) {
396 // Reset the start time
398 Printf("=== Refreshing AliEn token");
399 gSystem->Exec("alien-token-init");
400 Printf("=== Done refreshing AliEn token");
404 if (missing >= total) {
405 Error("GetJobStates", "Info on all jobs missing");
409 if (gSystem->Select(&h, 1000*delay)) {
410 // Got input on std::cin
412 std::getline(std::cin, l);
413 std::cout << "Do you want to terminate now [yN]? " << std::flush;
414 std::getline(std::cin, l);
415 if (l[0] == 'y' || l[0] == 'Y') {
422 gSystem->Sleep(1000*delay);
430 * Watch Grid for termination of main job, and submit merging jobs as needed.
432 * @param name Name of the job
433 * @param delay Delay between updates in seconds
435 * @ingroup pwglf_forward_trains_helper
437 void GridWatch(const TString& name, Bool_t batch=false, UShort_t delay=5*60)
439 gEnv->SetValue("XSec.GSI.DelegProxy", "2");
440 TGrid::Connect("alien:///");
442 Error("GridWatch", "Failed to connect to the Grid");
446 TObjArray* jobIDs = ReadJobIDs(name, false);
447 TObjArray* stages = ReadStages(name, false);
449 if (!jobIDs || !stages) return;
452 if (!ParseJobIDs(jobIDs, jobs)) return;
454 if (!(CheckTokens(name, "jobid", true) &&
455 CheckTokens(name, "stage", true)))
456 WaitForJobs(jobs, stages, delay, batch);
463 if (!CheckTokens(name, "jobid", true) &&
464 !CheckTokens(name, "stage", true)) {
465 Printf("Now executing terminate");
466 gSystem->Exec("aliroot -l -b -q Terminate.C");
467 gSystem->Sleep(10*1000);
469 Printf("Reading job ids");
471 jobIDs = ReadJobIDs(name, true);
472 stages = ReadStages(name, true);
474 if (!ParseJobIDs(jobIDs, jobs)) {
475 Error("GridWatch", "Failed to parse job ids %s",
476 TokenName(name,"jobid",true).Data());
480 WaitForJobs(jobs, stages, delay, batch);
482 Bool_t allFinal = true;
483 for (Int_t i = 0; i < jobs.GetSize(); i++) {
484 if (jobs.At(i) < 0) continue;
486 const TString& s = static_cast<TObjString*>(stages->At(i))->String();
487 if (!s.BeginsWith("final_")) allFinal = false;
493 Printf("All jobs in final stage");
496 RemoveTokens(name, "jobid", true);
497 RemoveTokens(name, "stage", true);