Source code for pycbc.workflow.grb_utils

# Copyright (C) 2015  Andrew Williamson
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

#
# =============================================================================
#
#                                   Preamble
#
# =============================================================================
#

"""
This library code contains functions and classes that are used in the
generation of pygrb workflows. For details about pycbc.workflow see here:
http://pycbc.org/pycbc/latest/html/workflow.html
"""

from __future__ import print_function
import sys
import os
import shutil
from six.moves.urllib.request import pathname2url
from six.moves.urllib.parse import urljoin
from ligo import segments
from glue.ligolw import ligolw, lsctables, utils, ilwd
from pycbc.workflow.core import File, FileList, resolve_url_to_file
from pycbc.workflow.jobsetup import select_generic_executable


[docs]def set_grb_start_end(cp, start, end): """ Function to update analysis boundaries as workflow is generated Parameters ---------- cp : pycbc.workflow.configuration.WorkflowConfigParser object The parsed configuration options of a pycbc.workflow.core.Workflow. start : int The start of the workflow analysis time. end : int The end of the workflow analysis time. Returns -------- cp : pycbc.workflow.configuration.WorkflowConfigParser object The modified WorkflowConfigParser object. """ cp.set("workflow", "start-time", str(start)) cp.set("workflow", "end-time", str(end)) return cp
[docs]def get_coh_PTF_files(cp, ifos, run_dir, bank_veto=False, summary_files=False): """ Retrieve files needed to run coh_PTF jobs within a PyGRB workflow Parameters ---------- cp : pycbc.workflow.configuration.WorkflowConfigParser object The parsed configuration options of a pycbc.workflow.core.Workflow. ifos : str String containing the analysis interferometer IDs. run_dir : str The run directory, destination for retrieved files. bank_veto : Boolean If true, will retrieve the bank_veto_bank.xml file. summary_files : Boolean If true, will retrieve the summary page style files. Returns ------- file_list : pycbc.workflow.FileList object A FileList containing the retrieved files. """ if os.getenv("LAL_SRC") is None: raise ValueError("The environment variable LAL_SRC must be set to a " "location containing the file lalsuite.git") else: lalDir = os.getenv("LAL_SRC") sci_seg = segments.segment(int(cp.get("workflow", "start-time")), int(cp.get("workflow", "end-time"))) file_list = FileList([]) # Bank veto if bank_veto: shutil.copy("%s/lalapps/src/ring/coh_PTF_config_files/" \ "bank_veto_bank.xml" % lalDir, "%s" % run_dir) bank_veto_url = "file://localhost%s/bank_veto_bank.xml" % run_dir bank_veto = File(ifos, "bank_veto_bank", sci_seg, file_url=bank_veto_url) bank_veto.PFN(bank_veto.cache_entry.path, site="local") file_list.extend(FileList([bank_veto])) if summary_files: # summary.js file shutil.copy("%s/lalapps/src/ring/coh_PTF_config_files/" \ "coh_PTF_html_summary.js" % lalDir, "%s" % run_dir) summary_js_url = "file://localhost%s/coh_PTF_html_summary.js" \ % run_dir summary_js = File(ifos, "coh_PTF_html_summary_js", sci_seg, file_url=summary_js_url) summary_js.PFN(summary_js.cache_entry.path, site="local") file_list.extend(FileList([summary_js])) # summary.css file shutil.copy("%s/lalapps/src/ring/coh_PTF_config_files/" \ "coh_PTF_html_summary.css" % lalDir, "%s" % run_dir) summary_css_url = "file://localhost%s/coh_PTF_html_summary.css" \ % run_dir summary_css = File(ifos, "coh_PTF_html_summary_css", sci_seg, file_url=summary_css_url) summary_css.PFN(summary_css.cache_entry.path, site="local") file_list.extend(FileList([summary_css])) return file_list
[docs]def make_exttrig_file(cp, ifos, sci_seg, out_dir): ''' Make an ExtTrig xml file containing information on the external trigger Parameters ---------- cp : pycbc.workflow.configuration.WorkflowConfigParser object The parsed configuration options of a pycbc.workflow.core.Workflow. ifos : str String containing the analysis interferometer IDs. sci_seg : ligo.segments.segment The science segment for the analysis run. out_dir : str The output directory, destination for xml file. Returns ------- xml_file : pycbc.workflow.File object The xml file with external trigger information. ''' # Initialise objects xmldoc = ligolw.Document() xmldoc.appendChild(ligolw.LIGO_LW()) tbl = lsctables.New(lsctables.ExtTriggersTable) cols = tbl.validcolumns xmldoc.childNodes[-1].appendChild(tbl) row = tbl.appendRow() # Add known attributes for this GRB setattr(row, "event_ra", float(cp.get("workflow", "ra"))) setattr(row, "event_dec", float(cp.get("workflow", "dec"))) setattr(row, "start_time", int(cp.get("workflow", "trigger-time"))) setattr(row, "event_number_grb", str(cp.get("workflow", "trigger-name"))) # Fill in all empty rows for entry in cols.keys(): if not hasattr(row, entry): if cols[entry] in ['real_4','real_8']: setattr(row,entry,0.) elif cols[entry] == 'int_4s': setattr(row,entry,0) elif cols[entry] == 'lstring': setattr(row,entry,'') elif entry == 'process_id': row.process_id = ilwd.ilwdchar("external_trigger:process_id:0") elif entry == 'event_id': row.event_id = ilwd.ilwdchar("external_trigger:event_id:0") else: print("Column %s not recognized" %(entry), file=sys.stderr) raise ValueError # Save file xml_file_name = "triggerGRB%s.xml" % str(cp.get("workflow", "trigger-name")) xml_file_path = os.path.join(out_dir, xml_file_name) utils.write_filename(xmldoc, xml_file_path) xml_file_url = urljoin("file:", pathname2url(xml_file_path)) xml_file = File(ifos, xml_file_name, sci_seg, file_url=xml_file_url) xml_file.PFN(xml_file_url, site="local") return xml_file
[docs]def get_ipn_sky_files(workflow, file_url, tags=None): ''' Retreive the sky point files for searching over the IPN error box and populating it with injections. Parameters ---------- workflow: pycbc.workflow.core.Workflow An instanced class that manages the constructed workflow. file_url : string The URL of the IPN sky points file. tags : list of strings If given these tags are used to uniquely name and identify output files that would be produced in multiple calls to this function. Returns -------- sky_points_file : pycbc.workflow.core.File File object representing the IPN sky points file. ''' tags = tags or [] file_attrs = { 'ifos': workflow.ifos, 'segs': workflow.analysis_time, 'exe_name': "IPN_SKY_POINTS", 'tags': tags } sky_points_file = resolve_url_to_file(file_url, attrs=file_attrs) return sky_points_file
[docs]def make_gating_node(workflow, datafind_files, outdir=None, tags=None): ''' Generate jobs for autogating the data for PyGRB runs. Parameters ---------- workflow: pycbc.workflow.core.Workflow An instanced class that manages the constructed workflow. datafind_files : pycbc.workflow.core.FileList A FileList containing the frame files to be gated. outdir : string Path of the output directory tags : list of strings If given these tags are used to uniquely name and identify output files that would be produced in multiple calls to this function. Returns -------- condition_strain_nodes : list List containing the pycbc.workflow.core.Node objects representing the autogating jobs. condition_strain_outs : pycbc.workflow.core.FileList FileList containing the pycbc.workflow.core.File objects representing the gated frame files. ''' cp = workflow.cp if tags is None: tags = [] condition_strain_class = select_generic_executable(workflow, "condition_strain") condition_strain_nodes = [] condition_strain_outs = FileList([]) for ifo in workflow.ifos: input_files = FileList([datafind_file for datafind_file in \ datafind_files if datafind_file.ifo == ifo]) condition_strain_jobs = condition_strain_class(cp, "condition_strain", ifo=ifo, out_dir=outdir, tags=tags) condition_strain_node, condition_strain_out = \ condition_strain_jobs.create_node(input_files, tags=tags) condition_strain_nodes.append(condition_strain_node) condition_strain_outs.extend(FileList([condition_strain_out])) return condition_strain_nodes, condition_strain_outs
[docs]def get_sky_grid_scale(sky_error, sigma_sys=6.8359): """ Calculate suitable 3-sigma radius of the search patch, incorporating Fermi GBM systematic if necessary. """ return 1.65 * (sky_error**2 + sigma_sys**2)**0.5