Source code for cosmicpy.fisher

# Copyright (c) 2014-2015, CosmicPy Developers
# Licensed under CeCILL 2.1 - see LICENSE.rst
"""
:mod:`cosmicpy.fisher` -- Fisher forecasts
=========================================

.. module:: cosmicpy.fisher
    :synopsis: Computes Fisher Matrices
.. moduleauthor:: Francois Lanusse <francois.lanusse@cea.fr>
.. moduleauthor:: Anais Rassat <anais.rassat@epfl.ch>
.. Created on Jul 9, 2013 by Francois Lanusse

.. autosummary::

    fisher
    fisherTomo
    fisher3d

"""

from .cosmology import cosmology
from .spectra import spectra
from .utils import *
from copy import deepcopy
import itertools
from numpy import *
from scipy.integrate import *
from scipy.optimize import brentq
import matplotlib.pyplot as plt
from matplotlib.patches import Ellipse
from scipy.interpolate import interp1d
from abc import ABCMeta, abstractmethod


[docs]class fisher(object): """ Base class to perform a Fisher Analysis from specified cosmology, survey and cosmological parameters. """ __metaclass__ = ABCMeta def __init__(self, fid_cosmo, fid_survey, params, margin_params=[]): """ Constructor """ self.step = 0.003 self.fid_cosmo = fid_cosmo self.fid_surv = fid_survey self.params = [] # Check that the parameters provided are present in survey or cosmo for p in params: # First find the fiducial value for the parameter in question if p in dir(self.fid_surv) or p in dir(self.fid_cosmo): self.params.append(p) else: print("Warning, unknown parameter in derivative :" + p) # Checks that the marginalisation parameters are actually considered # in the Fisher analysis self.margin_params = [] for p in margin_params: if self.fid_surv.nuisances: self.margin_params.append(p) else: print("Warning, requested marginalisation parameter " + p + " is not included in the analysis") # Create a spectra object with a copy of the fiducial cosmology self.spectra = spectra(deepcopy(self.fid_cosmo), deepcopy(self.fid_surv)) # Precomputed Fisher matrix self._fullMat = None self._fullInvMat = None self._mat = None self._invmat = None @abstractmethod def _computeObservables(self): pass @abstractmethod def _computeFullMatrix(self): pass
[docs] def Fij(self, param_i, param_j): """ Returns the matrix element of the Fisher matrix for parameters param_i and param_j """ i = self.params.index(param_i) j = self.params.index(param_j) return self.mat[i, j]
[docs] def invFij(self, param_i, param_j): """ Returns the matrix element of the inverse Fisher matrix for parameters param_i and param_j """ i = self.params.index(param_i) j = self.params.index(param_j) return self.invmat[i, j]
def sigma_fix(self, param): return 1.0 / sqrt(self.Fij(param, param)) def sigma_marg(self, param): return sqrt(self.invFij(param, param))
[docs] def sub_matrix(self, subparams): """ Extracts a submatrix from the current fisher matrix using the parameters in params """ params = [] for p in subparams: # Checks that the parameter exists in the orignal matrix if p in self.params: params.append(p) else: print("Warning, parameter not present in original \ Fisher matrix, left ignored :" + p) newFisher = fisher(self.fid_cosmo, self.fid_surv, params) # Fill in the fisher matrix from the precomputed matrix newFisher._mat = zeros((len(params), len(params))) for i in range(len(params)): indi = self.params.index(params[i]) for j in range(len(params)): indj = self.params.index(params[j]) newFisher._mat[i, j] = self.mat[indi, indj] newFisher._invmat = linalg.inv(newFisher._mat) return newFisher
def _marginalise(self, params): r""" Marginalises the Fisher matrix over unwanted parameters. Parameters ---------- params: list List of parameters that should not be marginalised over. Returns ------- (mat, invmat): ndarray Marginalised Fisher matrix and its invers """ # Builds inverse matrix marg_inv = zeros((len(params), len(params))) for i in range(len(params)): indi = self.params.index(params[i]) for j in range(len(params)): indj = self.params.index(params[j]) marg_inv[i, j] = self.invmat[indi, indj] marg_mat = linalg.pinv(marg_inv) return (marg_mat, marg_inv)
[docs] def corner_plot(self, nstd=2, labels=None, **kwargs): r""" Makes a corner plot including all the parameters in the Fisher analysis """ if labels is None: labels = self.params for i in range(len(self.params)): for j in range(i): ax = plt.subplot(len(self.params)-1, len(self.params)-1 , (i - 1)*(len(self.params)-1) + (j+1)) if i == len(self.params) - 1: ax.set_xlabel(labels[j]) else: ax.set_xticklabels([]) if j == 0: ax.set_ylabel(labels[i]) else: ax.set_yticklabels([]) self.plot(self.params[j], self.params[i], nstd=nstd, ax=ax, **kwargs) plt.subplots_adjust(wspace=0) plt.subplots_adjust(hspace=0)
[docs] def plot(self, p1, p2, nstd=2, ax=None, **kwargs): r""" Plots confidence contours corresponding to the parameters provided. Parameters ---------- """ params = [p1, p2] def eigsorted(cov): vals, vecs = linalg.eigh(cov) order = vals.argsort()[::-1] return vals[order], vecs[:, order] mat, cov = self._marginalise(params) # First find the fiducial value for the parameter in question fid_param = None pos = [0, 0] for p in params: if p in dir(self.fid_surv): fid_param = getattr(self.fid_surv, p) else: fid_param = getattr(self.fid_cosmo, p) pos[params.index(p)] = fid_param if ax is None: ax = plt.gca() vals, vecs = eigsorted(cov) theta = degrees(arctan2(*vecs[:, 0][::-1])) # Width and height are "full" widths, not radius width, height = 2 * nstd * sqrt(vals) ellip = Ellipse(xy=pos, width=width, height=height, angle=theta, **kwargs) ax.add_artist(ellip) sz = max(width, height) s1 = 1.5*nstd*self.sigma_marg(p1) s2 = 1.5*nstd*self.sigma_marg(p2) ax.set_xlim(pos[0] - s1, pos[0] + s1) ax.set_ylim(pos[1] - s2, pos[1] + s2) #ax.set_xlim(pos[0] - sz, pos[0] + sz) #ax.set_ylim(pos[1] - sz, pos[1] + sz) plt.draw() return ellip
@property
[docs] def FoM_DETF(self): """ Computes the figure of merit from the Dark Energy Task Force Albrecht et al 2006 FoM = 1/sqrt(det(F^-1_{w0,wa})) """ det = (self.invFij('w0', 'w0') * self.invFij('wa', 'wa') - self.invFij('wa', 'w0') * self.invFij('w0', 'wa')) return 1.0 / sqrt(det)
@property
[docs] def FoM(self): """ Total figure of merit : ln (1/det(F^{-1})) """ return log(1.0 / abs(linalg.det(self.invmat)))
@property
[docs] def invmat(self): """ Returns the inverse fisher matrix """ if self._invmat is None: self._invmat = linalg.inv(self.mat) return self._invmat
@property
[docs] def mat(self): """ Returns the fisher matrix marginalised over nuisance parameters """ # If the matrix is not already computed, compute it if self._mat is None: self._fullMat = self._computeFullMatrix() self._fullInvMat = linalg.pinv(self._fullMat) # Apply marginalisation over nuisance parameters self._invmat = self._fullInvMat[0:len(self.params), 0:len(self.params)] self._mat = linalg.pinv(self._invmat) return self._mat
def _computeDerivatives(self): """ Computes all the derivatives of the specified observable with respect to the parameters and nuisance parameters in the analysis""" # List the derivatives with respect to all the parameters dcldp = [] old_fid_param = None old_param = None # Computes all the derivatives with respect to the main parameters for p in self.params: print("varying :" + p) # First find the fiducial value for the parameter in question fid_param = None if p in dir(self.fid_surv): fid_param = getattr(self.fid_surv, p) else: fid_param = getattr(self.fid_cosmo, p) step = fid_param * self.step if fid_param == 0: step = self.step # Compute derivative using the 2 point formula if old_param is None: kw = {p: fid_param + step, 'makeFlat': True} else: kw = {p: fid_param + step, old_param: old_fid_param, 'makeFlat': True} self.spectra.update(**kw) clp = self._computeObservables() kw = {p: fid_param - step, 'makeFlat': True} self.spectra.update(**kw) clm = self._computeObservables() dcl = [] for (pl, mi) in zip(clp, clm): dcl.append((pl - mi) / (2.0 * step)) dcldp.append(dcl) old_fid_param = fid_param old_param = p # Reset everything to the fiducial value kw = {old_param: old_fid_param, 'makeFlat': True} self.spectra.update(**kw) # Now, computing derivatives with respect to the nuisance parameters for p in self.margin_params: print("Varying nuisance parameter :" + p) nuisance = self.fid_surv.nuisances[p] np = nuisance.Np # Start varying each nuisance parameter for ind_param in range(np): fid_param = nuisance.get_value(ind_param) step = fid_param * self.step if fid_param == 0: step = self.step kw = {p + '_p' + str(ind_param): fid_param + step} self.spectra.update(**kw) clp = self._computeObservables() kw = {p + '_p' + str(ind_param): fid_param - step} self.spectra.update(**kw) clm = self._computeObservables() dcl = [] for (pl, mi) in zip(clp, clm): dcl.append((pl - mi) / (2.0 * step)) dcldp.append(dcl) # Resetting nuisance parameter kw = {p + '_p' + str(ind_param): fid_param} self.spectra.update(**kw) return dcldp
[docs]class fisherTomo(fisher): """ Tomographic Fisher matrix """ def __init__(self, fid_cosmo, fid_survey, params, probes, margin_params=[], cutNonLinearScales=None, lmax=10000, nl=200, lmin=2, diagonal=False): """ Initializes a tomographic Fisher Matrix using the probes given in probes """ self.probes = probes # Calls super class initialization super(fisherTomo, self).__init__(fid_cosmo, fid_survey, params, margin_params) self.diagonal = diagonal # Create a list of redshift bins if diagonal: self.bins = [] for i in range(self.fid_surv.nzbins): self.bins.append((i,i)) else: self.bins = list(itertools.combinations_with_replacement( range(self.fid_surv.nzbins), r=2)) # Create a list of spectra from the probes self.crossprobes = list( itertools.combinations_with_replacement(probes, r=2)) # Create list of power spectra from probes and redshift bins self.cls = list(itertools.product(self.crossprobes, self.bins)) # Compute l range for each redshift bin self.lmax = zeros(len(self.cls)) for i in range(len(self.cls)): zmin = min(self.fid_surv.zbins[self.cls[i][1][0]].zmed, self.fid_surv.zbins[self.cls[i][1][1]].zmed) if cutNonLinearScales is None: self.lmax[i] = lmax else: if cutNonLinearScales is 'optimistic': kmax = 0.25 else: kmax = min(self.fid_surv.zbins[self.cls[i][1][0]].kmax_lin, self.fid_surv.zbins[self.cls[i][1][1]].kmax_lin) self.lmax[i] = kmax * self.fid_cosmo.a2chi(z2a(zmin)) lmax = self.lmax.max() # Generating an hybrid array self._l = logspace(0, log10(lmax), nl) for i in range(nl): if self._l[i] <= (i + lmin): self._l[i] = i + lmin self._nl = len(self._l) def _computeObservables(self, shotNoise=False): obs = [] for cl in self.cls: obs.append(getattr(self.spectra, 'cl_' + cl[0][0] + cl[0][1]) (cl[1][0], cl[1][1], self._l, shotNoise=shotNoise)) return obs def _computeFullMatrix(self): """ Returns the full fisher matrix. """ def find_index(a, b, i, j): if ((a, b), (i, j)) in self.cls: return self.cls.index(((a, b), (i, j))) else: return self.cls.index(((b, a), (j, i))) # Prefactor geom = 1.0 / (2.0 * self._l + 1.0) / self.fid_surv.fsky print("Computing derivatives") self._dcldp = self._computeDerivatives() print("Computing covariance matrix") cl = self._computeObservables(shotNoise=True) # Precompute all the covariance matrices self._cov = zeros((len(self.cls), len(self.cls), self._nl)) for ind1 in range(len(self.cls)): for ind2 in range(ind1 + 1): if self.diagonal and ind1 != ind2: continue cl1 = self.cls[ind1] cl2 = self.cls[ind2] C02 = find_index(cl1[0][0], cl2[0][0], cl1[1][0], cl2[1][0]) C13 = find_index(cl1[0][1], cl2[0][1], cl1[1][1], cl2[1][1]) C03 = find_index(cl1[0][0], cl2[0][1], cl1[1][0], cl2[1][1]) C12 = find_index(cl1[0][1], cl2[0][0], cl1[1][1], cl2[1][0]) self._cov[ind1, ind2, :] = geom * (cl[C02] * cl[C13] + cl[C03] * cl[C12]) self._cov[ind2, ind1, :] = self._cov[ind1, ind2, :] # Precomputes the inverse of the covariance matrix for each l self._invcov = zeros_like(self._cov) for indl in range(len(self._l)): self._invcov[:, :, indl] = linalg.pinv(self._cov[:, :, indl]) # Computes the fisher Matrix # The size of the Fisher matrix is given by the number of derivatives print("Computing full Fisher matrix") nparams = len(self._dcldp) mat = zeros((nparams, nparams)) for i in range(nparams): for j in range(i + 1): res = zeros_like(self._l, dtype=double) for ind1 in range(len(self.cls)): for ind2 in range(len(self.cls)): temp = (self._dcldp[i][ind1] * self._dcldp[j][ind2] * self._invcov[ind1, ind2, :]) lmax = min(self.lmax[ind1], self.lmax[ind2]) temp[self._l > lmax] = 0 res += temp mat[i, j] = simps(res, x=self._l.astype('float')) mat[j, i] = mat[i, j] return mat
[docs]class fisher3d(fisher): """ Full 3D fisher analysis using the Spherical Fourier-Bessel expansion """ def __init__(self, fid_cosmo, fid_survey, params, margin_params=[], cutNonLinearScales=True): super(fisher3d, self).__init__(fid_cosmo, fid_survey, params, margin_params) self.cutNonLinearScales = cutNonLinearScales # TODO : Adapt the l range self._l = arange(1, 100) self._l[29:] = around(logspace(log(30) / log(10), log(900) / log(10), 70)) self._l = self._l.astype('int') # Compute the mask of admissible values i.e. corresponding to the linear part of the power spectrum self._linearcut = ones_like(self._l) * 0.25 if cutNonLinearScales: for i in range(len(self._l)): cut = lambda k: k * self.fid_cosmo.a2chi(z2a(k/0.132)) - self._l[i] self._linearcut[i] = brentq(cut, 0., 0.25) def _computeObservables(self, shotNoise=False): cl = self.spectra.cl_sfb(self._l, kmax=self._linearcut, shotNoise=shotNoise, fid_cosmo=self.fid_cosmo) obs = [] for i in range(len(self._l)): obs.append(cl[i][1]) return obs def _computeFullMatrix(self): # Prefactor geom = 0.5 * self.fid_surv.fsky * (2.0 * self._l + 1.0) print("Computing covariance matrix") # Computing eigen decomposition of the covariance matrix self._cl = self._computeObservables(shotNoise=True) # Invert the covariance matrix for every l self._invcov = [] for i in range(len(self._l)): self._invcov.append(linalg.eigh(self._cl[i])) print("Computing derivatives") self._dcldp = self._computeDerivatives() nparams = len(self._dcldp) # Apply inverse covariance matrix for i in range(len(self._l)): v, q = self._invcov[i] for p in range(nparams): self._dcldp[p][i] = dot(dot(q.T, dot(self._dcldp[p][i], q)), diag(1.0/v)) print("Computing Full Fisher matrix") # Computes the fisher Matrix self._integrand = zeros((nparams, nparams, len(self._l))) for i in range(nparams): for j in range(i+1): for n in range(len(self._l)): self._integrand[i, j, n] = trace(dot(self._dcldp[i][n], self._dcldp[j][n])) self._integrand[j, i, n] = self._integrand[i, j, n] self._integrand *= geom mat = simps(self._integrand, x=self._l.astype('float')) # integrand = interp1d(self._l, self._integrand, kind='cubic') # l = arange(self._l.min(), self._l.max()+1) # self._mat = 0.5 * self.fid_surv.fsky * sum(integrand(l) * # (2.0*l + 1.0), # axis=2) return mat