Skip to content

Instantly share code, notes, and snippets.

@Nikolaj-K
Last active October 1, 2023 12:06
Show Gist options
  • Save Nikolaj-K/f660de8cec4551cfb879479470625e20 to your computer and use it in GitHub Desktop.
Save Nikolaj-K/f660de8cec4551cfb879479470625e20 to your computer and use it in GitHub Desktop.
Absorption probabilities in finite Markov chains
"""
Code used in the video
https://youtu.be/BiViLT6FCC4
"""
import random
import numpy as np
class LinAlgLib: # Linear algebra'ish functions
def is_positive_vec(vec) -> bool:
return all(elem >= 0 for elem in vec)
def is_positive_mat(mat) -> bool:
return all(map(LinAlgLib.is_positive_vec, row) for row in mat)
def is_non_zero_vec(vec, prec) -> bool:
return any(abs(elem) > prec for elem in vec)
def l1_normalized(vec) -> list: # assumes all v elements are positive
PREC = 10**-8
assert LinAlgLib.is_non_zero_vec(vec, PREC), f"Tried to normalize the zero vector, {vec}"
n = sum(abs(elem) for elem in vec)
return [elem / n for elem in vec]
def normalized_rows(mat) -> np.array:
"""
Takes any matrix of positive doubles and return a valid stochastic matrix by normalizing the rows.
:param mat: A matrix.
Example: [[17.5, -2], [2‚ 3]] is mapped to [[1, 0], [2/5‚ 3/5]]
"""
assert LinAlgLib.is_positive_mat(mat)
return np.array(list(map(LinAlgLib.l1_normalized, mat)))
class ProbLib: # ProbLibability theory'ish functions
def rand_bool(p: float) -> bool:
# p ... chance of returning True
return random.random() < p
def is_stochastic_matrix(mat) -> bool:
"""
Return true if mat is a a quare matrix in which all rows are normalized to 1.
"""
return all(
len(row) == len(mat) and abs(sum(row) - 1) < 10**-15
for row in mat
)
def compute_fundamental_matrix_N(mat_Q):
"""
:param: Transient block of stochastic matrix. (=Upper left block, if matrix is in canonical form.)
N := \lim_{num_steps\to\infty} \sum_{s=0}^num_steps Q^s = 1/(1-Q)
N_{ij} Describes the chance of reaching index j from index i in any number of steps
"""
id = np.eye(len(mat_Q))
return np.linalg.inv(id - mat_Q)
def run_walk(transition_function, exit_condition_predicate, start_state):
s = start_state
while not exit_condition_predicate(s):
s = transition_function(s)
return s
def weighted_vertex_transition(weights: list) -> int: # TODO: This function doesn't handle cases well where weights has zeros
"""
Return the index of a weight in weights, with ProbLibability determined by the weight itself.
:param weights: List of positive numbers
Details: Weight don't have to be l_1-normalized.
"""
ps = LinAlgLib.l1_normalized(weights)
# return np.random.choice(range(len(p)), p=ps) # Numpy equivalent
r = random.random() # Uniformly sampled float from the interval [0, 1]
acc_p = 0
for idx, p in enumerate(ps):
acc_p += p
if acc_p > r:
return idx
assert False # Should not be reachable
class Graph:
def __init__(self, mat_graph_weights, num_transient_states):
"""
The square matrix __WEIGHTS described a graph and its elements are the relative transition ProbLibabilities.
(P^s)_{ij} described the chance of going reaching the state with index j from index i in s steps.
"""
self.P = LinAlgLib.normalized_rows(mat_graph_weights)
self.__NUM_TRANSIENT_STATES = num_transient_states
def is_transient(self, idx: int) -> bool:
return idx < self.__NUM_TRANSIENT_STATES
def is_absorbing(self, idx: int) -> bool:
return not self.is_transient(idx)
def compute_absorption_ProbLibability_matrix(self):
k = self.__NUM_TRANSIENT_STATES
mat_A = np.zeros((k, k))
mat_Q = self.P[:k, :k] # Top left block
mat_R = self.P[:k, k:] # Top right block
mat_B = ProbLib.compute_fundamental_matrix_N(mat_Q).dot(mat_R)
mat_absorption = np.block([
[mat_A, mat_B],
[self.P[k:, :k], self.P[k:, k:]]
])
assert ProbLib.is_stochastic_matrix(mat_absorption), "Absorption ProbLibability values were not normalized"
return mat_absorption
def compute_count_matrix_from_simulations(self, num_simulations):
"""
Run a simulation for all initial states.
"""
transition_function = lambda idx: ProbLib.weighted_vertex_transition(self.P[idx])
exit_condition_predicate = lambda idx: self.is_absorbing(idx)
count_mat = np.zeros(self.P.shape)
for idx_row, mutable_row in enumerate(count_mat):
if self.is_transient(idx_row):
for _sim in range(num_simulations):
# Simulate one run
idx_destination: int = ProbLib.run_walk(transition_function, exit_condition_predicate, idx_row)
mutable_row[idx_destination] += 1
else:
# Has deterministic outcome anyhow
mutable_row[idx_row] = num_simulations
return count_mat
class Config:
NUM_SIMULATIONS = 100
NUM_TRANSIENT_STATES = 4
STATES = "ABCDLFW" # Auxiliary names for nicer logging
MAT_GRAPH_WEIGHTS = (
# A B C D L F W
(0, 2, 0, 2, 1, 0, 0,), # A # Start of upper, transient part
(3, 0, 4, 0, 0, 2, 0,), # B
(0, 1, 0, 1, 0, 0, 1,), # C
(4, 0, 3, 0, 0, 0, 0,), # D
(0, 0, 0, 0, 1, 0, 0,), # L # Start of lower, absorbing part
(0, 0, 0, 0, 0, 1, 0,), # F
(0, 0, 0, 0, 0, 0, 1,), # W
)
# The matrix is already in canonical form, with upper and lower parts
# Note: Submatrix of the weight matrix from the index NUM_TRANSIENT_STATES onwards must makes sense w.r.t. describing absorbing states
def run_gambling_game_graph_version():
graph = Graph(Config.MAT_GRAPH_WEIGHTS, Config.NUM_TRANSIENT_STATES)
abs_mat__analytical = graph.compute_absorption_ProbLibability_matrix()
abs_mat__simulation = graph.compute_count_matrix_from_simulations(Config.NUM_SIMULATIONS) / Config.NUM_SIMULATIONS
# Log results (all matrix indices and differences)
for idx_from, state_from in enumerate(Config.STATES):
if graph.is_transient(idx_from):
print(f"\nRuns for transition from {state_from}:")
for idx_to, state_to in enumerate(Config.STATES):
if graph.is_absorbing(idx_to):
p_ana: float = abs_mat__analytical[idx_from][idx_to]
p_emp: float = abs_mat__simulation[idx_from][idx_to]
diff: float = p_ana - p_emp
print(f"\t{state_from} => {state_to}: {round(100 * p_emp, 5)} % (diff = {round(100 * diff, 3)} %)")
################################################################################
if __name__ == "__main__":
run_gambling_game_graph_version()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment