Source code for emulchem.emulchem

import pickle
import json
import numpy as np
from emulchem.network import NeuralNet
from emulchem.scaler import MinMaxScaler
import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms
import glob
import os

path_emulchem = os.path.dirname(__file__)
print(path_emulchem)

[docs]def molecule_list(): """Get an exhaustive list of all molecules included in the emulator""" query = os.path.join(path_emulchem,"data","chem","scaler*.p") files = glob.glob(query) molecules = [f.split(".")[0][54:] for f in files] return molecules
[docs]class Emulator(): def __init__(self,specie): """Base emulator class Parameters ---------- specie: str Molecule to predict using the emulator. Name must be one of those returned by molecule_list. """ self.specie = specie
[docs] def get_prediction_list(self,x): x = np.array(x) x_val = self.input_scaler.transform(x.reshape(1,-1)) y_val = self.neural_network(torch.FloatTensor(x_val)).detach().numpy()[0] y_val = self.output_scaler.inverse_transform(y_val.reshape(1,-1)) return y_val[0][0]
[docs] def deserialize_scaler(self,scaler_path): print(scaler_path) with open(scaler_path,"r") as f: scaler_dict = json.loads(f.read()) print(type(scaler_dict)) print(scaler_dict) return MinMaxScaler(x_scale=scaler_dict["x_scale"],x_min = scaler_dict["x_min"])
[docs]class ChemistryEmulator(Emulator): def __init__(self,specie): """Base UCLCHEM emulator class Parameters ---------- specie: str Molecule to predict using the emulator. Name must be one of those returned by molecule_list. """ Emulator.__init__(self,specie) self.neural_network = NeuralNet(input_size=6,hidden_size=200,hidden_size2=100,hidden_size3=50,num_outputs=1) path_chem = os.path.join(path_emulchem,"data","chem") #self.neural_network.load_state_dict(torch.load(os.path.join(path_emulchem,"data/chem/network{}".format(self.specie)))) self.neural_network.load_state_dict(torch.load(os.path.join(path_chem,"network{}".format(self.specie)))) input_scaler_path = os.path.join(path_chem,"minMaxScaler.json") output_scaler_path = os.path.join(path_chem,"scaler{}.json".format(self.specie)) self.input_scaler = self.deserialize_scaler(input_scaler_path) self.output_scaler = self.deserialize_scaler(output_scaler_path)
[docs] def get_prediction(self,radfield,zeta,density,av,temperature,metallicity): """Obtain predicted abundances for given physical conditions. Parameters ---------- radfield: float Ultraviolet-photoionization rate (Draine). zeta: float cosmic-ray ionization rate (1.3 * 10^-17 s^-1). density: float overall molecular density (cm^-3) av: float visual extinction (mag) temperature: float temperature (Kelvin) metallicity: float multiplicative factor for elemental abundances with 1 corresponding to solar elemental abundances of Asplund et al (2009) (Z) Outputs ------- y: float predicted molecular abundance """ self._check_bounds(radfield,zeta,density,av,temperature,metallicity) x = [radfield,zeta,density,av,temperature,metallicity] y = np.exp(self.get_prediction_list(x)) return y
def _check_bounds(self,radfield,zeta,density,av,temperature,metallicity): """raises an error if the supplied parameter values are outside emulator range""" av_bounds = [1,100] density_bounds = [10**4,10**6] temperature_bounds = [10,200] metallicity_bounds = [0,2] radfield_bounds = [1,10**3] zeta_bounds = [1,10**3] out_of_bounds = [] bounds_msg = "{} was {} which is outside of the emulator bounds of {} to {}\n" if av<av_bounds[0] or av>av_bounds[1]: out_of_bounds.append(bounds_msg.format("av",av,av_bounds[0],av_bounds[1])) if density<density_bounds[0] or density>10**6: out_of_bounds.append(bounds_msg.format("density",density,density_bounds[0],density_bounds[1])) if temperature<temperature_bounds[0] or temperature>temperature_bounds[1]: out_of_bounds.append(bounds_msg.format("temperature",temperature,temperature_bounds[0],temperature_bounds[1])) if metallicity<metallicity_bounds[0] or metallicity>metallicity_bounds[1]: out_of_bounds.append(bounds_msg.format("metallicity",metallicity,metallicity_bounds[0],metallicity_bounds[1])) if radfield<radfield_bounds[0] or radfield>radfield_bounds[1]: out_of_bounds.append(bounds_msg.format("radfield",radfield,radfield_bounds[0],radfield_bounds[1])) if zeta<zeta_bounds[0] or zeta>zeta_bounds[1]: out_of_bounds.append(bounds_msg.format("zeta",zeta,zeta_bounds[0],zeta_bounds[1])) if len(out_of_bounds) !=0: raise Exception("".join(out_of_bounds))
#raise Exception(", ".join(out_of_bounds)+ " out of emulator usable bounds")
[docs]class RadexEmulator(Emulator): def __init__(self,specie,transition): Emulator.__init__(self,specie) self.transition = transition self.neural_network = NeuralNet(input_size=3,hidden_size=200,hidden_size2=100,hidden_size3=50,num_outputs=1) path_rad = os.path.join(path_emulchem,"data","rad") self.neural_network.load_state_dict(torch.load(os.path.join(path_rad,"network{}{}".format(self.specie,self.transition)))) input_scaler_path = os.path.join(path_rad,"minMaxScaler{}{}.json".format(self.specie,self.transition)) output_scaler_path = os.path.join(path_rad,"scaler{}{}.json".format(self.specie,self.transition)) self.input_scaler = self.deserialize_scaler(input_scaler_path) self.output_scaler = self.deserialize_scaler(output_scaler_path)
[docs] def get_prediction(self,temperature,density,column_density,line_width=1): """Obtain predicted molecular line intensities for given physical conditions. Parameters ---------- temperature: float temperature (Kelvin) density: float overall molecular density (cm^-3) column_density: float Outputs ------- y: float predicted line intensity """ """Temperature (Kelvin) Density (cm^-3) column_density () line-width () """ self._check_bounds(temperature,density,column_density,line_width) x = [temperature,density,np.log10(column_density/line_width)] y = self.get_prediction_list(x)*line_width return y
def _check_bounds(self,temperature,density,column_density,line_width): density_bounds = [10**4,10**6] temperature_bounds = [10,200] CO_bounds = [10**13,10**19] CS_bounds = [10**10,10**18] HCOplus_bounds = [10**8,10**15] HCN_bounds = [10**9,10**17] out_of_bounds = [] scaled_column_density = column_density/line_width bounds_msg = "{} was {} which is outside of the emulator bounds of {} to {}\n" if density<density_bounds[0] or density>10**6: out_of_bounds.append(bounds_msg.format("density",density,density_bounds[0],density_bounds[1])) if temperature<temperature_bounds[0] or temperature>temperature_bounds[1]: out_of_bounds.append(bounds_msg.format("temperature",temperature,temperature_bounds[0],temperature_bounds[1])) if self.specie=="CO": if scaled_column_density<CO_bounds[0] or scaled_column_density>CO_bounds[1]: out_of_bounds.append(bounds_msg.format("scaled column density",scaled_column_density,CO_bounds[0],CO_bounds[1])) elif self.specie=="CS": if scaled_column_density<CS_bounds[0] or scaled_column_density>CS_bounds[1]: out_of_bounds.append(bounds_msg.format("scaled column density",scaled_column_density,CS_bounds[0],CS_bounds[1])) elif self.specie=="HCO+": if scaled_column_density<HCOplus_bounds[0] or scaled_column_density>HCOplus_bounds[1]: out_of_bounds.append(bounds_msg.format("scaled column density",scaled_column_density,HCOplus_bounds[0],HCOplus_bounds[1])) elif self.specie=="HCN": if scaled_column_density<HCN_bounds[0] or scaled_column_density>HCN_bounds[1]: out_of_bounds.append(bounds_msg.format("scaled column density",scaled_column_density,HCN_bounds[0],HCN_bounds[1])) if len(out_of_bounds) !=0: raise Exception("".join(out_of_bounds))