Source code for prospect.specutils

# Licensed under a 3-clause BSD style license - see LICENSE.rst
# -*- coding: utf-8 -*-
"""
prospect.specutils
==================

Conversions of DESI & SDSS spectra into specutils-compatible objects.

"""
import os
import re

import numpy as np
import astropy.units as u
from astropy.nddata import InverseVariance
from astropy.io import fits
from astropy.table import Table
from astropy.wcs import WCS
from specutils import SpectrumList, Spectrum1D

_desiutil_imported = True
try:
    from desiutil.depend import add_dependencies
    from desiutil.io import encode_table
except ImportError:
    _desiutil_imported = False

_desispec_imported = True
try:
    from desispec.maskbits import specmask
    from desispec.resolution import Resolution
    from desispec.io.util import fitsheader, native_endian, add_columns
    from desispec.io.frame import read_frame
    from desispec.io.fibermap import fibermap_comments
except ImportError:
    _desispec_imported = False

from .utilities import _coadd


[docs]class Spectra(SpectrumList): """Represents a grouping of spectra. This class contains an "extended" fibermap that has information about the night and exposure of each spectrum. For each band, this class has the wavelength grid, flux, ivar, mask, and resolution arrays. Parameters ---------- bands : :class:`list` List of strings used to identify the bands. wave : :class:`dict` Dictionary of arrays specifying the wavelength grid. flux : :class:`dict` Dictionary of arrays specifying the flux for each spectrum. ivar : :class:`dict` Dictionary of arrays specifying the inverse variance. mask : :class:`dict`, optional Dictionary of arrays specifying the bitmask. resolution_data : :class:`dict`, optional Dictionary of arrays specifying the block diagonal resolution matrix. The object for each band must be in one of the formats supported by the Resolution class constructor. fibermap Extended fibermap to use. If not specified, a fake one is created. meta : :class:`dict`, optional Dictionary of arbitrary properties. extra : :class:`dict`, optional Optional dictionary of dictionaries containing extra floating point arrays. The top-level is a dictionary over bands and each value is a dictionary containing string keys and values which are arrays of the same size as the flux array. single : :class:`bool`, optional If ``True``, store data in memory as single precision. scores QA scores table. """ def __init__(self, bands=[], wave={}, flux={}, ivar={}, mask=None, resolution_data=None, fibermap=None, meta=None, extra=None, single=False, scores=None): self._bands = tuple(bands) self._single = single self._ftype = np.float64 if single: self._ftype = np.float32 self._reset_properties() self.scores = scores if meta is None: self.meta = dict() elif isinstance(meta, fits.Header): self.meta = {'header': meta} else: self.meta = meta.copy() nspec = 0 # check consistency of input dimensions for b in self._bands: if wave[b].ndim != 1: raise ValueError("Wavelength array for band {} should have shape (Nwave, ).".format(b)) if flux[b].ndim != 2: raise ValueError("Flux array for band {} should have shape (Nspec, Nwave).".format(b)) if flux[b].shape[1] != wave[b].shape[0]: raise ValueError("Flux array wavelength dimension for band {} does not match wavelength grid.".format(b)) if nspec is None: nspec = flux[b].shape[0] if fibermap is not None: if len(fibermap) != flux[b].shape[0]: raise ValueError("Flux array number of spectra for band {} does not match fibermap.".format(b)) if ivar[b].shape != flux[b].shape: raise ValueError("Inverse variance array dimensions do not match flux for band {}.".format(b)) if mask is not None: if mask[b].shape != flux[b].shape: raise ValueError("Mask array dimensions do not match flux for band {}.".format(b)) if mask[b].dtype not in (int, np.int64, np.int32, np.uint64, np.uint32): raise ValueError("Bad mask type {}.".format(mask.dtype)) if resolution_data is not None: if resolution_data[b].ndim != 3: raise ValueError("Resolution array for band {} should have shape (Nspec, Ndiag, Nwave).".format(b)) if resolution_data[b].shape[0] != flux[b].shape[0]: raise ValueError("Resolution array spectrum dimension for band {} does not match flux.".format(b)) if resolution_data[b].shape[2] != wave[b].shape[0]: raise ValueError("Resolution array wavelength dimension for band {} does not match wavelength grid.".format(b)) if extra is not None: for ex in extra[b].items(): if ex[1].shape != flux[b].shape: raise ValueError("Extra arrays must have the same shape as the flux array.") if fibermap is not None: self.fibermap = fibermap.copy() else: self.fibermap = None # copy band-based data for b in self._bands: band_meta = dict() if mask is None: band_meta['mask'] = None bool_mask = None else: band_meta['mask'] = np.copy(mask[b]) bool_mask = band_meta['mask'] != 0 if resolution_data is None: band_meta['resolution_data'] = None band_meta['R'] = None else: band_meta['resolution_data'] = np.copy(resolution_data[b].astype(self._ftype)) band_meta['R'] = np.array([Resolution(r) for r in resolution_data[b]]) if extra is None: band_meta['extra'] = None else: band_meta['extra'] = dict() for k, v in extra[b].items(): band_meta['extra'][k] = np.copy(v.astype(self._ftype)) self.append(Spectrum1D(spectral_axis=np.copy(wave[b].astype(self._ftype))*u.Angstrom, flux=np.copy(flux[b].astype(self._ftype))*u.Unit('10**-17 erg/(s cm2 Angstrom)'), uncertainty=InverseVariance(np.copy(ivar[b].astype(self._ftype))), mask=bool_mask, meta=band_meta)) @property def bands(self): """ (list): the list of valid bands. """ return self._bands @property def ftype(self): """ (numpy.dtype): the data type used for floating point numbers. """ return self._ftype @property def wave(self): if self._wave is None: foo = self.wavelength_grid(self.bands[0]) return self._wave @property def flux(self): if self._flux is None: self._flux = dict() for band in self.bands: self._flux[band] = self[self.bands.index(band)].flux return self._flux @property def ivar(self): if self._ivar is None: self._ivar = dict() for band in self.bands: self._ivar[band] = self[self.bands.index(band)].uncertainty return self._ivar @property def mask(self): if self._mask is None: self._mask = dict() for band in self.bands: self._mask[band] = self[self.bands.index(band)].meta['mask'] return self._mask @property def resolution_data(self): if self._resolution_data is None: if self[0].meta['resolution_data'] is None: return None self._resolution_data = dict() for band in self.bands: self._resolution_data[band] = self[self.bands.index(band)].meta['resolution_data'] return self._resolution_data @property def R(self): if self._R is None: if self[0].meta['R'] is None: return None self._R = dict() for band in self.bands: self._R[band] = self[self.bands.index(band)].meta['R'] return self._R @property def extra(self): if self._extra is None: if self[0].meta['extra'] is None: return None self._extra = dict() for band in self.bands: self._extra[band] = self[self.bands.index(band)].meta['extra'] return self._extra def _reset_properties(self): self._wave = None self._flux = None self._ivar = None self._mask = None self._resolution_data = None self._R = None self._extra = None
[docs] def wavelength_grid(self, band): """ Return the wavelength grid for a band. Args: band (str): the name of the band. Returns (array): an array containing the wavelength values. """ if band not in self.bands: raise KeyError("{} is not a valid band.".format(band)) if self._wave is None: self._wave = dict() for band in self.bands: self._wave[band] = self[self.bands.index(band)].spectral_axis.value return self._wave[band]
[docs] def target_ids(self): """ Return list of unique target IDs. The target IDs are sorted by the order that they first appear. Returns (array): an array of integer target IDs. """ uniq, indices = np.unique(self.fibermap["TARGETID"], return_index=True) return uniq[indices.argsort()]
[docs] def num_spectra(self): """ Get the number of spectra contained in this group. Returns (int): Number of spectra contained in this group. """ if self.fibermap is not None: return len(self.fibermap) else: return 0
[docs] def num_targets(self): """ Get the number of distinct targets. Returns (int): Number of unique targets with spectra in this object. """ if self.fibermap is not None: return len(np.unique(self.fibermap["TARGETID"])) else: return 0
[docs] def select(self, nights=None, bands=None, targets=None, fibers=None, invert=False): """ Select a subset of the data. This filters the data based on a logical AND of the different criteria, optionally inverting that selection. Args: nights (list): optional list of nights to select. bands (list): optional list of bands to select. targets (list): optional list of target IDs to select. fibers (list): list/array of fiber indices to select. invert (bool): after combining all criteria, invert selection. Returns (Spectra): a new Spectra object containing the selected data. """ keep_bands = None if bands is None: keep_bands = self.bands else: keep_bands = [ x for x in self.bands if x in bands ] if len(keep_bands) == 0: raise ValueError("No valid bands were selected!") keep_nights = None if nights is None: keep_nights = [ True for x in self.fibermap["NIGHT"] ] else: keep_nights = [ (x in nights) for x in self.fibermap["NIGHT"] ] if sum(keep_nights) == 0: raise ValueError("No valid nights were selected!") keep_targets = None if targets is None: keep_targets = [ True for x in self.fibermap["TARGETID"] ] else: keep_targets = [ (x in targets) for x in self.fibermap["TARGETID"] ] if sum(keep_targets) == 0: raise ValueError("No valid targets were selected!") keep_fibers = None if fibers is None: keep_fibers = [ True for x in self.fibermap["FIBER"] ] else: keep_fibers = [ (x in fibers) for x in self.fibermap["FIBER"] ] if sum(keep_fibers) == 0: raise ValueError("No valid fibers were selected!") keep_rows = [ (x and y and z) for x, y, z in zip(keep_nights, keep_targets, keep_fibers) ] if invert: keep_rows = [ not x for x in keep_rows ] keep = [ i for i, x in enumerate(keep_rows) if x ] if len(keep) == 0: raise ValueError("Selection has no spectra!") keep_wave = {} keep_flux = {} keep_ivar = {} keep_mask = None keep_res = None keep_extra = None if self.mask is not None: keep_mask = {} if self.resolution_data is not None: keep_res = {} if self.extra is not None: keep_extra = {} for b in keep_bands: keep_wave[b] = self.wave[b] keep_flux[b] = self.flux[b][keep,:] keep_ivar[b] = self.ivar[b][keep,:] if self.mask is not None: keep_mask[b] = self.mask[b][keep,:] if self.resolution_data is not None: keep_res[b] = self.resolution_data[b][keep,:,:] if self.extra is not None: keep_extra[b] = {} for k, v in self.extra[b].items(): keep_extra[b][k] = v[keep,:] return Spectra(keep_bands, keep_wave, keep_flux, keep_ivar, mask=keep_mask, resolution_data=keep_res, fibermap=self.fibermap[keep], meta=self.meta, extra=keep_extra, single=self._single, scores=self.scores[keep])
[docs] def update(self, other): """ Overwrite or append new data. Given another Spectra object, compare the fibermap information with the existing one. For spectra that already exist, overwrite existing data with the new values. For spectra that do not exist, append that data to the end of the spectral data. Args: other (Spectra): the new data to add. Returns: nothing (object updated in place). """ if not isinstance(other, Spectra): raise ValueError("New data has incorrect type!") # Does the other Spectra object have any data? if other.num_spectra() == 0: return # Do we have new bands to add? newbands = [] for b in other.bands: if b not in self.bands: newbands.append(b) else: if not np.allclose(self.wave[b], other.wave[b]): raise ValueError("Band {} has an incompatible wavelength grid.".format(b)) bands = list(self.bands) bands.extend(newbands) # Are we adding mask data in this update? add_mask = False if other.mask is None: if self.mask is not None: raise ValueError("Existing spectra has a mask, cannot " "update it to a spectra with no mask.") else: if self.mask is None: add_mask = True # Are we adding resolution data in this update? ndiag = {} add_res = False if other.resolution_data is None: if self.resolution_data is not None: raise ValueError("Existing spectra has resolution data, cannot " "update it to a spectra with none.") else: if self.resolution_data is not None: for b in self.bands: ndiag[b] = self.resolution_data[b].shape[1] for b in other.bands: odiag = other.resolution_data[b].shape[1] if b not in self.bands: ndiag[b] = odiag else: if odiag != ndiag[b]: raise ValueError("Resolution matrices for a" " given band must have the same dimensions.") else: add_res = True for b in other.bands: ndiag[b] = other.resolution_data[b].shape[1] # Are we adding extra data in this update? add_extra = False if other.extra is None: if self.extra is not None: raise ValueError("Existing spectra has extra data, cannot " "update it to a spectra with none.") else: if self.extra is None: add_extra = True # Compute which targets / exposures are new nother = len(other.fibermap) exists = np.zeros(nother, dtype=np.int) indx_original = [] if self.fibermap is not None: for r in range(nother): expid = other.fibermap[r]["EXPID"] fiber = other.fibermap[r]["FIBER"] for i, row in enumerate(self.fibermap): if (expid == row["EXPID"]) and (fiber == row["FIBER"]): indx_original.append(i) exists[r] += 1 if len(np.where(exists > 1)[0]) > 0: raise ValueError("Found duplicate spectra (same EXPID and FIBER) in the fibermap.") indx_exists = np.where(exists == 1)[0] indx_new = np.where(exists == 0)[0] # Make new data arrays of the correct size to hold both the old and # new data nupdate = len(indx_exists) nnew = len(indx_new) if self.fibermap is None: nold = 0 newfmap = other.fibermap.copy() else: nold = len(self.fibermap) newfmap = encode_table(np.zeros( (nold + nnew, ), dtype=self.fibermap.dtype)) if self.scores is None: if other.scores is None: newscores = None else: newscores = other.scores.copy() else: newscores = encode_table(np.zeros( (nold + nnew, ), dtype=self.scores.dtype)) newwave = {} newflux = {} newivar = {} newmask = None if add_mask or self.mask is not None: newmask = {} newres = None newR = None if add_res or self.resolution_data is not None: newres = {} newR = {} newextra = None if add_extra or self.extra is not None: newextra = {} for b in bands: nwave = None if b in self.bands: nwave = self.wave[b].shape[0] newwave[b] = self.wave[b] else: nwave = other.wave[b].shape[0] newwave[b] = other.wave[b].astype(self._ftype) newflux[b] = np.zeros( (nold + nnew, nwave), dtype=self._ftype) newivar[b] = np.zeros( (nold + nnew, nwave), dtype=self._ftype) if newmask is not None: newmask[b] = np.zeros( (nold + nnew, nwave), dtype=np.uint32) newmask[b][:,:] = specmask["NODATA"] if newres is not None: newres[b] = np.zeros( (nold + nnew, ndiag[b], nwave), dtype=self._ftype) if newextra is not None: newextra[b] = {} # Copy the old data if nold > 0: # We have some data (i.e. we are not starting with an empty Spectra) newfmap[:nold] = self.fibermap if newscores is not None: newscores[:nold] = self.scores for b in self.bands: newflux[b][:nold,:] = self.flux[b] newivar[b][:nold,:] = self.ivar[b] if self.mask is not None: newmask[b][:nold,:] = self.mask[b] elif add_mask: newmask[b][:nold,:] = 0 if self.resolution_data is not None: newres[b][:nold,:,:] = self.resolution_data[b] if self.extra is not None: for ex in self.extra[b].items(): newextra[b][ex[0]] = np.zeros( newflux[b].shape, dtype=self._ftype) newextra[b][ex[0]][:nold,:] = ex[1] # Update existing spectra for i, s in enumerate(indx_exists): row = indx_original[i] for b in other.bands: newflux[b][row,:] = other.flux[b][s,:].astype(self._ftype) newivar[b][row,:] = other.ivar[b][s,:].astype(self._ftype) if other.mask is not None: newmask[b][row,:] = other.mask[b][s,:] else: newmask[b][row,:] = 0 if other.resolution_data is not None: newres[b][row,:,:] = other.resolution_data[b][s,:,:].astype(self._ftype) if other.extra is not None: for ex in other.extra[b].items(): if ex[0] not in newextra[b]: newextra[b][ex[0]] = np.zeros(newflux[b].shape, dtype=self._ftype) newextra[b][ex[0]][row,:] = ex[1][s,:].astype(self._ftype) # Append new spectra if nnew > 0: newfmap[nold:] = other.fibermap[indx_new] if newscores is not None: newscores[nold:] = other.scores[indx_new] for b in other.bands: newflux[b][nold:,:] = other.flux[b][indx_new].astype(self._ftype) newivar[b][nold:,:] = other.ivar[b][indx_new].astype(self._ftype) if other.mask is not None: newmask[b][nold:,:] = other.mask[b][indx_new] else: newmask[b][nold:,:] = 0 if other.resolution_data is not None: newres[b][nold:,:,:] = other.resolution_data[b][indx_new].astype(self._ftype) if other.extra is not None: for ex in other.extra[b].items(): if ex[0] not in newextra[b]: newextra[b][ex[0]] = np.zeros(newflux[b].shape, dtype=self._ftype) newextra[b][ex[0]][nold:,:] = ex[1][indx_new].astype(self._ftype) # Swap data into place self._bands = bands self.fibermap = newfmap self.scores = newscores self._reset_properties() for i, b in enumerate(self._bands): band_meta = dict() if newmask is None: band_meta['mask'] = None bool_mask = None else: band_meta['mask'] = newmask[b] bool_mask = band_meta['mask'] != 0 if newres is None: band_meta['resolution_data'] = None band_meta['R'] = None else: band_meta['resolution_data'] = newres[b] band_meta['R'] = np.array([Resolution(r) for r in newres[b]]) if newextra is None: band_meta['extra'] = None else: band_meta['extra'] = dict() for k, v in newextra[b].items(): band_meta['extra'][k] = v s = Spectrum1D(spectral_axis=newwave[b]*u.Angstrom, flux=newflux[b]*u.Unit('10**-17 erg/(s cm2 Angstrom)'), uncertainty=InverseVariance(newivar), mask=bool_mask, meta=band_meta) try: self[i] = s except IndexError: self.append(s) return
[docs]def write_spectra(outfile, spec, units=None): """ Write Spectra object to FITS file. This places the metadata into the header of the (empty) primary HDU. The first extension contains the fibermap, and then HDUs are created for the different data arrays for each band. Floating point data is converted to 32 bits before writing. Args: outfile (str): path to write spec (Spectra): the object containing the data units (str): optional string to use for the BUNIT key of the flux HDUs for each band. Returns: The absolute path to the file that was written. """ outfile = os.path.abspath(outfile) # Create the parent directory, if necessary. dir, base = os.path.split(outfile) if not os.path.exists(dir): os.makedirs(dir) # Create HDUs from the data all_hdus = fits.HDUList() # metadata goes in empty primary HDU hdr = fitsheader(spec.meta) if _desiutil_imported: add_dependencies(hdr) all_hdus.append(fits.PrimaryHDU(header=hdr)) # Next is the fibermap fmap = spec.fibermap.copy() fmap.meta["EXTNAME"] = "FIBERMAP" hdu = fits.convenience.table_to_hdu(fmap) # Add comments for fibermap columns. for i, colname in enumerate(fmap.dtype.names): if colname in fibermap_comments: key = "TTYPE{}".format(i+1) name = hdu.header[key] assert name == colname comment = fibermap_comments[name] hdu.header[key] = (name, comment) else: pass #print('Unknown comment for {}'.format(colname)) all_hdus.append(hdu) # Now append the data for all bands for band in spec.bands: hdu = fits.ImageHDU(name="{}_WAVELENGTH".format(band.upper())) hdu.header["BUNIT"] = "Angstrom" hdu.data = spec.wave[band].astype("f8") all_hdus.append(hdu) hdu = fits.ImageHDU(name="{}_FLUX".format(band.upper())) if units is None: hdu.header["BUNIT"] = "10**-17 erg/(s cm2 Angstrom)" else: hdu.header["BUNIT"] = units hdu.data = spec.flux[band].astype("f4") all_hdus.append(hdu) hdu = fits.ImageHDU(name="{}_IVAR".format(band.upper())) if units is None: hdu.header["BUNIT"] = '10**+34 (s2 cm4 Angstrom2) / erg2' else: hdu.header["BUNIT"] = ((u.Unit(units, format='fits'))**-2).to_string('fits') hdu.data = spec.ivar[band].astype("f4") all_hdus.append(hdu) if spec.mask is not None: # hdu = fits.CompImageHDU(name="{}_MASK".format(band.upper())) hdu = fits.ImageHDU(name="{}_MASK".format(band.upper())) hdu.data = spec.mask[band].astype(np.uint32) all_hdus.append(hdu) if spec.resolution_data is not None: hdu = fits.ImageHDU(name="{}_RESOLUTION".format(band.upper())) hdu.data = spec.resolution_data[band].astype("f4") all_hdus.append(hdu) if spec.extra is not None: for ex in spec.extra[band].items(): hdu = fits.ImageHDU(name="{}_{}".format(band.upper(), ex[0])) hdu.data = ex[1].astype("f4") all_hdus.append(hdu) if spec.scores is not None : scores_tbl = encode_table(spec.scores) #- unicode -> bytes scores_tbl.meta['EXTNAME'] = 'SCORES' all_hdus.append( fits.convenience.table_to_hdu(scores_tbl) ) if spec.scores_comments is not None : # add comments in header hdu=all_hdus['SCORES'] for i in range(1,999): key = 'TTYPE'+str(i) if key in hdu.header: value = hdu.header[key] if value in spec.scores_comments.keys() : hdu.header[key] = (value, spec.scores_comments[value]) all_hdus.writeto("{}.tmp".format(outfile), overwrite=True, checksum=True) os.rename("{}.tmp".format(outfile), outfile) return outfile
[docs]def read_spectra(infile, single=False, coadd=None): """Read Spectra object from FITS file. This reads data written by the write_spectra function. A new Spectra object is instantiated and returned. Args: infile (str): path to read single (bool): if True, keep spectra as single precision in memory. coadd (array-like): if set, coadd all spectra from the provided targetids. Returns (Spectra): The object containing the data read from disk. """ ftype = np.float64 if single: ftype = np.float32 infile = os.path.abspath(infile) if not os.path.isfile(infile): raise FileNotFoundError("{} is not a file!".format(infile)) # initialize data objects bands = [] fmap = None wave = None flux = None ivar = None mask = None res = None extra = None scores = None with fits.open(infile, mode="readonly") as hdulist: nhdu = len(hdulist) # load the metadata. meta = hdulist[0].header # For efficiency, go through the HDUs in disk-order. Use the # extension name to determine where to put the data. We don't # explicitly copy the data, since that will be done when constructing # the Spectra object. for h in range(1, nhdu): name = hdulist[h].header["EXTNAME"] if name == "FIBERMAP": fmap = encode_table(Table(hdulist[h].data, copy=True).as_array()) elif name == "SCORES": scores = encode_table(Table(hdulist[h].data, copy=True).as_array()) else: # Find the band based on the name mat = re.match(r"(.*)_(.*)", name) if mat is None: raise RuntimeError("FITS extension name {} does not contain the band".format(name)) band = mat.group(1).lower() type = mat.group(2) if band not in bands: bands.append(band) if type == "WAVELENGTH": if wave is None: wave = {} wave[band] = native_endian(hdulist[h].data.astype(ftype)) elif type == "FLUX": if flux is None: flux = {} flux[band] = native_endian(hdulist[h].data.astype(ftype)) elif type == "IVAR": if ivar is None: ivar = {} ivar[band] = native_endian(hdulist[h].data.astype(ftype)) elif type == "MASK": if mask is None: mask = {} mask[band] = native_endian(hdulist[h].data.astype(np.uint32)) elif type == "RESOLUTION": if res is None: res = {} res[band] = native_endian(hdulist[h].data.astype(ftype)) else: # this must be an "extra" HDU if extra is None: extra = {} if band not in extra: extra[band] = {} extra[band][type] = native_endian(hdulist[h].data.astype(ftype)) if coadd is not None: uniq, indices = np.unique(coadd, return_index=True) targetids = uniq[indices.argsort()] ntargets = len(targetids) cwave = dict() cflux = dict() civar = dict() crdat = dict() cmask = dict() for channel in bands: cwave[channel] = wave[channel].copy() nwave = len(cwave[channel]) cflux[channel] = np.zeros((ntargets, nwave)) civar[channel] = np.zeros((ntargets, nwave)) ndiag = res[channel].shape[1] crdat[channel] = np.zeros((ntargets, ndiag, nwave)) cmask[channel] = np.zeros((ntargets, nwave), dtype=mask[channel].dtype) #- Loop over targets, coadding all spectra for each target fibermap = Table(dtype=fmap.dtype) for i, targetid in enumerate(targetids): ii = np.where(fmap['TARGETID'] == targetid)[0] fibermap.add_row(fmap[ii[0]]) for channel in bands: if len(ii) > 1: outwave, outflux, outivar, outrdat = _coadd( wave[channel], flux[channel][ii], ivar[channel][ii], res[channel][ii] ) outmask = mask[channel][ii[0]] for j in range(1, len(ii)): outmask |= mask[channel][ii[j]] else: outwave, outflux, outivar, outrdat = ( wave[channel], flux[channel][ii[0]], ivar[channel][ii[0]], res[channel][ii[0]] ) outmask = mask[channel][ii[0]] cflux[channel][i] = outflux civar[channel][i] = outivar crdat[channel][i] = outrdat cmask[channel][i] = outmask return Spectra(bands, cwave, cflux, civar, mask=cmask, resolution_data=crdat, fibermap=fibermap, meta=meta, extra=extra, single=single, scores=scores) # Construct the Spectra object from the data. If there are any # inconsistencies in the sizes of the arrays read from the file, # they will be caught by the constructor. return Spectra(bands, wave, flux, ivar, mask=mask, resolution_data=res, fibermap=fmap, meta=meta, extra=extra, single=single, scores=scores)
[docs]def read_spPlate(filename, limit=None): """Read a SDSS spPlate file. Parameters ---------- filename : :class:`str` Name of the spPlate file. limit : :class:`int`, optional If set, only return the first `limit` spectra. Returns ------- Spectrum1D The spectra. """ with fits.open(filename) as hdulist: header = hdulist[0].header meta = {'header': header} try: flux_unit = u.Unit(hdulist[0].header['BUNIT']) except ValueError: flux_unit = u.Unit('1e-17 erg / (Angstrom cm2 s)') if limit is None: limit = header['NAXIS2'] flux = hdulist[0].data[0:limit, :] * flux_unit wcs = WCS(header) dispersion_unit = u.Unit('Angstrom') dispersion = 10**wcs.all_pix2world(np.vstack((np.arange(flux.shape[1]), np.zeros((flux.shape[1],)))).T, 0)[:, 0] uncertainty = InverseVariance(hdulist[1].data[0:limit, :]) mask = hdulist[2].data[0:limit, :] != 0 meta['plugmap'] = Table.read(hdulist[5])[0:limit] return Spectrum1D(flux=flux, spectral_axis=dispersion*dispersion_unit, uncertainty=uncertainty, meta=meta, mask=mask)
[docs]def read_spZbest(filename, limit=None): """Read a SDSS spZbest file. Parameters ---------- filename : :class:`str` Name of the spZbest file. limit : :class:`int`, optional If set, only return the first `limit` spectra. Returns ------- tuple A Table containing the redshift values and a Spectrum1D object containing the best-fit models. """ with fits.open(filename) as hdulist: header = hdulist[0].header meta = {'header': header} redshifts = Table.read(hdulist[1]) if limit is None: limit = hdulist[2].header['NAXIS2'] redshifts = redshifts[0:limit] flux_unit = u.Unit('1e-17 erg / (Angstrom cm2 s)') flux = hdulist[2].data[0:limit, :] * flux_unit dispersion = 10**(header['CRVAL1'] + header['CD1_1'] * np.arange(hdulist[2].header['NAXIS1'], dtype=hdulist[2].data.dtype)) dispersion_unit = u.Unit('Angstrom') models = Spectrum1D(flux=flux, spectral_axis=dispersion*dispersion_unit, meta=meta) return redshifts, models
[docs]def read_frame_as_spectra(filename, night=None, expid=None, band=None, single=False): """ Read a FITS file containing a Frame and return a Spectra. A Frame file is very close to a Spectra object (by design), and only differs by missing the NIGHT and EXPID in the fibermap, as well as containing only one band of data. Args: infile (str): path to read Options: night (int): the night value to use for all rows of the fibermap. expid (int): the expid value to use for all rows of the fibermap. band (str): the name of this band. single (bool): if True, keep spectra as single precision in memory. Returns (Spectra): The object containing the data read from disk. """ fr = read_frame(filename) if fr.fibermap is None: raise RuntimeError("reading Frame files into Spectra only supported if a fibermap exists") nspec = len(fr.fibermap) if band is None: band = fr.meta['camera'][0] if night is None: night = fr.meta['night'] if expid is None: expid = fr.meta['expid'] fmap = np.asarray(fr.fibermap.copy()) fmap = add_columns(fmap, ['NIGHT', 'EXPID', 'TILEID'], [np.int32(night), np.int32(expid), np.int32(fr.meta['TILEID'])], ) fmap = encode_table(fmap) bands = [ band ] mask = None if fr.mask is not None: mask = {band : fr.mask} res = None if fr.resolution_data is not None: res = {band : fr.resolution_data} extra = None if fr.chi2pix is not None: extra = {band : {"CHI2PIX" : fr.chi2pix}} spec = Spectra(bands, {band : fr.wave}, {band : fr.flux}, {band : fr.ivar}, mask=mask, resolution_data=res, fibermap=fmap, meta=fr.meta, extra=extra, single=single, scores=fr.scores) return spec