Source code for cosmicpy.survey

# Copyright (c) 2014-2015, CosmicPy Developers
# Licensed under CeCILL 2.1 - see LICENSE.rst
"""
:mod:`cosmicpy.survey` -- Galaxy survey representation
=====================================================

.. module:: cosmicpy.survey
    :synopsis: Contains surveys related objects and functions
.. moduleauthor:: Francois Lanusse <francois.lanusse@cea.fr>
.. moduleauthor:: Anais Rassat <anais.rassat@epfl.ch>
.. Created on Jun 13, 2013 by Francois Lanusse

.. autosummary::

    survey
    redshift_bin
    nuisance
    grid_nuisance

"""
from numpy import *
from scipy.integrate import romberg
from scipy.optimize import brentq
from scipy.special import erf
from scipy.interpolate import interp2d
from abc import ABCMeta, abstractmethod
from .utils import *


[docs]class nuisance(object): r"""Generic nuisance representation of a 2D function""" __metaclass__ = ABCMeta def __init__(self, name, function, Nx, Ny, xlim, ylim): self._name = name self._xlim = xlim self._ylim = ylim self._Ny = Ny self._Nx = Nx self._params_values = zeros((Nx, Ny)) self._params_amplitude = 1.0 self._model(function)
[docs] def update(self, **kwargs): r"""Update the nuisance parameters""" if len(kwargs) is 0: return # Looks for keywords and update the survey parameters for kw in kwargs: if self._name in kw: # Extract the number of the parameter n = int(kw.split('_p')[1]) if n == 0: self._params_amplitude = kwargs[kw] else: self._params_values.flat[n - 1] = kwargs[kw]
@property
[docs] def Np(self): r""" Number of nuisance parameters""" return self._Nx * self._Ny + 1
def get_value(self, ind): if ind == 0: return self._params_amplitude else: return self._params_values.flat[ind - 1] @abstractmethod
[docs] def __call__(self, x, y): r""" Evaluates the function for the given set of nuisance parameters""" pass
@abstractmethod def _model(self, func): r""" Models the given function using the nuisance parameters""" pass
[docs]class grid_nuisance(nuisance): r""" Generic nuisance parametrisation on a grid """ def _model(self, func): # Compute the position of the grid nodes self._x = logspace(log10(self._xlim[0]), log10(self._xlim[1]), self._Nx + 2) self._y = logspace(log10(self._ylim[0]), log10(self._ylim[1]), self._Ny + 2) self._params_values *= 0 self._border = zeros((self._Nx + 2, self._Ny + 2)) # for i in range(self._Nx + 2): # for j in range(self._Ny + 2): # self._border[i, j] = func(self._x[i], self._y[j]) self._params_amplitude = 1.0 self._func = func def __call__(self, x, y): x = atleast_1d(x) y = atleast_1d(y) # Compute the bilinear interpolation of the logarithmique grid at the # requested position self._border[1:-1, 1:-1] = self._params_values[:, :] lnQ = interp2d(log10(self._y), log10(self._x), self._border, copy=False, bounds_error=False, fill_value=0) return self._params_amplitude * exp(lnQ(log10(y), log10(x)))*self._func(x, y)
[docs]class survey(object): r""" Contains all the parameters corresponding to a survey. Parameters ---------- nzparams : dict, optional Dictionary containing the parameters of the desired redshift distribution. nzbins : int, optional Number of redshift bins in the survey (def: 1). bintype : str, optional Type of redshift binning. Either 'eq_dens' or 'eq_size' (def: 'eq_dens'). ngal : float, optional Number of galaxies per square arcmins (def: 40). zmin : float, optional Minimum redshift considered in the survey (def: 0). zmax : float, optional Maximum redshift considered in the survey (def: 5). chicut : float, optional Level of the cut to apply to the selection function, relative to the maximum of the selection fuction. If 0, :math:`\chi_\max` is set to :math:`\chi(z_\max)` (def : 1e-5). zphot_sig : float, optional Standard variation of photometric redshift errors (def: 0.05). zphot_bias : float, optional Bias of photometric redshift errors (def: 0). fsky : float, optional Sky fraction covered by the survey (def: 0.4848). biastype : str, optional Type of galaxy bias. Either 'constant' or 'sqrt' (def: 'sqrt') Notes ----- Several parametrisations for the redshifts can be specified: * 'smail' : n(z) = z^a exp( - (z/z0)^b) * 'gaussian' : n(z) = exp(-(r/r0)^2) * r ^2 * dr/dz """ def __init__(self, nuisances=[], **kwargs): self._nzparams = {'type': 'smail', 'a': 2.0, 'b': 1.5, 'z0': 0.9 / sqrt(2)} self._zmin = 0 self._zmax = 5 self._chicut = 1e-5 self._nzbins = 1 self._zphot_sig = 0.05 self._zphot_bias = 0 self._ngal = 40 self._fsky = 0.4848 self._bintype = 'eq_dens' self._biastype = 'sqrt' self.nuisances = {} self._norm = None self._zmean = None self._zmed = None self.update(**kwargs)
[docs] def addNuisance(self, params): """ Adds nuisance parameters to the survey params is of the form {'name':'bias', 'type':'grid', 'Nx':3,'Ny':4,'xlim':[0.001,1],'ylim':[0.001,0.1]} """ if params['name'] == 'bias': if params['type'] == 'grid': if self._biastype == 'sqrt': def func(zp, k): zp = atleast_1d(zp) k = atleast_1d(k) return einsum('i,j->ij', sqrt(zp), ones(len(k))) else: def func(zp, k): zp = atleast_1d(zp) k = atleast_1d(k) return ones((len(zp), len(k))) nuisance = grid_nuisance('bias', func, params['Nx'], params['Ny'], array(params['xlim'])+1, params['ylim']) self.nuisances['bias'] = nuisance
[docs] def update(self, **kwargs): r""" Updates the survey after parameters have been modified. Parameters ---------- nzparams : dict, optional Dictionary containing the parameters of the desired redshift distribution. nzbins : int, optional Number of redshift bins in the survey. bintype : str, optional Type of redshift binning. Either 'eq_dens' or 'eq_size'. ngal : float, optional Number of galaxies per square arcmins. zmin : float, optional Minimum redshift considered in the survey. zmax : float, optional Maximum redshift considered in the survey. chicut : float, optional Level of the cut to apply to the selection function, relative to the maximum of the selection fuction. If 0, :math:`\chi_\max` is set to :math:`\chi(z_\max)` (def : 1e-5). zphot_sig : float, optional Standard variation of photometric redshift errors. zphot_bias : float, optional Bias of photometric redshift errors. fsky : float, optional Sky fraction covered by the survey. biastype : str, optional Type of galaxy bias. Either 'constant' or 'sqrt'. """ if len(kwargs) is 0: return # Looks for keywords and update the survey parameters for kw in kwargs: if kw == 'nzbins': self._nzbins = kwargs[kw] elif kw == 'nzparams': self._nzparams = kwargs[kw] elif kw == 'zmin': self._zmin = kwargs[kw] elif kw == 'zmax': self._zmax = kwargs[kw] elif kw == 'chicut': self._chicut = kwargs[kw] elif kw == 'zphot_sig': self._zphot_sig = kwargs[kw] elif kw == 'zphot_bias': self._zphot_bias = kwargs[kw] elif kw == 'ngal': self._ngal = kwargs[kw] elif kw == 'bintype': self._bintype = kwargs[kw] elif kw == 'fsky': self._fsky = kwargs[kw] elif kw == 'biastype': self._biastype = kwargs[kw] for n in self.nuisances.values(): n.update(**kwargs) self._norm = None self._zmean = None self._zmed = None # Updates the redshift bins nztot = redshift_bin(self, zphot_min=self._zmin, zphot_max=self._zmax) self.zbins = nztot.subdivide(self._nzbins, bintype=self._bintype)
[docs] def nz_unorm(self, z): r""" Computes the non normalized n(z) Parameters ---------- z : array_like Redshift Returns ------- n : ndarray, or float if input scalar Non normalised redshift distribution evaluated at the specified redshift """ if self._nzparams['type'] == 'smail': a = self._nzparams['a'] b = self._nzparams['b'] z0 = self._nzparams['z0'] p = z**a * exp(-(z / z0)**b) elif self._nzparams['type'] == 'gaussian': r0 = self._nzparams['r0'] cosmo = self._nzparams['cosmology'] a = z2a(z) r = cosmo.a2chi(a) p = exp(-(r / r0)**2) * r**2 * (cosmo.dchioverda(a) / cosmo.dzoverda(a)) else: print ('ERROR : unknown redshift bin type ' + self.params['type']) p = 0 return p
[docs] def nz(self, z): r""" Computes the normalized n(z) Parameters ---------- z : array_like Redshift Returns ------- n : ndarray, or float if input scalar Normalised redshift distribution evaluated at the specified redshift See Also -------- nz_unorm : Non normalised n(z) """ return self.nz_unorm(z) / self.norm
[docs] def phi(self, cosmo, chi): r""" Computes the normalised survey selection function at a given comoving distance. Parameters ---------- cosmo : cosmology Cosmology object required to convert comoving distances. chi : array_like Comoving distance. Returns ------- phi : ndarray, or float if input is scalar Normalised selection function of the survey. Notes ----- The survey selection function :math:`\phi` verifies: .. math:: \int d^3 r \phi (r) = V where :math:`V` is a characteristic volume of the survey chosen such that :math:`\phi \rightarrow 1` as :math:`V \rightarrow \infty`. .. warning:: This function returns the selection function scaled by the survey volume i.e. :math:`\frac{\phi}{V}` """ chi = atleast_1d(chi) a = cosmo.chi2a(chi) phi = self.nz(a2z(a)) / (4.*pi*chi**2) * (cosmo.dzoverda(a) / cosmo.dchioverda(a)) # Apply zmax cut phi[a2z(a) > self.zmax] = 0 return phi
[docs] def chimax(self, cosmo): r""" Computes the maximum comoving distance for the survey by corresponding to the point where the selection function reaches the cut level. Parameters ---------- cosmo : cosmology Cosmology object required to convert comoving distances. cut : float, optional Level of the cut to apply to the selection function, relative to the maximum of the selection fuction (def : 1e-5) Returns ------- chi : float Maximum survey comoving distance """ # If chicut is set to 0, chimax is set to zmax if self._chicut <= 0: return cosmo.a2chi(z2a(self.zmax)).tolist() # First, find approximate maximum of the selection function chi = cosmo.a2chi(z2a(self.zmax)) chi = linspace(1.0, chi) phitab = self.phi(cosmo, chi) m = log10(phitab.max()*self._chicut) phi = lambda x: log10(self.phi(cosmo, x)) - m chimax = brentq(phi, chi[argmax(phitab)], cosmo.a2chi(cosmo._amin)) if chimax >= cosmo.a2chi(cosmo._amin): print("Warning : reaching maximum tabulated comoving distance") return chimax
@property
[docs] def norm(self): r""" Normalisation factor of the redshift distribution""" if self._norm is None: if self._nzparams['type'] == 'smail': z0 = self._nzparams['z0'] elif self._nzparams['type'] == 'gaussian': r0 = self._nzparams['r0'] cosmo = self._nzparams['cosmology'] z0 = a2z(cosmo.chi2a(r0)) else: print('ERROR : unknown redshift bin type ' + self.params['type']) # The normalisation factor is computed in 2 steps to avoid failure # of the integration scheme self._norm = romberg(self.nz_unorm, self.zmin, z0) + \ romberg(self.nz_unorm, z0, self.zmax) return self._norm
[docs] def bias(self, z, k): r""" Galaxy bias b(z,k) Parameters ---------- z : array_like Redshift Returns ------- b : ndarray, or float if input is scalar Galaxy bias evaluated at the specified redshift Notes ----- The galaxy bias follows the """ z = atleast_1d(z) k = atleast_1d(k) if 'bias' in self.nuisances: return self.nuisances['bias'](1+z, k) if self._biastype == 'sqrt': return einsum('i,j->ij', sqrt(1 + z), ones(len(k))) else: return ones((len(z), len(k)))
@property
[docs] def zmed(self): r""" Median of the redshift distribution. """ if self._zmed is None: f = lambda x: romberg(self.nz, self.zmin, x) - 0.5 self._zmed = brentq(f, self.zmin, self.zmax) return self._zmed
@property
[docs] def zmean(self): r""" Mean of the redshift distribution. """ if self._zmean is None: self._zmean = romberg(lambda z: z * self.nz(z), self.zmin, self.zmax) return self._zmean
@property def zmin(self): r""" Minimun Redshift """ return self._zmin @zmin.setter
[docs] def zmin(self, val): r""" Sets the minimum redshift """ self.update(wa=val)
@property def zmax(self): return self._zmax @property def zphot_sig(self): return self._zphot_sig @property def zphot_bias(self): return self._zphot_bias @property def ngal(self): return self._ngal @property def N(self): res = self._ngal res *= 60.0*60.0 # gal/deg^2 res *= (180.0/pi)**2 # gal/stradian res *= 4.0 * pi * self.fsky return res @property def nzbins(self): return self._nzbins @property def fsky(self): return self._fsky
[docs]class redshift_bin(object): """ Represents a redshift bin """ def __init__(self, surv, **kwargs): self.surv = surv self.zphot_min = surv.zmin self.zphot_max = surv.zmax for kw in kwargs: if kw == 'zphot_min': self.zphot_min = kwargs[kw] elif kw == 'zphot_max': self.zphot_max = kwargs[kw] else: print ('WARNING : Unknown keyword ' + kw) self.zmin = max([self.zphot_min - 10*self.surv.zphot_sig, 0]) self.zmax = self.zphot_max + 10*self.surv.zphot_sig self._norm = None self._zmean = None self._zmed = None
[docs] def nz_unorm(self, z): """ Computes the un-normalized n(z) """ p = self.surv.nz(z) # Apply photo-z errors x = 1.0/(sqrt(2.0)*self.surv.zphot_sig*(1.0+z)) res = 0.5* p *( erf((self.zphot_max - z + self.surv.zphot_bias)*x) - erf((self.zphot_min - z + self.surv.zphot_bias)*x)) return res
def nz(self, z): return self.nz_unorm(z)/self.norm
[docs] def subdivide(self, nbins, bintype='eq_dens'): """ Divide this redshift bins into sub-bins nbins : Number of bins to generate bintype : 'eq_dens' or 'eq_size' """ # Compute the redshift boundaries for each bin generated zbounds = [self.zphot_min] bins = [] n_per_bin = self.norm / nbins for i in range(nbins-1): if bintype == 'eq_dens': zbound = brentq(lambda z: romberg(self.nz,self.zphot_min,z) - (i+1.0)*n_per_bin, zbounds[i], self.zmax) else: if bintype != 'eq_size': print ('WARNING : unknown binning scheme ' + bintype + '. Assuming equal size bins') zbound = (i+1.) * (self.zphot_max - self.zphot_min)/nbins zbounds.append(zbound) new_bin = redshift_bin(self.surv, zphot_min=zbounds[i],zphot_max=zbounds[i+1]) bins.append(new_bin) zbounds.append(self.zmax) new_bin = redshift_bin(self.surv, zphot_min=zbounds[nbins-1],zphot_max=zbounds[nbins]) bins.append(new_bin) return bins
[docs] def phi(self, cosmo, chi): r""" Computes the normalised selection function for the bin at a given comoving distance. Parameters ---------- cosmo : cosmology Cosmology object required to convert comoving distances. chi : array_like Comoving distance. Returns ------- phi : ndarray, or float if input is scalar Normalised selection function of the bin. Notes ----- The normalised survey selection function :math:`\phi` verifies: .. math:: \int d^3 r \phi (r) = 1 """ chi = atleast_1d(chi) a = cosmo.chi2a(chi) phi = self.nz(a2z(a)) / (4.*pi*chi**2) * (cosmo.dzoverda(a) / cosmo.dchioverda(a)) # Apply zmax cut phi[a2z(a) > self.zmax] = 0 return phi
@property
[docs] def kmax_lin(self): """ Maximum linear scale at the median redshift in h^-1 Mpc""" return min(0.132*self.zmed, 0.25)
@property
[docs] def zmed(self): """ Median of the redshift distribution """ if self._zmed is None: f = lambda x: romberg(self.nz, self.zmin, x) - 0.5 self._zmed = brentq(f, self.zmin, self.zmax) return self._zmed
@property
[docs] def zmean(self): """ Mean of the distribution """ if self._zmean is None: self._zmean = romberg(lambda z: z*self.nz(z), self.zmin, self.zmax) return self._zmean
@property
[docs] def norm(self): """ Normalisation of the distribution """ if self._norm is None: self._norm = romberg(self.nz_unorm, self.zmin, self.zmax) return self._norm
@property
[docs] def ngal(self): """ Returns the number of galaxies in this photo-z bin in steradian""" res = self.surv.ngal*self.norm res *= 60.0*60.0 #gal/deg^2 res *= (180.0/pi)**2 #gal/stradian return res