Source code for cana.spec

r"""Tools to handle asteroid spectra."""

from os.path import splitext, basename
from dataclasses import dataclass, field
from scipy.interpolate import UnivariateSpline
from sklearn.preprocessing import PolynomialFeatures
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import cross_val_score
from sklearn.metrics import mean_squared_error
import numpy as np


import matplotlib.pyplot as plt
import matplotlib
from .util import find_nearest, kwargupdate
from .specdata import SpectralData

# Omiting the warning for bad polynomial fitting
# import warnings
# warnings.simplefilter('ignore', np.RankWarning)


[docs]def loadspec(filename, unit='micron', r_error_col=None, masknull=True, label=None, **kwargs): r""" Load a spectrum file in units of *unit*. Returns a new Spectrum object. Parameters ---------- filename: str Path for the spectrum file unit: str The wavelength unit. Possible values are: 'micron', 'angstron', 'nanometer'. r_error_col: None or integer (optional) The column for the errors in the reflectance. masknull: boolean (optional) If True removes points where the wavelength is zero. label: None or string (optional) The spectrum label. If None it will take the file basename. **kwargs: Other arguments. See numpy.loadtxt Returns ------- spec: Spectrum object """ # setting default values for loading the spectrum file default_kwargs = {'unpack': True} for k in default_kwargs: if k not in kwargs: kwargs[k] = default_kwargs[k] # Loading the file using numpy.loadtxt spec_data = np.loadtxt(filename, **kwargs) # masking zero values in the wavelength array if masknull: mask = np.argwhere(spec_data[0] == 0) spec_data = np.delete(spec_data, mask, axis=1) # inserting it in as a Spectrum object if r_error_col is not None: r_error_col = spec_data[r_error_col] # setting the label if label is None: label = basename(splitext(filename)[0]) # creating Spectrum object with given data spec = Spectrum(spec_data[0], spec_data[1], r_unc=r_error_col, unit=unit, label=label) return spec
[docs]def stack_spec(tup): r""" Stack Spectra arrays. Parameters ---------- tup : array of Spectrum objects A sequence of Spectrum that must be the same shape along all but the second axis. 1-D arrays can be any length. Returns ------- stacked : Spectrum The Spectrum array formed by stacking the given spectra, sorted by the wavelength. """ wave = np.hstack([i.w for i in tup]) ref = np.hstack([i.r for i in tup]) r_unc_aux = [type(i.r_unc) for i in tup] if type(None) not in r_unc_aux: r_unc = np.hstack([i.r_unc for i in tup]) else: r_unc = None stacked = Spectrum(wave, ref, r_unc=r_unc, unit=tup[0].unit, label='_'.join(t.label for t in tup)) stacked = stacked.sort(order='w') return stacked
[docs]@dataclass(repr=False) class Spectrum(SpectralData): r"""Create a spectrum object. A spectrum array is a subclass of a record or structured numpy array, with one axis representing the wavelength vector (w) and other the reflectance (r). The optional Attributes ---------- w: numpy array array corresponding to the wavelength vector r: numpy array array corresponding to the relative reflectance of the asteroid unit: str The wavelength units. Default is 'microns'. r_unc: numpy array (optional) array corresponding to the relative reflectance uncertainty of the asteroid path: None or str (optional) The path of the spectrum file label: None or str The spectrum label res: float The spectrum resolution (number of points). Methods ------- trim fit autofit estimate_rms clean_spec mad rebin normalize mask_region save plot """ w: np.ndarray r: np.ndarray r_unc: np.ndarray = field(default=None) label: str = field(default='asteroid') unit: str = field(default='micron') def __post_init__(self): r"""Inicialize class.""" self.w = np.array(self.w, ndmin=1) self.r = np.array(self.r, ndmin=1) assert self.w.size == self.r.size if self.r_unc is None: dtype = ('w', 'r') else: self.r_unc = np.array(self.r_unc, ndmin=1) assert self.r_unc.size == self.r.size dtype = ('w', 'r', 'r_unc') SpectralData.__init__(self, self.w, dtype, self.label, unit=self.unit)
[docs] def angstrom2micron(self): r"""Convert wavenlength axis from angstrom to micron.""" self.w = self.w / 10000.0 self.unit = 'micron' return self
[docs] def micron2angstrom(self): r"""Convert wavenlength axis from micron to angstrom.""" self.w = self.w * 10000. self.unit = 'angstrom' return self
[docs] def fit(self, order=4, ftype='spline'): r""" Fit the spectrum using a polynomial or a smooth spline. Parameters ---------- order: int Order of the fitting. ftype: str Type of algorithm to use for the fitting. Options are: 'spline' or 'polynomial'. Returns ------- fspec: Spectrum Object The fitted spectrum array fcoefs: array-like the fitting coefficients """ # Performing the fit if ftype == 'spline': fcoefs = UnivariateSpline(self.w, self.r, k=order) fspec_y = fcoefs(self.w) elif ftype == 'polynomial': fcoefs = np.polyfit(self.w, self.r, order) fspec_y = np.polyval(fcoefs, self.w) y_err = np.abs(fspec_y - self.r) # building new array fspec = self.__class__(w=self.w, r=fspec_y, r_unc=y_err, unit=self.unit, label=self.label + '_fit') return fspec, fcoefs
[docs] def autofit(self, degree_min=1, degree_max=12): r""" Find the best order of the polynomial fitting of the spectrum. Parameters ---------- degree_min: int The minimal order for a fit degree_max: int The maximal order for a fit Returns ------- fspec: Spectrum Object The fitted spectrum array fcoefs: array-like the fitting coefficients """ # degrees which we will test the fitting degrees = np.arange(degree_min, degree_max+1) # calculation cross-validation score and error for each degree cross, _ = np.array([self._autofit_aux(deg) for deg in degrees]).T # getting minimun cross validation score aux = np.argmin(cross) bestfit = degrees[aux] # fitting spectrum fspec, fcoefs = self.fit(order=bestfit, ftype='polynomial') return fspec, fcoefs
def _autofit_aux(self, degree): r"""Auxiliary funtion for the autofit method.""" # creating polynomial and reshaping array for model validation polynom = PolynomialFeatures(degree=degree, include_bias=False) x_train = self.w.reshape((-1, 1)) x_train_trans = polynom.fit_transform(x_train) # Create the linear regression model and train model = LinearRegression() model.fit(x_train_trans, self.r) # Calculate the cross validation score cross_valid = cross_val_score(model, x_train_trans, self.r, scoring='neg_mean_squared_error', cv=10) # Training predictions and error y_train_predictions = model.predict(x_train_trans) training_error = mean_squared_error(self.r, y_train_predictions) return -np.mean(cross_valid), training_error
[docs] def estimate_rms(self, ftype='auto', order=5): r""" Estimate the signal-to-noise ratio in a spectrum. Parameters ---------- ftype: str Type of fitting for the snr estimation. Options are: 'spline', 'polynomial', 'auto'. Default is 'auto'. order: int Order of the adjust. Ignored if ftype is 'auto'. Returns ------- rms: float The estimated snr value """ # fitting the spectrum if ftype == 'auto': fspec, _ = self.autofit() else: fspec, _ = self.fit(order, ftype) # Estimating the SNR std_arr = np.abs(self.r - fspec.r) rms = np.std(std_arr) return rms
[docs] def clean_spec(self, method='sigmaclip', sigma=3, fit='auto'): r""" Remove outliers from the spectrum. Parameters ---------- method: str Method for detecting outliers. Currently only 'sigmaclip' available Default is 'sigmaclip'. sigma: int Remove points higher than sigma. fit: 'auto' or integer The order of the polynomial fit. If auto it will try to find automaticaly. Default is 'auto'. Returns ------- spec: Spectrum Spectrum object with outliers removed. """ if fit == 'auto': fspec, _ = self.autofit() else: fspec, _ = self.fit(order=fit, ftype='polynomial') if method == 'sigmaclip': cspec = np.divide(self.r, fspec.r) cspec_index = [self._sigma_clip(val, sigma=sigma, cspec=cspec) for val in cspec] aux = self[cspec_index] spec = self.__class__(aux.w, aux.r, r_unc=aux.r_unc, unit=self.unit, label=self.label + '_cleaned') return spec
def _sigma_clip(self, val, sigma, cspec): r"""Auxiliary method to perform sigma-clipping on array elements.""" if (np.median(cspec) - self._mad(cspec) * sigma < val) and \ (val < np.median(cspec) + self._mad(cspec)*sigma): return True return False
[docs] def mad(self, axis=None): r""" Calculate the median absolute deviation. Parameters ---------- axis: str 'wave', 'ref' or None. It will return the mad in the defined axis. If None, than returns the mad in both axis Returns ------- The median absolute deviation """ if axis is not None: return self._mad(axis) return self._mad('w'), self._mad('r')
@staticmethod def _mad(arr): r"""Auxiliary function for calculating the MAD.""" return np.median(np.abs(arr - np.median(arr)))
[docs] def rebin(self, binsize=11, method='median', std=True, rem_trend=False): r""" Rebin the spectrum. Parameters ---------- binsize: int The number of points in the bin. method: str The method for the rebinning. Options are:'mean and 'median'. std: boolean If True, also returns the deviation. In the case of the median, returns the MAD (median absolute deviation). rem_trend: boolean Returns ------- The rebined spectrum """ spec_size = len(self.w) y_stop = (spec_size//binsize)*binsize wave_arr = self.w[:y_stop] ref_arr = self.r[:y_stop] if method == 'median': func = np.median std_func = np.std # ->>>>>>change later if method == 'mean': func = np.mean std_func = np.std wave_reb = func(wave_arr.reshape(spec_size // binsize, binsize), axis=-1).T ref_reb = func(ref_arr.reshape(spec_size // binsize, binsize), axis=-1).T if rem_trend: fspec = self.autofit()[0] ref_arr = np.divide(ref_arr, fspec.r[:y_stop]) if std: std = std_func(ref_arr.reshape(spec_size // binsize, binsize), axis=-1).T std = np.array(std) else: std = None return self.__class__(w=wave_reb, r=ref_reb, r_unc=std, unit=self.unit, label=self.label + '_binned')
[docs] def ref_from_wavelength(self, w, interpolate=True): r""" Get the spectrum reflectance in a particular wavelength. Parameters ---------- w: float Wavelength value If interpolate=False, The code will search the closest value. interpolate: boolean (optional) If interpolate=False, The code will search the closest value. If True it will interpolate the value of w. Returns ------- The reflectance value """ if not interpolate: aux = find_nearest(self.w, w)[0] ref = self.r[aux] else: ref = np.interp(w, self.w, self.r) return ref
[docs] def normalize(self, wnorm=0.55, window=None, interpolate=True): r""" Normalize the spectrum in a particular wavelength. Parameters ---------- wnorm: float Wavelength value to normalize the spectrum. If interpolate=False, The code will search the closest value. window: None or float (optional) The wavelenght window size for normalizing. If None it will normalize in the wnorm point only. interpolate: boolean (optional) If interpolate=False, The code will search the closest value. If True it will interpolate the value of wnorm. Returns ------- The normalized Spectrum """ if window is None: norm_factor = self.ref_from_wavelength(wnorm, interpolate=interpolate) else: aux = np.argwhere((self.w > wnorm-window) & (self.w < wnorm+window)) norm_factor = np.mean(self.r[aux]) self.r = self.r / norm_factor if self.r_unc is not None: self.r_unc = self.r_unc / norm_factor return self
[docs] def mask_region(self, region=[(1.3, 1.45), (1.8, 1.95)]): r""" Exclude a region of the spectrum. Parameters ---------- w_min: float Wavelength lower limit of the masked region w_max: float Wavelength upper limit of the masked region Returns ------- masked_spec: Spectrum The Spectrum array without the masked region """ if isinstance(region[0], (float, int)): masked_spec = self.mask_region_aux(self, wmin=region[0], wmax=region[1]) else: masked_spec = self for rr in region: masked_spec = self.mask_region_aux(masked_spec, wmin=rr[0], wmax=rr[1]) return masked_spec
[docs] def mask_region_aux(self, spec, wmin, wmax): r""" Exclude a region of the spectrum. Parameters ---------- w_min: float Wavelength lower limit of the masked region w_max: float Wavelength upper limit of the masked region Returns ------- The Spectrum array without the masked region """ aux = np.argwhere((spec.w > wmin) & (spec.w < wmax)) mask = np.ones(len(spec.w), dtype=bool) mask[aux] = 0 w = spec.w[mask] r = spec.r[mask] r_unc = spec.r_unc if r_unc is not None: r_unc = spec.r_unc[mask] return self.__class__(w=w, r=r, r_unc=r_unc, unit=spec.unit, label=spec.label)
[docs] def plot(self, fax=None, show=False, savefig=None, axistitles=True, speckwargs=None, legendkwargs=None): r""" Quick plot of the Spectrum. Parameters ---------- fax (Optional): matplotlib.axes If desired to subplot image in a figure. Default is 'None', which will open a new plt.figure() show (Optional): boolean True if want to plt.show(). Default is True. savefig (Optional): str The path to save the figure. If set to None, wont save the figure. Default is None axistitles: boolean If True will label the axis. Default is True. speckwargs: dict Arguments for matplotlib plot function. default values: {'c':'0.9', 'lw':'1'}. legendkwargs: dict Arguments for matplotlib legend function. default values: {'loc':'best'}. Returns ------- the matplotlib.axes of the figure """ # checking if plot in another frame if fax is None: fig = plt.figure() fax = plt.gca() # setting default values for image plot with matplotlib specsty_defaults = {'c': '0.1', 'lw': 1} legendsty_defaults = {'loc': 'best'} # updating plot styles speckwargs = kwargupdate(specsty_defaults, speckwargs) legendkwargs = kwargupdate(legendsty_defaults, legendkwargs) # Ploting the spec fax.errorbar(self.w, self.r, yerr=self.r_unc, **speckwargs) # Checking if desired to plot the axis labels if axistitles: if self.unit == 'micron': unit_label = '$\mu$m' fax.set_xlabel('Wavelength (%s)' % unit_label) fax.set_ylabel('Reflectance') # plot legend? if 'label' in speckwargs: fax.legend(**legendkwargs) # check if save the image if savefig is not None: plt.savefig(savefig) if not show: plt.clf() matplotlib.use('TkAgg') # show in the matplotlib window? if show: plt.show()
# return fax