Source code for tardis.montecarlo.montecarlo_numba.formal_integral

import sys
import warnings
import numpy as np
import pandas as pd
import scipy.sparse as sp
import scipy.sparse.linalg as linalg
from scipy.interpolate import interp1d
from astropy import units as u
from tardis import constants as const
from numba import njit, char, float64, int64, typeof, byte, prange
from numba.experimental import jitclass
import pdb

from tardis.montecarlo.montecarlo_numba.numba_config import SIGMA_THOMSON
from tardis.montecarlo.montecarlo_numba import njit_dict, njit_dict_no_parallel
from tardis.montecarlo.montecarlo_numba.numba_interface import (
    numba_plasma_initialize,
    NumbaModel,
    NumbaPlasma,
)

from tardis.montecarlo.spectrum import TARDISSpectrum

C_INV = 3.33564e-11
M_PI = np.arccos(-1)
KB_CGS = 1.3806488e-16
H_CGS = 6.62606957e-27


[docs]class IntegrationError(Exception): pass
[docs]@njit(**njit_dict) def numba_formal_integral( model, plasma, iT, inu, inu_size, att_S_ul, Jred_lu, Jblue_lu, tau_sobolev, electron_density, N, ): """ model, plasma, and estimator are the numba variants """ # todo: add all the original todos # Initialize the output which is shared among threads L = np.zeros(inu_size, dtype=np.float64) # global read-only values size_line, size_shell = tau_sobolev.shape size_tau = size_line * size_shell R_ph = model.r_inner[0] # make sure these are cgs R_max = model.r_outer[size_shell - 1] pp = np.zeros(N, dtype=np.float64) # check exp_tau = np.zeros(size_tau, dtype=np.float64) exp_tau = np.exp(-tau_sobolev.T.ravel()) # maybe make this 2D? pp[::] = calculate_p_values(R_max, N) line_list_nu = plasma.line_list_nu # done with instantiation # now loop over wavelength in spectrum for nu_idx in prange(inu_size): I_nu = np.zeros(N, dtype=np.float64) z = np.zeros(2 * size_shell, dtype=np.float64) shell_id = np.zeros(2 * size_shell, dtype=np.int64) offset = 0 size_z = 0 idx_nu_start = 0 direction = 0 first = 0 i = 0 p = 0.0 nu_start = 0.0 nu_end = 0.0 nu = 0.0 zstart = 0.0 zend = 0.0 escat_contrib = 0.0 escat_op = 0.0 Jkkp = 0.0 pexp_tau = 0 patt_S_ul = 0 pJred_lu = 0 pJblue_lu = 0 pline = 0 nu = inu[nu_idx] # now loop over discrete values along line for p_idx in range(1, N): escat_contrib = 0 p = pp[p_idx] # initialize z intersections for p values size_z = populate_z(model, p, z, shell_id) # check returns # initialize I_nu if p <= R_ph: I_nu[p_idx] = intensity_black_body(nu * z[0], iT) else: I_nu[p_idx] = 0 # find first contributing lines nu_start = nu * z[0] nu_end = nu * z[1] idx_nu_start = line_search(plasma.line_list_nu, nu_start, size_line) offset = shell_id[0] * size_line # start tracking accumulated e-scattering optical depth zstart = model.time_explosion / C_INV * (1.0 - z[0]) # Initialize "pointers" pline = int(idx_nu_start) pexp_tau = int(offset + idx_nu_start) patt_S_ul = int(offset + idx_nu_start) pJred_lu = int(offset + idx_nu_start) pJblue_lu = int(offset + idx_nu_start) # flag for first contribution to integration on current p-ray first = 1 nu_ends = nu * z[1:] nu_ends_idxs = size_line - np.searchsorted( line_list_nu[::-1], nu_ends, side="right" ) # loop over all interactions for i in range(size_z - 1): escat_op = electron_density[int(shell_id[i])] * SIGMA_THOMSON nu_end = nu_ends[i] nu_end_idx = nu_ends_idxs[i] for _ in range(max(nu_end_idx - pline, 0)): # calculate e-scattering optical depth to next resonance point zend = ( model.time_explosion / C_INV * (1.0 - line_list_nu[pline] / nu) ) # check if first == 1: # first contribution to integration # NOTE: this treatment of I_nu_b (given # by boundary conditions) is not in Lucy 1999; # should be re-examined carefully escat_contrib += ( (zend - zstart) * escat_op * (Jblue_lu[pJblue_lu] - I_nu[p_idx]) ) first = 0 else: # Account for e-scattering, c.f. Eqs 27, 28 in Lucy 1999 Jkkp = 0.5 * (Jred_lu[pJred_lu] + Jblue_lu[pJblue_lu]) escat_contrib += ( (zend - zstart) * escat_op * (Jkkp - I_nu[p_idx]) ) # this introduces the necessary ffset of one element between # pJblue_lu and pJred_lu pJred_lu += 1 I_nu[p_idx] += escat_contrib # // Lucy 1999, Eq 26 I_nu[p_idx] *= exp_tau[pexp_tau] I_nu[p_idx] += att_S_ul[patt_S_ul] # // reset e-scattering opacity escat_contrib = 0 zstart = zend pline += 1 pexp_tau += 1 patt_S_ul += 1 pJblue_lu += 1 # calculate e-scattering optical depth to grid cell boundary Jkkp = 0.5 * (Jred_lu[pJred_lu] + Jblue_lu[pJblue_lu]) zend = ( model.time_explosion / C_INV * (1.0 - nu_end / nu) ) # check escat_contrib += ( (zend - zstart) * escat_op * (Jkkp - I_nu[p_idx]) ) zstart = zend # advance pointers direction = int((shell_id[i + 1] - shell_id[i]) * size_line) pexp_tau += direction patt_S_ul += direction pJred_lu += direction pJblue_lu += direction I_nu[p_idx] *= p L[nu_idx] = 8 * M_PI * M_PI * trapezoid_integration(I_nu, R_max / N) return L
integrator_spec = [ ("model", NumbaModel.class_type.instance_type), ("plasma", NumbaPlasma.class_type.instance_type), ("points", int64), ] @jitclass(integrator_spec) class NumbaFormalIntegrator(object): """ Helper class for performing the formal integral with numba. """ def __init__(self, model, plasma, points=1000): self.model = model self.plasma = plasma self.points = points def formal_integral( self, iT, inu, inu_size, att_S_ul, Jred_lu, Jblue_lu, tau_sobolev, electron_density, N, ): """simple wrapper for the numba implementation of the formal integral""" return numba_formal_integral( self.model, self.plasma, iT, inu, inu_size, att_S_ul, Jred_lu, Jblue_lu, tau_sobolev, electron_density, N, )
[docs]class FormalIntegrator(object): """ Class containing the formal integrator """ def __init__(self, model, plasma, runner, points=1000): self.model = model self.runner = runner self.points = points if plasma: self.plasma = numba_plasma_initialize( plasma, runner.line_interaction_type ) self.atomic_data = plasma.atomic_data self.original_plasma = plasma
[docs] def generate_numba_objects(self): """instantiate the numba interface objects needed for computing the formal integral""" self.numba_model = NumbaModel( self.runner.r_inner_i, self.runner.r_outer_i, self.model.time_explosion.to("s").value, ) self.numba_plasma = numba_plasma_initialize( self.original_plasma, self.runner.line_interaction_type ) self.numba_integrator = NumbaFormalIntegrator( self.numba_model, self.numba_plasma, self.points )
[docs] def check(self, raises=True): """ A method that determines if the formal integral can be performed with the current configuration settings The function returns False if the configuration conflicts with the required settings. If raises evaluates to True, then a IntegrationError is raised instead """ def raise_or_return(message): if raises: raise IntegrationError(message) else: warnings.warn(message) return False for obj in (self.model, self.plasma, self.runner): if obj is None: return raise_or_return( "The integrator is missing either model, plasma or " "runner. Please make sure these are provided to the " "FormalIntegrator." ) if not self.runner.line_interaction_type in ["downbranch", "macroatom"]: return raise_or_return( "The FormalIntegrator currently only works for " 'line_interaction_type == "downbranch"' 'and line_interaction_type == "macroatom"' ) return True
[docs] def calculate_spectrum( self, frequency, points=None, interpolate_shells=0, raises=True ): # Very crude implementation # The c extension needs bin centers (or something similar) # while TARDISSpectrum needs bin edges self.check(raises) N = points or self.points if interpolate_shells == 0: # Default Value interpolate_shells = max(2 * self.model.no_of_shells, 80) warnings.warn( "The number of interpolate_shells was not " f"specified. The value was set to {interpolate_shells}." ) self.interpolate_shells = interpolate_shells frequency = frequency.to("Hz", u.spectral()) luminosity = u.Quantity(self.formal_integral(frequency, N), "erg") * ( frequency[1] - frequency[0] ) # Ugly hack to convert to 'bin edges' frequency = u.Quantity( np.concatenate( [ frequency.value, [frequency.value[-1] + np.diff(frequency.value)[-1]], ] ), frequency.unit, ) return TARDISSpectrum(frequency, luminosity)
[docs] def make_source_function(self): """ Calculates the source function using the line absorption rate estimator `Edotlu_estimator` Formally it calculates the expression ( 1 - exp(-tau_ul) ) S_ul but this product is what we need later, so there is no need to factor out the source function explicitly. Parameters ---------- model : tardis.model.Radial1DModel Returns ------- Numpy array containing ( 1 - exp(-tau_ul) ) S_ul ordered by wavelength of the transition u -> l """ model = self.model runner = self.runner macro_ref = self.atomic_data.macro_atom_references macro_data = self.atomic_data.macro_atom_data no_lvls = len(self.atomic_data.levels) no_shells = len(model.w) if runner.line_interaction_type == "macroatom": internal_jump_mask = (macro_data.transition_type >= 0).values ma_int_data = macro_data[internal_jump_mask] internal = self.original_plasma.transition_probabilities[ internal_jump_mask ] source_level_idx = ma_int_data.source_level_idx.values destination_level_idx = ma_int_data.destination_level_idx.values Edotlu_norm_factor = 1 / (runner.time_of_simulation * model.volume) exptau = 1 - np.exp(-self.original_plasma.tau_sobolevs) Edotlu = Edotlu_norm_factor * exptau * runner.Edotlu_estimator # The following may be achieved by calling the appropriate plasma # functions Jbluelu_norm_factor = ( ( const.c.cgs * model.time_explosion / (4 * np.pi * runner.time_of_simulation * model.volume) ) .to("1/(cm^2 s)") .value ) # Jbluelu should already by in the correct order, i.e. by wavelength of # the transition l->u Jbluelu = runner.j_blue_estimator * Jbluelu_norm_factor upper_level_index = self.atomic_data.lines.index.droplevel( "level_number_lower" ) e_dot_lu = pd.DataFrame(Edotlu, index=upper_level_index) e_dot_u = e_dot_lu.groupby(level=[0, 1, 2]).sum() e_dot_u_src_idx = macro_ref.loc[e_dot_u.index].references_idx.values if runner.line_interaction_type == "macroatom": C_frame = pd.DataFrame( columns=np.arange(no_shells), index=macro_ref.index ) q_indices = (source_level_idx, destination_level_idx) for shell in range(no_shells): Q = sp.coo_matrix( (internal[shell], q_indices), shape=(no_lvls, no_lvls) ) inv_N = sp.identity(no_lvls) - Q e_dot_u_vec = np.zeros(no_lvls) e_dot_u_vec[e_dot_u_src_idx] = e_dot_u[shell].values C_frame[shell] = linalg.spsolve(inv_N.T, e_dot_u_vec) e_dot_u.index.names = [ "atomic_number", "ion_number", "source_level_number", ] # To make the q_ul e_dot_u product work, could be cleaner transitions = self.original_plasma.atomic_data.macro_atom_data[ self.original_plasma.atomic_data.macro_atom_data.transition_type == -1 ].copy() transitions_index = transitions.set_index( ["atomic_number", "ion_number", "source_level_number"] ).index.copy() tmp = self.original_plasma.transition_probabilities[ (self.atomic_data.macro_atom_data.transition_type == -1).values ] q_ul = tmp.set_index(transitions_index) t = model.time_explosion.value lines = self.atomic_data.lines.set_index("line_id") wave = lines.wavelength_cm.loc[ transitions.transition_line_id ].values.reshape(-1, 1) if runner.line_interaction_type == "macroatom": e_dot_u = C_frame.loc[e_dot_u.index] att_S_ul = wave * (q_ul * e_dot_u) * t / (4 * np.pi) result = pd.DataFrame( att_S_ul.values, index=transitions.transition_line_id.values ) att_S_ul = result.loc[lines.index.values].values # Jredlu should already by in the correct order, i.e. by wavelength of # the transition l->u (similar to Jbluelu) Jredlu = Jbluelu * np.exp(-self.original_plasma.tau_sobolevs) + att_S_ul if self.interpolate_shells > 0: ( att_S_ul, Jredlu, Jbluelu, e_dot_u, ) = self.interpolate_integrator_quantities( att_S_ul, Jredlu, Jbluelu, e_dot_u ) else: runner.r_inner_i = runner.r_inner_cgs runner.r_outer_i = runner.r_outer_cgs runner.tau_sobolevs_integ = self.original_plasma.tau_sobolevs.values runner.electron_densities_integ = ( self.original_plasma.electron_densities.values ) return att_S_ul, Jredlu, Jbluelu, e_dot_u
[docs] def interpolate_integrator_quantities( self, att_S_ul, Jredlu, Jbluelu, e_dot_u ): runner = self.runner plasma = self.original_plasma nshells = self.interpolate_shells r_middle = (runner.r_inner_cgs + runner.r_outer_cgs) / 2.0 r_integ = np.linspace( runner.r_inner_cgs[0], runner.r_outer_cgs[-1], nshells ) runner.r_inner_i = r_integ[:-1] runner.r_outer_i = r_integ[1:] r_middle_integ = (r_integ[:-1] + r_integ[1:]) / 2.0 runner.electron_densities_integ = interp1d( r_middle, plasma.electron_densities, fill_value="extrapolate", kind="nearest", )(r_middle_integ) # Assume tau_sobolevs to be constant within a shell # (as in the MC simulation) runner.tau_sobolevs_integ = interp1d( r_middle, plasma.tau_sobolevs, fill_value="extrapolate", kind="nearest", )(r_middle_integ) att_S_ul = interp1d(r_middle, att_S_ul, fill_value="extrapolate")( r_middle_integ ) Jredlu = pd.DataFrame( interp1d(r_middle, Jredlu, fill_value="extrapolate")(r_middle_integ) ) Jbluelu = interp1d(r_middle, Jbluelu, fill_value="extrapolate")( r_middle_integ ) e_dot_u = interp1d(r_middle, e_dot_u, fill_value="extrapolate")( r_middle_integ ) # Set negative values from the extrapolation to zero att_S_ul = att_S_ul.clip(0.0) Jbluelu = Jbluelu.clip(0.0) Jredlu = Jredlu.clip(0.0) e_dot_u = e_dot_u.clip(0.0) return att_S_ul, Jredlu, Jbluelu, e_dot_u
[docs] def formal_integral(self, nu, N): """Do the formal integral with the numba routines""" # TODO: get rid of storage later on res = self.make_source_function() att_S_ul = res[0].flatten(order="F") Jred_lu = res[1].values.flatten(order="F") Jblue_lu = res[2].flatten(order="F") self.generate_numba_objects() L = self.numba_integrator.formal_integral( self.model.t_inner, nu, nu.shape[0], att_S_ul, Jred_lu, Jblue_lu, self.runner.tau_sobolevs_integ, self.runner.electron_densities_integ, N, ) return np.array(L, np.float64)
[docs]@njit(**njit_dict_no_parallel) def populate_z(model, p, oz, oshell_id): """Calculate p line intersections This function calculates the intersection points of the p-line with each shell Inputs: :p: (double) distance of the integration line to the center :oz: (array of doubles) will be set with z values. the array is truncated by the value `1`. :oshell_id: (int64) will be set with the corresponding shell_ids """ # abbreviations r = model.r_outer N = len(model.r_inner) # check # print(N) inv_t = 1 / model.time_explosion z = 0 offset = N if p <= model.r_inner[0]: # intersect the photosphere for i in range(N): oz[i] = 1 - calculate_z(r[i], p, inv_t) oshell_id[i] = i return N else: # no intersection with photosphere # that means we intersect each shell twice for i in range(N): z = calculate_z(r[i], p, inv_t) if z == 0: continue if offset == N: offset = i # calculate the index in the resulting array i_low = N - i - 1 # the far intersection with the shell i_up = N + i - 2 * offset # the nearer intersection with the shell # setting the arrays; check return them? oz[i_low] = 1 + z oshell_id[i_low] = i oz[i_up] = 1 - z oshell_id[i_up] = i return 2 * (N - offset)
[docs]@njit(**njit_dict_no_parallel) def calculate_z(r, p, inv_t): """Calculate distance to p line Calculate half of the length of the p-line inside a shell of radius r in terms of unit length (c * t_exp). If shell and p-line do not intersect, return 0. Inputs: :r: (double) radius of the shell :p: (double) distance of the p-line to the center of the supernova :inv_t: (double) inverse time_explosio is needed to norm to unit-length """ if r > p: return np.sqrt(r * r - p * p) * C_INV * inv_t else: return 0
[docs]class BoundsError(ValueError): pass
[docs]@njit(**njit_dict_no_parallel) def trapezoid_integration(array, h): """in the future, let's just replace this with the numpy trapz since it is numba compatable """ return np.trapz(array, dx=h)
[docs]@njit(**njit_dict_no_parallel) def intensity_black_body(nu, T): """Get the black body intensity at frequency nu and temperature T""" if nu == 0: return np.nan # to avoid ZeroDivisionError beta_rad = 1 / (KB_CGS * T) coefficient = 2 * H_CGS * C_INV * C_INV return coefficient * nu * nu * nu / (np.exp(H_CGS * nu * beta_rad) - 1)
[docs]@njit(**njit_dict_no_parallel) def calculate_p_values(R_max, N): """This can probably be replaced with a simpler function""" return np.arange(N).astype(np.float64) * R_max / (N - 1)