Source code for pycbc.inference.models.tools

""" Common utility functions for calculation of likelihoods
"""

import logging
from distutils.util import strtobool

import numpy
import numpy.random
import tqdm

from scipy.special import logsumexp, i0e
from scipy.interpolate import RectBivariateSpline, interp1d
from pycbc.distributions import JointDistribution


[docs]def str_to_tuple(sval, ftype): """ Convenience parsing to convert str to tuple""" return tuple(ftype(x) for x in sval.split(','))
[docs]def str_to_bool(sval): """ Ensure value is a bool if it can be converted """ if isinstance(sval, str): return strtobool(sval) return sval
[docs]class DistMarg(): """Help class to add bookkeeping for distance marginalization""" marginalize_phase = None distance_marginalization = None distance_interpolator = None
[docs] def setup_distance_marginalization(self, variable_params, marginalize_phase=False, marginalize_distance=False, marginalize_distance_param='distance', marginalize_distance_samples=int(1e4), marginalize_distance_interpolator=False, marginalize_distance_snr_range=None, marginalize_distance_density=None, marginalize_vector=False, marginalize_vector_params=None, **kwargs): """ Setup the model for use with distance marginalization This function sets up precalculations for distance / phase marginalization. For distance margininalization it modifies the model to internally remove distance as a parameter. Parameters ---------- variable_params: list of strings The set of variable parameters marginalize_phase: bool, False Do analytic marginalization (appopriate only for 22 mode waveforms) marginalize_distance: bool, False Marginalize over distance marginalize_distance_param: str Name of the parameter that is used to determine the distance. This might be 'distance' or a parameter which can be converted to distance by a provided univariate transformation. marginalize_distance_interpolator: bool Use a pre-calculated interpolating function for the distance marginalized likelihood. marginalize_distance_snr_range: tuple of floats, (1, 50) The SNR range for the interpolating function to be defined in. If a sampler goes outside this range, the logl will be returned as -numpy.inf. marginalize_distance_density: tuple of intes, (1000, 1000) The dimensions of the interpolation grid over (sh, hh). Returns ------- variable_params: list of strings Set of variable params (missing distance-related parameter). kwags: dict The keyword arguments to the model initialization, may be modified from the original set by this function. """ self.marginalize_vector = marginalize_vector self.marginalize_vector_params = marginalize_vector_params self.reconstructing_distance = False self.marginalize_phase = str_to_bool(marginalize_phase) self.distance_marginalization = False self.distance_interpolator = None marginalize_distance = str_to_bool(marginalize_distance) if not marginalize_distance: return variable_params, kwargs if isinstance(marginalize_distance_snr_range, str): marginalize_distance_snr_range = \ str_to_tuple(marginalize_distance_snr_range, float) if isinstance(marginalize_distance_density, str): marginalize_distance_density = \ str_to_tuple(marginalize_distance_density, int) logging.info('Marginalizing over distance') # Take distance out of the variable params since we'll handle it # manually now variable_params.remove(marginalize_distance_param) old_prior = kwargs['prior'] dists = [d for d in old_prior.distributions if marginalize_distance_param not in d.params] dprior = [d for d in old_prior.distributions if marginalize_distance_param in d.params][0] prior = JointDistribution(variable_params, *dists, **old_prior.kwargs) kwargs['prior'] = prior if len(dprior.params) != 1 or not hasattr(dprior, 'bounds'): raise ValueError('Distance Marginalization requires a ' 'univariate and bounded prior') # Set up distance prior vector and samples # (1) prior is using distance if dprior.params[0] == 'distance': logging.info("Prior is directly on distance, setting up " "%s grid weights", marginalize_distance_samples) dmin, dmax = dprior.bounds['distance'] dist_locs = numpy.linspace(dmin, dmax, int(marginalize_distance_samples)) dist_weights = [dprior.pdf(distance=l) for l in dist_locs] dist_weights = numpy.array(dist_weights) # (2) prior is univariate and can be converted to distance elif marginalize_distance_param != 'distance': waveform_transforms = kwargs['waveform_transforms'] pname = dprior.params[0] logging.info("Settings up transform, prior is in terms of" " %s", pname) wtrans = [d for d in waveform_transforms if 'distance' not in d.outputs] if len(wtrans) == 0: wtrans = None kwargs['waveform_transforms'] = wtrans dtrans = [d for d in waveform_transforms if 'distance' in d.outputs][0] v = dprior.rvs(int(1e8)) d = dtrans.transform({pname: v[pname]})['distance'] d.sort() cdf = numpy.arange(1, len(d)+1) / len(d) i = interp1d(d, cdf) dmin, dmax = d.min(), d.max() logging.info('Distance range %s-%s', dmin, dmax) x = numpy.linspace(dmin, dmax, int(marginalize_distance_samples) + 1) xl, xr = x[:-1], x[1:] dist_locs = 0.5 * (xr + xl) dist_weights = i(xr) - i(xl) else: raise ValueError("No prior seems to determine the distance") dist_weights /= dist_weights.sum() dist_ref = 0.5 * (dmax + dmin) self.dist_locs = dist_locs self.distance_marginalization = dist_ref / dist_locs, dist_weights self.distance_interpolator = None if str_to_bool(marginalize_distance_interpolator): setup_args = {} if marginalize_distance_snr_range: setup_args['snr_range'] = marginalize_distance_snr_range if marginalize_distance_density: setup_args['density'] = marginalize_distance_density i = setup_distance_marg_interpolant(self.distance_marginalization, phase=self.marginalize_phase, **setup_args) self.distance_interpolator = i kwargs['static_params']['distance'] = dist_ref return variable_params, kwargs
[docs] def marginalize_loglr(self, sh_total, hh_total, skip_vector=False, return_peak=False): """ Return the marginal likelihood Parameters ----------- sh_total: float or ndarray The total <s|h> inner product summed over detectors hh_total: float or ndarray The total <h|h> inner product summed over detectors skip_vector: bool, False If true, and input is a vector, do not marginalize over that vector, instead return the likelihood values as a vector. """ interpolator = self.distance_interpolator if self.reconstructing_distance: skip_vector = True interpolator = None return marginalize_likelihood(sh_total, hh_total, phase=self.marginalize_phase, interpolator=interpolator, distance=self.distance_marginalization, skip_vector=skip_vector, return_peak=return_peak)
[docs] def reconstruct(self): """ Reconstruct the distance or vectored marginalized parameter of this class. """ rec = {} def draw_sample(params, loglr): x = numpy.random.uniform() cdf = numpy.exp(loglr).cumsum() cdf /= cdf[-1] xl = numpy.searchsorted(cdf, x) return params[xl], xl if self.distance_marginalization: # turn off interpolator and set to return vector output self.reconstructing_distance = True loglr_full = self.loglr if self.marginalize_vector and self.distance_marginalization: loglr = logsumexp(loglr_full, axis=0) else: loglr = loglr_full if self.distance_marginalization: # call likelihood to get vector output _, weights = self.distance_marginalization loglr += numpy.log(weights) # draw distance sample dist, xl = draw_sample(self.dist_locs, loglr) rec['distance'] = dist if self.marginalize_vector: if self.distance_marginalization: # logl along for the selected distance vlr = loglr_full[:, xl] else: vlr = loglr vec_param, _ = draw_sample(self.marginalize_vector_params, vlr) rec[self.marginalize_vector] = vec_param self.reconstructing_distance = False return rec
[docs]def setup_distance_marg_interpolant(dist_marg, phase=False, snr_range=(1, 50), density=(1000, 1000)): """ Create the interpolant for distance marginalization Parameters ---------- dist_marg: tuple of two arrays The (dist_loc, dist_weight) tuple which defines the grid for integrating over distance snr_range: tuple of (float, float) Tuple of min, max SNR that the interpolant is expected to work for. density: tuple of (float, float) The number of samples in either dimension of the 2d interpolant Returns ------- interp: function Function which returns the precalculated likelihood for a given inner product sh/hh. """ dist_rescale, _ = dist_marg logging.info("Interpolator valid for SNRs in %s", snr_range) logging.info("Interpolator using grid %s", density) # approximate maximum shr and hhr values, assuming the true SNR is # within the indicated range (and neglecting noise fluctuations) snr_min, snr_max = snr_range smax = dist_rescale.max() smin = dist_rescale.min() shr_max = snr_max ** 2.0 / smin hhr_max = snr_max ** 2.0 / smin / smin shr_min = snr_min ** 2.0 / smax hhr_min = snr_min ** 2.0 / smax / smax shr = numpy.geomspace(shr_min, shr_max, density[0]) hhr = numpy.geomspace(hhr_min, hhr_max, density[1]) lvals = numpy.zeros((len(shr), len(hhr))) logging.info('Setup up likelihood interpolator') for i, sh in enumerate(tqdm.tqdm(shr)): for j, hh in enumerate(hhr): lvals[i, j] = marginalize_likelihood(sh, hh, distance=dist_marg, phase=phase) interp = RectBivariateSpline(shr, hhr, lvals) def interp_wrapper(x, y, bounds_check=True): k = None if bounds_check: if isinstance(x, float): if x > shr_max or x < shr_min or y > hhr_max or y < hhr_min: return -numpy.inf else: k = (x > shr_max) | (x < shr_min) k = k | (y > hhr_max) | (y < hhr_min) v = interp(x, y, grid=False) if k is not None: v[k] = -numpy.inf return v return interp_wrapper
[docs]def marginalize_likelihood(sh, hh, phase=False, distance=False, skip_vector=False, interpolator=None, return_peak=False, ): """ Return the marginalized likelihood Parameters ---------- sh: complex float or numpy.ndarray The data-template inner product hh: complex float or numpy.ndarray The template-template inner product phase: bool, False Enable phase marginalization. Only use if orbital phase can be related to just a single overall phase (e.g. not true for waveform with sub-dominant modes) skip_vector: bool, False Don't apply marginalization of vector component of input (i.e. leave as vector). interpolator: function, None If provided, internal calculation is skipped in favor of a precalculated interpolating function which takes in sh/hh and returns the likelihood. Returns ------- loglr: float The marginalized loglikehood ratio """ if isinstance(hh, float): clogweights = 0 else: sh = sh.flatten() hh = hh.flatten() clogweights = numpy.log(len(sh)) if phase: sh = abs(sh) else: sh = sh.real vweights = 1 if interpolator: # pre-calculated result for this function vloglr = interpolator(sh, hh) if skip_vector: return vloglr else: # explicit calculation if distance: # brute force distance path dist_rescale, dist_weights = distance sh = numpy.multiply.outer(sh, dist_rescale) hh = numpy.multiply.outer(hh, dist_rescale ** 2.0) if len(sh.shape) == 2: vweights = numpy.resize(dist_weights, (sh.shape[1], sh.shape[0])).T else: vweights = dist_weights if phase: sh = numpy.log(i0e(sh)) + sh else: sh = sh.real # Calculate loglikelihood ratio vloglr = sh - 0.5 * hh if return_peak: maxv = vloglr.argmax() maxl = vloglr[maxv] # Do brute-force marginalization if loglr is a vector if isinstance(vloglr, float): vloglr = float(vloglr) elif not skip_vector: vloglr = float(logsumexp(vloglr, b=vweights)) - clogweights if return_peak: return vloglr, maxv, maxl return vloglr