]> git.uio.no Git - u/mrichter/AliRoot.git/blob - TPC/macros/testTPC/AliTPCjobs.cxx
Added the dimuon trigger decision component, its default configuration CDB entries...
[u/mrichter/AliRoot.git] / TPC / macros / testTPC / AliTPCjobs.cxx
1 //#include <fstream>
2 //#include <TFile.h>
3 //#include <TSystem.h>
4
5
6 /*
7
8   Process all jobs specified in the job.list file
9   Concurent agents acces the list
10   Once starting to procces given job the jonb is locked 
11
12 */
13
14
15
16 /*
17
18
19 .L $ALICE_ROOT/TPC/macros/testTPC/AliTPCjobs.cxx+
20  
21
22 AliTPCJobs jobs;
23 jobs.fJobFile="job.list"
24 jobs.ProcessAllJobs();
25 */
26
27
28 class AliTPCJobs : public TNamed{
29 public:
30   AliTPCJobs();
31   void ProcessAllJobs();
32   Bool_t GetNextJob();
33   void ProcessJob(TString jobID, TString inputData, TString outputDir, TString   action);
34   
35   void    SetLock(TString jobID);
36   void    SetDone(TString jobID);
37   void    SetFail(TString jobID);
38
39   Int_t   Stage(TString inputData,TString jobID);
40
41   Bool_t  IsLocked(TString jobID);
42   Bool_t  IsFail(TString jobID);
43   Bool_t  IsStaged(TString inputData);
44   Bool_t  IsCastorFile(TString filename);
45
46   TString  fJobFile;
47   TString  fWorkDir;
48   ClassDef(AliTPCJobs,0)
49 };
50
51  ClassImp(AliTPCJobs)
52
53 AliTPCJobs::AliTPCJobs(){
54   // 
55   //
56   //
57   gSystem->Load("libXrdClient.so");
58   gSystem->Load("libNetx.so");
59 }
60
61 void AliTPCJobs::ProcessAllJobs(){
62   //
63   //
64   //
65   Int_t counter=0;
66   while (GetNextJob()){
67     //
68     printf("PROCESSING JOB\t%d\n",counter);
69     counter++;
70     if (!GetNextJob()) break;   
71   }
72 }
73
74
75 Bool_t AliTPCJobs::GetNextJob(){
76   //
77   // GetNextJob  - get job from the list which is not locked
78   //
79   ifstream ins;
80   ins.open(fJobFile);
81   TString id;
82   TString inputData;
83   TString outputDir;
84   TString action;
85   Bool_t hasJob=kFALSE;
86   while(ins.good()) {
87     ins>>id;
88     ins>>inputData;
89     ins>>outputDir;
90     ins>>action;
91     if (!inputData.Contains(".root")) SetFail(id);
92     if (!IsStaged(inputData)){
93         Stage(inputData,id);
94         continue;
95     }
96     if (IsFail(id)) continue;
97     if (!IsLocked(id)){
98       hasJob=kTRUE;
99       break;
100     }
101   }
102   printf("Process %s\n",id.Data());
103   if (hasJob) ProcessJob(id,inputData,outputDir, action);
104   return hasJob;
105 }
106
107
108 void    AliTPCJobs::SetLock(TString jobID){
109   printf("touch out/%s.lock\n",jobID.Data());
110   gSystem->Exec(Form("touch out/%s.lock",jobID.Data()));
111 }
112
113 void    AliTPCJobs::SetDone(TString jobID){ 
114   printf("touch out/%s.done\n",jobID.Data());
115   gSystem->Exec(Form("touch out/%s.done", jobID.Data()));
116 }
117
118 void    AliTPCJobs::SetFail(TString jobID){
119   printf("touch out/%s.fail\n",jobID.Data());
120   gSystem->Exec(Form("touch out/%s.fail", jobID.Data()));
121 }
122
123 void  AliTPCJobs::Stage(TString inputData, TString jobID){
124   //
125   // stage file
126   //
127   inputData.ReplaceAll("root://voalice04.cern.ch:1094/","");
128   if ( !IsCastorFile(inputData) ) return;
129   char command[1000];
130   sprintf(command,"stager_get -M %s \| grep SUBREQUEST_FAILED ",inputData.Data());
131   FILE *pipe = gSystem->OpenPipe(command,"r");
132   TString result;
133   result.Gets(pipe);
134   gSystem->ClosePipe(pipe);
135   if ( result.Contains("SUBREQUEST_FAILED") ){
136       SetFail(jobID);
137   }
138 }
139
140 Bool_t    AliTPCJobs::IsLocked(TString jobID){
141   TString path = "out/";
142   path+=jobID;
143   path+=".lock";
144   Long_t pid; Long_t psize; Long_t pflags; Long_t pmodtime;
145   Int_t status = gSystem->GetPathInfo(path,&pid,&psize,&pflags,&pmodtime);
146   return (status==0);
147 }
148
149 Bool_t    AliTPCJobs::IsFail(TString jobID){
150   TString path = "out/";
151   path+=jobID;
152   path+=".fail";
153   Long_t pid; Long_t psize; Long_t pflags; Long_t pmodtime;
154   Int_t status = gSystem->GetPathInfo(path,&pid,&psize,&pflags,&pmodtime);
155   return (status==0);
156 }
157
158 Bool_t  AliTPCJobs::IsStaged(TString inputData){
159   //
160   // check if file bname is staged
161   //
162   inputData.ReplaceAll("root://voalice04.cern.ch:1094/","");
163   if ( !IsCastorFile(inputData) ) return kTRUE;
164   char command[1000];
165   sprintf(command,"stager_qry -M %s \| grep /castor \| gawk  \'{ print $3;}\'",inputData.Data());
166   FILE *pipe = gSystem->OpenPipe(command,"r");
167   TString result;
168   result.Gets(pipe);
169   gSystem->ClosePipe(pipe);
170   if ( result.Contains("STAGED") ) return kTRUE;
171
172   return kFALSE;
173 }
174
175 Bool_t  AliTPCJobs::IsCastorFile(TString filename){
176   //
177   // check if filename begins with 'castor'
178   //
179   return filename.BeginsWith("/castor/");
180 }
181
182
183 void AliTPCJobs::ProcessJob(TString jobID, TString inputData, TString outputDir, TString   action){
184   //
185   //
186   // 1. Create lock file
187   // 2. Get Input data
188   // 3. Process data
189   // 4. Create Done file
190   SetLock(jobID);
191   if (action.Contains("COPY")){
192     char command[10000];
193       sprintf(command,"xrdcp  -DIFirstConnectMaxCnt 4 -DIConnectTimeout 4 -DIRequestTimeout 4 -DIMaxRedirectcount 4 -DIRedirCntTimeout 4 %s\t%s\n",inputData.Data(), outputDir.Data());
194     printf("Exec\t%s\n", command);
195     gSystem->Exec(command);
196     //TFile::Cp(inputData.Data(), outputDir.Data());
197   }else{
198     char command[10000];
199     //sprintf(command,"$ALICE_ROOT/TPC/macros/testTPC/action.sh %s %s %s %s", jobID.Data(), inputData.Data(), outputDir.Data(), action.Data());    
200     sprintf(command,"/afs/cern.ch/user/w/wiechula/SOURCE/aliroot/job_agend/action.sh %s %s %s %s", jobID.Data(), inputData.Data(), outputDir.Data(), action.Data());
201     printf("%s\n\n",command);
202     gSystem->Exec(command);
203     printf("\n\n");
204
205     // create temp work dir
206 //    TString processDir=fWorkDir+jobID+action;
207 //    Int_t res = gSystem->mkdir(processDir,kTRUE);
208 //    if ( res==-1 ){
209 //      AliWarning(Form("Cannot create dir/already exists: '%s'",processDir.Data()));
210 //      return;
211 //    }
212   }
213   SetDone(jobID);
214 }
215
216
217
218 void AliTPCjobs(){
219   //
220   //
221   //
222   AliTPCJobs jobs;
223   jobs.fJobFile="job.list";
224   jobs.ProcessAllJobs();
225 }