Source code for pycbc.inference.io.base_mcmc
# Copyright (C) 2016 Christopher M. Biwer, Collin Capano
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# self.option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# =============================================================================
#
# Preamble
#
# =============================================================================
#
"""Provides I/O that is specific to MCMC samplers.
"""
from __future__ import (absolute_import, division)
from six import string_types
import numpy
import argparse
[docs]class MCMCMetadataIO(object):
"""Provides functions for reading/writing MCMC metadata to file.
"""
[docs] def write_resume_point(self):
"""Keeps a list of the number of iterations that were in a file when a
run was resumed from a checkpoint."""
try:
resume_pts = self.attrs["resume_points"].tolist()
except KeyError:
resume_pts = []
try:
niterations = self.niterations
except KeyError:
niterations = 0
resume_pts.append(niterations)
self.attrs["resume_points"] = resume_pts
[docs] def write_niterations(self, niterations):
"""Writes the given number of iterations to the sampler group."""
self[self.sampler_group].attrs['niterations'] = niterations
@property
def niterations(self):
"""Returns the number of iterations the sampler was run for."""
return self[self.sampler_group].attrs['niterations']
@property
def nwalkers(self):
"""Returns the number of walkers used by the sampler."""
return self[self.sampler_group].attrs['nwalkers']
[docs] def thin(self, thin_interval):
"""Thins the samples on disk using the given thinning interval.
Parameters
----------
thin_interval : int
The interval to thin by.
"""
# read thinned samples into memory
params = self[self.samples_group].keys()
samples = self.read_raw_samples(params, thin_start=0,
thin_interval=thin_interval,
thin_end=None,
flatten=False)
# now resize and write the data back to disk
group = self[self.samples_group]
for param in params:
data = samples[param]
# resize the arrays on disk
group[param].resize(data.shape)
# and write
group[param][:] = data
# store the interval that samples were thinned by
self.thinned_by *= thin_interval
# If a default thin interval and thin start exist, reduce them by the
# thinned interval. If the thin interval is not an integer multiple
# of the original, we'll round up, to avoid getting samples from
# before the burn in / at an interval less than the ACL.
self.thin_start = int(numpy.ceil(self.thin_start/thin_interval))
self.thin_interval = int(numpy.ceil(self.thin_interval/thin_interval))
@property
def thinned_by(self):
"""Returns interval samples have been thinned by on disk.
This looks for ``thinned_by`` in the samples group attrs. If none is
found, will just return 1.
"""
try:
thinned_by = self.attrs['thinned_by']
except KeyError:
thinned_by = 1
return thinned_by
@thinned_by.setter
def thinned_by(self, thinned_by):
"""Sets the thinned_by attribute.
This is the interval that samples have been thinned by on disk. The
given value is written to
``self[self.samples_group].attrs['thinned_by']``.
"""
self.attrs['thinned_by'] = int(thinned_by)
[docs] def last_iteration(self, parameter=None):
"""Returns the iteration of the last sample of the given parameter.
If parameter is ``None``, will just use the first parameter in the
samples group.
"""
if parameter is None:
try:
parameter = list(self[self.samples_group].keys())[0]
except (IndexError, KeyError):
# nothing has been written yet, just return 0
return 0
try:
lastiter = self[self.samples_group][parameter].shape[-1]
except KeyError:
# no samples have been written, just return 0
lastiter = 0
# account for thinning
return lastiter * self.thinned_by
[docs] def iterations(self, parameter):
"""Returns the iteration each sample occurred at."""
return numpy.arange(0, self.last_iteration(parameter), self.thinned_by)
[docs] def write_sampler_metadata(self, sampler):
"""Writes the sampler's metadata."""
self.attrs['sampler'] = sampler.name
self[self.sampler_group].attrs['nwalkers'] = sampler.nwalkers
# write the model's metadata
sampler.model.write_metadata(self)
[docs] def write_acls(self, acls):
"""Writes the given autocorrelation lengths.
The ACL of each parameter is saved to
``[sampler_group]/acls/{param}']``. The maximum over all the
parameters is saved to the file's 'acl' attribute.
Parameters
----------
acls : dict
A dictionary of ACLs keyed by the parameter.
Returns
-------
ACL
The maximum of the acls that was written to the file.
"""
group = self.sampler_group + '/acls/{}'
# write the individual acls
for param in acls:
try:
# we need to use the write_direct function because it's
# apparently the only way to update scalars in h5py
self[group.format(param)].write_direct(
numpy.array(acls[param]))
except KeyError:
# dataset doesn't exist yet
self[group.format(param)] = acls[param]
# write the maximum over all params
acl = numpy.array(list(acls.values())).max()
self[self.sampler_group].attrs['acl'] = acl
# set the default thin interval to be the acl (if it is finite)
if numpy.isfinite(acl):
self.thin_interval = int(numpy.ceil(acl))
[docs] def read_acls(self):
"""Reads the acls of all the parameters.
Returns
-------
dict
A dictionary of the ACLs, keyed by the parameter name.
"""
group = self[self.sampler_group]['acls']
return {param: group[param].value for param in group.keys()}
[docs] def write_burn_in(self, burn_in):
"""Write the given burn-in data to the given filename."""
group = self[self.sampler_group]
group.attrs['burn_in_test'] = burn_in.burn_in_test
group.attrs['is_burned_in'] = burn_in.is_burned_in
group.attrs['burn_in_iteration'] = burn_in.burn_in_iteration
# set the defaut thin_start to be the burn_in_index
self.thin_start = burn_in.burn_in_index
# write individual test data
for tst in burn_in.burn_in_data:
key = 'burn_in_tests/{}'.format(tst)
try:
attrs = group[key].attrs
except KeyError:
group.create_group(key)
attrs = group[key].attrs
self.write_kwargs_to_attrs(attrs, **burn_in.burn_in_data[tst])
[docs] @staticmethod
def extra_args_parser(parser=None, skip_args=None, **kwargs):
"""Create a parser to parse sampler-specific arguments for loading
samples.
Parameters
----------
parser : argparse.ArgumentParser, optional
Instead of creating a parser, add arguments to the given one. If
none provided, will create one.
skip_args : list, optional
Don't parse the given options. Options should be given as the
option string, minus the '--'. For example,
``skip_args=['iteration']`` would cause the ``--iteration``
argument not to be included.
\**kwargs :
All other keyword arguments are passed to the parser that is
created.
Returns
-------
parser : argparse.ArgumentParser
An argument parser with th extra arguments added.
actions : list of argparse.Action
A list of the actions that were added.
"""
if parser is None:
parser = argparse.ArgumentParser(**kwargs)
elif kwargs:
raise ValueError("No other keyword arguments should be provded if "
"a parser is provided.")
if skip_args is None:
skip_args = []
actions = []
if 'thin-start' not in skip_args:
act = parser.add_argument(
"--thin-start", type=int, default=None,
help="Sample number to start collecting samples to plot. If "
"none provided, will use the input file's `thin_start` "
"attribute.")
actions.append(act)
if 'thin-interval' not in skip_args:
act = parser.add_argument(
"--thin-interval", type=int, default=None,
help="Interval to use for thinning samples. If none provided, "
"will use the input file's `thin_interval` attribute.")
actions.append(act)
if 'thin-end' not in skip_args:
act = parser.add_argument(
"--thin-end", type=int, default=None,
help="Sample number to stop collecting samples to plot. If "
"none provided, will use the input file's `thin_end` "
"attribute.")
actions.append(act)
if 'iteration' not in skip_args:
act = parser.add_argument(
"--iteration", type=int, default=None,
help="Only retrieve the given iteration. To load "
"the last n-th sampe use -n, e.g., -1 will "
"load the last iteration. This overrides "
"the thin-start/interval/end options.")
actions.append(act)
if 'walkers' not in skip_args:
act = parser.add_argument(
"--walkers", type=int, nargs="+", default=None,
help="Only retrieve samples from the listed "
"walkers. Default is to retrieve from all "
"walkers.")
actions.append(act)
return parser, actions
[docs]class SingleTempMCMCIO(object):
"""Provides functions for reading/writing samples from an MCMC sampler.
These functions will work for samplers that have 1 or more walkers, with
only a single temperature.
"""
[docs] def write_samples(self, samples, parameters=None, last_iteration=None):
"""Writes samples to the given file.
Results are written to ``samples_group/{vararg}``, where ``{vararg}``
is the name of a model params. The samples are written as an
``nwalkers x niterations`` array. If samples already exist, the new
samples are appended to the current.
If the current samples on disk have been thinned (determined by the
``thinned_by`` attribute in the samples group), then the samples will
be thinned by the same amount before being written. The thinning is
started at the sample in ``samples`` that occured at the iteration
equal to the last iteration on disk plus the ``thinned_by`` interval.
If this iteration is larger than the iteration of the last given
sample, then none of the samples will be written.
Parameters
-----------
samples : dict
The samples to write. Each array in the dictionary should have
shape nwalkers x niterations.
parameters : list, optional
Only write the specified parameters to the file. If None, will
write all of the keys in the ``samples`` dict.
last_iteration : int, optional
The iteration of the last sample. If the file's ``thinned_by``
attribute is > 1, this is needed to determine where to start
thinning the samples such that the interval between the last sample
currently on disk and the first new sample is the same as all of
the other samples.
"""
nwalkers, nsamples = list(samples.values())[0].shape
assert all(p.shape == (nwalkers, nsamples)
for p in samples.values()), (
"all samples must have the same shape")
group = self.samples_group + '/{name}'
if parameters is None:
parameters = samples.keys()
# thin the samples
samples = thin_samples_for_writing(self, samples, parameters, last_iteration)
# loop over number of dimensions
for param in parameters:
dataset_name = group.format(name=param)
data = samples[param]
# check that there's something to write after thinning
if data.shape[1] == 0:
# nothing to write, move along
continue
try:
fp_nsamples = self[dataset_name].shape[-1]
istart = fp_nsamples
istop = istart + data.shape[1]
if istop > fp_nsamples:
# resize the dataset
self[dataset_name].resize(istop, axis=1)
except KeyError:
# dataset doesn't exist yet
istart = 0
istop = istart + data.shape[1]
self.create_dataset(dataset_name, (nwalkers, istop),
maxshape=(nwalkers, None),
dtype=data.dtype,
fletcher32=True)
self[dataset_name][:, istart:istop] = data
[docs] def read_raw_samples(self, fields,
thin_start=None, thin_interval=None, thin_end=None,
iteration=None, walkers=None, flatten=True):
"""Base function for reading samples.
Parameters
-----------
fields : list
The list of field names to retrieve. Must be names of datasets in
the ``samples_group``.
thin_start : int, optional
Start reading from the given iteration. Default is to start from
the first iteration.
thin_interval : int, optional
Only read every ``thin_interval`` -th sample. Default is 1.
thin_end : int, optional
Stop reading at the given iteration. Default is to end at the last
iteration.
iteration : int, optional
Only read the given iteration. If this provided, it overrides
the ``thin_(start|interval|end)`` options.
walkers : int, optional
Only read from the given walkers. Default is to read all.
flatten : bool, optional
Flatten the samples to 1D arrays before returning. Otherwise, the
returned arrays will have shape (requested walkers x
requested iteration(s)). Default is True.
Returns
-------
dict
A dictionary of field name -> numpy array pairs.
"""
if isinstance(fields, string_types):
fields = [fields]
# walkers to load
if walkers is not None:
widx = numpy.zeros(self.nwalkers, dtype=bool)
widx[walkers] = True
nwalkers = widx.sum()
else:
widx = slice(0, None)
nwalkers = self.nwalkers
# get the slice to use
if iteration is not None:
get_index = int(iteration)
niterations = 1
else:
get_index = self.get_slice(thin_start=thin_start,
thin_end=thin_end,
thin_interval=thin_interval)
# we'll just get the number of iterations from the returned shape
niterations = None
# load
group = self.samples_group + '/{name}'
arrays = {}
for name in fields:
arr = self[group.format(name=name)][widx, get_index]
if niterations is None:
niterations = arr.shape[-1]
if flatten:
arr = arr.flatten()
else:
# ensure that the returned array is 2D
arr = arr.reshape((nwalkers, niterations))
arrays[name] = arr
return arrays
[docs]def thin_samples_for_writing(fp, samples, parameters, last_iteration):
"""Thins samples for writing to disk.
The thinning interval to use is determined by the given file handler's
``thinned_by`` attribute. If that attribute is 1, just returns the samples.
Parameters
----------
fp : MCMCMetadataIO instance
The file the sampels will be written to. Needed to determine the
thin interval used on disk.
samples : dict
Dictionary mapping parameter names to arrays of (unthinned) samples.
The arrays are thinned along their last dimension.
parameters : list of str
The parameters to thin in ``samples`` before writing. All listed
parameters must be in ``samples``.
last_iteration : int
The iteration that the last sample in ``samples`` occurred at. This is
needed to figure out where to start the thinning in ``samples``, such
that the interval between the last sample on disk and the first new
sample is the same as all of the other samples.
Returns
-------
dict :
Dictionary of the thinned samples to write.
"""
if fp.thinned_by > 1:
if last_iteration is None:
raise ValueError("File's thinned_by attribute is > 1 ({}), "
"but last_iteration not provided."
.format(fp.thinned_by))
thinned_samples = {}
for param in parameters:
data = samples[param]
nsamples = data.shape[-1]
# To figure out where to start:
# the last iteration in the file + the file's thinning interval
# gives the iteration of the next sample that should be written;
# last_iteration - nsamples gives the iteration of the first
# sample in samples. Subtracting the latter from the former - 1
# (-1 to convert from iteration to index) therefore gives the index
# in the samples data to start using samples.
thin_start = fp.last_iteration(param) + fp.thinned_by \
- (last_iteration - nsamples) - 1
thinned_samples[param] = data[..., thin_start::fp.thinned_by]
else:
thinned_samples = samples
return thinned_samples