Source code for gptools.mean

# Copyright 2014 Mark Chilenski
# This program is distributed under the terms of the GNU General Purpose License (GPL).
# Refer to http://www.gnu.org/licenses/gpl.txt
# 
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# 
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

"""Provides classes for defining explicit, parametric mean functions.

To provide the necessary hooks to optimize/sample the hyperparameters, your mean
function must be wrapped with :py:class:`MeanFunction` before being passed to
:py:class:`GaussianProcess`. The function must have the calling fingerprint
`fun(X, n, p1, p2, ...)`, where `X` is an array with shape `(M, N)`, `n` is a
vector with length `D` and `p1`, `p2`, ... are the (hyper)parameters of the mean
function, given as individual arguments.
"""

from __future__ import division

from .utils import unique_rows, UniformJointPrior, MaskedBounds

import scipy
import inspect

[docs]class MeanFunction(object): r"""Wrapper to turn a function into a form useable by :py:class:`GaussianProcess`. This lets you define a simple function `fun(X, n, p1, p2, ...)` that operates on an (`M`, `D`) array `X`, taking the derivatives indicated by the vector `n` with length `D` (one derivative order for each dimension). The function should evaluate this derivative at all points in `X`, returning an array of length `M`. :py:class:`MeanFunction` takes care of looping over the different derivatives requested by :py:class:`GaussianProcess`. Parameters ---------- fun : callable Must have fingerprint `fun(X, n, p1, p2, ...)` where `X` is an array with shape (`M`, `D`), `n` is an array of non-negative integers with length `D` representing the order of derivative orders to take for each dimension and `p1`, `p2`, ... are the parameters of the mean function. num_params : Non-negative int, optional Number of parameters in the model. Default is to determine the number of parameters by inspection of `fun` or the other arguments provided. initial_params : Array, (`num_params`,), optional Initial values to set for the hyperparameters. Default is None, in which case 1 is used for the initial values. fixed_params : Array of bool, (`num_params`,), optional Sets which hyperparameters are considered fixed when optimizing the log likelihood. A True entry corresponds to that element being fixed (where the element ordering is as defined in the class). Default value is None (no hyperparameters are fixed). param_bounds : list of 2-tuples (`num_params`,), optional List of bounds for each of the hyperparameters. Each 2-tuple is of the form (lower`, `upper`). If there is no bound in a given direction, it works best to set it to something big like 1e16. Default is (0.0, 1e16) for each hyperparameter. Note that this is overridden by the `hyperprior` keyword, if present. param_names : list of str (`num_params`,), optional List of labels for the hyperparameters. Default is all empty strings. enforce_bounds : bool, optional If True, an attempt to set a hyperparameter outside of its bounds will result in the hyperparameter being set right at its bound. If False, bounds are not enforced inside the kernel. Default is False (do not enforce bounds). hyperprior : :py:class:`JointPrior` instance or list, optional Joint prior distribution for all hyperparameters. Can either be given as a :py:class:`JointPrior` instance or a list of `num_params` callables or :py:class:`rv_frozen` instances from :py:mod:`scipy.stats`, in which case a :py:class:`IndependentJointPrior` is constructed with these as the independent priors on each hyperparameter. Default is a uniform PDF on all hyperparameters. """ def __init__(self, fun, num_params=None, initial_params=None, fixed_params=None, param_bounds=None, param_names=None, enforce_bounds=False, hyperprior=None): # TODO: This duplicates a lot of code from WarpingFunction, which itself duplicates code from Kernel. Is it worth making a wrapper? self.fun = fun if num_params is None: # There are two non-parameters at the start of fun's fingerprint. try: argspec = inspect.getargspec(fun) offset = 2 except TypeError: # Need to remove self from the arg list for bound method: argspec = inspect.getargspec(fun.__call__) offset = 3 # Account for any keyword arguments: if argspec[3] is not None: offset += len(argspec[3]) if argspec[1] is None: self.num_params = len(argspec[0]) - offset else: # If fun uses the *args syntax, we need to get the number of # parameters elsewhere. if hyperprior is not None: self.num_params = len(hyperprior.bounds) elif param_names is not None: self.num_params = len(param_names) elif param_bounds is not None: self.num_params = len(param_bounds) else: raise ValueError( "If warping function w uses a variable number of " "arguments, you must also specify an explicit hyperprior, " "list of param_names and/or list of param_bounds." ) else: if num_params < 0 or not isinstance(num_params, (int, long)): raise ValueError("num_params must be an integer >= 0!") self.num_params = num_params if param_names is None: param_names = [''] * self.num_params elif len(param_names) != self.num_params: raise ValueError("param_names must be a list of length num_params!") self.param_names = scipy.asarray(param_names, dtype=str) self.enforce_bounds = enforce_bounds # Handle default case for initial parameter values -- set them all to 1. if initial_params is None: # Only accept fixed_params if initial_params is given: if fixed_params is not None: raise ValueError( "Must pass explicit parameter values if fixing parameters!" ) initial_params = scipy.ones(self.num_params, dtype=float) fixed_params = scipy.zeros(self.num_params, dtype=float) else: if len(initial_params) != self.num_params: raise ValueError("Length of initial_params must be equal to num_params!") # Handle default case of fixed_params: no fixed parameters. if fixed_params is None: fixed_params = scipy.zeros(self.num_params, dtype=float) else: if len(fixed_params) != self.num_params: raise ValueError("Length of fixed_params must be equal to num_params!") # Handle default case for parameter bounds -- set them all to (0, 1e16): if param_bounds is None: param_bounds = self.num_params * [(0.0, 1e16)] else: if len(param_bounds) != self.num_params: raise ValueError("Length of param_bounds must be equal to num_params!") # Handle default case for hyperpriors -- set them all to be uniform: if hyperprior is None: hyperprior = UniformJointPrior(param_bounds) else: try: iter(hyperprior) if len(hyperprior) != self.num_params: raise ValueError( "If hyperprior is a list its length must be equal to " "num_params!" ) hyperprior = IndependentJointPrior(hyperprior) except TypeError: pass self.params = scipy.asarray(initial_params, dtype=float) self.fixed_params = scipy.asarray(fixed_params, dtype=bool) self.hyperprior = hyperprior
[docs] def __call__(self, X, n, hyper_deriv=None): """Evaluate the mean function at the given points with the current parameters. Parameters ---------- X : array, (`N`,) or (`N`, `D`) Points to evaluate the mean function at. n : array, (`N`,) or (`N`, `D`) Derivative orders for each point. hyper_deriv : int or None, optional Index of parameter to take derivative with respect to. """ n = scipy.atleast_2d(scipy.asarray(n, dtype=int)) X = scipy.atleast_2d(scipy.asarray(X)) n_unique = unique_rows(n) mu = scipy.zeros(X.shape[0]) for nn in n_unique: idxs = (n == nn).all(axis=1) mu[idxs] = self.fun(X[idxs, :], nn, *self.params, hyper_deriv=hyper_deriv) return mu
@property def param_bounds(self): return self.hyperprior.bounds @param_bounds.setter def param_bounds(self, value): self.hyperprior.bounds = value
[docs] def set_hyperparams(self, new_params): """Sets the free hyperparameters to the new parameter values in new_params. Parameters ---------- new_params : :py:class:`Array` or other Array-like, (len(:py:attr:`self.params`),) New parameter values, ordered as dictated by the docstring for the class. """ new_params = scipy.asarray(new_params, dtype=float) if len(new_params) == len(self.free_params): if self.enforce_bounds: for idx, new_param, bound in zip(range(0, len(new_params)), new_params, self.free_param_bounds): if bound[0] is not None and new_param < bound[0]: new_params[idx] = bound[0] elif bound[1] is not None and new_param > bound[1]: new_params[idx] = bound[1] self.params[~self.fixed_params] = new_params else: raise ValueError("Length of new_params must be %s!" % (len(self.free_params),))
@property def num_free_params(self): """Returns the number of free parameters. """ return sum(~self.fixed_params) @property def free_param_idxs(self): """Returns the indices of the free parameters in the main arrays of parameters, etc. """ return scipy.arange(0, self.num_params)[~self.fixed_params] @property def free_params(self): """Returns the values of the free hyperparameters. Returns ------- free_params : :py:class:`Array` Array of the free parameters, in order. """ return MaskedBounds(self.params, self.free_param_idxs) @free_params.setter def free_params(self, value): self.params[self.free_param_idxs] = scipy.asarray(value, dtype=float) @property def free_param_bounds(self): """Returns the bounds of the free hyperparameters. Returns ------- free_param_bounds : :py:class:`Array` Array of the bounds of the free parameters, in order. """ return MaskedBounds(self.hyperprior.bounds, self.free_param_idxs) @free_param_bounds.setter def free_param_bounds(self, value): # Need to use a loop since self.hyperprior.bounds is NOT guaranteed to support fancy indexing. for i, v in zip(self.free_param_idxs, value): self.hyperprior.bounds[i] = v @property def free_param_names(self): """Returns the names of the free hyperparameters. Returns ------- free_param_names : :py:class:`Array` Array of the names of the free parameters, in order. """ return MaskedBounds(self.param_names, self.free_param_idxs) @free_param_names.setter def free_param_names(self, value): # Cast to array in case it hasn't been done already: self.param_names = scipy.asarray(self.param_names, dtype=str) self.param_names[~self.fixed_params] = value
[docs]def constant(X, n, mu, hyper_deriv=None): """Function implementing a constant mean suitable for use with :py:class:`MeanFunction`. """ if (n == 0).all(): if hyper_deriv is not None: return scipy.ones(X.shape[0]) else: return mu * scipy.ones(X.shape[0]) else: return scipy.zeros(X.shape[0])
[docs]class ConstantMeanFunction(MeanFunction): """Class implementing a constant mean function suitable for use with :py:class:`GaussianProcess`. All kwargs are passed to :py:class:`MeanFunction`. If you do not pass `hyperprior` or `param_bounds`, the hyperprior for the mean is taken to be uniform over [-1e3, 1e3]. """ def __init__(self, **kwargs): if 'hyperprior' not in kwargs and 'param_bounds' not in kwargs: kwargs['param_bounds'] = [(-1e3, 1e3)] super(ConstantMeanFunction, self).__init__( constant, param_names=['\\mu'], **kwargs )
# The following use the definitions from chapter 4 of JR Walk's thesis:
[docs]def mtanh(alpha, z): """Modified hyperbolic tangent function mtanh(z; alpha). Parameters ---------- alpha : float The core slope of the mtanh. z : float or array The coordinate of the mtanh. """ z = scipy.asarray(z) ez = scipy.exp(z) enz = 1.0 / ez return ((1 + alpha * z) * ez - enz) / (ez + enz)
[docs]def mtanh_profile(X, n, x0, delta, alpha, h, b, hyper_deriv=None): """Profile used with the mtanh function to fit profiles, suitable for use with :py:class:`MeanFunction`. Only supports univariate data! Parameters ---------- X : array, (`M`, 1) The points to evaluate at. n : array, (1,) The order of derivative to compute. Only up to first derivatives are supported. x0 : float Pedestal center delta : float Pedestal halfwidth alpha : float Core slope h : float Pedestal height b : float Pedestal foot hyper_deriv : int or None, optional The index of the parameter to take a derivative with respect to. """ X = X[:, 0] z = (x0 - X) / delta if n[0] == 0: if hyper_deriv is not None: if hyper_deriv == 0: return (h - b) / (2.0 * delta * (scipy.cosh(z))**2) * ( 1.0 + alpha / 4.0 * (1.0 + 2.0 * z + scipy.exp(2.0 * z)) ) elif hyper_deriv == 1: return -(h - b) * z / (2.0 * delta * (scipy.cosh(z))**2) * ( 1.0 + alpha / 4.0 * (1.0 + 2.0 * z + scipy.exp(2.0 * z)) ) elif hyper_deriv == 2: ez = scipy.exp(z) enz = 1.0 / ez return (h - b) / 2.0 * z * ez / (ez + enz) elif hyper_deriv == 3: ez = scipy.exp(z) enz = 1.0 / ez return 0.5 * (1.0 + ((1.0 + alpha * z) * ez - enz) / (ez + enz)) elif hyper_deriv == 4: ez = scipy.exp(z) enz = 1.0 / ez return 0.5 * (1.0 - ((1.0 + alpha * z) * ez - enz) / (ez + enz)) else: raise ValueError("Invalid value for hyper_deriv, " + str(hyper_deriv)) else: return (h + b) / 2.0 + (h - b) * mtanh(alpha, z) / 2.0 elif n[0] == 1: if hyper_deriv is not None: if hyper_deriv == 0: return -(h - b) / (2.0 * delta**2.0 * (scipy.cosh(z))**2.0) * ( alpha - (alpha * z + 2) * scipy.tanh(z) ) elif hyper_deriv == 1: return (h - b) / (2.0 * delta**2.0 * (scipy.cosh(z))**2.0) * ( 1.0 + alpha / 4.0 * (1.0 + 2.0 * z + scipy.exp(2.0 * z)) + z * (alpha - (alpha * z + 2) * scipy.tanh(z)) ) elif hyper_deriv == 2: return -(h - b) / (8.0 * delta * (scipy.cosh(z))**2.0) * ( 1.0 + 2.0 * z + scipy.exp(2.0 * z) ) elif hyper_deriv == 3: return -1.0 / (2.0 * delta * (scipy.cosh(z))**2.0) * ( 1.0 + alpha / 4.0 * (1.0 + 2.0 * z + scipy.exp(2.0 * z)) ) elif hyper_deriv == 4: return 1.0 / (2.0 * delta * (scipy.cosh(z))**2.0) * ( 1.0 + alpha / 4.0 * (1.0 + 2.0 * z + scipy.exp(2.0 * z)) ) else: raise ValueError("Invalid value for hyper_deriv, " + str(hyper_deriv)) else: return -(h - b) / (2.0 * delta * (scipy.cosh(z))**2) * ( 1 + alpha / 4.0 * (1 + 2 * z + scipy.exp(2 * z)) ) else: raise NotImplementedError("Derivatives of order greater than 1 are not supported!")
[docs]class MtanhMeanFunction1d(MeanFunction): """Profile with mtanh edge, suitable for use with :py:class:`GaussianProcess`. All kwargs are passed to :py:class:`MeanFunction`. If `hyperprior` and `param_bounds` are not passed then the hyperprior is taken to be uniform over the following intervals: ===== ==== === x0 0.98 1.1 delta 0.0 0.1 alpha -0.5 0.5 h 0 5 b 0 0.5 ===== ==== === """ def __init__(self, **kwargs): if 'hyperprior' not in kwargs and 'param_bounds' not in kwargs: kwargs['param_bounds'] = [(0.98, 1.1), (0, 0.1), (-0.5, 0.5), (0, 5), (0, 0.5)] super(MtanhMeanFunction1d, self).__init__( mtanh_profile, param_names=['x_0', '\\delta', '\\alpha', 'h', 'b'], **kwargs )
[docs]def linear(X, n, *args, **kwargs): """Linear mean function of arbitrary dimension, suitable for use with :py:class:`MeanFunction`. The form is :math:`m_0 * X[:, 0] + m_1 * X[:, 1] + \dots + b`. Parameters ---------- X : array, (`M`, `D`) The points to evaluate the model at. n : array of non-negative int, (`D`) The derivative order to take, specified as an integer order for each dimension in `X`. *args : num_dim+1 floats The slopes for each dimension, plus the constant term. Must be of the form `m0, m1, ..., b`. """ hyper_deriv = kwargs.pop('hyper_deriv', None) m = scipy.asarray(args[:-1]) b = args[-1] if sum(n) > 1: return scipy.zeros(X.shape[0]) elif sum(n) == 0: if hyper_deriv is not None: if hyper_deriv < len(m): return X[:, hyper_deriv] elif hyper_deriv == len(m): return scipy.ones(X.shape[0]) else: raise ValueError("Invalid value for hyper_deriv, " + str(hyper_deriv)) else: return (m * X).sum(axis=1) + b else: # sum(n) == 1: if hyper_deriv is not None: if n[hyper_deriv] == 1: return scipy.ones(X.shape[0]) else: return scipy.zeros(X.shape[0]) return m[n == 1] * scipy.ones(X.shape[0])
[docs]class LinearMeanFunction(MeanFunction): """Linear mean function suitable for use with :py:class:`GaussianProcess`. Parameters ---------- num_dim : positive int, optional The number of dimensions of the input data. Default is 1. **kwargs : optional kwargs All extra kwargs are passed to :py:class:`MeanFunction`. If `hyperprior` and `param_bounds` are not specified, all parameters are taken to have a uniform hyperprior over [-1e3, 1e3]. """ def __init__(self, num_dim=1, **kwargs): if 'hyperprior' not in kwargs and 'param_bounds' not in kwargs: kwargs['param_bounds'] = [(-1e3, 1e3)] * (num_dim + 1) super(LinearMeanFunction, self).__init__( linear, param_names=['m%d' % (i,) for i in range(0, num_dim)] + ['b'], **kwargs )