Skip to content

Instantly share code, notes, and snippets.

@znxkznxk1030
Last active April 1, 2025 03:46
Show Gist options
  • Save znxkznxk1030/ee2254d7ff45ddcbb6920f2711c06946 to your computer and use it in GitHub Desktop.
Save znxkznxk1030/ee2254d7ff45ddcbb6920f2711c06946 to your computer and use it in GitHub Desktop.
import numpy as np
from numpy.linalg import inv
from visualize_train import draw_value_image, draw_policy_image
# left, right, up, down
ACTIONS = [np.array([0, -1]),
np.array([0, 1]),
np.array([-1, 0]),
np.array([1, 0])]
class AGENT:
def __init__(self, env, is_upload=False):
self.ACTIONS = ACTIONS
self.env = env
HEIGHT, WIDTH = env.size()
self.state = [0,0]
if is_upload:
dp_results = np.load('./result/dp.npz')
self.values = dp_results['V']
self.policy = dp_results['PI']
else:
self.values = np.zeros((HEIGHT, WIDTH))
self.policy = np.zeros((HEIGHT, WIDTH,len(self.ACTIONS)))+1./len(self.ACTIONS)
def policy_evaluation_matrix_inversion(self, iter, env, policy, discount=1.0):
"""
policy(π)하에 가치 함수(V(S))를 선형 방정식 풀이로 으로 평가.
V=(I−γP)^−1•R
Parameters
-------
iter : int
정책을 improvement 한 횟수. (평가 로직과 무관)
env : grid_world
Grid World 환경.
policy : np.array
정책 함수(π) - P(행위(←|→|↑|↓) | 상태)에 1:1 대응.
discount : float
γ - Bellan Equation이 발산하지 않도록 조절하는 하이퍼 파라미터.
Returns
--------
new_state_values : np.array
policy(π)하에 평가된 가치 함수(V(S)) - 상태(s)에 가치(v)가 1:1 대응.
iteration : int
V(S)를 업데이트 한 횟수. (* 선형 방정식 풀이에서는 반복하지 않으므로 1로 고정된다. )
"""
HEIGHT, WIDTH = env.size() # Grid World 의 높이, 너비
N = HEIGHT * WIDTH # 선형 방정식 풀이를 쉽게 하기 위해서 N 정의
state_transition_matrix = np.zeros((N, N)) # 상태 전이 행렬(P), N x N 행렬 : state -> state' 로 가는 확률을 정의
rewards = np.zeros((N, 1)) # 보상 행렬(R), N x 1 행렬 : state' 로 갔을때 받을 보상을 정의
identity = np.identity(N) # Identity 행렬(I), N x N 행렬
for y in range(HEIGHT): # State 값을 Y 축에 대하여 Iterate 한다.
for x in range(WIDTH): # State 값을 X 축에 대하여 Iterate 한다.
state = [y, x] # 상태 값, s
if env.is_terminal(state): # s가 종료 조건이라면 현 상태에서는 더 전이할 상태가 없으므로 skip
continue
rewards[WIDTH * y + x] = -1 # s가 종료 조건이 아니라면 보상은 -1로 동일
for i, action in enumerate(ACTIONS):
if self.policy[y][x][i] == 0:
continue
next_state, reward = env.interaction(state, action) # (state, action) 과 환경을 상호작용 시킨다. ( => s'( next_state) 와 r(reward) 리턴 )
[ny, nx] = next_state # 다음 상태
state_transition_matrix[WIDTH * y + x][WIDTH * ny + nx] += policy[y][x][i] # 상태 전이 행렬 업데이트
# print(np.multiply(discount, state_transition_matrix))
# print(np.subtract(identity, np.multiply(discount, state_transition_matrix)))
# print(inv(np.subtract(identity, np.multiply(discount, state_transition_matrix))))
# print(rewards)
new_state_values = np.dot(inv(np.subtract(identity, np.multiply(discount, state_transition_matrix))), rewards) # V=(I−γP)^−1•R 연산
new_state_values = np.reshape(new_state_values, (-1, WIDTH)) # N으로 합친 것을 다시 HEIGHT, WIDTH 로 나눈다.
# print(new_state_values)
draw_value_image(iter, np.round(new_state_values, decimals=2), env=env)
return new_state_values, 1 # (V(S), 1) 반환.
def values_update(self, values, policy, discount, env):
"""
Bellman Equation을 적용하여 values(가치 함수)를 1회 업데이트.
Parameters
-------
values : np.array
가치 함수(V(S)) - 상태(s)에 가치(v)가 1:1 대응. * in-place.
policy : np.array
정책 함수(π) - P(행위(←|→|↑|↓) | 상태)에 1:1 대응.
discount : float
γ - Bellan Equation이 발산하지 않도록 조절하는 하이퍼 파라미터.
env : grid_world
Grid World 환경.
Returns
--------
diff : float
value 변화량 중 최대값. max(∆V(s_k)), k ∈ S
∆V(s) = V'(s) - V(s)
"""
diff = 0 # max(∆V(S))를 찾기 위한 변수.
old_value = values.copy() # values 를 object reference 로 선언했기 때문에 values 값 백업.
HEIGHT, WIDTH = env.size() # Grid World 의 높이, 너비
for y in range(HEIGHT): # State 값을 Y 축에 대하여 Iterate 한다.
for x in range(WIDTH): # State 값을 X 축에 대하여 Iterate 한다.
state = [y, x] # 상태 값, s
prev_value = old_value[y][x] # v <- V(s)
new_value = 0 # V(s)값을 연산하기 위한 변수.
# V(s) <- ∑_a[π(a|s) * ∑_{s',r}[p(s',r|s,a) * (r + γV(s'))]]
for i, action in enumerate(ACTIONS): # ∑_a, state 에 대한 action 을 Iterate 한다.
next_state, reward = env.interaction(state, action) # (state, action) 과 환경을 상호작용 시킨다. ( => s'( next_state) 와 r(reward) 리턴 )
new_value += policy[y][x][i] * (reward + discount * old_value[next_state[0]][next_state[1]]) # Bellman Equation을 이용해서 V'(s) 값을 구한다.
values[state[0]][state[1]] = new_value # V'(s)로 갱신
diff = max(diff, abs(prev_value - values[state[0]][state[1]])) # ∆V(s) 를 계산하여 diff를 업데이트한다.
return diff # ∆V(s) 반환
def policy_evaluation(self, iter, env, policy, discount=1.0):
"""
policy(π)하에 가치 함수(V(S))를 반복 적으로 업데이트하여 policy(π)를 평가. ( 종료 조건: ∆V(s) < threshold(θ) )
Parameters
-------
iter : int
정책을 improvement 한 횟수. (평가 로직과 무관)
env : grid_world
Grid World 환경.
policy : np.array
정책 함수(π) - P(행위(←|→|↑|↓) | 상태)에 1:1 대응.
discount : float
γ - Bellan Equation이 발산하지 않도록 조절하는 하이퍼 파라미터.
Returns
--------
new_state_values : np.array
policy(π)하에 평가된 가치 함수(V(S)) - 상태(s)에 가치(v)가 1:1 대응.
iteration : int
V(S)를 업데이트 한 횟수.
"""
HEIGHT, WIDTH = env.size() # Grid World 의 높이, 너비
new_state_values = np.zeros((HEIGHT, WIDTH)) # V(S) 를 0으로 초기화.
iteration = 0 # V(S)를 업데이트 한 횟수.
diff = int(1e10) # ∆V(s)
threshold = 0.001 # θ - ∆V(s) 임계값
while diff > threshold: # 종료 조건: ∆V(s) < threshold(θ)
diff = self.values_update(new_state_values, policy, discount, env) # V(S)를 1회 업데이트 하고, ∆V(s) 계산. ( ∆V(s) <- V'(s) - V(s) )
iteration += 1 # V(S)를 업데이트 한 횟수 1회 증가.
draw_value_image(iter, np.round(new_state_values, decimals=2), env=env)
return new_state_values, iteration # (V(S), iteration) 반환.
def policy_improvement(self, iter, env, state_values, old_policy, discount=1.0):
"""
policy_evaluation 으로 부터 갱신된 가치 함수(V(S))를 이용 하여 정책 함수(π)를 갱신.
Parameters
-------
iter : int
정책을 improvement 한 횟수. (평가 로직과 무관)
env : grid_world
Grid World 환경.
state_values : np.array
가치 함수(V(S)) - 상태(s)에 가치(v)가 1:1 대응.
old_policy : np.array
기존 정책 함수(π) - P(행위(←|→|↑|↓) | 상태)에 1:1 대응.
discount : float
γ - Bellan Equation이 발산하지 않도록 조절하는 하이퍼 파라미터.
Returns
--------
policy : np.array
신규 정책 함수(π) - P(행위(←|→|↑|↓) | 상태)에 1:1 대응.
policy_stable : boolean
I(π=π'), 기존 정책 함수와 신규 정책 함수의 동일 여부.
"""
HEIGHT, WIDTH = env.size() # Grid World 의 높이, 너비
policy = old_policy.copy() # 신규 정책 함수(π), 기존 정책으로 초기화.
policy_stable = True # I(π=π'), 기존 정책 함수와 신규 정책 함수의 동일 여부.
for y in range(HEIGHT): # State 값을 Y 축에 대하여 Iterate 한다.
for x in range(WIDTH): # State 값을 X 축에 대하여 Iterate 한다.
# 현재 상태 (s)에서 기존 정책 함수(π(s))에서 나온 actions 중 가장 기대 가치가 큰 actions 을 추려서 새 정책 함수로 갱신.
# 균등 분배 적용 : 가장 큰 value 를 가지는 action 의 개수가 2개 이상일 경우, 동일한 확률로 분배.
max_value = -int(1e10) # argmax_a{∑_{s',r}[p(s',r|s,a) * (r + γV(s'))]} 값을 찾기 위한 변수.
num_action = 0 # 갱신될 policy 의 action 개수.
state = [y, x] # 상태 s = (y, x)
old_action = policy[y][x].copy() # 기존 정책 π(s) 백업
new_action = [0, 0, 0, 0] # 갱신할 π'(s) 초기화
# if env.is_on_obstacle(state) or env.is_terminal(state): # terminal 또는 obstacle 위에 상태는 skip
# continue
for i, action in enumerate(ACTIONS): # π(s) 의 action 을 iterate
next_state, reward = env.interaction(state, action) # (state, action) 과 환경을 상호작용 시킨다. ( => s'(next_state)와 r(reward) 반환 )
value = reward + discount * state_values[next_state[0]][next_state[1]] # ∑_{s',r}[p(s',r|s,a) * (r + γV(s'))] 값 연산
if value > max_value: # s 에서 maximum value 을 신규 발견
max_value = value # maximum value 갱신
num_action = 1 # maximum value 를 가질 action 의 개수 1개로 초기화
elif value == max_value: # s 에서 maximum value 추가 발견
num_action += 1 # maximum value 를 가진 action 의 개수 1개로 추가
for i, action in enumerate(ACTIONS): # π(s) 의 action 을 iterate, maximum value 를 가지는 action 을 새 정책(π'(s)) 으로 갱신.
next_state, reward = env.interaction(state, action) # (state, action) 과 환경을 상호작용 시킨다. ( => s'(next_state)와 r(reward) 반환 )
value = reward + discount * state_values[next_state[0]][next_state[1]] # ∑_{s',r}[p(s',r|s,a) * (r + γV(s'))] 값 연산
if value == max_value: # 해당 action 은 s 에서 maximum value 를 가진다.
new_action[i] = (1 / num_action) # 균등 분배 적용.
policy[y][x] = new_action # π(s) <- π'(s)
if not np.array_equal(new_action, old_action): # I(π(s)!=π'(s))
policy_stable = False # π(s) != π'(s) 하다면 policy_stable 값 False로 갱신.
print('policy stable {}:'.format(policy_stable))
draw_policy_image(iter, np.round(policy, decimals=2), env=env)
return policy, policy_stable # (신규 정책, 정책 변경 여부) 반환.
def policy_iteration(self):
iter = 1
# print(self.policy)
while (True):
# self.values, iteration = self.policy_evaluation(iter, env=self.env, policy=self.policy)
self.values, iteration = self.policy_evaluation_matrix_inversion(iter, env=self.env, policy=self.policy)
self.policy, policy_stable = self.policy_improvement(iter, env=self.env, state_values=self.values,
old_policy=self.policy, discount=1.0)
iter += 1
# print(self.policy)
# print(self.values, self.policy)
if policy_stable == True:
break
np.savez('./result/dp.npz', V=self.values, PI=self.policy)
return self.values, self.policy
def get_action(self, state):
i,j = state
return np.random.choice(len(ACTIONS), 1, p=self.policy[i,j,:].tolist()).item()
def get_state(self):
return self.state
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment