Last active
October 1, 2023 12:06
-
-
Save Nikolaj-K/f660de8cec4551cfb879479470625e20 to your computer and use it in GitHub Desktop.
Absorption probabilities in finite Markov chains
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Code used in the video | |
https://youtu.be/BiViLT6FCC4 | |
""" | |
import random | |
import numpy as np | |
class LinAlgLib: # Linear algebra'ish functions | |
def is_positive_vec(vec) -> bool: | |
return all(elem >= 0 for elem in vec) | |
def is_positive_mat(mat) -> bool: | |
return all(map(LinAlgLib.is_positive_vec, row) for row in mat) | |
def is_non_zero_vec(vec, prec) -> bool: | |
return any(abs(elem) > prec for elem in vec) | |
def l1_normalized(vec) -> list: # assumes all v elements are positive | |
PREC = 10**-8 | |
assert LinAlgLib.is_non_zero_vec(vec, PREC), f"Tried to normalize the zero vector, {vec}" | |
n = sum(abs(elem) for elem in vec) | |
return [elem / n for elem in vec] | |
def normalized_rows(mat) -> np.array: | |
""" | |
Takes any matrix of positive doubles and return a valid stochastic matrix by normalizing the rows. | |
:param mat: A matrix. | |
Example: [[17.5, -2], [2‚ 3]] is mapped to [[1, 0], [2/5‚ 3/5]] | |
""" | |
assert LinAlgLib.is_positive_mat(mat) | |
return np.array(list(map(LinAlgLib.l1_normalized, mat))) | |
class ProbLib: # ProbLibability theory'ish functions | |
def rand_bool(p: float) -> bool: | |
# p ... chance of returning True | |
return random.random() < p | |
def is_stochastic_matrix(mat) -> bool: | |
""" | |
Return true if mat is a a quare matrix in which all rows are normalized to 1. | |
""" | |
return all( | |
len(row) == len(mat) and abs(sum(row) - 1) < 10**-15 | |
for row in mat | |
) | |
def compute_fundamental_matrix_N(mat_Q): | |
""" | |
:param: Transient block of stochastic matrix. (=Upper left block, if matrix is in canonical form.) | |
N := \lim_{num_steps\to\infty} \sum_{s=0}^num_steps Q^s = 1/(1-Q) | |
N_{ij} Describes the chance of reaching index j from index i in any number of steps | |
""" | |
id = np.eye(len(mat_Q)) | |
return np.linalg.inv(id - mat_Q) | |
def run_walk(transition_function, exit_condition_predicate, start_state): | |
s = start_state | |
while not exit_condition_predicate(s): | |
s = transition_function(s) | |
return s | |
def weighted_vertex_transition(weights: list) -> int: # TODO: This function doesn't handle cases well where weights has zeros | |
""" | |
Return the index of a weight in weights, with ProbLibability determined by the weight itself. | |
:param weights: List of positive numbers | |
Details: Weight don't have to be l_1-normalized. | |
""" | |
ps = LinAlgLib.l1_normalized(weights) | |
# return np.random.choice(range(len(p)), p=ps) # Numpy equivalent | |
r = random.random() # Uniformly sampled float from the interval [0, 1] | |
acc_p = 0 | |
for idx, p in enumerate(ps): | |
acc_p += p | |
if acc_p > r: | |
return idx | |
assert False # Should not be reachable | |
class Graph: | |
def __init__(self, mat_graph_weights, num_transient_states): | |
""" | |
The square matrix __WEIGHTS described a graph and its elements are the relative transition ProbLibabilities. | |
(P^s)_{ij} described the chance of going reaching the state with index j from index i in s steps. | |
""" | |
self.P = LinAlgLib.normalized_rows(mat_graph_weights) | |
self.__NUM_TRANSIENT_STATES = num_transient_states | |
def is_transient(self, idx: int) -> bool: | |
return idx < self.__NUM_TRANSIENT_STATES | |
def is_absorbing(self, idx: int) -> bool: | |
return not self.is_transient(idx) | |
def compute_absorption_ProbLibability_matrix(self): | |
k = self.__NUM_TRANSIENT_STATES | |
mat_A = np.zeros((k, k)) | |
mat_Q = self.P[:k, :k] # Top left block | |
mat_R = self.P[:k, k:] # Top right block | |
mat_B = ProbLib.compute_fundamental_matrix_N(mat_Q).dot(mat_R) | |
mat_absorption = np.block([ | |
[mat_A, mat_B], | |
[self.P[k:, :k], self.P[k:, k:]] | |
]) | |
assert ProbLib.is_stochastic_matrix(mat_absorption), "Absorption ProbLibability values were not normalized" | |
return mat_absorption | |
def compute_count_matrix_from_simulations(self, num_simulations): | |
""" | |
Run a simulation for all initial states. | |
""" | |
transition_function = lambda idx: ProbLib.weighted_vertex_transition(self.P[idx]) | |
exit_condition_predicate = lambda idx: self.is_absorbing(idx) | |
count_mat = np.zeros(self.P.shape) | |
for idx_row, mutable_row in enumerate(count_mat): | |
if self.is_transient(idx_row): | |
for _sim in range(num_simulations): | |
# Simulate one run | |
idx_destination: int = ProbLib.run_walk(transition_function, exit_condition_predicate, idx_row) | |
mutable_row[idx_destination] += 1 | |
else: | |
# Has deterministic outcome anyhow | |
mutable_row[idx_row] = num_simulations | |
return count_mat | |
class Config: | |
NUM_SIMULATIONS = 100 | |
NUM_TRANSIENT_STATES = 4 | |
STATES = "ABCDLFW" # Auxiliary names for nicer logging | |
MAT_GRAPH_WEIGHTS = ( | |
# A B C D L F W | |
(0, 2, 0, 2, 1, 0, 0,), # A # Start of upper, transient part | |
(3, 0, 4, 0, 0, 2, 0,), # B | |
(0, 1, 0, 1, 0, 0, 1,), # C | |
(4, 0, 3, 0, 0, 0, 0,), # D | |
(0, 0, 0, 0, 1, 0, 0,), # L # Start of lower, absorbing part | |
(0, 0, 0, 0, 0, 1, 0,), # F | |
(0, 0, 0, 0, 0, 0, 1,), # W | |
) | |
# The matrix is already in canonical form, with upper and lower parts | |
# Note: Submatrix of the weight matrix from the index NUM_TRANSIENT_STATES onwards must makes sense w.r.t. describing absorbing states | |
def run_gambling_game_graph_version(): | |
graph = Graph(Config.MAT_GRAPH_WEIGHTS, Config.NUM_TRANSIENT_STATES) | |
abs_mat__analytical = graph.compute_absorption_ProbLibability_matrix() | |
abs_mat__simulation = graph.compute_count_matrix_from_simulations(Config.NUM_SIMULATIONS) / Config.NUM_SIMULATIONS | |
# Log results (all matrix indices and differences) | |
for idx_from, state_from in enumerate(Config.STATES): | |
if graph.is_transient(idx_from): | |
print(f"\nRuns for transition from {state_from}:") | |
for idx_to, state_to in enumerate(Config.STATES): | |
if graph.is_absorbing(idx_to): | |
p_ana: float = abs_mat__analytical[idx_from][idx_to] | |
p_emp: float = abs_mat__simulation[idx_from][idx_to] | |
diff: float = p_ana - p_emp | |
print(f"\t{state_from} => {state_to}: {round(100 * p_emp, 5)} % (diff = {round(100 * diff, 3)} %)") | |
################################################################################ | |
if __name__ == "__main__": | |
run_gambling_game_graph_version() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment