import logging
import os
import pycbc
import numpy
import lal
import json
from six import u as unicode
from glue.ligolw import ligolw
from glue.ligolw import lsctables
from glue.ligolw import utils as ligolw_utils
from glue.ligolw.utils import process as ligolw_process
from glue.ligolw import param as ligolw_param
from pycbc import version as pycbc_version
from pycbc import pnutils
from pycbc.tmpltbank import return_empty_sngl
from pycbc.results import ifo_color
from pycbc.results import source_color
from pycbc.mchirp_area import calc_probabilities
#FIXME Legacy build PSD xml helpers, delete me when we move away entirely from
# xml formats
def _build_series(series, dim_names, comment, delta_name, delta_unit):
from glue.ligolw import array as ligolw_array
Attributes = ligolw.sax.xmlreader.AttributesImpl
elem = ligolw.LIGO_LW(
Attributes({u"Name": unicode(series.__class__.__name__)}))
if comment is not None:
elem.appendChild(ligolw.Comment()).pcdata = comment
elem.appendChild(ligolw.Time.from_gps(series.epoch, u"epoch"))
elem.appendChild(ligolw_param.Param.from_pyvalue(u"f0", series.f0,
unit=u"s^-1"))
delta = getattr(series, delta_name)
if numpy.iscomplexobj(series.data.data):
data = numpy.row_stack((numpy.arange(len(series.data.data)) * delta,
series.data.data.real, series.data.data.imag))
else:
data = numpy.row_stack((numpy.arange(len(series.data.data)) * delta,
series.data.data))
a = ligolw_array.Array.build(series.name, data, dim_names=dim_names)
a.Unit = str(series.sampleUnits)
dim0 = a.getElementsByTagName(ligolw.Dim.tagName)[0]
dim0.Unit = delta_unit
dim0.Start = series.f0
dim0.Scale = delta
elem.appendChild(a)
return elem
[docs]def snr_series_to_xml(snr_series, document, sngl_inspiral_id):
"""Save an SNR time series into an XML document, in a format compatible
with BAYESTAR.
"""
snr_lal = snr_series.lal()
snr_lal.name = 'snr'
snr_lal.sampleUnits = ''
snr_xml = _build_series(snr_lal, (u'Time', u'Time,Real,Imaginary'), None,
'deltaT', 's')
snr_node = document.childNodes[-1].appendChild(snr_xml)
eid_param = ligolw_param.Param.build(u'event_id', u'ilwd:char',
sngl_inspiral_id)
snr_node.appendChild(eid_param)
[docs]def make_psd_xmldoc(psddict, xmldoc=None):
"""Add a set of PSDs to a LIGOLW XML document. If the document is not
given, a new one is created first.
"""
xmldoc = ligolw.Document() if xmldoc is None else xmldoc.childNodes[0]
# the PSDs must be children of a LIGO_LW with name "psd"
root_name = u"psd"
Attributes = ligolw.sax.xmlreader.AttributesImpl
lw = xmldoc.appendChild(
ligolw.LIGO_LW(Attributes({u"Name": root_name})))
for instrument, psd in psddict.items():
xmlseries = _build_series(psd, (u"Frequency,Real", u"Frequency"),
None, 'deltaF', 's^-1')
fs = lw.appendChild(xmlseries)
fs.appendChild(ligolw_param.Param.from_pyvalue(u"instrument",
instrument))
return xmldoc
[docs]class SingleCoincForGraceDB(object):
"""Create xml files and submit them to gracedb from PyCBC Live"""
def __init__(self, ifos, coinc_results, **kwargs):
"""Initialize a ligolw xml representation of a zerolag trigger
for upload from pycbc live to gracedb.
Parameters
----------
ifos: list of strs
A list of the ifos participating in this trigger.
coinc_results: dict of values
A dictionary of values. The format is defined in
pycbc/events/coinc.py and matches the on disk representation
in the hdf file for this time.
psds: dict of FrequencySeries
Dictionary providing PSD estimates for all involved detectors.
low_frequency_cutoff: float
Minimum valid frequency for the PSD estimates.
high_frequency_cutoff: float, optional
Maximum frequency considered for the PSD estimates. Default None.
followup_data: dict of dicts, optional
Dictionary providing SNR time series for each detector,
to be used in sky localization with BAYESTAR. The format should
be `followup_data['H1']['snr_series']`. More detectors can be
present than given in `ifos`. If so, the extra detectors will only
be used for sky localization.
channel_names: dict of strings, optional
Strain channel names for each detector.
Will be recorded in the sngl_inspiral table.
mc_area_args: dict of dicts, optional
Dictionary providing arguments to be used in source probability
estimation with pycbc/mchirp_area.py
"""
self.template_id = coinc_results['foreground/%s/template_id' % ifos[0]]
self.coinc_results = coinc_results
self.ifos = ifos
# remember if this should be marked as HWINJ
self.is_hardware_injection = ('HWINJ' in coinc_results
and coinc_results['HWINJ'])
# Check if we need to apply a time offset (this may be permerger)
self.time_offset = 0
rtoff = 'foreground/{}/time_offset'.format(ifos[0])
if rtoff in coinc_results:
self.time_offset = coinc_results[rtoff]
if 'followup_data' in kwargs:
fud = kwargs['followup_data']
assert len({fud[ifo]['snr_series'].delta_t for ifo in fud}) == 1, \
"delta_t for all ifos do not match"
self.snr_series = {ifo: fud[ifo]['snr_series'] for ifo in fud}
usable_ifos = fud.keys()
followup_ifos = list(set(usable_ifos) - set(ifos))
for ifo in self.snr_series:
self.snr_series[ifo].start_time += self.time_offset
else:
self.snr_series = None
usable_ifos = ifos
followup_ifos = []
# Set up the bare structure of the xml document
outdoc = ligolw.Document()
outdoc.appendChild(ligolw.LIGO_LW())
proc_id = ligolw_process.register_to_xmldoc(
outdoc, 'pycbc', {}, ifos=usable_ifos, comment='',
version=pycbc_version.version,
cvs_repository='pycbc/'+pycbc_version.git_branch,
cvs_entry_time=pycbc_version.date).process_id
# Set up coinc_definer table
coinc_def_table = lsctables.New(lsctables.CoincDefTable)
coinc_def_id = lsctables.CoincDefID(0)
coinc_def_row = lsctables.CoincDef()
coinc_def_row.search = "inspiral"
coinc_def_row.description = "sngl_inspiral<-->sngl_inspiral coincs"
coinc_def_row.coinc_def_id = coinc_def_id
coinc_def_row.search_coinc_type = 0
coinc_def_table.append(coinc_def_row)
outdoc.childNodes[0].appendChild(coinc_def_table)
# Set up coinc inspiral and coinc event tables
coinc_id = lsctables.CoincID(0)
coinc_event_table = lsctables.New(lsctables.CoincTable)
coinc_event_row = lsctables.Coinc()
coinc_event_row.coinc_def_id = coinc_def_id
coinc_event_row.nevents = len(usable_ifos)
coinc_event_row.instruments = ','.join(usable_ifos)
coinc_event_row.time_slide_id = lsctables.TimeSlideID(0)
coinc_event_row.process_id = proc_id
coinc_event_row.coinc_event_id = coinc_id
coinc_event_row.likelihood = 0.
coinc_event_table.append(coinc_event_row)
outdoc.childNodes[0].appendChild(coinc_event_table)
# Set up sngls
sngl_inspiral_table = lsctables.New(lsctables.SnglInspiralTable)
coinc_event_map_table = lsctables.New(lsctables.CoincMapTable)
sngl_populated = None
network_snrsq = 0
for sngl_id, ifo in enumerate(usable_ifos):
sngl = return_empty_sngl(nones=True)
sngl.event_id = lsctables.SnglInspiralID(sngl_id)
sngl.process_id = proc_id
sngl.ifo = ifo
names = [n.split('/')[-1] for n in coinc_results
if 'foreground/%s' % ifo in n]
for name in names:
val = coinc_results['foreground/%s/%s' % (ifo, name)]
if name == 'end_time':
val += self.time_offset
sngl.set_end(lal.LIGOTimeGPS(val))
else:
try:
setattr(sngl, name, val)
except AttributeError:
pass
if sngl.mass1 and sngl.mass2:
sngl.mtotal, sngl.eta = pnutils.mass1_mass2_to_mtotal_eta(
sngl.mass1, sngl.mass2)
sngl.mchirp, _ = pnutils.mass1_mass2_to_mchirp_eta(
sngl.mass1, sngl.mass2)
sngl_populated = sngl
if sngl.snr:
sngl.eff_distance = (sngl.sigmasq)**0.5 / sngl.snr
network_snrsq += sngl.snr ** 2.0
if 'channel_names' in kwargs and ifo in kwargs['channel_names']:
sngl.channel = kwargs['channel_names'][ifo]
sngl_inspiral_table.append(sngl)
# Set up coinc_map entry
coinc_map_row = lsctables.CoincMap()
coinc_map_row.table_name = 'sngl_inspiral'
coinc_map_row.coinc_event_id = coinc_id
coinc_map_row.event_id = sngl.event_id
coinc_event_map_table.append(coinc_map_row)
if self.snr_series is not None:
snr_series_to_xml(self.snr_series[ifo], outdoc, sngl.event_id)
# set merger time to the average of the ifo peaks
self.merger_time = numpy.mean(
[coinc_results['foreground/{}/end_time'.format(ifo)]
for ifo in ifos]) + self.time_offset
# for subthreshold detectors, respect BAYESTAR's assumptions and checks
bayestar_check_fields = ('mass1 mass2 mtotal mchirp eta spin1x '
'spin1y spin1z spin2x spin2y spin2z').split()
for sngl in sngl_inspiral_table:
if sngl.ifo in followup_ifos:
for bcf in bayestar_check_fields:
setattr(sngl, bcf, getattr(sngl_populated, bcf))
sngl.set_end(lal.LIGOTimeGPS(self.merger_time))
outdoc.childNodes[0].appendChild(coinc_event_map_table)
outdoc.childNodes[0].appendChild(sngl_inspiral_table)
# Set up the coinc inspiral table
coinc_inspiral_table = lsctables.New(lsctables.CoincInspiralTable)
coinc_inspiral_row = lsctables.CoincInspiral()
# This seems to be used as FAP, which should not be in gracedb
coinc_inspiral_row.false_alarm_rate = 0
coinc_inspiral_row.minimum_duration = 0.
coinc_inspiral_row.set_ifos(usable_ifos)
coinc_inspiral_row.coinc_event_id = coinc_id
coinc_inspiral_row.mchirp = sngl_populated.mchirp
coinc_inspiral_row.mass = sngl_populated.mtotal
coinc_inspiral_row.end_time = sngl_populated.end_time
coinc_inspiral_row.end_time_ns = sngl_populated.end_time_ns
coinc_inspiral_row.snr = network_snrsq ** 0.5
far = 1.0 / (lal.YRJUL_SI * coinc_results['foreground/ifar'])
coinc_inspiral_row.combined_far = far
coinc_inspiral_table.append(coinc_inspiral_row)
outdoc.childNodes[0].appendChild(coinc_inspiral_table)
# append the PSDs
self.psds = kwargs['psds']
psds_lal = {}
for ifo in self.psds:
psd = self.psds[ifo]
kmin = int(kwargs['low_frequency_cutoff'] / psd.delta_f)
fseries = lal.CreateREAL8FrequencySeries(
"psd", psd.epoch, kwargs['low_frequency_cutoff'], psd.delta_f,
lal.StrainUnit**2 / lal.HertzUnit, len(psd) - kmin)
fseries.data.data = psd.numpy()[kmin:] / pycbc.DYN_RANGE_FAC ** 2.0
psds_lal[ifo] = fseries
make_psd_xmldoc(psds_lal, outdoc)
# source probabilities estimation
if 'mc_area_args' in kwargs:
eff_distances = [sngl.eff_distance for sngl in sngl_inspiral_table]
probabilities = calc_probabilities(coinc_inspiral_row.mchirp,
coinc_inspiral_row.snr,
min(eff_distances),
kwargs['mc_area_args'])
self.probabilities = probabilities
else:
self.probabilities = None
self.outdoc = outdoc
self.time = sngl_populated.get_end()
[docs] def save(self, filename):
"""Write this trigger to gracedb compatible xml format
Parameters
----------
filename: str
Name of file to write to disk.
"""
gz = filename.endswith('.gz')
ligolw_utils.write_filename(self.outdoc, filename, gz=gz)
# save source probabilities in a json file
if self.probabilities is not None:
prob_fname = filename.replace('.xml.gz', '_probs.json')
with open(prob_fname, 'w') as prob_outfile:
json.dump(self.probabilities, prob_outfile)
logging.info('Source probabilities file saved as %s', prob_fname)
[docs] def upload(self, fname, gracedb_server=None, testing=True,
extra_strings=None, search='AllSky'):
"""Upload this trigger to gracedb
Parameters
----------
fname: str
The name to give the xml file associated with this trigger
gracedb_server: string, optional
URL to the GraceDB web API service for uploading the event.
If omitted, the default will be used.
testing: bool
Switch to determine if the upload should be sent to gracedb as a
test trigger (True) or a production trigger (False).
search: str
String going into the "search" field of the GraceDB event.
"""
from ligo.gracedb.rest import GraceDb
import matplotlib
matplotlib.use('Agg')
import pylab as pl
# first of all, make sure the event is saved on disk
# as GraceDB operations can fail later
self.save(fname)
if self.snr_series is not None:
if fname.endswith('.xml.gz'):
snr_series_fname = fname.replace('.xml.gz', '.hdf')
else:
snr_series_fname = fname.replace('.xml', '.hdf')
snr_series_plot_fname = snr_series_fname.replace('.hdf',
'_snr.png')
psd_series_plot_fname = snr_series_fname.replace('.hdf',
'_psd.png')
pl.figure()
ref_time = int(self.merger_time)
for ifo in sorted(self.snr_series):
curr_snrs = self.snr_series[ifo]
curr_snrs.save(snr_series_fname, group='%s/snr' % ifo)
pl.plot(curr_snrs.sample_times - ref_time, abs(curr_snrs),
c=ifo_color(ifo), label=ifo)
if ifo in self.ifos:
base = 'foreground/{}/'.format(ifo)
snr = self.coinc_results[base + 'snr']
mt = (self.coinc_results[base + 'end_time']
+ self.time_offset)
pl.plot([mt - ref_time], [snr], c=ifo_color(ifo),
marker='x')
pl.legend()
pl.xlabel('GPS time from {:d} (s)'.format(ref_time))
pl.ylabel('SNR')
pl.savefig(snr_series_plot_fname)
pl.close()
pl.figure()
for ifo in sorted(self.snr_series):
# Undo dynamic range factor
curr_psd = self.psds[ifo].astype(numpy.float64)
curr_psd /= pycbc.DYN_RANGE_FAC ** 2.0
curr_psd.save(snr_series_fname, group='%s/psd' % ifo)
# Can't plot log(0) so start from point 1
pl.loglog(curr_psd.sample_frequencies[1:],
curr_psd[1:]**0.5, c=ifo_color(ifo), label=ifo)
pl.legend()
pl.xlim([10, 1300])
pl.ylim([3E-24, 1E-20])
pl.xlabel('Frequency (Hz)')
pl.ylabel('ASD')
pl.savefig(psd_series_plot_fname)
pl.close()
if self.probabilities is not None:
prob_fname = fname.replace('.xml.gz', '_probs.json')
prob_plot_fname = prob_fname.replace('.json', '.png')
prob_plot = {k: v for (k, v) in self.probabilities.items()
if v != 0.0}
labels, sizes = zip(*prob_plot.items())
colors = [source_color(label) for label in labels]
fig, ax = pl.subplots()
ax.pie(sizes, labels=labels, colors=colors, autopct='%1.1f%%',
textprops={'fontsize': 15})
ax.axis('equal')
fig.savefig(prob_plot_fname)
pl.close()
gid = None
try:
# try connecting to GraceDB
gracedb = GraceDb(gracedb_server) \
if gracedb_server is not None else GraceDb()
# create GraceDB event
group = 'Test' if testing else 'CBC'
r = gracedb.createEvent(group, "pycbc", fname, search).json()
gid = r["graceid"]
logging.info("Uploaded event %s", gid)
if self.is_hardware_injection:
gracedb.writeLabel(gid, 'INJ')
logging.info("Tagging event %s as an injection", gid)
# upload PSDs. Note that the PSDs are already stored in the
# original event file and we just upload a copy of that same file
# here. This keeps things as they were in O2 and can be removed
# after updating the follow-up infrastructure
psd_fname = 'psd.xml.gz' if fname.endswith('.gz') else 'psd.xml'
gracedb.writeLog(gid, "PyCBC PSD estimate from the time of event",
psd_fname, open(fname, "rb").read(), "psd")
logging.info("Uploaded PSDs for event %s", gid)
# add info for tracking code version
gracedb_tag_with_version(gracedb, gid)
extra_strings = [] if extra_strings is None else extra_strings
for text in extra_strings:
gracedb.writeLog(gid, text, tag_name=['analyst_comments'])
# upload SNR series in HDF format and plots
if self.snr_series is not None:
gracedb.writeLog(gid, 'SNR timeseries HDF file upload',
filename=snr_series_fname)
gracedb.writeLog(gid, 'SNR timeseries plot upload',
filename=snr_series_plot_fname,
tag_name=['background'],
displayName=['SNR timeseries'])
gracedb.writeLog(gid, 'PSD plot upload',
filename=psd_series_plot_fname,
tag_name=['psd'], displayName=['PSDs'])
# upload source probabilities in json format and plot
if self.probabilities is not None:
gracedb.writeLog(gid, 'source probabilities JSON file upload',
filename=prob_fname, tag_name=['em_follow'])
logging.info('Uploaded source probabilities for event %s', gid)
gracedb.writeLog(gid, 'source probabilities plot upload',
filename=prob_plot_fname,
tag_name=['em_follow'])
logging.info('Uploaded source probabilities pie chart for '
'event %s', gid)
except Exception as exc:
logging.error('Something failed during the upload/annotation of '
'event %s on GraceDB. The event may not have been '
'uploaded!', fname)
logging.error(str(exc))
return gid
[docs]def gracedb_tag_with_version(gracedb, event_id):
"""Add a GraceDB log entry reporting PyCBC's version and install location.
"""
version_str = 'Using PyCBC version {}{} at {}'
version_str = version_str.format(
pycbc_version.version,
' (release)' if pycbc_version.release else '',
os.path.dirname(pycbc.__file__))
gracedb.writeLog(event_id, version_str)
__all__ = ['SingleCoincForGraceDB', 'make_psd_xmldoc', 'snr_series_to_xml',
'gracedb_tag_with_version']