Note: This document was generated by Claude (Opus) with human feedback.
In a standard PyMC model, each observed data point contributes equally to the joint log-probability. When you write
pm.Normal("obs", mu=mu, sigma=sigma, observed=data)PyMC computes elementwise log-probabilities and sums them. There is no built-in hook to scale individual contributions before that reduction.
Power likelihoods generalize the standard likelihood by raising each
observation's contribution to a per-observation exponent
In log space this is simply a weighted sum:
When
Suppose you observe
Per-observation weights let you correct for this. If you can estimate each
observation's effective multiplicity (e.g. via autocorrelation analysis, or
simply by counting how many times a value appears), you assign
PyMC's pm.CustomDist lets you supply a custom logp function, which is all
you need to inject per-observation weights.
pm.logp(dist, value) returns the elementwise log-probability as a
PyTensor expression, so you can multiply by a weight tensor before PyMC reduces
it into the joint log-probability.
def weighted_logp(value, weight, mu, sigma):
dist = pm.Normal.dist(mu=mu, sigma=sigma)
return weight * pm.logp(dist, value)The signature is logp(value, *dist_params) where value is the observed data
tensor and *dist_params are the positional arguments you pass to CustomDist
(after the name).
Because the return value is a PyTensor expression, automatic differentiation works and NUTS can compute gradients through the weighted likelihood.
The random function is used by pm.sample_prior_predictive() and
pm.sample_posterior_predictive(). It should ignore the weights and sample
from the base distribution — weights are an inferential device, not part of the
data-generating process.
def weighted_random(weight, mu, sigma, rng=None, size=None):
return pm.Normal.dist(mu=mu, sigma=sigma, rng=rng, size=size)import numpy as np
import pymc as pm
weights = np.array([...]) # per-observation, shape matching data
data = np.array([...])
with pm.Model() as model:
mu = pm.Normal("mu", mu=0, sigma=10)
sigma = pm.HalfNormal("sigma", sigma=2)
pm.CustomDist(
"obs",
weights, # → `weight` in logp / random
mu, # → `mu` in logp / random
sigma, # → `sigma` in logp / random
logp=weighted_logp,
random=weighted_random,
observed=data,
)Positional arguments after the name are forwarded to both logp and random
in order.
When observations are multivariate (e.g. 2-D points), the weights are typically per-observation rather than per-component. Use broadcasting to apply a 1-D weight vector across the component axis:
# data shape: (n_obs, n_components)
# weights shape: (n_obs,)
pm.CustomDist(
"obs",
weights[:, None], # broadcast over components
mu,
sigma,
logp=weighted_logp,
random=weighted_random,
observed=data,
)The pattern works with any PyMC distribution. Replace pm.Normal.dist with
pm.StudentT.dist, pm.Laplace.dist, or any other distribution that supports
pm.logp:
def weighted_logp(value, weight, mu, sigma):
dist = pm.StudentT.dist(nu=6, mu=mu, sigma=sigma)
return weight * pm.logp(dist, value)When all weights are the same scalar
This is equivalent to tempering the entire likelihood. Setting
The implementation simplifies: there is no need for per-observation weight arrays, and you can use a scalar directly.
temperature = 0.5 # scalar weight applied to all observations
def tempered_logp(value, temperature, mu, sigma):
dist = pm.Normal.dist(mu=mu, sigma=sigma)
return temperature * pm.logp(dist, value)
def tempered_random(temperature, mu, sigma, rng=None, size=None):
return pm.Normal.dist(mu=mu, sigma=sigma, rng=rng, size=size)
with pm.Model() as model:
mu = pm.Normal("mu", mu=0, sigma=10)
sigma = pm.HalfNormal("sigma", sigma=2)
pm.CustomDist(
"obs",
temperature,
mu,
sigma,
logp=tempered_logp,
random=tempered_random,
observed=data,
)In the constant-weight case, the effect on the posterior is equivalent to
observing a fractional number of data points:
-
Correcting for duplicates / autocorrelation: if your
$n$ observations have an effective sample size of$n_{\text{eff}}$ , setting$w = n_{\text{eff}} / n$ recovers the appropriate posterior width. -
Safe Bayes / generalized Bayesian learning: tempering with
$w < 1$ guards against model misspecification by preventing the likelihood from overwhelming the prior. -
Thermodynamic integration: sweeping
$w$ from 0 to 1 to estimate the marginal likelihood (model evidence) for Bayesian model comparison. -
Cold posteriors (
$w > 1$ ): sharpening the posterior beyond what the data alone would justify, sometimes used in deep learning.
Note that because a constant weight does not change the relative contribution of observations, it does not help when different observations have different levels of redundancy — use per-observation weights for that.
When the data come from a stationary time series, the observations are not independent: nearby samples are correlated, so they carry less information than the same number of i.i.d. draws. The effective sample size (ESS) quantifies how many independent observations the correlated series is worth.
The autocorrelation function
For i.i.d. data,
Under an i.i.d. likelihood, the variance of the posterior mean scales as
The parenthesized factor is
For large
The denominator
You cannot sum
Step 1: Estimate the autocorrelation function. Compute the sample autocovariance via FFT (fast, $O(n \log n)$) and normalize by the variance:
import numpy as np
def sample_autocorrelation(y: np.ndarray) -> np.ndarray:
"""Estimate the autocorrelation function using FFT."""
n = len(y)
y_centered = y - y.mean()
# Zero-pad to avoid circular correlation artifacts
fft_result = np.fft.fft(y_centered, n=2 * n)
acf_raw = np.fft.ifft(fft_result * np.conj(fft_result)).real[:n]
# Normalize: acf[0] = variance, divide to get correlation
return acf_raw / acf_raw[0]Step 2: Truncate the sum. Raw autocorrelation estimates are noisy at high
lags. A common approach (Geyer, 1992) is the initial positive sequence
estimator: sum consecutive pairs
def integrated_autocorrelation_time(rho: np.ndarray) -> float:
"""Estimate the integrated autocorrelation time using Geyer's
initial positive sequence method."""
n = len(rho)
tau = rho[0] # = 1.0 (the lag-0 term)
for k in range(1, n // 2):
pair_sum = rho[2 * k - 1] + rho[2 * k]
if pair_sum < 0:
break
tau += 2 * pair_sum
return tauStep 3: Compute ESS.
rho = sample_autocorrelation(data)
tau = integrated_autocorrelation_time(rho)
n_eff = len(data) / tauConstant weight (global tempering). If autocorrelation is roughly stationary across the series, a single scalar weight suffices:
This is the simplest approach and is appropriate when the autocorrelation structure is homogeneous.
Per-observation weights. If the autocorrelation varies over the series (e.g. a non-stationary process, or a series with gaps), you can compute local ESS in sliding windows and derive per-observation weights. A simple scheme:
- Divide the series into overlapping windows.
- Estimate
$\tau$ within each window. - Assign
$w_i = 1 / \tau_{\text{local}}$ for observations in that window. - Rescale so that
$\sum w_i = n_{\text{eff}}$ (the global ESS).
Exact duplicates are the extreme case of autocorrelation (