Source code for specter.util.util

"""
specter.util.util
=================

Utility functions and classes for specter

Stephen Bailey
Fall 2012
"""

from __future__ import absolute_import, division, print_function, unicode_literals

import os
import math
import numpy as np
import scipy.signal
from scipy.special import legendre
from scipy.sparse import spdiags
from scipy.signal import convolve, convolve2d
from specter.util import pixspline
from time import time

_t0 = 0.0
def _timeit():
    global _t0
    tx = time()
    dt = tx - _t0
    _t0 = tx
    return dt

#- 2D Linear interpolator
[docs]class LinearInterp2D(object): """ Linear interpolation on a 2D grid. Allows values to be interpolated to be multi-dimensional. """ def __init__(self, x, y, data): """ x : array of x coordinates y : array of y coordinates data[ix, iy, ...] : 3 or more dimensional array of data to interpolate first two coordinates are x and y """ self.x = np.array(x) self.y = np.array(y) self.data = np.array(data) def __call__(self, x, y): """ Evaluate data at (x,y) """ #- TODO: compare speed to solution at #- http://stackoverflow.com/questions/12729228/simple-efficient-bilinear-interpolation-of-images-in-numpy-and-python #- Find where we are in grid #- clip to 1 because we will use i and i-1 #- clip to len(x)-1 to allow extrapolation beyond grid boundary ix = np.searchsorted(self.x, x).clip(1, len(self.x)-1) iy = np.searchsorted(self.y, y).clip(1, len(self.y)-1) #- Interpolation distances from points dx = (x - self.x[ix-1]) / (self.x[ix] - self.x[ix-1]) dy = (y - self.y[iy-1]) / (self.y[iy] - self.y[iy-1]) #- Interpolate, allowing x and/or y to be multi-dimensional #- NOTE: these are the slow steps, about equal time each #- Original code with what appears to be vestigial transposes # data1 = (self.data[ix-1,iy-1].T*(1-dx) + self.data[ix,iy-1].T*dx).T # data2 = (self.data[ix-1,iy].T*(1-dx) + self.data[ix,iy].T*dx).T # dataxy = (data1.T*(1-dy) + data2.T*dy).T #- Updated without transposes data1 = (self.data[ix-1,iy-1]*(1-dx) + self.data[ix,iy-1]*dx) data2 = (self.data[ix-1,iy]*(1-dx) + self.data[ix,iy]*dx) dataxy = (data1*(1-dy) + data2*dy) return dataxy
[docs]def rebin_image(image, n): """ rebin 2D array pix into bins of size n x n New binsize must be evenly divisible into original pix image """ assert image.shape[0] % n == 0 assert image.shape[1] % n == 0 s = image.shape[0]//n, n, image.shape[1]//n, n return image.reshape(s).sum(-1).sum(1)
#- Utility functions for sinc shifting pixelated PSFs
[docs]def _sincfunc(x, dx, dampfac=3.25): """sinc helper function for sincshift()""" if dx != 0.0: xx = (x+dx)*np.pi #- cache shifted array for 30% faster evals return np.exp( -(xx/(dampfac*np.pi))**2 ) * np.sin(xx) / xx else: xx = np.zeros(len(x)) xx[len(x)//2] = 1.0 return xx
#- Implementation note: the typical PSF image is 15x15. #- fftconvolve is not faster than convolve for images this small
[docs]def sincshift(image, dx, dy, sincrad=10, dampfac=3.25): """ Return image shifted by dx, dy using sinc interpolation. For speed, do each dimension independently which can introduce edge effects. Also see sincshift2d(). """ s = np.arange(-sincrad, sincrad+1.0) imgshape = image.shape if abs(dx) > 1e-6: sincx = _sincfunc(s, -dx, dampfac=dampfac) image = convolve(image.ravel(), sincx, mode='same') image = image.reshape(imgshape) if abs(dy) > 1e-6: sincy = _sincfunc(s, -dy, dampfac=dampfac) image = convolve(image.T.ravel(), sincy, mode='same') image = image.reshape(imgshape[-1::-1]).T return image
[docs]def sincshift2d(image, dx, dy, sincrad=10, dampfac=3.25): """ Return image shifted by dx, dy using full 2D sinc interpolation """ s = np.arange(-sincrad, sincrad+1.0) sincx = _sincfunc(s, -dx, dampfac=dampfac) sincy = _sincfunc(s, -dy, dampfac=dampfac) kernel = np.outer(sincy, sincx) newimage = convolve2d(image, kernel, mode='same') return newimage
from scipy.special import erf
[docs]def gaussint(x, mean=0.0, sigma=1.0): """ Return integral from -inf to x of normalized Gaussian with mean and sigma """ z = (x - mean) / (math.sqrt(2) * sigma) return (erf(z) + 1.0) / 2.0
[docs]def gausspix(x, mean=0.0, sigma=1.0): """ Return Gaussian(mean,sigma) integrated over unit width pixels centered at x[]. """ edges = np.concatenate((x-0.5, x[-1:]+0.5)) integrals = gaussint(edges, mean=mean, sigma=sigma) return integrals[1:] - integrals[0:-1]
[docs]def weighted_solve(A, b, w): """ Solve `A x = b` with weights `w` on `b` Returns x, inverseCovarance(x) """ assert len(b) == len(w) n = len(b) W = spdiags(w, [0,], n, n) y = A.T.dot(W.dot(b)) iCov = A.T.dot(W.dot(A)) x = np.linalg.lstsq(iCov, y)[0] return x, iCov
[docs]def trapz(edges, xp, yp): """ Perform trapezoidal integration between edges using sampled function yp vs. xp. Returns array of length len(edges)-1. Input xp array must be sorted in ascending order. See also numpy.trapz, which integrates a single array """ if np.any(np.diff(xp) < 0.0): raise ValueError("Input x must be sorted in increasing order") if len(xp) != len(yp): raise ValueError("xp and yp must have same length") yedge = np.interp(edges, xp, yp) result = np.zeros(len(edges)-1) iedge = np.searchsorted(xp, edges) for i in range(len(edges)-1): ilo, ihi = iedge[i], iedge[i+1] xx = np.concatenate( (edges[i:i+1], xp[ilo:ihi], edges[i+1:i+2]) ) yy = np.concatenate( (yedge[i:i+1], yp[ilo:ihi], yedge[i+1:i+2]) ) result[i] = np.trapz(yy, xx) return result
[docs]def resample(x, xp, yp, xedges=False, xpedges=False): """ IN PROGRESS. Resample a spectrum to a new binning using PixelSpline 1 <= x.ndim <= xp.ndim <= yp.ndim <= 2 """ assert 1 <= x.ndim assert x.ndim <= xp.ndim assert xp.ndim <= yp.ndim assert yp.ndim <= 2 input_edges = xp if xpedges else pixspline.cen2bound(xp) ys = pixspline.PixelSpline(input_edges, yp) edges = x if xedges else pixspline.cen2bound(x) return ys.resample(edges)
#- Faster versions than np.outer, which has to do type checking and raveling try: #- ~3x faster if numba is installed if 'NUMBA_DISABLE_JIT' in os.environ: raise ImportError import numba @numba.jit def outer(x, y, out): for i in range(len(x)): for j in range(len(y)): out[i,j] = x[i] * y[j] return out except ImportError: #- 1.5x faster otherwise def outer(x, y, out): return np.multiply(x[:, None], y[None, :], out) # Much faster than numpy.polynomial.legendre.legval, but doesn't work with scalars import numba @numba.jit(nopython=True,cache=False) def legval_numba(x, c): nd=len(c) ndd=nd xlen = x.size c0=c[-2]*np.ones(xlen) c1=c[-1]*np.ones(xlen) for i in range(3, ndd + 1): tmp = c0 nd = nd - 1 nd_inv = 1/nd c0 = c[-i] - (c1*(nd - 1))*nd_inv c1 = tmp + (c1*x*(2*nd - 1))*nd_inv return c0 + c1*x
[docs]@numba.jit(nopython=True, cache=False) def custom_hermitenorm(n, u): """ Custom implementation of scipy.special.hermitenorm to enable jit-compiling with Numba (which as of 10/2018 does not support scipy). This functionality is equivalent to: fn = scipy.special.hermitenorm(n) return fn(u) with the exception that scalar values of u are not supported. Inputs: n: the degree of the hermite polynomial u: (requires array) points at which the polynomial will be evaulated. Outputs: res: the value of the hermite polynomial at array points(u) """ #below is (mostly) cut and paste from scipy orthogonal_eval.pxd #some modifications have been made to operate on an array #rather than a single value (as in the original version) res=np.zeros(len(u)) if n < 0: return (0.0)*np.ones(len(u)) elif n == 0: return (1.0)*np.ones(len(u)) elif n == 1: return u else: y3 = 0.0 y2 = 1.0 for i,x in enumerate(u): for k in range(n, 1, -1): y1 = x*y2-k*y3 y3 = y2 y2 = y1 res[i]=x*y2-y3 #have to reset before the next iteration y3 = 0.0 y2 = 1.0 return res
[docs]@numba.jit(nopython=True, cache=False) def custom_erf(y): """Custom implementation of :func:`scipy.special.erf` to enable jit-compiling with Numba (which as of 10/2018 does not support scipy). This functionality is equilvalent to:: scipy.special.erf(y) with the exception that scalar values of y are not supported. Parameters ---------- y : array-like Points at which the error function will be evaluated. Returns ------- array-like The value of the error function at points in array `y`. Notes ----- This function has been translated from the original fortran function to Python. The original scipy erf function can be found at: https://github.com/scipy/scipy/blob/8dba340293fe20e62e173bdf2c10ae208286692f/scipy/special/cdflib/erf.f Note that this new function introduces a small amount of machine-precision numerical error as compared to the original scipy function. """ #have to define a ton of constants c=0.564189583547756E0 ### a1=0.771058495001320E-04 a2=-0.133733772997339E-02 a3=0.323076579225834E-01 a4=0.479137145607681E-01 a5=0.128379167095513E+00 ### b1=0.301048631703895E-02 b2=0.538971687740286E-01 b3=0.375795757275549E+00 ### p1=-1.36864857382717E-07 p2=5.64195517478974E-01 p3=7.21175825088309E+00 p4=4.31622272220567E+01 p5=1.52989285046940E+02 p6=3.39320816734344E+02 p7=4.51918953711873E+02 p8=3.00459261020162E+02 ### q1=1.00000000000000E+00 q2=1.27827273196294E+01 q3=7.70001529352295E+01 q4=2.77585444743988E+02 q5=6.38980264465631E+02 q6=9.31354094850610E+02 q7=7.90950925327898E+02 q8=3.00459260956983E+02 ### r1=2.10144126479064E+00 r2=2.62370141675169E+01 r3=2.13688200555087E+01 r4=4.65807828718470E+00 r5=2.82094791773523E-01 ### s1=9.41537750555460E+01 s2=1.87114811799590E+02 s3=9.90191814623914E+01 s4=1.80124575948747E+01 ### #end of constants #the orig version is meant for a single point #need to modify to work on an array erf = np.zeros(len(y)) for i,x in enumerate(y): ax=abs(x) #change gotos into something sensible if ax <= 0.5E0: t=x*x top = ((((a1*t+a2)*t+a3)*t+a4)*t+a5) + 1.0E0 bot = ((b1*t+b2)*t+b3)*t + 1.0E0 erf[i] = x * (top/bot) elif 0.5E0 < ax <= 4.0E0: top = ((((((p1*ax+p2)*ax+p3)*ax+p4)*ax+p5)*ax+p6)*ax + p7)*ax + p8 bot = ((((((q1*ax+q2)*ax+q3)*ax+q4)*ax+q5)*ax+q6)*ax + q7)*ax + q8 val = 0.5E0 + (0.5E0 - np.exp(-x*x)*top/bot) if x < 0.0E0: erf[i] = -val else: erf[i] = val elif 4.0E0 < ax < 5.8E0: x2 = x*x t = 1.0E0/x2 top = (((r1*t+r2)*t+r3)*t+r4)*t + r5 bot = (((s1*t+s2)*t+s3)*t+s4)*t + 1.0E0 val = (c-top/ (x2*bot)) / ax val = 0.5E0 + (0.5E0 - np.exp(-x2)*val) if x < 0.0E0: erf[i] = -val else: erf[i] = val elif ax >= 5.8E0: #choose the sign if x < 0.0E0: erf[i] = -1.0E0 else: erf[i] = 1.0E0 return erf