Logo Coherent WaveBurst  
Reference Guide
Logo
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
cwb_condor_recovery.C
Go to the documentation of this file.
1 // compare list of jobs included in the dag file and check job finished (from history) produce condor dag file : used by the cwb_condor command
2 
3 {
4  #include <vector>
5 
7 
8  TB.checkFile(gSystem->Getenv("CWB_ROOTLOGON_FILE"));
9  TB.checkFile(gSystem->Getenv("CWB_PARAMETERS_FILE"));
10  TB.checkFile(gSystem->Getenv("CWB_UPARAMETERS_FILE"));
11 
12  if(TString(condor_tag)=="") {
13  cout << endl;
14  cout << "cwb_condor_recovery.C : Error - the accounting_group is not defined !!!" << endl;
15  cout << "The accounting_group must be defined in the user_parameters.C file" << endl;
16  cout << "See the following link:" << endl;
17  cout <<" https://ldas-gridmon.ligo.caltech.edu/accounting/condor_groups/determine_condor_account_group.html" << endl;
18  cout << "Examples : " << endl;
19  cout << "strcpy(condor_tag,\"ligo.dev.o2.burst.allsky.cwboffline\");" << endl;
20  cout << "strcpy(condor_tag,\"ligo.prod.o2.burst.allsky.cwboffline\");" << endl;
21  cout << "If you don't need it set : strcpy(condor_tag,\"disabled\");" << endl << endl;
22  exit(1);
23  }
24  if(TString(condor_tag)=="disabled") strcpy(condor_tag,"");
25 
27  if(gSystem->Getenv("CWB_UPARAMETERS_FILE")==NULL) {
28  cout << "Error : environment CWB_UPARAMETERS_FILE is not defined!!!" << endl;exit(1);
29  } else {
30  cwb_uparameters_file=TString(gSystem->Getenv("CWB_UPARAMETERS_FILE"));
31  }
32 
33  // get cwb stage name
34  TString cwb_stage_name="CWB_STAGE_FULL";
35  if(gSystem->Getenv("CWB_STAGE_NAME")!=NULL) {
36  cwb_stage_name=TString(gSystem->Getenv("CWB_STAGE_NAME"));
37  }
38  if(cwb_stage_name=="CWB_STAGE_FULL") cwb_stage_name="CWB_STAGE_LIKELIHOOD";
39  // convert stage name to value
40  TString cwb_resume_label=output_dir; // used with resume
42  if(cwb_stage_name=="CWB_STAGE_FULL") {cwb_stage=CWB_STAGE_LIKELIHOOD; cwb_resume_label+="/supercluster_";}
43  if(cwb_stage_name=="CWB_STAGE_INIT") {cwb_stage=CWB_STAGE_INIT; cwb_resume_label+="";}
44  if(cwb_stage_name=="CWB_STAGE_STRAIN") {cwb_stage=CWB_STAGE_STRAIN; cwb_resume_label+="/init_";}
45  if(cwb_stage_name=="CWB_STAGE_CSTRAIN") {cwb_stage=CWB_STAGE_CSTRAIN; cwb_resume_label+="/strain_";}
46  if(cwb_stage_name=="CWB_STAGE_COHERENCE") {cwb_stage=CWB_STAGE_COHERENCE; cwb_resume_label+="/cstrain_";}
47  if(cwb_stage_name=="CWB_STAGE_SUPERCLUSTER") {cwb_stage=CWB_STAGE_SUPERCLUSTER;cwb_resume_label+="/coherence_";}
48  if(cwb_stage_name=="CWB_STAGE_LIKELIHOOD") {cwb_stage=CWB_STAGE_LIKELIHOOD; cwb_resume_label+="/supercluster_";}
49  if(gSystem->Getenv("CWB_STAGE_NAME")!=NULL) {
50  cwb_stage_name=TString(gSystem->Getenv("CWB_STAGE_NAME"));
51  }
52 
53  char full_condor_dir[1024];
54  sprintf(full_condor_dir,"%s/%s",work_dir,condor_dir);
55 
56  // check if condor dag file is present, otherwise it is created
57  // the dag file could be not present in the second stage analysis
58  bool exists = TB.isFileExisting(TString::Format("%s/%s.dag",full_condor_dir,data_label));
59  if(!exists) {
60  TString cwb_scripts = TString(gSystem->Getenv("CWB_SCRIPTS"));
61  TString exec_cmd = TString::Format("%s/cwb_condor.csh create",cwb_scripts.Data());
62  int ret=gSystem->Exec(exec_cmd);
63  if(ret) {cout << "Error while executing cwb_condor create !!!" << endl;exit(1);}
64  }
65 
66  // read condor job list
67  vector<int> jobList=TB.getCondorJobList(full_condor_dir, data_label);
68 
69  int max_jobs = 0;
70  for(int i=0;i<jobList.size();i++) if(jobList[i]>max_jobs) max_jobs=jobList[i];
71 
72  int jobStart[max_jobs];
73  int jobStop[max_jobs];
74 
76  for (int i=0;i<max_jobs;i++) jobStage[i]=-1; // excluded jobs
77  for (int i=0;i<jobList.size();i++) jobStage[jobList[i]-1]=0; // jobs in the dag file
78 
79  char tag[256];sprintf(tag,"%s.dag.recovery.",data_label);
80  vector<TString> fileList = TB.getFileListFromDir(condor_dir, "", tag);
81  int iversion=0;
82  for(int i=0;i<fileList.size();i++) {
83  //cout << i << " " << fileList[i].Data() << endl;
84  TObjArray* token = TString(fileList[i]).Tokenize(TString("."));
85  TObjString* srecoveryID = (TObjString*)token->At(token->GetEntries()-1);
86  if(srecoveryID->GetString().IsDigit()) {
87  cout << i << " " << fileList[i].Data() << endl;
88  int recoveryID = srecoveryID->GetString().Atoi();
89  if(iversion<recoveryID) iversion=recoveryID;
90  }
91  }
92  iversion++;
93 
94  char dagfile[1024];
95  sprintf(dagfile,"%s/%s.dag.recovery.%d",condor_dir,data_label,iversion);
96 
97  // Check if dag file already exist
98  Long_t id,size,flags,mt;
99  int estat = gSystem->GetPathInfo(dagfile,&id,&size,&flags,&mt);
100  if (estat==0) {
101  char answer[256];
102  strcpy(answer,"");
103  do {
104  cout << "File \"" << dagfile << "\" already exist" << endl;
105  cout << "Do you want to overwrite the file ? (y/n) ";
106  cin >> answer;
107  cout << endl << endl;
108  } while ((strcmp(answer,"y")!=0)&&(strcmp(answer,"n")!=0));
109  if (strcmp(answer,"n")==0) {
110  exit(0);
111  }
112  }
113 
114  // get cwb_stage_resume
115  // if true then recovery done only if previous cwb_stage is present in the output dir
117  if(gSystem->Getenv("CWB_STAGE_RESUME")!=NULL) {
118  cwb_stage_resume=TString(gSystem->Getenv("CWB_STAGE_RESUME"));
119  }
120 
121  // get cwb stage input dir (input files produced by the previous stage)
123  if(gSystem->Getenv("CWB_STAGE_INPUT")!=NULL) {
124  cwb_stage_input=TString(gSystem->Getenv("CWB_STAGE_INPUT"));
125  }
126  if(cwb_stage_input=="") cwb_stage_input=output_dir;
127  TB.checkFile(cwb_stage_input);
128 
129  // factor label : extract the last factor value
130  char sfactor[32]="";
131  if(simulation) {
132  if(simulation==3) {
133  if(factor<0) sprintf(sfactor,"_n%g",fabs(factors[nfactor-1]));
134  if(factor==0) sprintf(sfactor,"_z%g",factors[nfactor-1]);
135  if(factor>0) sprintf(sfactor,"_p%g",factors[nfactor-1]);
136  } else if(simulation==4) {
137  int ioffset = int(factors[0])<=0 ? 1 : int(factors[0]);
138  ioffset+=nfactor-1;
139  sprintf(sfactor,"_%i",ioffset);
140  } else sprintf(sfactor,"_%g",factors[nfactor-1]);
141  }
142  char job_label[512];sprintf(job_label,"%s%s",data_label,sfactor);
143 
144  cout << "Starting reading output directory ..." << endl;
145  vector<TString> jobFiles(max_jobs);
146  for(int i=0;i<max_jobs;i++) jobFiles[i]=cwb_uparameters_file;
147  vector<TString> fileList = TB.getFileListFromDir(cwb_stage_input,".root","",data_label,true);
148  for(int n=0;n<fileList.size();n++) {
149 
150  int jobId = TB.getJobId(fileList[n]); // Get JOB ID
151  jobId-=1;
152 
153  if(fileList[n].BeginsWith(cwb_stage_input+"/init_"))
154  if(CWB_STAGE_INIT>jobStage[jobId])
155  {jobStage[jobId]=CWB_STAGE_INIT;jobFiles[jobId]=fileList[n];continue;}
156  if(fileList[n].BeginsWith(cwb_stage_input+"/strain_"))
157  if(CWB_STAGE_STRAIN>jobStage[jobId])
158  {jobStage[jobId]=CWB_STAGE_STRAIN;jobFiles[jobId]=fileList[n];continue;}
159  if(fileList[n].BeginsWith(cwb_stage_input+"/cstrain_"))
160  if(CWB_STAGE_CSTRAIN>jobStage[jobId])
161  {jobStage[jobId]=CWB_STAGE_CSTRAIN;jobFiles[jobId]=fileList[n];continue;}
162  if(fileList[n].BeginsWith(cwb_stage_input+"/coherence_"))
163  if(CWB_STAGE_COHERENCE>jobStage[jobId])
164  {jobStage[jobId]=CWB_STAGE_COHERENCE;jobFiles[jobId]=fileList[n];continue;}
165  if(fileList[n].BeginsWith(cwb_stage_input+"/supercluster_"))
166  if(CWB_STAGE_SUPERCLUSTER>jobStage[jobId])
167  {jobStage[jobId]=CWB_STAGE_SUPERCLUSTER;jobFiles[jobId]=fileList[n];continue;}
168  if(fileList[n].BeginsWith(cwb_stage_input+"/wave_")&&fileList[n].Contains(job_label))
169  if(CWB_STAGE_LIKELIHOOD>jobStage[jobId])
170  {jobStage[jobId]=CWB_STAGE_LIKELIHOOD;jobFiles[jobId]=fileList[n];continue;}
171 
172 /*
173  // Get STOP JOB info from history
174  TFile *ifile = TFile::Open(fileList[n]);
175  if(ifile==NULL) {cout << "Failed to open " << fileList[n].Data() << endl;exit(-1);}
176  CWB::History* ihistory = (CWB::History*)ifile->Get("history");
177  if(ihistory==NULL) { cout << "Error : history is not present!!!" << endl;exit(1); }
178  int log_size = ihistory->GetLogSize("FULL");
179  TString log = ihistory->GetLog("FULL",log_size-1);
180  ifile->Close();
181  if(log!="STOP JOB") nrecovery++;
182 */
183  }
184  //for (int i=0;i<max_jobs;i++) cout << i << " " << jobStage[i] << " " << jobFiles[i].Data() << " " << cwb_stage << endl;
185 
186  int nrecovery=0;
187  for (int i=0;i<max_jobs;i++)
188  if ((jobStage[i]>=cwb_stage)||(jobStage[i]>=CWB_STAGE_LIKELIHOOD)) nrecovery++;
189  nrecovery=jobList.size()-nrecovery;
190  if(nrecovery==0) {
191  cout << "No Jobs to be recovered" << endl;
192  gSystem->Exit(0);
193  }
194  cout << endl;
195  cout << "New Recovey File " << endl;
196  cout << dagfile << endl;
197 
198  // condor log dirs
199  char full_condor_out_dir[1024];
200  char full_condor_err_dir[1024];
201  sprintf(full_condor_out_dir,"%s/%s",work_dir,log_dir);
202  sprintf(full_condor_err_dir,"%s/%s",work_dir,log_dir);
203 
204  // create dag condor file
205  char full_condor_dir[1024];
206  sprintf(full_condor_dir,"%s/%s",work_dir,condor_dir);
207 
208  ofstream out;
209  out.open(dagfile,ios::out);
210  int cnt = 0;
211  for (int i=0;i<max_jobs;i++) {
212  if (i%1000==0) cout << i << "/" << max_jobs << endl;
213  if ((jobStage[i]!=-1)&&(jobStage[i]<cwb_stage)&&(jobStage[i]<CWB_STAGE_LIKELIHOOD)) {
214  if(cwb_stage_resume=="TRUE") if(!jobFiles[i].BeginsWith(cwb_resume_label)) continue;
215  cnt++;
216  char ostring[256];
217  int jobID=i+1;
218  sprintf(ostring,"JOB A%i %s/%s.sub.recovery.%d",jobID,full_condor_dir,data_label,iversion);
219  out << ostring << endl;
220  sprintf(ostring,"VARS A%i PID=\"%i\" CWB_UFILE=\"%s\" CWB_STAGE=\"%s\"",
221  jobID,jobID,jobFiles[i].Data(),cwb_stage_name.Data());
222  out << ostring << endl;
223  sprintf(ostring,"RETRY A%i 3000",jobID);
224  out << ostring << endl;
225  // remove broken symbolic links of condor log files (avoid init condor failure)
226  TString path;
227  char symlink[1024];
228  Long_t id,size,flags,mt;
229  sprintf(symlink,"%s/%d_%s_%s.out",full_condor_out_dir,jobID,data_label,cwb_stage_name.Data());
230  path = CWB::Toolbox::getFileName(symlink);
231  if(path!="") {
232  int estat = gSystem->GetPathInfo(path.Data(),&id,&size,&flags,&mt);
233  if(estat!=0) { // condor log out symbolic link is broken
234  char cmd[1024]; sprintf(cmd,"rm -f %s",symlink);
235  gSystem->Exec(cmd);
236  }
237  }
238  sprintf(symlink,"%s/%d_%s_%s.err",full_condor_err_dir,jobID,data_label,cwb_stage_name.Data());
239  path = CWB::Toolbox::getFileName(symlink);
240  if(path!="") {
241  int estat = gSystem->GetPathInfo(symlink,&id,&size,&flags,&mt);
242  if(estat!=0) { // condor log err symbolic link is broken
243  char cmd[1024]; sprintf(cmd,"rm -f %s",symlink);
244  gSystem->Exec(cmd);
245  }
246  }
247  }
248  }
249  out.close();
250 
251  if(gSystem->Getenv("_USE_LSF")!=NULL) {
252 
253  // make lsf label
254  char lsf_label[1024];
255  if(cwb_stage_name=="CWB_STAGE_FULL") {
256  sprintf(lsf_label,"%s",data_label);
257  } else {
258  sprintf(lsf_label,"%s_%s",data_label,cwb_stage_name.Data());
259  }
260 
261  // create tgz of the working dir
262  TString exec_cmd = TString::Format("tar -czf %s/%s.tgz %s %s %s %s/*.sh --exclude='*/.svn'",
264  gSystem->Exec(exec_cmd);
265  cout << endl << "Created tgz file : " << condor_dir<<"/"<<lsf_label<<".tgz" << endl;
266 
267  // create LSF file from the DAG file
269  if(lsfFile!="") {
270  cout << endl << "Unfinished Jobs : " << cnt << "/" << jobList.size() << endl;
271  cout << endl << "Created LSF file : " << lsfFile << endl << endl;
272  cout << "To submit LSF recovered jobs, type :" << endl;
273  cout << "cwb_lsf submit " << lsfFile << endl << endl;
274  } else {
275  cout << endl << "No jobs to be submitted !!!" << endl << endl;
276  }
277  gSystem->Exit(0);
278  }
279 
280  if(gSystem->Getenv("_USE_PEGASUS")!=NULL) {
281  // create in tgz file
282  ofstream out;
283  char infile[1024];
284  sprintf(infile,"%s/%s.in.recovery.%d",condor_dir,data_label,iversion);
285  out.open(infile,ios::out);
286  out << "../" << config_dir << "/" << endl;
287  out << "../" << input_dir << "/" << endl;
288  out << "../" << macro_dir << "/" << endl;
289 /*
290  for (int i=0;i<max_jobs;i++) {
291  if ((jobStage[i]!=-1)&&(jobStage[i]<cwb_stage)&&(jobStage[i]<CWB_STAGE_LIKELIHOOD)) {
292  if(jobStage[i]!=CWB_STAGE_FULL) out << "../" << jobFiles[i] << endl;
293  }
294  }
295 */
296  out.close();
297  // execute cwb_pegasus_create.sh
298  sprintf(dagfile,"%s.dag.recovery.%d",data_label,iversion);
299  TString cwb_scripts = TString(gSystem->Getenv("CWB_SCRIPTS"));
300  TString exec_cmd = TString::Format("cd %s;%s/cwb_pegasus_create.sh %s",
301  condor_dir,cwb_scripts.Data(),dagfile);
302  int ret=gSystem->Exec(exec_cmd);
303  if(ret) {cout << "Error while executing cwb_pegasus_create !!!" << endl;exit(1);}
304 
305  cout << endl;
306  cout << "Unfinished Jobs : " << cnt << "/" << jobList.size() << endl;
307  cout << endl;
308  sprintf(dagfile,"%s/%s.dag.recovery.%d",condor_dir,data_label,iversion);
309  cout << "To submit pegasus recovered jobs, type :" << endl;
310  cout << "cwb_pegasus submit " << dagfile << endl;
311  } else {
312  // create sub condor file
313  char extention[1024];
314  sprintf(extention,"recovery.%d",iversion);
315  TB.createSubFile(data_label, full_condor_dir, full_condor_out_dir,
316  full_condor_err_dir, condor_log, extention, condor_tag);
317  cout << endl;
318  cout << "Unfinished Jobs : " << cnt << "/" << jobList.size() << endl;
319  cout << endl;
320  sprintf(dagfile,"%s/%s.dag.recovery.%d",condor_dir,data_label,iversion);
321  cout << "To submit condor recovered jobs, type :" << endl;
322  cout << "cwb_condor submit " << dagfile << endl;
323  }
324  cout << endl;
325 
326  if(gSystem->Getenv("_USE_LSF")!=NULL) {
327 
328  TString cwb_stage_label="supercluster_";
329  if( cwb_stage_input=="FULL") cwb_stage_label="wave_";
330  if( cwb_stage_input=="INIT") cwb_stage_label="init_";
331  if( cwb_stage_input=="STRAIN") cwb_stage_label="strain_";
332  if( cwb_stage_input=="CSTRAIN") cwb_stage_label="cstrain_";
333  if( cwb_stage_input=="COHERENCE") cwb_stage_label="coherence_";
334  if( cwb_stage_input=="SUPERCLUSTER") cwb_stage_label="supercluster_";
335  if( cwb_stage_input=="LIKELIHOOD") cwb_stage_label="wave_";
336 
337  TString exec_cmd =TString::Format("export file_n_st=""$(ls %s*_job%i.root)""",cwb_stage_label.Data(),jobID);
338  gSystem->Exec(exec_cmd);
339  gSystem->Exec("echo $file_n_st");
340  if(gSystem->Getenv("file_n_st")!=NULL) {
341  char *file_tmp = gSystem->ExpandPathName("output/$file_n_st");
342  //cout<<file_tmp<<endl;
343  exec_cmd = TString::Format("tar -czf %s/%s.tgz %s %s %s %s %s %s --exclude='*/.svn' --exclude='%s/*' --exclude='%s/*'",
345  log_dir, macro_dir, output_dir, log_dir, (&file_tmp));
346  } else {
347  // create tgz of the working dir
348  exec_cmd = TString::Format("tar -czf %s/%s.tgz %s %s %s %s %s --exclude='*/.svn' --exclude='%s/*' --exclude='%s/*'",
351  gSystem->Exec(exec_cmd);
352  cout << endl << "Created tgz file : " << condor_dir<<"/"<<data_label<<".tgz" << endl;
353  }
354  }
355 
356  gSystem->Exit(0);
357 }
TString cwb_scripts
Long_t size
TString cwb_stage_name
static vector< TString > getFileListFromDir(TString dir_name, TString endString="", TString beginString="", TString containString="", bool fast=false)
Definition: Toolbox.cc:4333
sprintf(full_condor_dir,"%s/%s", work_dir, condor_dir)
char cmd[1024]
TString cwb_stage_input
TString cwb_stage_label
Definition: cwb_clonedir.C:146
vector< int > jobList
int n
Definition: cwb_net.C:10
Long_t flags
char dagfile[1024]
TString("c")
Long_t mt
char full_condor_dir[1024]
int max_jobs
char macro_dir[512]
Definition: test_config1.C:150
i drho i
static bool checkFile(TString fName, bool question=false, TString message="")
Definition: Toolbox.cc:3956
int jobID
Definition: cwb_net.C:177
CWB::Toolbox TB
Definition: ComputeSNR.C:5
CWB_STAGE jobStage[max_jobs]
TString symlink
ofstream out
Definition: cwb_merge.C:196
char data_label[512]
Definition: test_config1.C:160
double factor
char input_dir[512]
Definition: test_config1.C:145
int nrecovery
static int createSubFile(TString label, TString condor_dir, TString out_dir, TString err_dir, TString log_dir, TString ext="", TString condor_tag="")
Definition: Toolbox.cc:1317
int estat
Long_t id
i() int(T_cor *100))
char job_label[512]
int jobStop[max_jobs+1]
char data_dir[512]
Definition: test_config1.C:152
TObjArray * token
char log_dir[512]
Definition: test_config1.C:151
char config_dir[512]
Definition: test_config1.C:144
static int getJobId(TString file, TString fext="root")
Definition: Toolbox.cc:5913
char tag[256]
Definition: cwb_merge.C:74
double factors[100]
Definition: test_config1.C:84
int jobStart[max_jobs+1]
char answer[256]
static vector< int > getCondorJobList(TString condor_dir, TString label)
Definition: Toolbox.cc:1378
cout<< "Starting reading output directory ..."<< endl;vector< TString > fileList
double fabs(const Complex &x)
Definition: numpy.cc:37
static TString DAG2LSF(char *dagFile, char *data_label, char *nodedir, char *data_dir, char *condor_dir, char *log_dir, char *output_dir, char *work_dir)
Definition: Toolbox.cc:1541
strcpy(RunLabel, RUN_LABEL)
int cnt
nfactor[0]
Definition: cwb_eced.C:10
TString cwb_stage_resume
cout<< "Starting reading output directory ..."<< endl;vector< TString > jobFiles(max_jobs)
bool exists
char condor_log[512]
Definition: test_config1.C:163
static TString getFileName(FILE *fp)
Definition: Toolbox.cc:5996
TString cwb_uparameters_file
int jobId
char nodedir[1024]
Definition: test_config1.C:187
char condor_dir[512]
Definition: test_config1.C:148
char work_dir[512]
Definition: test_config1.C:143
char full_condor_out_dir[512]
static bool isFileExisting(TString fName)
Definition: Toolbox.cc:3937
CWB_STAGE cwb_stage
simulation
Definition: cwb_eced.C:9
char output_dir[512]
Definition: test_config1.C:146
CWB_STAGE
Definition: cwb.hh:102
char full_condor_err_dir[512]
exit(0)