Source code for trident.absorption_spectrum.absorption_spectrum

"""
AbsorptionSpectrum class and member functions.



"""

#-----------------------------------------------------------------------------
# Copyright (c) 2013-2017, yt Development Team.
# Copyright (c) 2017, Trident Development Team.
#
# Distributed under the terms of the Modified BSD License.
#
# The full license is in the file LICENSE, distributed with this software.
#-----------------------------------------------------------------------------

from yt.utilities.on_demand_imports import _h5py as h5py
import numpy as np

from yt.data_objects.data_containers import \
    YTDataContainer
from yt.data_objects.static_output import \
    Dataset
from yt.loaders import load
from yt.funcs import get_pbar, mylog
from yt.units.yt_array import YTArray, YTQuantity
from yt.utilities.on_demand_imports import _astropy
from yt.utilities.parallel_tools.parallel_analysis_interface import \
    _get_comm, \
    parallel_objects, \
    parallel_root_only
from yt.utilities.physical_constants import \
    boltzmann_constant_cgs, \
    speed_of_light_cgs

from trident.absorption_spectrum.absorption_line import \
    tau_profile

pyfits = _astropy.pyfits

_bin_space_units = {'wavelength': 'angstrom',
                    'velocity': 'km/s'}
c_kms = speed_of_light_cgs.to('km/s')

[docs]class AbsorptionSpectrum(object): r"""Base class for generating absorption spectra. This code was originally based in yt and more restrictive in terms of what development was allowed, so the :class:`~trident.SpectrumGenerator` subclass has more advanced functionality built on top of this. The base algorithm and functionality for spectral generation occurs here though. .. note:: The preferred method for generating spectra is using :class:`~trident.SpectrumGenerator`. **Parameters** :lambda_min: float, YTQuantity, or 'auto' lower wavelength bound in angstroms or velocity bound in km/s (if bin_space set to 'velocity'). If set to 'auto', the lower bound will be automatically adjusted to encompass all absorption lines. The window will not be expanded for continuum features, only absorption lines. :lambda_max: float, YTQuantity, or 'auto' upper wavelength bound in angstroms or velocity bound in km/s (if bin_space set to 'velocity'). If set to 'auto', the upper bound will be automatically adjusted to encompass all absorption lines. The window will not be expanded for continuum features, only absorption lines. :n_lambda: optional, int number of bins. This cannot be set when setting either lambda_min or lambda_max to auto. :dlambda: optional, float or YTQuantity size of the wavelength bins in angstroms or velocity bins in km/s. :bin_space: 'wavelength' or 'velocity' Sets the dimension in which spectra are created. If set to wavelength, the resulting spectra are flux (or tau) vs. observed wavelength. If set to velocity, the spectra are flux vs. velocity offset from the rest wavelength of the absorption line. Default: wavelength """
[docs] def __init__(self, lambda_min, lambda_max, n_lambda=None, dlambda=None, bin_space='wavelength'): if bin_space not in _bin_space_units: raise RuntimeError( 'Invalid bin_space value: "%s". Valid values are: "%s".' % (bin_space, '", "'.join(list(_bin_space_units)))) self.bin_space = bin_space lunits = _bin_space_units[self.bin_space] if dlambda is not None: if not isinstance(dlambda, YTQuantity): dlambda = YTQuantity(dlambda, lunits) self.bin_width = dlambda if n_lambda is None and dlambda is None: raise RuntimeError( 'Either n_lambda or dlambda must be given.') if n_lambda is not None and dlambda is not None: n_lambda = None if str(lambda_min) != 'auto': if not isinstance(lambda_min, YTQuantity): lambda_min = YTQuantity( lambda_min, lunits) # round limits to bin size if dlambda is not None: lambda_min = np.round(lambda_min / dlambda) * dlambda self.lambda_min = lambda_min if str(lambda_max) != 'auto': if not isinstance(lambda_max, YTQuantity): lambda_max = YTQuantity( lambda_max, lunits) # round limits to bin size if dlambda is not None: lambda_max = np.round(lambda_max / dlambda) * dlambda self.lambda_max = lambda_max self._auto_lambda = 'auto' in [str(self.lambda_min), str(self.lambda_max)] if self._auto_lambda and \ (n_lambda is not None and n_lambda != 'auto'): raise RuntimeError( 'Cannot set n_lambda when setting lambda_min or lambda_max to auto.') if dlambda is not None: self.bin_width = YTQuantity(dlambda, lunits) if not self._auto_lambda: n_lambda = \ self._get_field_size(self.lambda_min, self.lambda_max, self.bin_width) if self._auto_lambda: self.lambda_field = None else: if lambda_min >= lambda_max: raise RuntimeError( 'lambda_min (%f) cannot be greater than or equal to lambda_max (%f).' % (lambda_min, lambda_max)) if dlambda is None: n_lambda = int(n_lambda) self.bin_width = YTQuantity( float(lambda_max - lambda_min) / (n_lambda - 1), lunits) else: n_lambda = \ self._get_field_size(self.lambda_min, self.lambda_max, self.bin_width) self.lambda_field = \ self._create_lambda_field(lambda_min, lambda_max, n_lambda) self.flux_field = None self.absorbers_list = None # a dictionary that will store spectral quantities for each index in the light ray self.line_observables_dict = None self.line_list = [] self.continuum_list = [] self.snr = 100 # default signal to noise ratio for error estimation
def _get_field_size(self, lambda_min, lambda_max, dlambda): """ Calculate number of bins. """ if isinstance(lambda_min, YTQuantity): my_min = lambda_min.d else: my_min = lambda_min if isinstance(lambda_max, YTQuantity): my_max = lambda_max.d else: my_max = lambda_max if isinstance(dlambda, YTQuantity): my_dlambda = dlambda.d else: my_dlambda = dlambda n_lambda = (my_max - my_min) / my_dlambda + 1 n_lambda = int(np.round(n_lambda)) return n_lambda def _create_lambda_field(self, lambda_min, lambda_max, n_lambda, units=None): """ Create a lambda array with units. """ if units is None: units = _bin_space_units[self.bin_space] if isinstance(lambda_min, YTQuantity): my_min = lambda_min.d else: my_min = lambda_min if isinstance(lambda_max, YTQuantity): my_max = lambda_max.d else: my_max = lambda_max return YTArray(np.linspace(my_min, my_max, n_lambda), units) _lambda_field = None @property def lambda_field(self): """ The lambda field. """ return self._lambda_field @lambda_field.setter def lambda_field(self, val): # This is useful for testing. If something goes wrong with # the lambda field, put something here to see every time # it gets set. self._lambda_field = val _tau_field = None @property def tau_field(self): """ This is the total optical depth of all lines and continua. """ if self.lambda_field is None: return None if self._tau_field is None: self._tau_field = np.zeros(self.lambda_field.size) return self._tau_field @tau_field.setter def tau_field(self, val): self._tau_field = val _current_tau_field = None @property def current_tau_field(self): """ This is the optical depth array for the current absorption line being deposited. We will do the deposition of lines into this array, and then add it to self.tau_field at the end. """ if self.lambda_field is None: return None if self._current_tau_field is None: self._current_tau_field = np.zeros(self.lambda_field.size) return self._current_tau_field @current_tau_field.setter def current_tau_field(self, val): self._current_tau_field = val
[docs] def add_line(self, label, field_name, wavelength, f_value, gamma, atomic_mass, label_threshold=None): r"""Add an absorption line to the list of lines included in the spectrum. **Parameters** :label: string label for the line. :field_name: string field name from ray data for column densities. :wavelength: float line rest wavelength in angstroms. :f_value: float line f-value. :gamma: float line gamme value. :atomic_mass: float mass of atom in amu. """ self.line_list.append({'label': label, 'field_name': field_name, 'wavelength': YTQuantity(wavelength, "angstrom"), 'f_value': f_value, 'gamma': gamma, 'atomic_mass': YTQuantity(atomic_mass, "amu"), 'label_threshold': label_threshold})
[docs] def add_continuum(self, label, field_name, wavelength, normalization, index): """ Add a continuum feature that follows a power-law. **Parameters** :label: string label for the feature. :field_name: string field name from ray data for column densities. :wavelength: float line rest wavelength in angstroms. :normalization: float the column density normalization. :index: float the power-law index for the wavelength dependence. """ self.continuum_list.append({'label': label, 'field_name': field_name, 'wavelength': wavelength, 'normalization': normalization, 'index': index})
[docs] def make_spectrum(self, input_object, output_file=None, line_list_file=None, output_absorbers_file=None, use_peculiar_velocity=True, store_observables=False, subgrid_resolution=10, observing_redshift=0., min_tau=1e-3, njobs="auto"): """ Make spectrum from ray data using the line list. **Parameters** :input_object: string, dataset, or data container If a string, the path to the ray dataset. As a dataset, this is the ray dataset loaded by yt. As a data container, this is a data object created from a ray dataset, such as a cut region. :output_file: optional, string Option to save a file containing the wavelength, flux, and optical depth fields. File formats are chosen based on the filename extension. ``.h5`` for hdf5, ``.fits`` for fits, and everything else is ASCII. Default: None :output_absorbers_file: optional, string Option to save a text file containing all of the absorbers and corresponding wavelength and redshift information. For parallel jobs, combining the lines lists can be slow so it is recommended to set to None in such circumstances. Default: None :use_peculiar_velocity: optional, bool if True, include peculiar velocity for calculating doppler redshift to shift lines. Requires similar flag to be set in LightRay generation. Default: True :store_observables: optional, bool if True, stores observable properties of each cell along the line of sight for each line, such as tau, column density, and thermal b. These quantities will be saved to the AbsorptionSpectrum attribute: 'line_observables_dict'. Default: False :subgrid_resolution: optional, int When a line is being added that is unresolved (ie its thermal width is less than the spectral bin width), the voigt profile of the line is deposited into an array of virtual wavelength bins at higher resolution. The optical depth from these virtual bins is integrated and then added to the coarser spectral wavelength bin. The subgrid_resolution value determines the ratio between the thermal width and the bin width of the virtual bins. Increasing this value yields smaller virtual bins, which increases accuracy, but is more expensive. A value of 10 yields accuracy to the 4th significant digit in tau. Default: 10 :observing_redshift: optional, float This is the redshift at which the observer is observing the absorption spectrum. Default: 0 :min_tau: optional, float This value determines size of the wavelength window used to deposit lines or continua. The wavelength window is expanded until the optical depth at the edge is below this value. If too high, this will result in features appearing cut off at the edges. Decreasing this will make features smoother but will also increase run time. An increase by a factor of ten will result in roughly a 2x slow down. Default: 1e-3. :njobs: optional, int or "auto" the number of process groups into which the loop over absorption lines will be divided. If set to -1, each absorption line will be deposited by exactly one processor. If njobs is set to a value less than the total number of available processors (N), then the deposition of an individual line will be parallelized over (N / njobs) processors. If set to "auto", it will first try to parallelize over the list of lines and only parallelize the line deposition if there are more processors than lines. This is the optimal strategy for parallelizing spectrum generation. Default: "auto" """ self.snr = 100 if line_list_file is not None: mylog.info("'line_list_file' keyword is deprecated. Please use " \ "'output_absorbers_file'.") output_absorbers_file = line_list_file input_fields = [('gas', 'dl'), ('gas', 'redshift'), ('gas', 'temperature')] field_units = {('gas', 'dl'): "cm", ('gas', 'redshift'): "", ('gas', 'temperature'): "K"} if use_peculiar_velocity: input_fields.append(('gas', 'velocity_los')) input_fields.append(('gas', 'redshift_eff')) field_units[('gas', 'velocity_los')] = "cm/s" field_units[('redshift_eff')] = "" if observing_redshift != 0.: input_fields.append(('gas', 'redshift_dopp')) field_units[('gas', 'redshift_dopp')] = "" for feature in self.line_list + self.continuum_list: if not feature['field_name'] in input_fields: input_fields.append(feature['field_name']) field_units[feature["field_name"]] = "cm**-3" if isinstance(input_object, str): input_ds = load(input_object) field_data = input_ds.all_data() elif isinstance(input_object, Dataset): input_ds = input_object field_data = input_ds.all_data() elif isinstance(input_object, YTDataContainer): input_ds = input_object.ds field_data = input_object # temporary fix for yt-4.0 ytdata selection issue input_ds.domain_left_edge = input_ds.domain_left_edge.to('code_length') input_ds.domain_right_edge = input_ds.domain_right_edge.to('code_length') if self.bin_space == 'velocity': self.zero_redshift = getattr(input_ds, 'current_redshift', 0) # temperature field required to calculate voigt profile widths if ('gas', 'temperature') not in input_ds.derived_field_list: raise RuntimeError( "('gas', 'temperature') field required to be present in %s " "for AbsorptionSpectrum to function." % str(input_object)) self.absorbers_list = [] self.line_observables_dict = {} if njobs == "auto": comm = _get_comm(()) njobs = min(comm.size, len(self.line_list)) mylog.info("Creating spectrum") self._add_lines_to_spectrum(field_data, use_peculiar_velocity, output_absorbers_file,store_observables, subgrid_resolution=subgrid_resolution, observing_redshift=observing_redshift, min_tau=min_tau, njobs=njobs) self._add_continua_to_spectrum(field_data, use_peculiar_velocity, observing_redshift=observing_redshift, min_tau=min_tau) if self.tau_field is None: mylog.warning('Spectrum is totally empty!') else: self.flux_field = np.exp(-self.tau_field) if output_file is None: pass elif output_file.endswith('.h5') or output_file.endswith('.hdf5'): self._write_spectrum_hdf5(output_file) elif output_file.endswith('.fits'): self._write_spectrum_fits(output_file) else: self._write_spectrum_ascii(output_file) if output_absorbers_file is not None: self._write_absorbers_file(output_absorbers_file) del field_data return (self.lambda_field, self.flux_field)
[docs] def error_func(self, flux): """ Approximate the flux error for a spectrum. Many observational analysis programs require a flux error channel in addition to a flux channel. So we create a zeroth order approximation of the flux error, simply by taking the square root of the flux. Unfortunately, with flux normalized to be < 1, this would result in errors larger than the flux values themselves, so we normalize by an arbitrary signal-to-noise ratio, which by default is set to 100. This yields a typical error for a normalized spectrum of sqrt(1.0*100)/100 = 0.1. This assures our flux errors are smaller than our fluxes for most flux reasonable flux values. Note that when a signal to noise ratio is specified for adding gaussian noise, it uses this updated value for estimating the errors. SNR is set as an attribute of AbsorptionSpectrum directly (e.g., as.snr = N). **Parameters** :flux: array of floats The array of flux values """ return np.sqrt(flux*self.snr)/self.snr
def _apply_observing_redshift(self, field_data, use_peculiar_velocity, observing_redshift): """ Change the redshifts of individual absorbers to account for the redshift at which the observer sits. The intermediate redshift that is seen by an observer at a redshift other than z=0 is z12, where z1 is the observing redshift and z2 is the emitted photon's redshift Hogg (2000) eq. 13: 1 + z12 = (1 + z2) / (1 + z1) """ if observing_redshift == 0.: # This is already assumed in the generation of the LightRay redshift = field_data[('gas', 'redshift')] if use_peculiar_velocity: redshift_eff = field_data[('gas', 'redshift_eff')] else: # The intermediate redshift that is seen by an observer # at a redshift other than z=0 is z12, where z1 is the # observing redshift and z2 is the emitted photon's redshift # Hogg (2000) eq. 13: # 1 + z12 = (1 + z2) / (1 + z1) redshift = ((1 + field_data[('gas', 'redshift')]) / \ (1 + observing_redshift)) - 1. # Combining cosmological redshift and doppler redshift # into an effective redshift is found in Peacock's # Cosmological Physics eqn 3.75: # 1 + z_eff = (1 + z_cosmo) * (1 + z_doppler) if use_peculiar_velocity: redshift_eff = ((1 + redshift) * \ (1 + field_data[('gas', 'redshift_dopp')])) - 1. if not use_peculiar_velocity: redshift_eff = redshift return redshift, redshift_eff def _add_continua_to_spectrum(self, field_data, use_peculiar_velocity, observing_redshift=0., min_tau=1e-3): """ Add continuum features to the spectrum. Continuua are recorded as a name, associated field, wavelength, normalization value, and index. Continuua are applied at and below the denoted wavelength, where the optical depth decreases as a power law of desired index. For positive index values, this means optical depth is highest at the denoted wavelength, and it drops with shorter and shorter wavelengths. Consequently, transmitted flux undergoes a discontinuous cutoff at the denoted wavelength, and then slowly increases with decreasing wavelength according to the power law. """ if self._auto_lambda and self.lambda_field is None: mylog.warning( 'Cannot add continuum with empty spectrum and lambda_min/max ' + 'set to auto.') return # Change the redshifts of continuum sources to account for the # redshift at which the observer sits redshift, redshift_eff = self._apply_observing_redshift(field_data, use_peculiar_velocity, observing_redshift) # min_tau is the minimum optical depth value that warrants # accounting for an absorber. for a single absorber, noticeable # continuum effects begin for tau = 1e-3 (leading to transmitted # flux of e^-tau ~ 0.999). but we apply a cutoff to remove # absorbers with insufficient column_density to contribute # significantly to a continuum (see below). because lots of # low column density absorbers can add up to a significant # continuum effect, we normalize min_tau by the n_absorbers. n_absorbers = field_data[('gas', 'dl')].size if n_absorbers == 0: mylog.info("No absorbers in path of LightRay.") return min_tau /= n_absorbers for continuum in self.continuum_list: # Normalization is in cm**-2, so column density must be as well column_density = (field_data[continuum['field_name']] * field_data[('gas', 'dl')]).in_units('cm**-2') if (column_density == 0).all(): mylog.info("Not adding continuum %s: insufficient column density" % continuum['label']) continue # redshift_eff field combines cosmological and velocity redshifts if use_peculiar_velocity: delta_lambda = continuum['wavelength'] * redshift_eff else: delta_lambda = continuum['wavelength'] * redshift # right index of continuum affected area is wavelength itself this_wavelength = delta_lambda + continuum['wavelength'] right_index = np.digitize(this_wavelength, self.lambda_field).clip(0, self.lambda_field.size) # left index of continuum affected area wavelength at which # optical depth reaches tau_min left_index = np.digitize((this_wavelength * np.power((min_tau * continuum['normalization'] / column_density), (1. / continuum['index']))), self.lambda_field).clip(0, self.lambda_field.size) # Only calculate the effects of continuua where normalized # column_density is greater than min_tau # because lower column will not have significant contribution valid_continuua = np.where(((column_density / continuum['normalization']) > min_tau) & (right_index - left_index > 1))[0] if valid_continuua.size == 0: mylog.info("Not adding continuum %s: insufficient column density or out of range" % continuum['label']) continue pbar = get_pbar("Adding continuum - %s [%f A]: " % \ (continuum['label'], continuum['wavelength']), valid_continuua.size) # Tau value is (wavelength / continuum_wavelength)**index / # (column_dens / norm) # i.e. a power law decreasing as wavelength decreases # Step through the absorber list and add continuum tau for each to # the total optical depth for all wavelengths for i, lixel in enumerate(valid_continuua): cont_tau = \ np.power((self.lambda_field[left_index[lixel] : right_index[lixel]] / this_wavelength[lixel]), \ continuum['index']) * \ (column_density[lixel] / continuum['normalization']) self.tau_field[left_index[lixel]:right_index[lixel]] += cont_tau.d pbar.update() pbar.finish() def _add_lines_to_spectrum(self, field_data, use_peculiar_velocity, output_absorbers_file, store_observables, subgrid_resolution=10, observing_redshift=0., njobs=-1, min_tau=1e-3): """ Add the absorption lines to the spectrum. """ if len(self.line_list) == 0: return if self.bin_space == 'velocity': wavelength_zero_point = (1 + self.zero_redshift) * \ self.line_list[0]['wavelength'] mylog.info( ('Setting wavelength of velocity center to %s ' + 'line at z = %.3f: %s.') % (self.line_list[0]['label'], self.zero_redshift, wavelength_zero_point)) # Change the redshifts of individual absorbers to account for the # redshift at which the observer sits redshift, redshift_eff = self._apply_observing_redshift(field_data, use_peculiar_velocity, observing_redshift) # step through each ionic transition (e.g. HI, HII, MgII) specified # and deposit the lines into the spectrum for store, line in parallel_objects(self.line_list, njobs=njobs, storage=self.line_observables_dict): column_density = field_data[line['field_name']] * field_data[('gas', 'dl')] if (column_density < 0).any(): mylog.warning( "Setting negative densities for field %s to 0! Bad!" % line['field_name']) np.clip(column_density, 0, np.inf, out=column_density) if (column_density == 0).all(): mylog.info("Not adding line %s: insufficient column density" % line['label']) continue # redshift_eff field combines cosmological and velocity redshifts # so delta_lambda gives the offset in angstroms from the rest frame # wavelength to the observed wavelength of the transition if use_peculiar_velocity: delta_lambda = line['wavelength'] * redshift_eff else: delta_lambda = line['wavelength'] * redshift # lambda_obs is central wavelength of line after redshift lambda_obs = (line['wavelength'] + delta_lambda).to('angstrom') # either the observed wavelength or velocity offset if self.bin_space == 'wavelength': my_obs = lambda_obs[:] elif self.bin_space == 'velocity': my_obs = c_kms * \ (lambda_obs - wavelength_zero_point) / \ wavelength_zero_point my_obs.convert_to_units(_bin_space_units[self.bin_space]) else: raise RuntimeError('What bin_space is this?') # the total number of absorbers per transition n_absorbers = len(lambda_obs) # thermal broadening b parameter thermal_b = np.sqrt((2 * boltzmann_constant_cgs * field_data[('gas', 'temperature')]) / line['atomic_mass']) # the actual thermal width of the lines thermal_width = (lambda_obs * thermal_b / c_kms).to('angstrom') # Sanitize units for faster runtime of the tau_profile machinery. lambda_0 = line['wavelength'].d # line's rest frame; angstroms cdens = column_density.in_units("cm**-2").d # cm**-2 thermb = thermal_b.to('cm/s').d # thermal b coefficient; cm / s dlambda = delta_lambda.d # lambda offset; angstroms # Array to store sum of the tau values for each index in the # light ray that is deposited to the final spectrum if store_observables: tau_ray = np.zeros(cdens.size) if use_peculiar_velocity: vlos = field_data[('gas', 'velocity_los')].in_units("km/s").d # km/s else: vlos = np.zeros(field_data[('gas', 'temperature')].size) # When we actually deposit the voigt profile, sometimes we will # have underresolved lines (ie lines with smaller widths than # the spectral bin size). Here, we create virtual wavelength bins # small enough in width to well resolve each line, deposit the # voigt profile into them, then numerically integrate their tau # values and sum them to redeposit them into the actual spectral # bins. # virtual bins (vbins) will be: # 1) <= the bin_width; assures at least as good as spectral bins # 2) <= 1/10th the thermal width; assures resolving voigt profiles # (actually 1/subgrid_resolution value, default is 1/10) # 3) a bin width will be divisible by vbin_width times a power of # 10; this will assure we don't get spikes in the deposited # spectra from uneven numbers of vbins per bin if self.bin_space == 'wavelength': my_width = thermal_width elif self.bin_space == 'velocity': my_width = thermal_b else: raise RuntimeError('What bin space is this?') resolution = my_width / self.bin_width n_vbins_per_bin = (10 ** (np.ceil( np.log10( subgrid_resolution / resolution) ).clip(0, np.inf) ) ).astype('int') vbin_width = self.bin_width.d / n_vbins_per_bin # a note to the user about which lines components are unresolved if (my_width < self.bin_width).any(): mylog.info("%d out of %d line components are unresolved.", (my_width < self.bin_width).sum(), n_absorbers) # Keep track of the lambda field before depositing a new line # so we can add the current_tau_field and the tau_field together. last_lambda_field = self.lambda_field # provide a progress bar with information about lines processsed pbar = get_pbar("Adding line - %s [%f A]: " % \ (line['label'], line['wavelength']), n_absorbers) # for a given transition, step through each location in the # observed spectrum where it occurs and deposit a voigt profile for i in parallel_objects(np.arange(n_absorbers), njobs=-1): # if there is a ray element with temperature = 0 or column # density = 0, skip it if (thermal_b[i] == 0.) or (cdens[i] == 0.): pbar.update() continue # the virtual window into which the line is deposited initially # spans a region of 2 coarse spectral bins # (one on each side of the center_index) but the window # can expand as necessary. # it will continue to expand until the tau value in the far # edge of the wings is less than the min_tau value or it # reaches the edge of the spectrum window_width_in_bins = 2 # Widen wavelength window until optical depth falls below min_tau # value at the ends to assure that the wings of a line have been # fully resolved. while True: # calculate wavelength window if self._auto_lambda and self.lambda_field is None: my_lambda_min = my_obs[i] - \ window_width_in_bins * self.bin_width / 2 # round off to multiple of bin_width my_lambda_min = self.bin_width * \ np.ceil(my_lambda_min / self.bin_width) my_lambda = my_lambda_min + \ self.bin_width * np.arange(window_width_in_bins) else: my_lambda = self.lambda_field # we want to know the bin index in the lambda_field array # where each line has its central wavelength after being # redshifted. however, because we don't know a priori how wide # a line will be (ie DLAs), we have to include bin indices # *outside* the spectral range of the AbsorptionSpectrum # object. Thus, we find the "equivalent" bin index, which # may be <0 or >the size of the array. In the end, we deposit # the bins that actually overlap with the AbsorptionSpectrum's # range in lambda. left_index, center_index, right_index = \ self._get_bin_indices( my_lambda, self.bin_width, my_obs[i], window_width_in_bins) window_size = right_index - left_index n_vbins = window_size * n_vbins_per_bin[i] # the array of virtual bins in lambda space vbins = \ np.linspace(my_lambda.d[0] + self.bin_width.d * left_index, my_lambda.d[0] + self.bin_width.d * right_index, n_vbins, endpoint=False) if self.bin_space == 'wavelength': my_vbins = vbins elif self.bin_space == 'velocity': my_vbins = vbins * \ wavelength_zero_point.d / c_kms.d + \ wavelength_zero_point.d else: raise RuntimeError('What bin_space is this?') if my_vbins.size > 1e6: mylog.warning( ('Depositing line with %d bins may be slow. ' + 'Increase bin size?') % my_vbins.size) # the virtual bins and their corresponding opacities my_vbins, vtau = \ tau_profile( lambda_0, line['f_value'], line['gamma'], thermb[i], cdens[i], delta_lambda=dlambda[i], lambda_bins=my_vbins) # If tau has not dropped below min tau threshold by the # edges (ie the wings), then widen the wavelength # window and repeat process. if (vtau[0] < min_tau and vtau[-1] < min_tau): if self._auto_lambda: self._create_auto_field_arrays( left_index, right_index, my_lambda) left_index, center_index, right_index = \ self._get_bin_indices( self.lambda_field, self.bin_width, my_obs[i], window_width_in_bins) break # If we have fixed bins, call it done when the # spectrum takes up the entire range. if not self._auto_lambda: left_ok = vtau[0] < min_tau or left_index <= 0 right_ok = vtau[-1] < min_tau or \ right_index >= self.lambda_field.size if left_ok and right_ok: break window_width_in_bins *= 2 if center_index is None: pbar.update() continue # Numerically integrate the virtual bins to calculate a # virtual "equivalent width" of optical depth; then sum these # virtual equivalent widths in tau and deposit back into each # original spectral tau bin # Please note: this is not a true equivalent width in the # normal use of the word by observers. It is an equivalent # with in tau, not in flux, and is only used internally in # this subgrid deposition as EW_tau. vEW_tau = vtau * vbin_width[i] EW_tau = np.zeros(right_index - left_index) EW_tau_indices = np.arange(left_index, right_index) for k, val in enumerate(EW_tau_indices): EW_tau[k] = vEW_tau[n_vbins_per_bin[i] * k: n_vbins_per_bin[i] * (k + 1)].sum() EW_tau = EW_tau/self.bin_width.d # only deposit EW_tau bins that actually intersect the original # spectral wavelength range (i.e. lambda_field) # if EW_tau bins don't intersect the original spectral range at # all then skip the deposition if ((left_index >= self.lambda_field.size) or \ (right_index < 0)): pbar.update() continue # otherwise, determine how much of the original spectrum # is intersected by the expanded line window to be deposited, # and deposit the Equivalent Width in tau into that intersecting # window in the original spectrum's tau array else: intersect_left_index = max(left_index, 0) intersect_right_index = min(right_index, self.lambda_field.size) EW_tau_deposit = EW_tau[(intersect_left_index - left_index): \ (intersect_right_index - left_index)] self.current_tau_field[intersect_left_index:intersect_right_index] \ += EW_tau_deposit if store_observables: tau_ray[i] = np.sum(EW_tau_deposit) # write out absorbers to file if the column density of # an absorber is greater than the specified "label_threshold" # of that absorption line if output_absorbers_file and \ line['label_threshold'] is not None and \ cdens[i] >= line['label_threshold']: if use_peculiar_velocity: peculiar_velocity = vlos[i] else: peculiar_velocity = 0.0 self.absorbers_list.append({'label': line['label'], 'wavelength': (lambda_0 + dlambda[i]), 'column_density': column_density[i], 'b_thermal': thermal_b[i], 'redshift': redshift[i], 'redshift_eff': redshift_eff[i], 'v_pec': peculiar_velocity}) pbar.update() pbar.finish() # Expand the tau_field array to match the updated wavelength # array from the last line deposition. self._adjust_field_array(last_lambda_field, self.lambda_field, "tau_field") if self.current_tau_field is not None: # Now add the current_tau_field. self.tau_field += self.current_tau_field ## Check keyword before storing any observables if store_observables: # If running in parallel, make sure that the observable # quantities for the dictionary are combined correctly. comm = _get_comm(()) if self._auto_lambda: global_lambda_field = self._get_global_lambda_field(comm=comm) self._adjust_field_array(self.lambda_field, global_lambda_field, "current_tau_field") if comm.size > 1: obs_dict_fields = \ [column_density, tau_ray, self.current_tau_field, delta_lambda, lambda_obs, thermal_b, thermal_width] obs_dict_fields = [comm.mpi_allreduce(field,op="sum") for field in obs_dict_fields] # Calculate the flux decrement equivalent width (the true # equivalent width!) for use in post-processing if self.current_tau_field is None: EW = 0. else: EW = np.sum(1-np.exp(-self.current_tau_field))*self.bin_width # Update the line_observables_dict with values for this line obs_dict = {"column_density":column_density, "tau_ray":tau_ray, "EW":EW, "delta_lambda":delta_lambda, "lambda_obs":lambda_obs, "thermal_b":thermal_b, "thermal_width":thermal_width} if self.bin_space == 'velocity': obs_dict['velocity_offset'] = my_obs store.result_id = line['label'] store.result = obs_dict ## Can only delete these if in this statement: del obs_dict, tau_ray self.current_tau_field = None # These always need to be deleted del column_density, delta_lambda, lambda_obs, my_obs, \ thermal_b, thermal_width, cdens, thermb, dlambda, \ vlos, resolution, vbin_width, n_vbins, n_vbins_per_bin comm = _get_comm(()) if self._auto_lambda: new_lambda = self._get_global_lambda_field(comm=comm) self._adjust_field_array(self.lambda_field, new_lambda, "tau_field") self.lambda_field = new_lambda self.tau_field = comm.mpi_allreduce(self.tau_field, op="sum") if output_absorbers_file: self.absorbers_list = comm.par_combine_object( self.absorbers_list, "cat", datatype="list") def _get_bin_indices(self, lambda_field, dlambda, lambda_obs, window_width_in_bins): """ Return the indices of the lambda field corresponding to the lower limit, line center, and upper limit. """ if lambda_field is None or lambda_field.size == 0: return None, None, None # this equation gives us the "equivalent" bin index for each line # if it were placed into the self.lambda_field array center_index = ((lambda_obs - lambda_field[0]) / dlambda).d center_index = int(np.ceil(center_index)) left_index = (center_index - window_width_in_bins//2) right_index = (center_index + window_width_in_bins//2) # if fixed size, make some sensible bounds on size of virtual spectrum if not self._auto_lambda: if left_index <= 0 and right_index >= lambda_field.size: left_index = 0 right_index = lambda_field.size elif center_index < 0: left_index = max(left_index, center_index) right_index = min(right_index, lambda_field.size) elif center_index > lambda_field.size: left_index = max(left_index, 0) right_index = min(right_index, center_index) return left_index, center_index, right_index def _get_global_lambda_field(self, comm=None): """ Get the maximum lambda field bounds and create a new array. """ if comm is None: comm = _get_comm(()) if self.lambda_field is None: my_min = np.inf my_max = -np.inf else: my_min = self.lambda_field[0] my_max = self.lambda_field[-1] lf_min = comm.mpi_allreduce(my_min, op="min") lf_max = comm.mpi_allreduce(my_max, op="max") if lf_min != np.inf: lf_min = np.round(lf_min / self.bin_width) * self.bin_width lf_max = np.round(lf_max / self.bin_width) * self.bin_width n_lambda = self._get_field_size(lf_min, lf_max, self.bin_width) new_lambda = self._create_lambda_field(lf_min, lf_max, n_lambda) else: new_lambda = None return new_lambda def _create_auto_field_arrays(self, left_index, right_index, my_lambda): """ Adjust the current wavelength window to encompass the full window of the next feature we are depositing. my_lambda is the current wavelength window and the left and right index are the bounds of the desired wavelength window. If the bounds are outside the current window, we enlarge it to fit and adjust the tau field accordingly. """ if left_index >= 0 and right_index < my_lambda.size: return dlambda = my_lambda[1] - my_lambda[0] new_lambda_min = my_lambda[0] + \ dlambda * min(0, left_index) new_lambda_max = my_lambda[0] + \ dlambda * max(my_lambda.size-1, right_index) if str(self.lambda_min) != 'auto': new_lambda_min = max(new_lambda_min, self.lambda_min) if str(self.lambda_max) != 'auto': new_lambda_max = min(new_lambda_max, self.lambda_max) if new_lambda_min >= new_lambda_max: return n_lambda = self._get_field_size( new_lambda_min, new_lambda_max, dlambda) new_lambda = \ self._create_lambda_field(new_lambda_min, new_lambda_max, n_lambda) if self._current_tau_field is not None: self._adjust_field_array(self.lambda_field, new_lambda, "current_tau_field") self.lambda_field = new_lambda def _adjust_field_array(self, old_lambda, new_lambda, array_name): """ Adjust the field array associated with the old wavelength array so that it lines up correctly with the new wavelength array. """ if old_lambda is None or new_lambda is None: return start_index = np.digitize(old_lambda[0], new_lambda) - 1 new_array = np.zeros(new_lambda.size) old_array = getattr(self, array_name) new_array[start_index:start_index+old_array.size] = old_array setattr(self, array_name, new_array) @parallel_root_only def _write_absorbers_file(self, filename): """ Write out ASCII list of all substantial absorbers found in spectrum """ if filename is None: return if self.tau_field is None: return mylog.info("Writing absorber list: %s.", filename) self.absorbers_list.sort(key=lambda obj: obj['wavelength']) f = open(filename, 'w') f.write('#%-14s %-14s %-12s %-14s %-15s %-9s %-10s\n' % ('Wavelength', 'Line', 'N [cm^-2]', 'b [km/s]', 'z_cosmo', \ 'z_eff', 'v_pec [km/s]')) for line in self.absorbers_list: f.write('%-14.6f %-14ls %e %e % e % e % e\n' % (line['wavelength'], \ line['label'], line['column_density'], line['b_thermal'], \ line['redshift'], line['redshift_eff'], line['v_pec'])) f.close() @parallel_root_only def _write_spectrum_ascii(self, filename): """ Write spectrum to an ascii file. """ if self.tau_field is None: return mylog.info("Writing spectrum to ascii file: %s.", filename) f = open(filename, 'w') f.write("# wavelength[A] tau flux flux_error\n") for i in range(self.lambda_field.size): f.write("%e %e %e %e\n" % (self.lambda_field[i], self.tau_field[i], self.flux_field[i], self.error_func(self.flux_field[i]))) f.close() @parallel_root_only def _write_spectrum_fits(self, filename): """ Write spectrum to a fits file. """ if self.tau_field is None: return mylog.info("Writing spectrum to fits file: %s.", filename) col1 = pyfits.Column(name='wavelength', format='E', array=self.lambda_field) col2 = pyfits.Column(name='tau', format='E', array=self.tau_field) col3 = pyfits.Column(name='flux', format='E', array=self.flux_field) col4 = pyfits.Column(name='flux_error', format='E', array=self.error_func(self.flux_field)) cols = pyfits.ColDefs([col1, col2, col3, col4]) tbhdu = pyfits.BinTableHDU.from_columns(cols) tbhdu.writeto(filename, overwrite=True) @parallel_root_only def _write_spectrum_hdf5(self, filename): """ Write spectrum to an hdf5 file. """ if self.tau_field is None: return mylog.info("Writing spectrum to hdf5 file: %s.", filename) output = h5py.File(filename, 'w') output.create_dataset('wavelength', data=self.lambda_field) output.create_dataset('tau', data=self.tau_field) output.create_dataset('flux', data=self.flux_field) output.create_dataset('flux_error', data=self.error_func(self.flux_field)) output.close()