Source code for specter.util.pixspline

"""
specter.util.pixspline
======================

Pixel-integrated spline utilities.

Written by A. Bolton, U. of Utah, 2010-2013.
"""
from __future__ import absolute_import, division, print_function, unicode_literals

import numbers
import numpy as np
from scipy import linalg as la
from scipy import sparse as sp
from scipy import special as sf

[docs]def cen2bound(pixelcen): """ Convenience function to do the obvious thing to transform pixel centers to pixel boundaries. """ pixbound = 0.5 * (pixelcen[1:] + pixelcen[:-1]) lo_val = 2. * pixbound[0] - pixbound[1] hi_val = 2. * pixbound[-1] - pixbound[-2] pixbound = np.append(np.append(lo_val, pixbound), hi_val) return pixbound
# def gauss_blur_matrix(pixbound, sig_conv): # """ # Function to generate a Gaussian blurring matrix for a pixelized # spectrum, from specified pixel boundaries and 'sigma' vector. # The matrix will be flux-conserving if the spectrum to which it is # applied has units of 'counts per unit x', and pixbound and sig_conv # both have units of x. # # pixbound should have one more element than sig_conv. # # Output is a scipy sparse matrix that can implement the blurring as: # blurflux = gauss_blur_matrix * flux # where 'flux' has the same dimensions as 'sig_conv'. # """ # # Derived values and error checks: # npix = len(pixbound) - 1 # if (len(sig_conv) != npix): # raise PixSplineError('Need one more element in pixbound than in sig_conv!') # if (sig_conv.min() <= 0.): # raise PixSplineError('sig_conv must be > 0 everywhere!') # xcen = 0.5 * (pixbound[1:] + pixbound[:-1]) # dxpix = pixbound[1:] - pixbound[:-1] # if (dxpix.min() <= 0.): # raise PixSplineError('Pixel boundaries not monotonically increasing!') # # Which "new" pixels does each "old" pixel touch? # # Let's go +/- 6 sigma for all: # sig_width = 6.0 # # A minor correction factor to preserve flux conservation: # cfact = 1./sf.erf(sig_width / np.sqrt(2.)) # xblur_lo = xcen - sig_width * sig_conv # xblur_hi = xcen + sig_width * sig_conv # bin_lo = np.digitize(xblur_lo, pixbound) - 1 # bin_hi = np.digitize(xblur_hi, pixbound) - 1 # # Restrict the ranges: # #xblur_lo = np.where((xblur_lo > pixbound[0]), xblur_lo, pixbound[0]) # #xblur_lo = np.where((xblur_lo < pixbound[-1]), xblur_lo, pixbound[-1]) # #xblur_hi = np.where((xblur_hi > pixbound[0]), xblur_hi, pixbound[0]) # #xblur_hi = np.where((xblur_hi < pixbound[-1]), xblur_hi, pixbound[-1]) # bin_lo = np.where((bin_lo >= 0), bin_lo, 0) # #bin_lo = np.where((bin_lo < npix), bin_lo, npix-1) # #bin_hi = np.where((bin_hi >= 0), bin_hi, 0) # bin_hi = np.where((bin_hi < npix), bin_hi, npix-1) # # Compute total number of non-zero elements in the broadening matrix: # n_each = bin_hi - bin_lo + 1 # n_entries = n_each.sum() # ij = np.zeros((2, n_entries), dtype=long) # v_vec = np.zeros(n_entries, dtype=float) # # Loop over pixels in the "old" spectrum: # pcount = 0L # roottwo = np.sqrt(2.) # bin_vec = np.arange(npix, dtype=long) # for k in range(npix): # xbound = pixbound[bin_lo[k]:bin_hi[k]+2] # # Gaussian integral in terms of error function: # erf_terms = cfact * 0.5 * sf.erf((xbound - xcen[k]) / (roottwo * sig_conv[k])) # erf_int = (erf_terms[1:] - erf_terms[:-1]) * dxpix[k] / dxpix[bin_lo[k]:bin_hi[k]+1] # ij[0,pcount:pcount+n_each[k]] = bin_vec[bin_lo[k]:bin_hi[k]+1] # ij[1,pcount:pcount+n_each[k]] = k # v_vec[pcount:pcount+n_each[k]] = erf_int # pcount += n_each[k] # conv_matrix = sp.coo_matrix((v_vec, ij), shape=(npix,npix)) # return conv_matrix.tocsr()
[docs]class PixSplineError(Exception): def __init__(self, value): self.value = value def __str__(self): return repr(self.value)
[docs]class PixelSpline: """ Pixel Spline object class. Initialize as follows: PS = PixelSpline(pixbound, flux) where pixbound = array of pixel boundaries in baseline units (if same len as flux, treat as centers instead of edges) and flux = array of specific flux values in baseline units. Assumptions: 'pixbound' should have one more element than 'flux', and units of 'flux' are -per-unit-baseline, for the baseline units in which pixbound is expressed, averaged over the extent of each pixel. """ def __init__(self, pixbound, flux): #- If pixbound and flux have same number of elements, assume they #- are centers rather than edges if len(pixbound) == len(flux): pixbound = cen2bound(pixbound) npix = len(flux) # Test for correct argument dimensions: if (len(pixbound) - npix) != 1: raise PixSplineError('Need one more element in pixbound than in flux!') # The array of "delta-x" values: dxpix = pixbound[1:] - pixbound[:-1] # Test for monotonic increase: if dxpix.min() <= 0.: raise PixSplineError('Pixel boundaries not monotonically increasing!') self.npix = npix self.pixbound = pixbound.copy() self.dxpix = dxpix.copy() self.xcen = 0.5 * (pixbound[1:] + pixbound[:-1]).copy() self.flux = flux.copy() maindiag = (dxpix[:-1] + dxpix[1:]) / 3. offdiag = dxpix[1:-1] / 6. upperdiag = np.append(0., offdiag) lowerdiag = np.append(offdiag, 0.) band_matrix = np.vstack((upperdiag, maindiag, lowerdiag)) # The right-hand side: rhs = flux[1:] - flux[:-1] # Solve the banded matrix for the slopes at the ducks: acoeff = la.solve_banded((1,1), band_matrix, rhs) self.duckslopes = np.append(np.append(0., acoeff), 0.) #- convenience wrapper def __call__(self, xnew): """ Evaluate underlying pixel spline at array of points Also see: PixelSpline.resample() """ return self.point_evaluate(xnew, missing=0.0)
[docs] def point_evaluate(self, xnew, missing=0.): """ Evaluate underlying pixel spline at array of points Also see: PixelSpline.resample() """ input_scalar = False if isinstance(xnew, numbers.Real): input_scalar = True xnew = np.array( (xnew,) ) # Initialize output array: outflux = 0. * self.flux[0] * xnew + missing # Digitize into bins: bin_idx = np.digitize(xnew, self.pixbound) # Find the indices of those that are actually in-bounds: wh_in = np.where((bin_idx > 0) * (bin_idx < len(self.pixbound))) if len(wh_in[0]) == 0: return outflux xnew_in = xnew[wh_in] idx_in = bin_idx[wh_in] - 1 # The pixel centers as per the algorithm in use: adiff = self.duckslopes[idx_in+1] - self.duckslopes[idx_in] asum = self.duckslopes[idx_in+1] + self.duckslopes[idx_in] xdiff = xnew_in - self.xcen[idx_in] fluxvals = adiff * xdiff**2 / (2. * self.dxpix[idx_in]) \ + asum * xdiff / 2. \ + self.flux[idx_in] - adiff * self.dxpix[idx_in] / 24. outflux[wh_in] = fluxvals if input_scalar: return outflux[0] else: return outflux
[docs] def find_extrema(self, minima=False): """ Return array of extrema of underlying pixelspline if minima is False, return only maxima """ # Find the formal extrema positions: x_ext = self.xcen - 0.5 * self.dxpix * (self.duckslopes[1:] + self.duckslopes[:-1]) \ / (self.duckslopes[1:] - self.duckslopes[:-1]) # Digitize these into bins: bin_ext = np.digitize(x_ext, self.pixbound) - 1 # The second derivatives, flipped in sign if minima is set: curvat = (-1)**(minima == True) * (self.duckslopes[1:] - self.duckslopes[:-1]) / self.dxpix # Find in-bin maxima: wh_ext = np.where((bin_ext == np.arange(self.npix)) * (curvat < 0)) if len(wh_ext[0]) < 1: return np.array([]) x_ext = x_ext[wh_ext] return x_ext
def _subpixel_average(self, ipix, xlo, xhi): adiff = self.duckslopes[ipix+1] - self.duckslopes[ipix] asum = self.duckslopes[ipix+1] + self.duckslopes[ipix] xlo_c = xlo - self.xcen[ipix] xhi_c = xhi - self.xcen[ipix] outval = adiff * ((xhi-xlo)**2 / 6. + xhi_c * xlo_c / 2.) / self.dxpix[ipix] \ + asum * (xhi_c + xlo_c) / 4. - adiff * self.dxpix[ipix] / 24. + self.flux[ipix] return outval
[docs] def resample(self, pb_new): """ Resample a pixelspline analytically onto a new set of pixel boundaries """ npix_new = len(pb_new) - 1 ### xnew_lo = pb_new[:-1].copy() ### xnew_hi = pb_new[1:].copy() xnew_lo = pb_new[:-1] xnew_hi = pb_new[1:] # Test for monotonic: new_fulldx = xnew_hi - xnew_lo if new_fulldx.min() <= 0.: raise PixSplineError('New pixel boundaries not monotonically increasing!') # Digitize the new boundaries into the original bins: # searchsorted is faster than digitize for sorted data ### bin_idx = np.digitize(pb_new, self.pixbound) - 1 bin_idx = np.searchsorted(self.pixbound, pb_new) - 1 ### bin_lo = bin_idx[:-1].copy() ### bin_hi = bin_idx[1:].copy() bin_lo = bin_idx[:-1] bin_hi = bin_idx[1:] # Array for accumulating new counts: new_counts = np.zeros(npix_new, dtype=self.flux.dtype) # Array for accumulating new pixel widths by pieces. # Only used for debugging so far, but may be useful in future. #new_dxpix = np.zeros(npix_new, dtype=self.flux.dtype) # For convenience, we define the following. # Careful not to modify them... they are views, not copies! xold_lo = self.pixbound[:-1] xold_hi = self.pixbound[1:] # 4 cases to cover: # Case 1: both bin_hi and bin_lo in the same bin: wh_this = np.where((bin_hi == bin_lo) * (bin_lo >= 0) * (bin_hi < self.npix)) if (len(wh_this[0]) > 0): dx_this = xnew_hi[wh_this] - xnew_lo[wh_this] avgval_this = self._subpixel_average(bin_lo[wh_this], xnew_lo[wh_this], xnew_hi[wh_this]) #new_dxpix[wh_this] += dx_this new_counts[wh_this] += avgval_this * dx_this # Case 2: more than one bin, lower segment: wh_this = np.where((bin_hi > bin_lo) * (bin_lo >= 0)) if (len(wh_this[0]) > 0): dx_this = xold_hi[bin_lo[wh_this]] - xnew_lo[wh_this] avgval_this = self._subpixel_average(bin_lo[wh_this], xnew_lo[wh_this], xold_hi[bin_lo[wh_this]]) #new_dxpix[wh_this] += dx_this new_counts[wh_this] += avgval_this * dx_this # Case 3: more than one bin, upper segment: wh_this = np.where((bin_hi > bin_lo) * (bin_hi < self.npix)) if (len(wh_this[0]) > 0): dx_this = xnew_hi[wh_this] - xold_lo[bin_hi[wh_this]] avgval_this = self._subpixel_average(bin_hi[wh_this], xold_lo[bin_hi[wh_this]], xnew_hi[wh_this]) #new_dxpix[wh_this] += dx_this new_counts[wh_this] += avgval_this * dx_this # Case 4: entire bins covered, whole pixels: wh_this = np.where(bin_hi > (bin_lo+1)) nwhole = len(wh_this[0]) if (nwhole > 0): pcounts = self.flux * self.dxpix bin_lo1 = bin_lo+1 #- for speed #dx_this = np.array([self.dxpix[bin_lo[wh_this[0][ii]]+1:bin_hi[wh_this[0][ii]]].sum() for ii in range(nwhole)]) #- Progressively faster versions of same thing: ### icounts_this = np.array([pcounts[bin_lo[wh_this[0][ii]]+1:bin_hi[wh_this[0][ii]]].sum() for ii in range(nwhole)]) # Slowest bit ### icounts_this = np.array([pcounts[bin_lo1[wh_this[0][ii]]:bin_hi[wh_this[0][ii]]].sum() for ii in range(nwhole)]) # Slowest bit icounts_this = np.array([pcounts[a:b].sum() for a, b in zip(bin_lo1[wh_this], bin_hi[wh_this]) ]) #new_dxpix[wh_this] += dx_this new_counts[wh_this] += icounts_this # Divide out for average and return: return new_counts / new_fulldx
#- Currently unused (absorbed into PixelSpline), but kept for reference #- and maybe reviving if needed. # def _compute_duck_slopes(pixbound, flux): # """ # Compute the slope of the illuminating quadratic spline at # the locations of the 'ducks', i.e., the pixel boundaries, # given the integrated flux per unit baseline within the pixels. # # ARGUMENTS: # pixbound: (npix + 1) ndarray of pixel boundaries, in units of # wavelength or log-wavelength or frequency or whatever you like. # flux: (npix) ndarray of spectral flux (energy or counts) per # abscissa unit, averaged over the extent of the pixel # # RETURNS: # an (npix+1) ndarray of the slope of the underlying/illuminating # flux per unit abscissa spectrum at the position of the pixel # boundaries, a.k.a. 'ducks'. The end conditions are taken to # be zero slope, so the exterior points of the output are zeros. # """ # npix = len(flux) # # Test for correct argument dimensions: # if (len(pixbound) - npix) != 1: # print 'Need one more element in pixbound than in flux!' # return 0 # # The array of "delta-x" values: # dxpix = pixbound[1:] - pixbound[:-1] # # Test for monotonif increase: # if dxpix.min() <= 0.: # print 'Pixel boundaries not monotonically increasing!' # return 0 # # Encode the tridiagonal matrix that needs to be solved: # maindiag = (dxpix[:-1] + dxpix[1:]) / 3. # offdiag = dxpix[1:-1] / 6. # upperdiag = np.append(0., offdiag) # lowerdiag = np.append(offdiag, 0.) # band_matrix = np.vstack((upperdiag, maindiag, lowerdiag)) # # The right-hand side: # rhs = flux[1:] - flux[:-1] # # Solve the banded matrix and return: # acoeff = la.solve_banded((1,1), band_matrix, rhs) # acoeff = np.append(np.append(0., acoeff), 0.) # return acoeff