Skip to content

Instantly share code, notes, and snippets.

@znxkznxk1030
Last active April 17, 2025 23:26
Show Gist options
  • Save znxkznxk1030/c2f8a151138bd403530a80f288f845b2 to your computer and use it in GitHub Desktop.
Save znxkznxk1030/c2f8a151138bd403530a80f288f845b2 to your computer and use it in GitHub Desktop.
import random
import numpy as np
from visualize_train import draw_value_image, draw_policy_image
# left, right, up, down
ACTIONS = [np.array([0, -1]),
np.array([0, 1]),
np.array([-1, 0]),
np.array([1, 0])]
TRAINING_EPISODE_NUM = 800000
class AGENT:
def __init__(self, env, is_upload=False):
self.ACTIONS = ACTIONS
self.env = env
HEIGHT, WIDTH = env.size()
self.state = [0, 0]
if is_upload: # Test
mcc_results = np.load('./result/mcc.npz')
self.V_values = mcc_results['V']
self.Q_values = mcc_results['Q']
self.policy = mcc_results['PI']
print(self.policy)
else: # For training
self.V_values = np.zeros((HEIGHT, WIDTH))
self.Q_values = np.zeros((HEIGHT, WIDTH, len(self.ACTIONS)))
self.policy = np.zeros((HEIGHT, WIDTH, len(self.ACTIONS))) + 1. / len(self.ACTIONS)
def initialize_episode(self):
HEIGHT, WIDTH = self.env.size()
while True:
i = np.random.randint(HEIGHT)
j = np.random.randint(WIDTH)
state = [i, j]
if (state in self.env.goal) or (state in self.env.obstacles):
continue
break
# if (state not in self.env.goal) and (state not in self.env.obstacles):
# break
return state
def Monte_Carlo_Control(self, discount=1.0, alpha=0.01, max_seq_len=500,
epsilon=0.3, decay_period=20000, decay_rate=0.9):
"""
Monte Carlo Control 알고리즘을 사용하여 Q값과 정책 업데이트를 수행하는 함수.
각 에피소드마다 에이전트가 환경에서 상호작용하여 경험을 쌓고, 이를 바탕으로 Q값과 정책을 업데이트.
Parameters
-------
discount : float
γ - 현재의 보상을 미래의 보상보다 더 비중을 두기 위한 할인율.
alpha : float
α - 학습률을 조절하는 하이퍼 파라미터. 각 에피소드에서 얻은 정보를 얼마나 반영할지 결정.
max_seq_len : int
에피소드에서 고려되는 최대 길이. 이 값을 초과하면 해당 에피소드에서는 Q값과 정책을 업데이트하지 않음.
epsilon : float
ε - 무작위적인 행동을 선택할 확률.
decay_period: int
epsilon의 감쇠기간을 설정하는 하이퍼 파라미터.
decay_rate: float
epsilon의 감쇠율을 설정하는 하이퍼 파라미터.
Returns
--------
Q_values: np.array
Q(S,A) - 업데이트 된 Q 가치 함수.
V_values: np.array
V(S) - 업데이트 된 V 가치 함수.
policy: np.array
π(S) - 업데이트 된 정책 함수.
"""
for episode in range(TRAINING_EPISODE_NUM): # TRAINING_EPISODE_NUM 만큼 episode 를 반복.
if episode % decay_period == 0: # episode 가 decay_period 번 진행될 때 마다,
epsilon *= decay_rate # ε을 decay_rate 만큼 감쇠한다.
state = self.initialize_episode() # 시작 점을 초기화. Exploring Starts 방식을 채택. 즉, 임의의 시작점에서 episode를 진행.
print(episode) # episode 진행 상태를 확인 하기 위해 print
done = False # 시퀀스 생성 종료 여부.
timeout = False # 시퀀스 길이 초과 여부. (시퀀스의 길이가 max_seq_len 초과 된 경우.)
seq_len = 0 # 시퀀스 길이
history = [] # 시퀀스
#### Implement Sequence generation ####
while not done: # 시퀀스 생성 종료 조건을 충족 하지 않는 한 반복 수행. ( 종료 조건: Goal 도달 | timeout )
while True: # 유효한 s'(다음 상태)로 가는 행동을 취할 때까지 반복. ( 유효하지 않은 상태 예시: Grid World 범위 밖이나 장해물 셀로 이동.)
action = self.get_action(state) # a <- π(s) : s(현재 상태)에서 정책에 따라 다음 행동 선택.
next_state, reward = self.env.interaction(state, ACTIONS[action]) # (state, action) 과 환경을 상호작용 시킨다. ( => s'(next_state)와 r(reward) 반환 )
if state is not next_state: # s'(next_state)가 유효한 상태 이라면,
history.append((state, action, next_state, reward)) # 시퀀스 에 (상태-행동-보상)을 추가 하고,
break # 반복 탈출.
state = next_state # s <- s'
seq_len += 1 # 시퀀스 길이 1 증가.
if seq_len >= max_seq_len: # 시퀀스의 길이가 최대 허용값(max_seq_len) 보다 크다면, Timeout
timeout = True # timeout <- True
done = self.env.is_terminal(state) or timeout # 종료 조건 갱신. ( Goal 도달 | timeout )
if timeout: # 시퀀스 생성에서 시간초과가 된 경우,
continue # Q value 및 π(정책) 업데이트는 생략한다.
#### Implement Q Value and policy update ####
cum_reward = 0 # r : 누적 보상, 0으로 초기화
# print(history)
while history: # 에피소드에서 기록한 (상태-행동-보상) 시퀀스를 역순으로 처리
state, action, next_state, reward = history.pop(-1) # 에피소드에서 가장 마지막 경험(상태, 행동, 다음 상태, 보상)을 꺼냄 (후입선출 방식)
# print(state, action, next_state, reward)
if self.env.is_out_of_boundary(next_state) or self.env.is_on_obstacle(next_state): # 다음 상태가 환경의 경계 밖이거나 장애물 위인 경우, 업데이트를 건너뜀
continue
y, x = state # 현재 상태를 x,y로 분해
cum_reward = reward + discount * cum_reward # r <- r' + γr ( r: 누적 보상, r': 현재 보상, γ: 할인율 )
self.Q_values[y][x][action] += alpha * (cum_reward - self.Q_values[y][x][action]) # Q(s,a) 업데이트. Q(s,a) <- Q(s,a) + α(r - Q(s,a))
best_action = 0 # a∗: s에서 최적의 action
non_zero = np.where(self.Q_values[y][x] != 0)[0]
if len(non_zero) > 0:
max_val = np.max(self.Q_values[y][x][non_zero])
candidates = [i for i in non_zero if self.Q_values[y][x][i] == max_val]
best_action = np.random.choice(candidates)
self.policy[y][x] = [epsilon / 4] * 4
self.policy[y][x][best_action] += (1.0 - epsilon)
self.V_values = np.max(self.Q_values, axis=2)
draw_value_image(1, np.round(self.V_values, decimals=2), env=self.env)
draw_policy_image(1, np.round(self.policy, decimals=2), env=self.env)
np.savez('./result/mcc.npz', Q=self.Q_values, V=self.V_values, PI=self.policy)
print(self.Q_values)
print(self.policy)
return self.Q_values, self.V_values, self.policy
def get_action(self, state):
i, j = state
return np.random.choice(len(ACTIONS), 1, p=self.policy[i, j, :].tolist()).item()
def get_state(self):
return self.state
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment