Source code for pycbc.inference.burn_in

# Copyright (C) 2017  Collin Capano
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.


#
# =============================================================================
#
#                                   Preamble
#
# =============================================================================
#
"""
This modules provides classes and functions for determining when Markov Chains
have burned in.
"""

from __future__ import division

import numpy
from scipy.stats import ks_2samp

from pycbc.io.record import get_vars_from_arg

# The value to use for a burn-in iteration if a chain is not burned in
NOT_BURNED_IN_ITER = -1


#
# =============================================================================
#
#                              Convenience functions
#
# =============================================================================
#


[docs]def ks_test(samples1, samples2, threshold=0.9): """Applies a KS test to determine if two sets of samples are the same. The ks test is applied parameter-by-parameter. If the two-tailed p-value returned by the test is greater than ``threshold``, the samples are considered to be the same. Parameters ---------- samples1 : dict Dictionary of mapping parameters to the first set of samples. samples2 : dict Dictionary of mapping parameters to the second set of samples. threshold : float The thershold to use for the p-value. Default is 0.9. Returns ------- dict : Dictionary mapping parameter names to booleans indicating whether the given parameter passes the KS test. """ is_the_same = {} assert set(samples1.keys()) == set(samples2.keys()), ( "samples1 and 2 must have the same parameters") # iterate over the parameters for param in samples1: s1 = samples1[param] s2 = samples2[param] _, p_value = ks_2samp(s1, s2) is_the_same[param] = p_value > threshold return is_the_same
[docs]def max_posterior(lnps_per_walker, dim): """Burn in based on samples being within dim/2 of maximum posterior. Parameters ---------- lnps_per_walker : 2D array Array of values that are proportional to the log posterior values. Must have shape ``nwalkers x niterations``. dim : int The dimension of the parameter space. Returns ------- burn_in_idx : array of int The burn in indices of each walker. If a walker is not burned in, its index will be be equal to the length of the chain. is_burned_in : array of bool Whether or not a walker is burned in. """ if len(lnps_per_walker.shape) != 2: raise ValueError("lnps_per_walker must have shape " "nwalkers x niterations") # find the value to compare against max_p = lnps_per_walker.max() criteria = max_p - dim/2. nwalkers, _ = lnps_per_walker.shape burn_in_idx = numpy.empty(nwalkers, dtype=int) is_burned_in = numpy.empty(nwalkers, dtype=bool) # find the first iteration in each chain where the logpost has exceeded # max_p - dim/2 for ii in range(nwalkers): chain = lnps_per_walker[ii, :] passedidx = numpy.where(chain >= criteria)[0] is_burned_in[ii] = passedidx.size > 0 if is_burned_in[ii]: burn_in_idx[ii] = passedidx[0] else: burn_in_idx[ii] = NOT_BURNED_IN_ITER return burn_in_idx, is_burned_in
[docs]def posterior_step(logposts, dim): """Finds the last time a chain made a jump > dim/2. Parameters ---------- logposts : array 1D array of values that are proportional to the log posterior values. dim : int The dimension of the parameter space. Returns ------- int The index of the last time the logpost made a jump > dim/2. If that never happened, returns 0. """ if logposts.ndim > 1: raise ValueError("logposts must be a 1D array") criteria = dim/2. dp = numpy.diff(logposts) indices = numpy.where(dp >= criteria)[0] if indices.size > 0: idx = indices[-1] + 1 else: idx = 0 return idx
# # ============================================================================= # # Burn in classes # # ============================================================================= #
[docs]class MCMCBurnInTests(object): """Provides methods for estimating burn-in of an ensemble MCMC.""" available_tests = ('halfchain', 'min_iterations', 'max_posterior', 'posterior_step', 'nacl', 'ks_test', ) def __init__(self, sampler, burn_in_test, **kwargs): self.sampler = sampler # determine the burn-in tests that are going to be done self.do_tests = get_vars_from_arg(burn_in_test) self.burn_in_test = burn_in_test self.burn_in_data = {t: {} for t in self.do_tests} self.is_burned_in = False self.burn_in_iteration = NOT_BURNED_IN_ITER self.burn_in_index = NOT_BURNED_IN_ITER # Arguments specific to each test... # for nacl: self._nacls = int(kwargs.pop('nacls', 5)) # for kstest: self._ksthreshold = float(kwargs.pop('ks_threshold', 0.9)) # for max_posterior and posterior_step self._ndim = int(kwargs.pop('ndim', len(sampler.variable_params))) # for min iterations self._min_iterations = int(kwargs.pop('min_iterations', 0)) def _getniters(self, filename): """Convenience function to get the number of iterations in the file. If `niterations` hasn't been written to the file yet, just returns 0. """ with self.sampler.io(filename, 'r') as fp: try: niters = fp.niterations except KeyError: niters = 0 return niters def _getnsamples(self, filename): """Convenience function to get the number of samples saved in the file. If no samples have been written to the file yet, just returns 0. """ with self.sampler.io(filename, 'r') as fp: try: group = fp[fp.samples_group] # we'll just use the first parameter params = list(group.keys()) nsamples = group[params[0]].shape[-1] except (KeyError, IndexError): nsamples = 0 return nsamples def _index2iter(self, filename, index): """Converts the index in some samples at which burn in occurs to the iteration of the sampler that corresponds to. """ with self.sampler.io(filename, 'r') as fp: thin_interval = fp.thinned_by return index * thin_interval def _iter2index(self, filename, iteration): """Converts an iteration to the index it corresponds to. """ with self.sampler.io(filename, 'r') as fp: thin_interval = fp.thinned_by return iteration // thin_interval def _getlogposts(self, filename): """Convenience function for retrieving log posteriors. Parameters ---------- filename : str The file to read. Returns ------- array The log posterior values. They are not flattened, so have dimension nwalkers x niterations. """ with self.sampler.io(filename, 'r') as fp: samples = fp.read_raw_samples( ['loglikelihood', 'logprior'], thin_start=0, thin_interval=1, flatten=False) logposts = samples['loglikelihood'] + samples['logprior'] return logposts def _getacls(self, filename, start_index): """Convenience function for calculating acls for the given filename. Since we calculate the acls, this will also store it to the sampler. """ acls = self.sampler.compute_acl(filename, start_index=start_index) # since we calculated it, save the acls to the sampler... # but only do this if this is the only burn in test if len(self.do_tests) == 1: self.sampler.acls = acls return acls
[docs] def halfchain(self, filename): """Just uses half the chain as the burn-in iteration. """ niters = self._getniters(filename) data = self.burn_in_data['halfchain'] # this test cannot determine when something will burn in # only when it was not burned in in the past data['is_burned_in'] = True data['burn_in_iteration'] = niters/2
[docs] def min_iterations(self, filename): """Just checks that the sampler has been run for the minimum number of iterations. """ niters = self._getniters(filename) data = self.burn_in_data['min_iterations'] data['is_burned_in'] = self._min_iterations < niters if data['is_burned_in']: data['burn_in_iteration'] = self._min_iterations else: data['burn_in_iteration'] = NOT_BURNED_IN_ITER
[docs] def max_posterior(self, filename): """Applies max posterior test to self.""" logposts = self._getlogposts(filename) burn_in_idx, is_burned_in = max_posterior(logposts, self._ndim) data = self.burn_in_data['max_posterior'] # required things to store data['is_burned_in'] = is_burned_in.all() if data['is_burned_in']: data['burn_in_iteration'] = self._index2iter( filename, burn_in_idx.max()) else: data['burn_in_iteration'] = NOT_BURNED_IN_ITER # additional info data['iteration_per_walker'] = self._index2iter(filename, burn_in_idx) data['status_per_walker'] = is_burned_in
[docs] def posterior_step(self, filename): """Applies the posterior-step test.""" logposts = self._getlogposts(filename) burn_in_idx = numpy.array([posterior_step(logps, self._ndim) for logps in logposts]) data = self.burn_in_data['posterior_step'] # this test cannot determine when something will burn in # only when it was not burned in in the past data['is_burned_in'] = True data['burn_in_iteration'] = self._index2iter( filename, burn_in_idx.max()) # additional info data['iteration_per_walker'] = self._index2iter(filename, burn_in_idx)
[docs] def nacl(self, filename): """Burn in based on ACL. This applies the following test to determine burn in: 1. The first half of the chain is ignored. 2. An ACL is calculated from the second half. 3. If ``nacls`` times the ACL is < the length of the chain / 2, the chain is considered to be burned in at the half-way point. """ nsamples = self._getnsamples(filename) kstart = int(nsamples / 2.) acls = self._getacls(filename, start_index=kstart) is_burned_in = {param: (self._nacls * acl) < kstart for (param, acl) in acls.items()} data = self.burn_in_data['nacl'] # required things to store data['is_burned_in'] = all(is_burned_in.values()) if data['is_burned_in']: data['burn_in_iteration'] = self._index2iter(filename, kstart) else: data['burn_in_iteration'] = NOT_BURNED_IN_ITER # additional information data['status_per_parameter'] = is_burned_in
[docs] def ks_test(self, filename): """Applies ks burn-in test.""" nsamples = self._getnsamples(filename) with self.sampler.io(filename, 'r') as fp: # get the samples from the mid point samples1 = fp.read_raw_samples( ['loglikelihood', 'logprior'], iteration=int(nsamples/2.)) # get the last samples samples2 = fp.read_raw_samples( ['loglikelihood', 'logprior'], iteration=-1) # do the test # is_the_same is a dictionary of params --> bool indicating whether or # not the 1D marginal is the same at the half way point is_the_same = ks_test(samples1, samples2, threshold=self._ksthreshold) data = self.burn_in_data['ks_test'] # required things to store data['is_burned_in'] = all(is_the_same.values()) if data['is_burned_in']: data['burn_in_iteration'] = self._index2iter( filename, int(nsamples/2.)) else: data['burn_in_iteration'] = NOT_BURNED_IN_ITER # additional data['status_per_parameter'] = is_the_same
[docs] def evaluate(self, filename): """Runs all of the burn-in tests.""" for tst in self.do_tests: getattr(self, tst)(filename) # The iteration to use for burn-in depends on the logic in the burn-in # test string. For example, if the test was 'max_posterior | nacl' and # max_posterior burned-in at iteration 5000 while nacl burned in at # iteration 6000, we'd want to use 5000 as the burn-in iteration. # However, if the test was 'max_posterior & nacl', we'd want to use # 6000 as the burn-in iteration. The code below handles all cases by # doing the following: first, take the collection of burn in iterations # from all the burn in tests that were applied. Next, cycle over the # iterations in increasing order, checking which tests have burned in # by that point. Then evaluate the burn-in string at that point to see # if it passes, and if so, what the iteration is. The first point that # the test passes is used as the burn-in iteration. data = self.burn_in_data burn_in_iters = numpy.unique([data[t]['burn_in_iteration'] for t in self.do_tests]) burn_in_iters.sort() for ii in burn_in_iters: test_results = {t: (data[t]['is_burned_in'] & 0 <= data[t]['burn_in_iteration'] <= ii) for t in self.do_tests} is_burned_in = eval(self.burn_in_test, {"__builtins__": None}, test_results) if is_burned_in: break self.is_burned_in = is_burned_in if is_burned_in: self.burn_in_iteration = ii self.burn_in_index = self._iter2index(filename, ii) else: self.burn_in_iteration = NOT_BURNED_IN_ITER self.burn_in_index = NOT_BURNED_IN_ITER
[docs] @classmethod def from_config(cls, cp, sampler): """Loads burn in from section [sampler-burn_in].""" section = 'sampler' tag = 'burn_in' burn_in_test = cp.get_opt_tag(section, 'burn-in-test', tag) kwargs = {} if cp.has_option_tag(section, 'nacl', tag): kwargs['nacl'] = int(cp.get_opt_tag(section, 'nacl', tag)) if cp.has_option_tag(section, 'ks-threshold', tag): kwargs['ks_threshold'] = float( cp.get_opt_tag(section, 'ks-threshold', tag)) if cp.has_option_tag(section, 'ndim', tag): kwargs['ndim'] = int( cp.get_opt_tag(section, 'ndim', tag)) if cp.has_option_tag(section, 'min-iterations', tag): kwargs['min_iterations'] = int( cp.get_opt_tag(section, 'min-iterations', tag)) return cls(sampler, burn_in_test, **kwargs)
[docs]class MultiTemperedMCMCBurnInTests(MCMCBurnInTests): """Adds support for multiple temperatures to the MCMCBurnInTests.""" def _getacls(self, filename, start_index): """Convenience function for calculating acls for the given filename. This function is used by the ``n_acl`` burn-in test. That function expects the returned ``acls`` dict to just report a single ACL for each parameter. Since multi-tempered samplers return an array of ACLs for each parameter instead, this takes the max over the array before returning. Since we calculate the acls, this will also store it to the sampler. """ acls = super(MultiTemperedMCMCBurnInTests, self)._getacls( filename, start_index) # return the max for each parameter return {param: vals.max() for (param, vals) in acls.items()} def _getlogposts(self, filename): """Convenience function for retrieving log posteriors. This just gets the coldest temperature chain, and returns arrays with shape nwalkers x niterations, so the parent class can run the same ``posterior_step`` function. """ with self.sampler.io(filename, 'r') as fp: samples = fp.read_raw_samples( ['loglikelihood', 'logprior'], thin_start=0, thin_interval=1, temps=0, flatten=False) # reshape to drop the first dimension for (stat, arr) in samples.items(): _, nwalkers, niterations = arr.shape samples[stat] = arr.reshape((nwalkers, niterations)) logposts = samples['loglikelihood'] + samples['logprior'] return logposts