Last active
April 1, 2025 03:46
-
-
Save znxkznxk1030/ee2254d7ff45ddcbb6920f2711c06946 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
from numpy.linalg import inv | |
from visualize_train import draw_value_image, draw_policy_image | |
# left, right, up, down | |
ACTIONS = [np.array([0, -1]), | |
np.array([0, 1]), | |
np.array([-1, 0]), | |
np.array([1, 0])] | |
class AGENT: | |
def __init__(self, env, is_upload=False): | |
self.ACTIONS = ACTIONS | |
self.env = env | |
HEIGHT, WIDTH = env.size() | |
self.state = [0,0] | |
if is_upload: | |
dp_results = np.load('./result/dp.npz') | |
self.values = dp_results['V'] | |
self.policy = dp_results['PI'] | |
else: | |
self.values = np.zeros((HEIGHT, WIDTH)) | |
self.policy = np.zeros((HEIGHT, WIDTH,len(self.ACTIONS)))+1./len(self.ACTIONS) | |
def policy_evaluation_matrix_inversion(self, iter, env, policy, discount=1.0): | |
""" | |
policy(π)하에 가치 함수(V(S))를 선형 방정식 풀이로 으로 평가. | |
V=(I−γP)^−1•R | |
Parameters | |
------- | |
iter : int | |
정책을 improvement 한 횟수. (평가 로직과 무관) | |
env : grid_world | |
Grid World 환경. | |
policy : np.array | |
정책 함수(π) - P(행위(←|→|↑|↓) | 상태)에 1:1 대응. | |
discount : float | |
γ - Bellan Equation이 발산하지 않도록 조절하는 하이퍼 파라미터. | |
Returns | |
-------- | |
new_state_values : np.array | |
policy(π)하에 평가된 가치 함수(V(S)) - 상태(s)에 가치(v)가 1:1 대응. | |
iteration : int | |
V(S)를 업데이트 한 횟수. (* 선형 방정식 풀이에서는 반복하지 않으므로 1로 고정된다. ) | |
""" | |
HEIGHT, WIDTH = env.size() # Grid World 의 높이, 너비 | |
N = HEIGHT * WIDTH # 선형 방정식 풀이를 쉽게 하기 위해서 N 정의 | |
state_transition_matrix = np.zeros((N, N)) # 상태 전이 행렬(P), N x N 행렬 : state -> state' 로 가는 확률을 정의 | |
rewards = np.zeros((N, 1)) # 보상 행렬(R), N x 1 행렬 : state' 로 갔을때 받을 보상을 정의 | |
identity = np.identity(N) # Identity 행렬(I), N x N 행렬 | |
for y in range(HEIGHT): # State 값을 Y 축에 대하여 Iterate 한다. | |
for x in range(WIDTH): # State 값을 X 축에 대하여 Iterate 한다. | |
state = [y, x] # 상태 값, s | |
if env.is_terminal(state): # s가 종료 조건이라면 현 상태에서는 더 전이할 상태가 없으므로 skip | |
continue | |
rewards[WIDTH * y + x] = -1 # s가 종료 조건이 아니라면 보상은 -1로 동일 | |
for i, action in enumerate(ACTIONS): | |
if self.policy[y][x][i] == 0: | |
continue | |
next_state, reward = env.interaction(state, action) # (state, action) 과 환경을 상호작용 시킨다. ( => s'( next_state) 와 r(reward) 리턴 ) | |
[ny, nx] = next_state # 다음 상태 | |
state_transition_matrix[WIDTH * y + x][WIDTH * ny + nx] += policy[y][x][i] # 상태 전이 행렬 업데이트 | |
# print(np.multiply(discount, state_transition_matrix)) | |
# print(np.subtract(identity, np.multiply(discount, state_transition_matrix))) | |
# print(inv(np.subtract(identity, np.multiply(discount, state_transition_matrix)))) | |
# print(rewards) | |
new_state_values = np.dot(inv(np.subtract(identity, np.multiply(discount, state_transition_matrix))), rewards) # V=(I−γP)^−1•R 연산 | |
new_state_values = np.reshape(new_state_values, (-1, WIDTH)) # N으로 합친 것을 다시 HEIGHT, WIDTH 로 나눈다. | |
# print(new_state_values) | |
draw_value_image(iter, np.round(new_state_values, decimals=2), env=env) | |
return new_state_values, 1 # (V(S), 1) 반환. | |
def values_update(self, values, policy, discount, env): | |
""" | |
Bellman Equation을 적용하여 values(가치 함수)를 1회 업데이트. | |
Parameters | |
------- | |
values : np.array | |
가치 함수(V(S)) - 상태(s)에 가치(v)가 1:1 대응. * in-place. | |
policy : np.array | |
정책 함수(π) - P(행위(←|→|↑|↓) | 상태)에 1:1 대응. | |
discount : float | |
γ - Bellan Equation이 발산하지 않도록 조절하는 하이퍼 파라미터. | |
env : grid_world | |
Grid World 환경. | |
Returns | |
-------- | |
diff : float | |
value 변화량 중 최대값. max(∆V(s_k)), k ∈ S | |
∆V(s) = V'(s) - V(s) | |
""" | |
diff = 0 # max(∆V(S))를 찾기 위한 변수. | |
old_value = values.copy() # values 를 object reference 로 선언했기 때문에 values 값 백업. | |
HEIGHT, WIDTH = env.size() # Grid World 의 높이, 너비 | |
for y in range(HEIGHT): # State 값을 Y 축에 대하여 Iterate 한다. | |
for x in range(WIDTH): # State 값을 X 축에 대하여 Iterate 한다. | |
state = [y, x] # 상태 값, s | |
prev_value = old_value[y][x] # v <- V(s) | |
new_value = 0 # V(s)값을 연산하기 위한 변수. | |
# V(s) <- ∑_a[π(a|s) * ∑_{s',r}[p(s',r|s,a) * (r + γV(s'))]] | |
for i, action in enumerate(ACTIONS): # ∑_a, state 에 대한 action 을 Iterate 한다. | |
next_state, reward = env.interaction(state, action) # (state, action) 과 환경을 상호작용 시킨다. ( => s'( next_state) 와 r(reward) 리턴 ) | |
new_value += policy[y][x][i] * (reward + discount * old_value[next_state[0]][next_state[1]]) # Bellman Equation을 이용해서 V'(s) 값을 구한다. | |
values[state[0]][state[1]] = new_value # V'(s)로 갱신 | |
diff = max(diff, abs(prev_value - values[state[0]][state[1]])) # ∆V(s) 를 계산하여 diff를 업데이트한다. | |
return diff # ∆V(s) 반환 | |
def policy_evaluation(self, iter, env, policy, discount=1.0): | |
""" | |
policy(π)하에 가치 함수(V(S))를 반복 적으로 업데이트하여 policy(π)를 평가. ( 종료 조건: ∆V(s) < threshold(θ) ) | |
Parameters | |
------- | |
iter : int | |
정책을 improvement 한 횟수. (평가 로직과 무관) | |
env : grid_world | |
Grid World 환경. | |
policy : np.array | |
정책 함수(π) - P(행위(←|→|↑|↓) | 상태)에 1:1 대응. | |
discount : float | |
γ - Bellan Equation이 발산하지 않도록 조절하는 하이퍼 파라미터. | |
Returns | |
-------- | |
new_state_values : np.array | |
policy(π)하에 평가된 가치 함수(V(S)) - 상태(s)에 가치(v)가 1:1 대응. | |
iteration : int | |
V(S)를 업데이트 한 횟수. | |
""" | |
HEIGHT, WIDTH = env.size() # Grid World 의 높이, 너비 | |
new_state_values = np.zeros((HEIGHT, WIDTH)) # V(S) 를 0으로 초기화. | |
iteration = 0 # V(S)를 업데이트 한 횟수. | |
diff = int(1e10) # ∆V(s) | |
threshold = 0.001 # θ - ∆V(s) 임계값 | |
while diff > threshold: # 종료 조건: ∆V(s) < threshold(θ) | |
diff = self.values_update(new_state_values, policy, discount, env) # V(S)를 1회 업데이트 하고, ∆V(s) 계산. ( ∆V(s) <- V'(s) - V(s) ) | |
iteration += 1 # V(S)를 업데이트 한 횟수 1회 증가. | |
draw_value_image(iter, np.round(new_state_values, decimals=2), env=env) | |
return new_state_values, iteration # (V(S), iteration) 반환. | |
def policy_improvement(self, iter, env, state_values, old_policy, discount=1.0): | |
""" | |
policy_evaluation 으로 부터 갱신된 가치 함수(V(S))를 이용 하여 정책 함수(π)를 갱신. | |
Parameters | |
------- | |
iter : int | |
정책을 improvement 한 횟수. (평가 로직과 무관) | |
env : grid_world | |
Grid World 환경. | |
state_values : np.array | |
가치 함수(V(S)) - 상태(s)에 가치(v)가 1:1 대응. | |
old_policy : np.array | |
기존 정책 함수(π) - P(행위(←|→|↑|↓) | 상태)에 1:1 대응. | |
discount : float | |
γ - Bellan Equation이 발산하지 않도록 조절하는 하이퍼 파라미터. | |
Returns | |
-------- | |
policy : np.array | |
신규 정책 함수(π) - P(행위(←|→|↑|↓) | 상태)에 1:1 대응. | |
policy_stable : boolean | |
I(π=π'), 기존 정책 함수와 신규 정책 함수의 동일 여부. | |
""" | |
HEIGHT, WIDTH = env.size() # Grid World 의 높이, 너비 | |
policy = old_policy.copy() # 신규 정책 함수(π), 기존 정책으로 초기화. | |
policy_stable = True # I(π=π'), 기존 정책 함수와 신규 정책 함수의 동일 여부. | |
for y in range(HEIGHT): # State 값을 Y 축에 대하여 Iterate 한다. | |
for x in range(WIDTH): # State 값을 X 축에 대하여 Iterate 한다. | |
# 현재 상태 (s)에서 기존 정책 함수(π(s))에서 나온 actions 중 가장 기대 가치가 큰 actions 을 추려서 새 정책 함수로 갱신. | |
# 균등 분배 적용 : 가장 큰 value 를 가지는 action 의 개수가 2개 이상일 경우, 동일한 확률로 분배. | |
max_value = -int(1e10) # argmax_a{∑_{s',r}[p(s',r|s,a) * (r + γV(s'))]} 값을 찾기 위한 변수. | |
num_action = 0 # 갱신될 policy 의 action 개수. | |
state = [y, x] # 상태 s = (y, x) | |
old_action = policy[y][x].copy() # 기존 정책 π(s) 백업 | |
new_action = [0, 0, 0, 0] # 갱신할 π'(s) 초기화 | |
# if env.is_on_obstacle(state) or env.is_terminal(state): # terminal 또는 obstacle 위에 상태는 skip | |
# continue | |
for i, action in enumerate(ACTIONS): # π(s) 의 action 을 iterate | |
next_state, reward = env.interaction(state, action) # (state, action) 과 환경을 상호작용 시킨다. ( => s'(next_state)와 r(reward) 반환 ) | |
value = reward + discount * state_values[next_state[0]][next_state[1]] # ∑_{s',r}[p(s',r|s,a) * (r + γV(s'))] 값 연산 | |
if value > max_value: # s 에서 maximum value 을 신규 발견 | |
max_value = value # maximum value 갱신 | |
num_action = 1 # maximum value 를 가질 action 의 개수 1개로 초기화 | |
elif value == max_value: # s 에서 maximum value 추가 발견 | |
num_action += 1 # maximum value 를 가진 action 의 개수 1개로 추가 | |
for i, action in enumerate(ACTIONS): # π(s) 의 action 을 iterate, maximum value 를 가지는 action 을 새 정책(π'(s)) 으로 갱신. | |
next_state, reward = env.interaction(state, action) # (state, action) 과 환경을 상호작용 시킨다. ( => s'(next_state)와 r(reward) 반환 ) | |
value = reward + discount * state_values[next_state[0]][next_state[1]] # ∑_{s',r}[p(s',r|s,a) * (r + γV(s'))] 값 연산 | |
if value == max_value: # 해당 action 은 s 에서 maximum value 를 가진다. | |
new_action[i] = (1 / num_action) # 균등 분배 적용. | |
policy[y][x] = new_action # π(s) <- π'(s) | |
if not np.array_equal(new_action, old_action): # I(π(s)!=π'(s)) | |
policy_stable = False # π(s) != π'(s) 하다면 policy_stable 값 False로 갱신. | |
print('policy stable {}:'.format(policy_stable)) | |
draw_policy_image(iter, np.round(policy, decimals=2), env=env) | |
return policy, policy_stable # (신규 정책, 정책 변경 여부) 반환. | |
def policy_iteration(self): | |
iter = 1 | |
# print(self.policy) | |
while (True): | |
# self.values, iteration = self.policy_evaluation(iter, env=self.env, policy=self.policy) | |
self.values, iteration = self.policy_evaluation_matrix_inversion(iter, env=self.env, policy=self.policy) | |
self.policy, policy_stable = self.policy_improvement(iter, env=self.env, state_values=self.values, | |
old_policy=self.policy, discount=1.0) | |
iter += 1 | |
# print(self.policy) | |
# print(self.values, self.policy) | |
if policy_stable == True: | |
break | |
np.savez('./result/dp.npz', V=self.values, PI=self.policy) | |
return self.values, self.policy | |
def get_action(self, state): | |
i,j = state | |
return np.random.choice(len(ACTIONS), 1, p=self.policy[i,j,:].tolist()).item() | |
def get_state(self): | |
return self.state |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment