# file src/veldist/veldist.py
"""
Bayesian Matrix-Based Kinematic Deconvolution
=============================================
This module infers the intrinsic Line-of-Sight Velocity Distribution (LOSVD)
from discrete, heteroscedastic stellar observations using a pre-computed
linear design matrix and a hierarchical smoothness prior.
"""
import warnings
import numpy as np
import jax
import jax.numpy as jnp
import jax.scipy.special as jsp
import numpyro
import numpyro.distributions as dist
from numpyro.infer import MCMC, NUTS
import matplotlib.pyplot as plt
__all__ = [
"KinematicSolver",
"precompute_design_matrix",
"generate_smooth_curve",
"model",
"fit_all_bins",
"write_dynamite_kinematics",
]
# ==============================================================================
# Design Matrix
# ==============================================================================
[docs]
def precompute_design_matrix(obs_val, obs_err, bin_centers, bin_width=None):
"""
Computes the Probability Design Matrix (M) for 1D LOSVD inference.
This function bakes the observations and uncertainties into a static matrix, converting the
deconvolution problem into a single matrix multiplication during inference.
Parameters
----------
obs_val : array-like (N,)
Observed velocities of individual stars.
obs_err : array-like (N,)
Standard deviation (measurement error) for each star.
bin_centers : array-like (K,)
The velocity grid centers (the intrinsic histogram bins).
bin_width : float, optional
Width of bins. If None, inferred from centers.
Returns
-------
M : jnp.ndarray (N, K)
Probability matrix. M[i, j] = P(Star i | True Velocity is in Bin j)
"""
# Reshape for broadcasting: (N, 1)
y = jnp.array(obs_val)[:, None]
sig = jnp.array(obs_err)[:, None]
if bin_width is None:
bin_width = bin_centers[1] - bin_centers[0]
# Define bin edges (K+1 edges for K bins)
edges = jnp.concatenate(
[bin_centers - bin_width / 2, jnp.array([bin_centers[-1] + bin_width / 2])]
)
# --- Box Integration ---
# Instead of evaluating a Gaussian PDF at the bin center, we integrate
# the Gaussian over the full width of the bin.
# This prevents aliasing when error < bin_width.
# Calculate Z-scores at every bin edge
# Broadcasting: (1, K+1) - (N, 1) -> (N, K+1)
z = (edges[None, :] - y) / (sig * jnp.sqrt(2))
# Error Function (CDF of Gaussian)
cdf_values = 0.5 * (1 + jsp.erf(z))
# The probability mass in Bin j is CDF(Right Edge) - CDF(Left Edge)
prob_matrix = jnp.diff(cdf_values, axis=1)
# Numerical stability: Add epsilon to prevent log(0)
return prob_matrix + 1e-30
# ==============================================================================
# Smoothness Prior
# ==============================================================================
[docs]
def generate_smooth_curve(N_bins, smoothness_sigma):
"""
Generates a 1D smooth curve using a Gaussian Random Walk.
Model:
step[i] ~ Normal(0, sigma)
curve[i] = curve[i-1] + step[i]
Parameters
----------
N_bins : int
Number of velocity bins.
smoothness_sigma : float
Controls the flexibility of the curve.
- Low sigma: Stiff, very smooth.
- High sigma: Flexible, jagged.
Returns
-------
curve : jnp.ndarray (N_bins,)
Latent log-density curve.
"""
# Sample the *steps* between bins, not the bins themselves.
# This enforces the neighbor-to-neighbor correlation.
steps = numpyro.sample(
"steps", dist.Normal(0.0, smoothness_sigma), sample_shape=(N_bins - 1,)
)
# Reconstruct the curve via cumulative sum.
# We fix the first bin at 0.0 (relative scale).
curve = jnp.concatenate([jnp.array([0.0]), jnp.cumsum(steps)])
return curve
# ==============================================================================
# Model Inference
# ==============================================================================
[docs]
def model(matrix, n_bins):
"""
The Model definition for NumPyro.
Parameters
----------
matrix : jnp.ndarray (N_stars, N_bins)
The pre-computed Design Matrix M.
n_bins : int
Number of velocity bins.
Returns
-------
None
This function defines the probabilistic graph and has no return value.
"""
# --- Hyperparameters ---
# We infer the smoothness sigma from the data.
# HalfNormal(0.1) is a relatively conservative prior, favoring smooth curves.
smoothness_sigma = numpyro.sample("smoothness_sigma", dist.HalfNormal(0.1))
# We infer the Total Flux (Number of Stars) separately from the shape.
# TODO: test this vs simple Dirichlet prior on the weights.
N_stars = matrix.shape[0]
total_flux = numpyro.sample(
"total_flux",
dist.TruncatedNormal(
loc=float(N_stars), scale=jnp.sqrt(float(N_stars)), low=0.0
),
)
# --- Latent Density ---
# Generate the log-density curve using the Random Walk prior
latent_curve = generate_smooth_curve(n_bins, smoothness_sigma)
# --- Normalization ---
# Fix Scale Ambiguity:
centered_curve = latent_curve - jnp.mean(latent_curve)
# Convert latent values to valid Probabilities (Sum = 1)
intrinsic_pdf = jax.nn.softmax(centered_curve)
# Save the PDF for plotting and later use
numpyro.deterministic("intrinsic_pdf", intrinsic_pdf)
# --- Likelihood ---
# The probability of observing Star i is the weighted sum of the
# probabilities of it coming from each bin.
# P(Star_i) = DotProduct( Row_i, Intrinsic_Weights )
per_star_prob = jnp.dot(matrix, intrinsic_pdf)
# Poisson Log-Likelihood
# LogLik = Sum( log( P_i * Flux ) )
log_prob = jnp.sum(jnp.log(per_star_prob)) + N_stars * jnp.log(total_flux)
# Register the likelihood with NumPyro
numpyro.factor("obs_log_lik", log_prob)
# ==============================================================================
# Solver Class
# ==============================================================================
[docs]
class KinematicSolver:
"""
A high-level interface for performing Bayesian kinematic deconvolution.
This class manages the full inference workflow:
1. Defining the velocity grid (``setup_grid``).
2. Ingesting data and building the design matrix (``add_data``).
3. Running the MCMC sampler (``run``).
4. Visualizing the results (``plot_result``).
Attributes
----------
matrix : jnp.ndarray or None
The pre-computed design matrix of shape (N_stars, N_bins).
grid : dict
Metadata defining the velocity grid (centers, edges, width).
n_stars : int or None
Number of stars loaded via ``add_data``. Used as the ``bin_flux``
analog when writing Dynamite output (see ``write_dynamite_kinematics``).
samples : dict or None
Posterior samples from the MCMC run.
clipped_samples : dict or None
Per-bin summary statistics (median LOSVD and clipped uncertainties)
populated by ``clip_uncertainties``.
"""
def __init__(self):
"""
Initializes the KinematicSolver instance.
"""
self.matrix = None
self.grid = {}
self.n_stars = None
self.samples = None
self.clipped_samples = None
[docs]
def setup_grid(self, center, width, n_bins):
"""
Define the velocity grid (histogram bins).
Parameters
----------
center : float
Center of the velocity grid.
width : float
Total width of the velocity grid.
n_bins : int
Number of bins in the grid.
Returns
-------
None
Sets ``self.grid``.
"""
edges = np.linspace(center - width / 2, center + width / 2, n_bins + 1)
centers = 0.5 * (edges[:-1] + edges[1:])
self.grid = {
"centers": centers,
"edges": edges,
"width": edges[1] - edges[0],
"n_bins": n_bins,
}
[docs]
def add_data(self, vel, err):
"""
Load observations and pre-compute the design matrix.
Parameters
----------
vel : array-like
Observed velocities of stars.
err : array-like
Measurement errors associated with the velocities.
Returns
-------
None
Sets ``self.matrix``.
"""
if not self.grid:
msg = "Run setup_grid() first."
raise ValueError(msg)
self.n_stars = len(vel)
print(f"Computing Design Matrix for {self.n_stars} stars...")
# This one-time computation replaces the convolution loop
self.matrix = precompute_design_matrix(
vel, err, self.grid["centers"], bin_width=self.grid["width"]
)
print(f"Matrix ready. Shape: {self.matrix.shape}")
[docs]
def run(self, num_warmup=500, num_samples=1000, gpu=True, seed=5567):
"""
Run the NUTS sampler.
Parameters
----------
num_warmup : int
Number of warmup (burn-in) steps.
num_samples : int
Number of MCMC samples to draw.
gpu : bool
Whether to use GPU acceleration (if available).
seed : int
RNG seed for the NUTS sampler. Default 5567 (kept for backwards
compatibility). When running many bins in a batch, pass distinct
seeds per bin to avoid any correlation in the sampling chains; a
simple convention is ``seed + bin_index`` (see ``fit_all_bins``).
Returns
-------
samples : dict
Posterior samples (e.g., "intrinsic_pdf", "smoothness_sigma").
"""
if self.matrix is None:
msg = "No data added."
raise ValueError(msg)
if gpu:
numpyro.set_platform("gpu")
print("Starting NUTS MCMC...")
nuts_kernel = NUTS(model)
mcmc = MCMC(nuts_kernel, num_warmup=num_warmup, num_samples=num_samples)
rng_key = jax.random.PRNGKey(int(seed))
mcmc.run(rng_key, matrix=self.matrix, n_bins=self.grid["n_bins"])
self.samples = mcmc.get_samples()
# Invalidate any previously computed clipped summary when re-running.
self.clipped_samples = None
print("Inference Complete.")
return self.samples
[docs]
def plot_result(self, ax=None, true_intrinsic=None):
"""
Visualize the inferred LOSVD.
Parameters
----------
ax : matplotlib.axes.Axes, optional
Axes to plot on. If None, creates a new figure.
true_intrinsic : array-like, optional
True intrinsic velocities for comparison (if available).
Returns
-------
ax : matplotlib.axes.Axes
The plot axes.
"""
# The model returns "Probability Mass" (sum = 1).
# We need "Probability Density" (integral = 1) for plotting.
# Density = Mass / Bin_Width
# TODO: decide if we want to work in density space internally
pdf_samples_mass = self.samples["intrinsic_pdf"]
dx = self.grid["width"]
# Convert to Density
pdf_samples_density = pdf_samples_mass / dx
mean_pdf = jnp.mean(pdf_samples_density, axis=0)
# 68 percent credible interval
lower = jnp.percentile(pdf_samples_density, 16, axis=0)
upper = jnp.percentile(pdf_samples_density, 84, axis=0)
if ax is None:
fig, ax = plt.subplots()
x = self.grid["centers"]
# Plot Model, as a mid-step plot
ax.step(x, mean_pdf, where="mid", color="tab:green", label="Inferred Intrinsic")
ax.fill_between(
x,
lower,
upper,
color="tab:green",
alpha=0.3,
step="mid",
)
# Plot Truth
if true_intrinsic is not None:
if callable(true_intrinsic):
# If function, evaluate on a fine grid for smooth curve
x_fine = np.linspace(
self.grid["edges"][0],
self.grid["edges"][-1],
500, # High resolution
)
y_true = true_intrinsic(x_fine)
ax.plot(
x_fine,
y_true,
color="k",
ls="--",
linewidth=1.5,
label="True Intrinsic",
)
else:
# If samples, histogram with density=True
true_hist, _ = np.histogram(
true_intrinsic, bins=self.grid["edges"], density=True
)
ax.plot(x, true_hist, color="k", ls="--", label="True Intrinsic")
ax.set_xlabel("Velocity")
ax.set_ylabel("Probability Density")
ax.legend()
return ax
[docs]
def clip_uncertainties(self, floor_fraction=0.01, abs_floor=1e-10):
"""
Apply uncertainty floors and store LOSVD summary statistics.
This is a **post-processing step** that does *not* modify the raw
posterior samples in ``self.samples``. It summarises the posterior
as per-bin marginal medians and half-CI-widths in probability-mass
space, then raises the uncertainties to a floor so that no bin carries
a zero into the Dynamite output writer.
Output format matches the Dynamite ``BayesLOSVD`` ECSV convention
(see ``context/dynamite_format_spec.md``):
- ``losvd_median`` stores the per-bin **marginal median** of the
posterior probability mass. Because the joint posterior is a
simplex but marginals are taken independently, the median values
typically *sum to 0.85–0.95*, not 1. This is expected and correct.
- ``losvd_uncertainty`` stores the **half-width** of the 68% credible
interval: ``(p84 − p16) / 2``. Used as symmetric ±error bars.
Both quantities are **dimensionless probability mass per bin** — they
are *not* divided by the bin width.
Motivation
----------
Zero uncertainties in LOSVD bins propagate into Dynamite's internal
NNLS projection matrices and produce ``econ`` zeros that cause weight-
solving failures in large orbit-library runs. The relative floor
(``floor_fraction * max_uncertainty``) is the primary safeguard; the
absolute floor is a numerical backstop for channels where the posterior
is pathologically tight across the board.
Parameters
----------
floor_fraction : float
Relative floor as a fraction of the maximum per-bin half-CI-width
across all bins. Default 0.01 (1%).
abs_floor : float
Absolute floor applied after the relative floor. Default 1e-10.
Returns
-------
None
Sets ``self.clipped_samples`` as a dict with keys:
- ``'losvd_median'`` — per-bin marginal median, probability
mass (dimensionless); shape (n_bins,).
- ``'losvd_uncertainty'`` — clipped half-width of 68% CI,
probability mass; shape (n_bins,).
"""
if self.samples is None:
raise ValueError(
"No posterior samples found. Call run() before clip_uncertainties()."
)
# Work in probability-mass space throughout.
# self.samples["intrinsic_pdf"] has shape (n_samples, n_bins);
# each row is a valid probability mass function (sums to 1).
pdf_mass = np.asarray(self.samples["intrinsic_pdf"])
# Sanity check: the MEAN of valid mass samples must also sum to ~1.
mean_mass = np.mean(pdf_mass, axis=0)
mean_sum = np.sum(mean_mass)
assert np.isclose(mean_sum, 1.0, rtol=1e-3), (
f"Posterior mean LOSVD sums to {mean_sum:.6f}, expected ~1.0. "
"Check that self.samples['intrinsic_pdf'] contains valid probability "
"mass functions (each row should sum to 1)."
)
# Per-bin marginal statistics.
median_mass = np.percentile(pdf_mass, 50, axis=0)
p16 = np.percentile(pdf_mass, 16, axis=0)
p84 = np.percentile(pdf_mass, 84, axis=0)
# Half-width of 68% CI (used as symmetric ±uncertainty in Dynamite).
raw_half_width = (p84 - p16) / 2.0
# Relative floor: a fraction of the widest half-CI in this LOSVD.
rel_floor = floor_fraction * np.max(raw_half_width)
clipped = np.maximum(raw_half_width, rel_floor)
clipped = np.maximum(clipped, abs_floor)
self.clipped_samples = {
"losvd_median": median_mass,
"losvd_uncertainty": clipped,
}
[docs]
def truncate_losvd(self, n_sigma=3.0, abs_floor=1e-10):
"""
Suppress LOSVD bins beyond ``n_sigma`` dispersions from the bulk mean.
.. note::
**This method should only be called if tail contamination is
observed to be causing problems** (e.g. non-negligible posterior
mass in channels well outside the range of the input data, or
``econ`` zeros in Dynamite that persist after
``clip_uncertainties``). It is *not* part of the standard
pipeline and is not called by :func:`fit_all_bins`. When in
doubt, omit it — modifying the posterior summary without a clear
diagnostic reason introduces unnecessary bias.
Velocity channels far from the data carry unphysical posterior weight
due to the random-walk prior leaking into unconstrained tails. This
method zeros the mean density in those channels and sets their
uncertainty to ``abs_floor``, preventing Dynamite from trying to fit
orbits in unphysical regimes.
Approach follows the velocity-range truncation strategy discussed in
Falcón-Barroso & Martig (2021, A&A).
This is a **post-processing step** and does *not* modify
``self.samples``. It operates on ``self.clipped_samples``, creating
it first (with default floors) if it is not already present.
Parameters
----------
n_sigma : float
Number of velocity dispersions beyond which to truncate.
Default 3.0.
abs_floor : float
Uncertainty value assigned to truncated bins (must be > 0 to
avoid ``econ`` zeros in Dynamite). Default 1e-10.
Returns
-------
None
Updates ``self.clipped_samples`` in-place.
"""
if self.samples is None:
raise ValueError(
"No posterior samples found. Call run() before truncate_losvd()."
)
if self.clipped_samples is None:
self.clip_uncertainties()
centers = np.asarray(self.grid["centers"])
# Use the posterior-mean LOSVD (probability mass) to compute the bulk
# mean velocity and velocity dispersion of the distribution.
mean_pdf_mass = np.mean(np.asarray(self.samples["intrinsic_pdf"]), axis=0)
v_mean = np.dot(centers, mean_pdf_mass)
v_var = np.dot((centers - v_mean) ** 2, mean_pdf_mass)
v_std = np.sqrt(v_var)
truncation_mask = np.abs(centers - v_mean) > n_sigma * v_std
losvd_median = self.clipped_samples["losvd_median"].copy()
losvd_unc = self.clipped_samples["losvd_uncertainty"].copy()
losvd_median[truncation_mask] = 0.0
losvd_unc[truncation_mask] = abs_floor
self.clipped_samples["losvd_median"] = losvd_median
self.clipped_samples["losvd_uncertainty"] = losvd_unc
# ==============================================================================
# Batch API
# ==============================================================================
[docs]
def fit_all_bins(bin_data_list, grid_kwargs, run_kwargs=None, min_stars=10):
"""
Run the full inference pipeline for a list of Voronoi bins.
For each bin, this executes the ``setup_grid`` → ``add_data`` → ``run``
→ ``clip_uncertainties`` pipeline and returns a list of
:class:`KinematicSolver` instances ready for the Dynamite output writer.
Bins with too few stars are skipped (returning ``None`` at that position)
so the writer can mask them.
:meth:`~KinematicSolver.truncate_losvd` is deliberately *not* called
here. It is an optional diagnostic repair step; call it manually on
individual solvers only if tail contamination is observed to be causing
problems.
The same velocity grid is used for every bin (``grid_kwargs`` is shared).
Each bin receives a unique RNG seed derived as ``base_seed + bin_index``
to avoid correlations between sampling chains.
Parameters
----------
bin_data_list : list of dict
One dict per Voronoi bin. Required keys:
- ``'vel'`` — array of observed stellar velocities.
- ``'err'`` — array of per-star measurement errors.
Any additional keys (e.g. spatial metadata) are ignored here and
can be passed separately to the output writer.
grid_kwargs : dict
Keyword arguments forwarded to :meth:`KinematicSolver.setup_grid`
(``center``, ``width``, ``n_bins``). Shared across all bins.
run_kwargs : dict, optional
Keyword arguments forwarded to :meth:`KinematicSolver.run`
(e.g. ``num_warmup``, ``num_samples``, ``gpu``). The ``seed``
key, if present, is used as the *base* seed; each bin then receives
``seed + bin_index``. Defaults to ``{}`` (all ``run`` defaults
apply).
min_stars : int
Minimum number of stars required to attempt inference. Bins with
fewer stars are skipped with a warning. Default 10.
Returns
-------
solvers : list
One entry per input bin. Entries are either a fully solved
:class:`KinematicSolver` (with ``samples`` and ``clipped_samples``
populated) or ``None`` for skipped bins.
Examples
--------
>>> solvers = fit_all_bins(
... bin_data_list,
... grid_kwargs={"center": 0.0, "width": 600.0, "n_bins": 60},
... run_kwargs={"num_warmup": 500, "num_samples": 1000, "gpu": False},
... )
>>> # Pass to the output writer (Task 2):
>>> solver.write_dynamite_kinematics(output_dir, voronoi_bin_metadata)
"""
if run_kwargs is None:
run_kwargs = {}
# Extract the base seed so we can derive per-bin seeds.
run_kwargs = dict(run_kwargs)
base_seed = run_kwargs.pop("seed", 5567)
n_total = len(bin_data_list)
solvers = []
for i, bin_data in enumerate(bin_data_list):
print(f"Fitting bin {i + 1}/{n_total}...")
vel = np.asarray(bin_data["vel"])
err = np.asarray(bin_data["err"])
if len(vel) < min_stars:
warnings.warn(
f"Bin {i} has only {len(vel)} star(s) (minimum is {min_stars}). "
"Skipping — this bin will appear as None in the output list and "
"should be masked in the Dynamite input files.",
stacklevel=2,
)
solvers.append(None)
continue
solver = KinematicSolver()
solver.setup_grid(**grid_kwargs)
solver.add_data(vel=vel, err=err)
solver.run(seed=base_seed + i, **run_kwargs)
solver.clip_uncertainties()
solvers.append(solver)
n_solved = sum(s is not None for s in solvers)
n_skipped = n_total - n_solved
print(
f"Done. {n_solved}/{n_total} bins solved"
+ (f", {n_skipped} skipped." if n_skipped else ".")
)
return solvers
# ==============================================================================
# Dynamite Output Writer
# ==============================================================================
[docs]
def write_dynamite_kinematics(
solvers,
output_dir,
voronoi_bin_metadata,
kin_filename="bayes_losvd_kins.ecsv",
aperture_filename="aperture.dat",
bins_filename="bins.dat",
bin_flux_mode="nstars",
):
"""
Write Dynamite-compatible BayesLOSVD input files from a list of solved bins.
Produces three files that Dynamite expects for its ``BayesLOSVD`` kinematics
representation (see ``context/dynamite_format_spec.md`` for full format
details):
- ``{kin_filename}`` — Astropy ECSV, one row per solved Voronoi bin,
containing the per-bin marginal median LOSVD and ±half-CI uncertainties.
- ``{aperture_filename}`` — pixel grid geometry.
- ``{bins_filename}`` — pixel-to-bin mapping.
Any ``None`` entries in ``solvers`` (bins skipped by :func:`fit_all_bins`)
are automatically masked: their pixels are written as 0 in the bins file
and they are omitted from the kinematics table. The remaining bins are
re-numbered sequentially (1-indexed) as required by Dynamite.
:func:`clip_uncertainties` is called automatically on any solver that has
not already had its ``clipped_samples`` populated.
Parameters
----------
solvers : list
Solved :class:`KinematicSolver` instances (or ``None`` for skipped
bins), as returned by :func:`fit_all_bins`. All non-``None`` entries
must share the same velocity grid.
output_dir : str or path-like
Directory in which to write the three output files. Created if it
does not exist.
voronoi_bin_metadata : dict
Spatial and observational metadata. Required structure::
{
'bins': [
{
'xbin': float, # arcsec, bin centre x
'ybin': float, # arcsec, bin centre y
# 'bin_flux' only read when bin_flux_mode='custom'
'bin_flux': float,
},
... # one entry per entry in solvers
],
'aperture': {
'x_start': float, # arcsec, lower-left corner x
'y_start': float, # arcsec, lower-left corner y
'x_size': float, # arcsec, total x extent
'y_size': float, # arcsec, total y extent
'angle_deg': float, # degrees (= 90 - position_angle)
'nx': int, # pixels along x
'ny': int, # pixels along y
},
# 1-indexed bin IDs (bin 1 = solvers[0], bin 2 = solvers[1], …)
# 0 = masked. Skipped bins are re-mapped to 0 automatically.
'pixel_bin_ids': array-like, # shape (nx*ny,) or (ny, nx)
'psf': {
'sigma': [float, ...], # Gaussian sigma(s) in arcsec
'weight': [float, ...], # must sum to 1
},
}
kin_filename : str
File name for the kinematics ECSV. Default ``'bayes_losvd_kins.ecsv'``.
aperture_filename : str
File name for the aperture file. Default ``'aperture.dat'``.
bins_filename : str
File name for the bins file. Default ``'bins.dat'``.
bin_flux_mode : {'nstars', 'uniform', 'custom'}
Controls what is written to the ``bin_flux`` column.
``'nstars'`` *(default)*
Use the number of stars in each bin
(``solver.n_stars``, set by :meth:`~KinematicSolver.add_data`).
This is the physically meaningful analog of IFU surface brightness
for discrete stellar kinematic data. ``bin_flux`` is used by
Dynamite **only** for flux-weighted systemic velocity centering
(``center_v_systemic``); it does **not** enter the NNLS chi²
(confirmed from ``NNLS.construct_nnls_matrix_and_rhs`` and
``BayesLOSVD.get_observed_values_and_uncertainties`` in
Dynamite's source). N_stars is the physically appropriate
quantity to pass for discrete stellar data.
``'uniform'``
Write 1.0 for every bin. Use this if you want equal weighting
of all bins in any flux-weighted calculation, or if you are
uncertain about the right quantity and want a neutral default.
``'custom'``
Read ``bin_flux`` from ``voronoi_bin_metadata['bins'][i]['bin_flux']``
for each bin. Useful if you have an external flux estimate
(e.g. sum of photometric counts in each Voronoi cell).
Raises
------
ValueError
If no solved bins are found, or if solvers share inconsistent grids.
AssertionError
If any LOSVD uncertainty is ≤ 0 after clipping (would cause ``econ``
zeros in Dynamite).
Returns
-------
None
"""
try:
from astropy.table import Table
except ImportError as exc:
raise ImportError(
"astropy is required for write_dynamite_kinematics(). "
"Install it with: pip install astropy"
) from exc
from pathlib import Path
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
# ------------------------------------------------------------------
# Identify solved bins and validate grid consistency
# ------------------------------------------------------------------
solved_indices = [i for i, s in enumerate(solvers) if s is not None]
n_solved = len(solved_indices)
if n_solved == 0:
raise ValueError("No solved bins found (all solvers are None).")
ref_solver = solvers[solved_indices[0]]
vcent = np.asarray(ref_solver.grid["centers"])
dv = float(ref_solver.grid["width"])
nvbins = int(ref_solver.grid["n_bins"])
for idx in solved_indices[1:]:
s = solvers[idx]
if s.grid["n_bins"] != nvbins or not np.allclose(s.grid["centers"], vcent):
raise ValueError(
f"Solver at index {idx} has a different velocity grid than "
f"solver at index {solved_indices[0]}. All bins must share "
"the same grid (set via setup_grid / fit_all_bins)."
)
# ------------------------------------------------------------------
# Gather LOSVD summaries (auto-clip if needed)
# ------------------------------------------------------------------
bin_metas = voronoi_bin_metadata["bins"]
psf = voronoi_bin_metadata.get("psf", {"sigma": [1.0], "weight": [1.0]})
losvd_all = np.zeros((n_solved, nvbins))
dlosvd_all = np.zeros((n_solved, nvbins))
for out_i, orig_i in enumerate(solved_indices):
solver = solvers[orig_i]
if solver.clipped_samples is None:
solver.clip_uncertainties()
losvd_all[out_i] = solver.clipped_samples["losvd_median"]
dlosvd_all[out_i] = solver.clipped_samples["losvd_uncertainty"]
# Guard: no zero uncertainties (would corrupt Dynamite's NNLS matrices).
assert np.all(dlosvd_all > 0), (
"Zero or negative uncertainty found after clipping. "
"This would cause econ zeros in Dynamite. "
"Check clip_uncertainties() floor settings."
)
# ------------------------------------------------------------------
# Compute v and sigma from normalised median LOSVD (per format spec)
# ------------------------------------------------------------------
losvd_sums = losvd_all.sum(axis=1, keepdims=True) # (n_solved, 1)
losvd_norm = losvd_all / losvd_sums # l_hat, sums to 1
v_vals = (vcent[np.newaxis, :] * losvd_norm).sum(axis=1)
sigma_vals = np.sqrt(
(((vcent[np.newaxis, :] - v_vals[:, np.newaxis]) ** 2) * losvd_norm).sum(axis=1)
)
# ------------------------------------------------------------------
# Write kinematics ECSV
# ------------------------------------------------------------------
data = {}
data["binID_BayesLOSVD"] = np.arange(n_solved)
# Compute bin_flux according to requested mode.
if bin_flux_mode == "nstars":
missing = [
solved_indices[k]
for k, idx in enumerate(solved_indices)
if solvers[idx].n_stars is None
]
if missing:
raise ValueError(
f"bin_flux_mode='nstars' requires that add_data() was called "
f"on every solver, but solvers at indices {missing} have "
"n_stars=None. Call add_data() before fit_all_bins(), or "
"use bin_flux_mode='uniform' / 'custom' instead."
)
bin_flux_vals = np.array(
[float(solvers[i].n_stars) for i in solved_indices]
)
elif bin_flux_mode == "uniform":
bin_flux_vals = np.ones(n_solved, dtype=float)
elif bin_flux_mode == "custom":
bin_flux_vals = np.array(
[float(bin_metas[i].get("bin_flux", 1.0)) for i in solved_indices]
)
else:
raise ValueError(
f"bin_flux_mode must be 'nstars', 'uniform', or 'custom'; "
f"got {bin_flux_mode!r}."
)
data["bin_flux"] = bin_flux_vals
data["binID_dynamite"] = np.arange(1, n_solved + 1)
data["v"] = v_vals
data["sigma"] = sigma_vals
data["xbin"] = np.array([bin_metas[i]["xbin"] for i in solved_indices])
data["ybin"] = np.array([bin_metas[i]["ybin"] for i in solved_indices])
# Interleave losvd_j and dlosvd_j columns (required column order).
for j in range(nvbins):
data[f"losvd_{j}"] = losvd_all[:, j]
data[f"dlosvd_{j}"] = dlosvd_all[:, j]
table = Table(data)
# Metadata written in !!omap order to match BayesLOSVD convention.
table.meta["dv"] = dv
table.meta["vcent"] = vcent.tolist()
table.meta["nbins"] = n_solved
table.meta["nvbins"] = nvbins
table.meta["PSF"] = psf
kin_path = output_dir / kin_filename
table.write(str(kin_path), format="ascii.ecsv", overwrite=True)
print(f"Written kinematics ({n_solved} bins): {kin_path}")
# ------------------------------------------------------------------
# Write aperture.dat
# ------------------------------------------------------------------
ap = voronoi_bin_metadata["aperture"]
ap_path = output_dir / aperture_filename
with open(ap_path, "w") as f:
f.write("#counter_rotation_boxed_aperturefile_version_2 \n")
f.write(f"\t{ap['x_start']:f}\t{ap['y_start']:f} \n")
f.write(f"\t{ap['x_size']:f}\t{ap['y_size']:f} \n")
f.write(f"\t{ap['angle_deg']:f} \n")
f.write(f"\t{ap['nx']}\t{ap['ny']} \n")
print(f"Written aperture: {ap_path}")
# ------------------------------------------------------------------
# Write bins.dat
# Re-map original 1-indexed bin IDs to sequential solved-only IDs.
# Pixels for skipped bins become 0 (masked).
# ------------------------------------------------------------------
# Build mapping: original 1-indexed ID -> new 1-indexed ID (0 if skipped).
n_total = len(solvers)
orig_to_new = np.zeros(n_total + 1, dtype=int) # index 0 unused
for new_id, orig_i in enumerate(solved_indices, start=1):
orig_to_new[orig_i + 1] = new_id # orig_i is 0-based; +1 for 1-based
pixel_ids = np.asarray(voronoi_bin_metadata["pixel_bin_ids"]).flatten().astype(int)
remapped = np.where(
(pixel_ids > 0) & (pixel_ids <= n_total),
orig_to_new[pixel_ids],
0,
)
total_pixels = len(remapped)
bins_path = output_dir / bins_filename
with open(bins_path, "w") as f:
f.write("#Counterrotation_binning_version_1\n")
f.write(f"{total_pixels}\n")
for start in range(0, total_pixels, 10):
chunk = remapped[start : start + 10]
f.write("\t" + "\t".join(str(v) for v in chunk) + "\n")
print(f"Written bins: {bins_path}")