Source code for trident.utilities

"""
Miscellaneous Utilities for Trident

"""

#-----------------------------------------------------------------------------
# Copyright (c) 2016, Trident Development Team.
#
# Distributed under the terms of the Modified BSD License.
#
# The full license is in the file LICENSE, distributed with this software.
#-----------------------------------------------------------------------------

import gzip
import os
import requests
import tempfile
import shutil
from yt.funcs import \
    get_pbar
import numpy as np
from yt.units import \
    cm, \
    pc, \
    Zsun
from yt import \
    load_uniform_grid, \
    YTArray, \
    load

def ensure_directory(directory):
    """
    Ensures a directory exists by creating it if it does not.  Works for
    both absolute and relative paths.

    **Parameters**

    :directory: string

        Directory you wish to ensure exists.

    **Example**

    >>> ensure_directory("~/test")
    """
    if not os.path.exists(directory):
        os.makedirs(directory)

def download_file(url, progress_bar=True, local_directory=None,
                 local_filename=None):
    """
    Downloads a file from the provided URL.
    
    **Parameters**

    :url: string

        The web address of the file to download.
        
    :progress_bar: boolean, optional

        Will generate a progress bar for the user as the file downloads.
        iPython/Jupyter friendly.
        Default: True

    :local_directory: string, optional

        Absolute or relative path of a local directory where the file
        will be downloaded.  If set to None, will default to current
        working directory.
        Default: None

    :local_filename: string, optional

        Local filename where the file will be downloaded.  If set to None,
        will default to filename of downloaded file.
        Default: None
    
    **Example**
    
    >>> download_file("http://trident-project.org/data/ion_table/hm2012_lr.h5.gz")
    """

    # Following the base description on stack overflow:
    # http://stackoverflow.com/questions/22676/how-do-i-download-a-file-over-http-using-python

    # Set defaults
    if local_filename is None:
        local_filename = os.path.basename(url)
    if local_directory is None:
        local_directory = '.'
    ensure_directory(local_directory)

    # open local file handle
    filepath = os.path.join(local_directory, local_filename)
    filehandle = open(filepath, 'wb')

    # Get information about remote filesize
    r = requests.get(url, stream=True)
    filesize = int(r.headers["content-length"])/2**10  # in kB
    if progress_bar:
        pbar = get_pbar("Downloading file: %s" % local_filename, filesize)
    filesize_dl = 0
    chunk_size = 8192

    # Download file in chunks and update statusbar until done with transfer
    for content in r.iter_content(chunk_size):
        filesize_dl += len(content)/2**10
        filehandle.write(content)
        if progress_bar:
            pbar.update(filesize_dl)
    if progress_bar:
        pbar.finish()
    filehandle.close()

def gunzip_file(in_filename, out_filename=None, cleanup=True):
    """
    Uncompress a file using gunzip.
    
    **Parameters**

    :in_filename: string

        The filename of the gzipped file.

    :out_filename: string, optional

        The filename where the unzipped file will be saved.  If set to None,
        this will be the ``in_filename`` without the ``.gz`` suffix.
        Default: None

    :cleanup: boolean, optional
        
        Remove the zipped version of the file after unzipping it?
        Default: True.
    
    **Example**
    
    >>> gunzip_file("hm2012_lr.h5.gz")
    """
    in_file = gzip.open(in_filename, 'rb')
    # if out_filename not set, defaults to stripping off the .gz of in_filename
    if out_filename is None:
        out_filename = ".".join(in_filename.split('.')[:-1])
    out_file = open(out_filename, 'wb')
    out_file.write( in_file.read() )
    in_file.close()
    out_file.close()
    if cleanup:
        os.remove(in_filename)

def gzip_file(in_filename, out_filename=None, cleanup=True):
    """
    Compress a file using gzip.
    
    **Parameters**

    :in_filename: string

        The filename of the file to be gzipped.

    :out_filename: string, optional

        The filename where the zipped file will be saved.  If set to None,
        this will be the ``in_filename`` with a ``.gz`` suffix.
        Default: None

    :cleanup: boolean, optional
        
        Remove the unzipped version of the file after zipping it?
        Default: True.
    
    **Example**
    
    >>> gzip_file("hm2012_lr.h5")
    """
    in_file = open(in_filename, 'rb')
    # if out_filename not set, defaults to appending .gz to the in_filename
    if out_filename is None:
        out_filename = in_filename + ".gz"
    out_file = gzip.open(out_filename, 'wb')
    out_file.write( in_file.read() )
    in_file.close()
    out_file.close()
    if cleanup:
        os.remove(in_filename)

def get_datafiles(datadir=None, url=None):
    """
    Downloads an ion table datafile through interactive prompts.

    **Parameters**

    :datadir: string, optional

        The directory to which to download the ion table datafile.
        If None, uses ``$HOME/.trident``.
        Default: None

    :url: string, optional

        The url to contact to get the data files.  If None, uses
        ``http://trident-project.org/data/ion_table/``
        Default: None

    **Example**

    >>> get_datafiles()
    """
    if datadir is None:
        datadir = os.path.expanduser(os.path.join('~','.trident'))
    ensure_directory(datadir)

    # ion table datafiles are stored here
    if url is None:
        url = 'http://trident-project.org/data/ion_table/'
    
    # Try to figure out which datafiles are available on the remote server
    try:
        page = str(requests.get(url).text)
    except BaseException:
        print("Cannot seem to access %s; Do you have internet access?" % url)
        raise

    line_list = page.split('\n')
    i = 1 # counter
    filenames = []
    print("The following data files are available:")
    for line in line_list:
        if not line.startswith('<!--X'): # identifies files by comments
            continue
        line = line[6:-3]  # strips off the HTML comment chars
        filenames.append(line.split()[0])
        print("%d) %s" % (i, line))
        i += 1

    # User chooses which file
    print("")
    print("Which number would you like to download and use? [1]")
    value = input().rstrip()
    if value == '':
        value = '1'
    while 1:
        try:
            filename = filenames[int(value)-1]
            break
        except IndexError:
            print("%d is not a valid option.  Please pick one of the listed numbers.")

    # Actually downloads the file to a temporary directory before gunzipping.
    fileurl = os.path.join(url, filename)
    tempdir = tempfile.mkdtemp()
    print("")
    download_file(fileurl, local_directory=tempdir)
    print("  Unzipping file: %s" % filename)
    gunzip_file(os.path.join(tempdir, filename),
                out_filename=os.path.join(datadir, filename[:-3]))
    shutil.rmtree(tempdir)
    return filename[:-3]

[docs]def make_onezone_dataset(density=1e-26, temperature=1000, metallicity=0.3, domain_width=10.): """ Create a one-zone hydro dataset for use as test data. The dataset consists of a single cubicle cell of gas with hydro quantities specified in the function kwargs. It makes an excellent test dataset through which to send a sightline and test Trident's capabilities for making absorption spectra. Using the defaults and passing a ray through the full domain should result in a spectrum with a good number of absorption features. **Parameters** :density: float, optional The gas density value of the dataset in g/cm**3 Default: 1e-26 :temperature: float, optional The gas temperature value of the dataset in K Default: 10**3 :metallicity: float, optional The gas metallicity value of the dataset in Zsun Default: 0.3 :domain_width: float, optional The width of the dataset in kpc Default: 10. **Returns** **Example** Create a simple one-zone dataset, pass a ray through it, and generate a COS spectrum for that ray. >>> import trident >>> ds = trident.make_onezone_dataset() >>> ray = trident.make_simple_ray(ds, ... start_position=ds.domain_left_edge, ... end_position=ds.domain_right_edge, ... fields=['density', 'temperature', 'metallicity']) >>> sg = trident.SpectrumGenerator('COS') >>> sg.make_spectrum(ray) >>> sg.plot_spectrum('spec_raw.png') """ one = np.ones([1,1,1]) zero = np.zeros([1,1,1]) dens = np.array([[[density]]])*one temp = np.array([[[temperature]]])*one metal = np.array([[[metallicity]]])*one domain_width *= 1e3*pc/cm bbox = np.array([[0., domain_width], [0., domain_width], [0., domain_width]]) data = {'density':dens, 'temperature':temp, 'metallicity':metal*Zsun, 'velocity_x':zero, 'velocity_y':zero, 'velocity_z':zero} return load_uniform_grid(data, one.shape, length_unit='cm', mass_unit='g', bbox=bbox)
[docs]def make_onezone_ray(density=1e-26, temperature=1000, metallicity=0.3, length=10, redshift=0, filename='ray.h5', column_densities=None): """ Create a one-zone ray object for use as test data. The ray consists of a single absorber of hydrodynamic characteristics specified in the function kwargs. It makes an excellent test dataset to test Trident's capabilities for making absorption spectra. You can specify the column densities of different ions explicitly using the column_densities keyword, or you can let Trident calculate the different ion columns internally from the density, temperature, and metallicity fields. Using the defaults will produce a ray that should result in a spectrum with a good number of absorption features. **Parameters** :density: float, optional The gas density value of the ray in g/cm**3 Default: 1e-26 :temperature: float, optional The gas temperature value of the ray in K Default: 10**3 :metallicity: float, optional The gas metallicity value of the ray in Zsun Default: 0.3 :length: float, optional The length of the ray in kpc Default: 10. :redshift: float, optional The redshift of the ray Default: 0 :filename: string, optional The filename to which to save the ray to disk. Due to the mechanism for passing rays, the ray data must be saved to disk. Default: 'ray.h5' :column_densities: dict, optional The user can create a dictionary which adds more number density ion fields to the ray. Each key in the dictionary should be the desired ion field name according to the field name format: i.e. "<ELEMENT>_p<IONSTATE>_number_density" e.g. neutral hydrogen = "H_p0_number_density". The corresponding value for each key should be the desired column density of that ion in cm**-2. See example below. Default: None **Returns** A YT LightRay object **Example** Create a one-zone ray, and generate a COS spectrum from that ray. >>> import trident >>> ray = trident.make_onezone_ray() >>> sg = trident.SpectrumGenerator('COS') >>> sg.make_spectrum(ray) >>> sg.plot_spectrum('spec_raw.png') Create a one-zone ray with an HI column density of 1e21 (DLA) and generate a COS spectrum from that ray for just the Lyman alpha line. >>> import trident >>> ds = trident.make_onezone_ray(column_densities={'H_p0_number_density': 1e21}) >>> sg = trident.SpectrumGenerator('COS') >>> sg.make_spectrum(ray, lines=['Ly a']) >>> sg.plot_spectrum('spec_raw.png') """ from yt import save_as_dataset length = YTArray([length], "kpc") data = {"density" : YTArray([density], "g/cm**3"), "metallicity" : YTArray([metallicity], "Zsun"), "dl" : length, "temperature" : YTArray([temperature], "K"), "redshift" : np.array([redshift]), "redshift_eff" : np.array([redshift]), "velocity_los" : YTArray([0.], "cm/s"), "x": length/2, "dx": length, "y": length/2, "dy": length, "z": length/2, "dz": length } extra_attrs = {"data_type": "yt_light_ray", "dimensionality": 3} field_types = dict([(field, "grid") for field in data.keys()]) # Add additional number_density fields to dataset if column_densities: for k,v in column_densities.items(): v = YTArray([v], 'cm**-2') data[k] = v/length field_types[k] = 'grid' ds = {"current_time": 0., "current_redshift": 0., "cosmological_simulation": 0., "domain_left_edge": np.zeros(3)*length, "domain_right_edge": np.ones(3)*length, "periodicity": [True]*3} save_as_dataset(ds, filename, data, field_types=field_types, extra_attrs=extra_attrs) # load dataset and make spectrum ray = load(filename) # temporary fix for yt-4.0 ytdata selection issue ray.domain_left_edge = ray.domain_left_edge.to('code_length') ray.domain_right_edge = ray.domain_right_edge.to('code_length') return ray
def import_check(): """ """ # Avoid astropy error when importing from trident package directory. plist = os.path.dirname(os.path.abspath(__file__)).split(os.sep) package_path = os.sep.join(plist[:-1]) if os.getcwd() == package_path: raise RuntimeError( """ The Trident package does not work correctly when imported from its installation directory. Please try moving to another directory.""")