Last active
April 17, 2025 23:26
-
-
Save znxkznxk1030/c2f8a151138bd403530a80f288f845b2 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import random | |
import numpy as np | |
from visualize_train import draw_value_image, draw_policy_image | |
# left, right, up, down | |
ACTIONS = [np.array([0, -1]), | |
np.array([0, 1]), | |
np.array([-1, 0]), | |
np.array([1, 0])] | |
TRAINING_EPISODE_NUM = 800000 | |
class AGENT: | |
def __init__(self, env, is_upload=False): | |
self.ACTIONS = ACTIONS | |
self.env = env | |
HEIGHT, WIDTH = env.size() | |
self.state = [0, 0] | |
if is_upload: # Test | |
mcc_results = np.load('./result/mcc.npz') | |
self.V_values = mcc_results['V'] | |
self.Q_values = mcc_results['Q'] | |
self.policy = mcc_results['PI'] | |
print(self.policy) | |
else: # For training | |
self.V_values = np.zeros((HEIGHT, WIDTH)) | |
self.Q_values = np.zeros((HEIGHT, WIDTH, len(self.ACTIONS))) | |
self.policy = np.zeros((HEIGHT, WIDTH, len(self.ACTIONS))) + 1. / len(self.ACTIONS) | |
def initialize_episode(self): | |
HEIGHT, WIDTH = self.env.size() | |
while True: | |
i = np.random.randint(HEIGHT) | |
j = np.random.randint(WIDTH) | |
state = [i, j] | |
if (state in self.env.goal) or (state in self.env.obstacles): | |
continue | |
break | |
# if (state not in self.env.goal) and (state not in self.env.obstacles): | |
# break | |
return state | |
def Monte_Carlo_Control(self, discount=1.0, alpha=0.01, max_seq_len=500, | |
epsilon=0.3, decay_period=20000, decay_rate=0.9): | |
""" | |
Monte Carlo Control 알고리즘을 사용하여 Q값과 정책 업데이트를 수행하는 함수. | |
각 에피소드마다 에이전트가 환경에서 상호작용하여 경험을 쌓고, 이를 바탕으로 Q값과 정책을 업데이트. | |
Parameters | |
------- | |
discount : float | |
γ - 현재의 보상을 미래의 보상보다 더 비중을 두기 위한 할인율. | |
alpha : float | |
α - 학습률을 조절하는 하이퍼 파라미터. 각 에피소드에서 얻은 정보를 얼마나 반영할지 결정. | |
max_seq_len : int | |
에피소드에서 고려되는 최대 길이. 이 값을 초과하면 해당 에피소드에서는 Q값과 정책을 업데이트하지 않음. | |
epsilon : float | |
ε - 무작위적인 행동을 선택할 확률. | |
decay_period: int | |
epsilon의 감쇠기간을 설정하는 하이퍼 파라미터. | |
decay_rate: float | |
epsilon의 감쇠율을 설정하는 하이퍼 파라미터. | |
Returns | |
-------- | |
Q_values: np.array | |
Q(S,A) - 업데이트 된 Q 가치 함수. | |
V_values: np.array | |
V(S) - 업데이트 된 V 가치 함수. | |
policy: np.array | |
π(S) - 업데이트 된 정책 함수. | |
""" | |
for episode in range(TRAINING_EPISODE_NUM): # TRAINING_EPISODE_NUM 만큼 episode 를 반복. | |
if episode % decay_period == 0: # episode 가 decay_period 번 진행될 때 마다, | |
epsilon *= decay_rate # ε을 decay_rate 만큼 감쇠한다. | |
state = self.initialize_episode() # 시작 점을 초기화. Exploring Starts 방식을 채택. 즉, 임의의 시작점에서 episode를 진행. | |
print(episode) # episode 진행 상태를 확인 하기 위해 print | |
done = False # 시퀀스 생성 종료 여부. | |
timeout = False # 시퀀스 길이 초과 여부. (시퀀스의 길이가 max_seq_len 초과 된 경우.) | |
seq_len = 0 # 시퀀스 길이 | |
history = [] # 시퀀스 | |
#### Implement Sequence generation #### | |
while not done: # 시퀀스 생성 종료 조건을 충족 하지 않는 한 반복 수행. ( 종료 조건: Goal 도달 | timeout ) | |
while True: # 유효한 s'(다음 상태)로 가는 행동을 취할 때까지 반복. ( 유효하지 않은 상태 예시: Grid World 범위 밖이나 장해물 셀로 이동.) | |
action = self.get_action(state) # a <- π(s) : s(현재 상태)에서 정책에 따라 다음 행동 선택. | |
next_state, reward = self.env.interaction(state, ACTIONS[action]) # (state, action) 과 환경을 상호작용 시킨다. ( => s'(next_state)와 r(reward) 반환 ) | |
if state is not next_state: # s'(next_state)가 유효한 상태 이라면, | |
history.append((state, action, next_state, reward)) # 시퀀스 에 (상태-행동-보상)을 추가 하고, | |
break # 반복 탈출. | |
state = next_state # s <- s' | |
seq_len += 1 # 시퀀스 길이 1 증가. | |
if seq_len >= max_seq_len: # 시퀀스의 길이가 최대 허용값(max_seq_len) 보다 크다면, Timeout | |
timeout = True # timeout <- True | |
done = self.env.is_terminal(state) or timeout # 종료 조건 갱신. ( Goal 도달 | timeout ) | |
if timeout: # 시퀀스 생성에서 시간초과가 된 경우, | |
continue # Q value 및 π(정책) 업데이트는 생략한다. | |
#### Implement Q Value and policy update #### | |
cum_reward = 0 # r : 누적 보상, 0으로 초기화 | |
# print(history) | |
while history: # 에피소드에서 기록한 (상태-행동-보상) 시퀀스를 역순으로 처리 | |
state, action, next_state, reward = history.pop(-1) # 에피소드에서 가장 마지막 경험(상태, 행동, 다음 상태, 보상)을 꺼냄 (후입선출 방식) | |
# print(state, action, next_state, reward) | |
if self.env.is_out_of_boundary(next_state) or self.env.is_on_obstacle(next_state): # 다음 상태가 환경의 경계 밖이거나 장애물 위인 경우, 업데이트를 건너뜀 | |
continue | |
y, x = state # 현재 상태를 x,y로 분해 | |
cum_reward = reward + discount * cum_reward # r <- r' + γr ( r: 누적 보상, r': 현재 보상, γ: 할인율 ) | |
self.Q_values[y][x][action] += alpha * (cum_reward - self.Q_values[y][x][action]) # Q(s,a) 업데이트. Q(s,a) <- Q(s,a) + α(r - Q(s,a)) | |
best_action = 0 # a∗: s에서 최적의 action | |
non_zero = np.where(self.Q_values[y][x] != 0)[0] | |
if len(non_zero) > 0: | |
max_val = np.max(self.Q_values[y][x][non_zero]) | |
candidates = [i for i in non_zero if self.Q_values[y][x][i] == max_val] | |
best_action = np.random.choice(candidates) | |
self.policy[y][x] = [epsilon / 4] * 4 | |
self.policy[y][x][best_action] += (1.0 - epsilon) | |
self.V_values = np.max(self.Q_values, axis=2) | |
draw_value_image(1, np.round(self.V_values, decimals=2), env=self.env) | |
draw_policy_image(1, np.round(self.policy, decimals=2), env=self.env) | |
np.savez('./result/mcc.npz', Q=self.Q_values, V=self.V_values, PI=self.policy) | |
print(self.Q_values) | |
print(self.policy) | |
return self.Q_values, self.V_values, self.policy | |
def get_action(self, state): | |
i, j = state | |
return np.random.choice(len(ACTIONS), 1, p=self.policy[i, j, :].tolist()).item() | |
def get_state(self): | |
return self.state |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment