Created
August 16, 2021 10:42
-
-
Save TripleExclam/4614ac046c3d3db4b4c68d328f12ceb1 to your computer and use it in GitHub Desktop.
Exercise 3.1 and 3.2 COMP3702
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import heapq | |
import time | |
class GridWorld: | |
ACTIONS = ['U', 'D', 'L', 'R'] | |
def __init__(self): | |
self.n_rows = 9 | |
self.n_cols = 9 | |
self.init_state = (8, 0) | |
self.goal_state = (0, 8) | |
self.board = [ | |
[0, 0, 0, 0, 0, 0, 0, 0, 0], | |
[0, 0, 0, 0, 0, 0, 0, 0, 0], | |
[0, 0, 1, 1, 1, 1, 1, 0, 0], | |
[0, 0, 0, 0, 0, 0, 1, 0, 0], | |
[0, 0, 0, 0, 0, 0, 1, 0, 0], | |
[0, 0, 0, 0, 0, 0, 1, 0, 0], | |
[0, 0, 0, 0, 0, 0, 1, 0, 0], | |
[0, 0, 0, 1, 1, 1, 1, 0, 0], | |
[0, 0, 0, 0, 0, 0, 0, 0, 0] | |
] | |
# 3.2 | |
self.costs = [ | |
[1, 1, 1, 5, 5, 5, 5, 1, 1], | |
[1, 1, 1, 5, 5, 5, 5, 1, 1], | |
[1, 1, 10, 10, 10, 10, 10, 1, 1], | |
[1, 1, 1, 10, 10, 10, 10, 1, 1], | |
[1, 1, 1, 1, 1, 10, 10, 1, 1], | |
[1, 1, 1, 1, 1, 10, 10, 1, 1], | |
[1, 1, 1, 1, 10, 10, 10, 1, 1], | |
[1, 1, 1, 10, 10, 10, 10, 1, 1], | |
[1, 1, 1, 1, 1, 1, 1, 1, 1] | |
] | |
def step(self, state, action): | |
row, col = state | |
action_map = {'U': (row - 1, col), 'D': (row + 1, col), | |
'L': (row, col - 1), 'R': (row, col + 1)} | |
next_state = action_map.get(action, None) | |
if next_state is None: | |
assert False, "Invalid Action" | |
if (not 0 <= next_state[0] < self.n_rows) \ | |
or (not 0 <= next_state[1] < self.n_cols) \ | |
or self.board[next_state[0]][next_state[1]] == 1: | |
# Collision | |
return False, state | |
return True, next_state | |
def is_goal(self, state): | |
return state == self.goal_state | |
def get_state_cost(self, state): | |
return self.costs[state[0]][state[1]] | |
class StateNode: | |
def __init__(self, env, state, actions, path_cost): | |
self.env = env | |
self.state = state | |
self.actions = actions | |
self.path_cost = path_cost | |
def get_successors(self): | |
successors = [] | |
for a in self.env.ACTIONS: | |
success, new_state = self.env.step(self.state, a) | |
if success: | |
next_state = StateNode(self.env, new_state, | |
self.actions + [a], | |
self.path_cost + self.env.get_state_cost(new_state)) | |
successors.append(next_state) | |
return successors | |
def __lt__(self, other): | |
return True | |
def log(verbose, visited, container, end_node, n_expanded): | |
if not verbose: | |
return | |
print(f'Visited Nodes: {len(visited)},\t\tExpanded Nodes: {n_expanded},\t\t' | |
f'Nodes in Container: {len(container)}') | |
print(f'Cost of Path (with Costly Moves): {end_node.path_cost}') | |
def bfs(env, verbose=True): | |
container = [StateNode(env, env.init_state, [], 0)] | |
visited = set() | |
n_expanded = 0 | |
while len(container) > 0: | |
# expand node | |
node = container.pop(0) | |
# test for goal | |
if env.is_goal(node.state): | |
log(verbose, visited, container, node, n_expanded) | |
return node.actions | |
# add successors | |
successors = node.get_successors() | |
for s in successors: | |
if s.state not in visited: | |
container.append(s) | |
visited.add(s.state) | |
n_expanded += 1 | |
return None | |
def depth_limited_dfs(env, max_depth, verbose=True): | |
container = [StateNode(env, env.init_state, [], 0)] | |
# revisiting should be allowed if cost (depth) is lower than previous visit (needed for optimality) | |
visited = {} # dict mapping states to path cost (here equal to depth) | |
n_expanded = 0 | |
while len(container) > 0: | |
# expand node | |
node = container.pop(-1) | |
# test for goal | |
if env.is_goal(node.state): | |
log(verbose, visited, container, node, n_expanded) | |
return node.actions | |
# add successors | |
successors = node.get_successors() | |
for s in successors: | |
if (s.state not in visited or len(s.actions) < visited[s.state]) \ | |
and len(s.actions) < max_depth: | |
container.append(s) | |
visited[s.state] = len(s.actions) | |
n_expanded += 1 | |
return None | |
def iddfs(env, verbose=True): | |
depth_limit = 1 | |
while depth_limit < 1000: | |
actions = depth_limited_dfs(env, depth_limit, verbose) | |
if actions is not None: | |
return actions | |
depth_limit += 1 | |
return None | |
def ucs(env, verbose=True): | |
container = [(0, StateNode(env, env.init_state, [], 0))] | |
heapq.heapify(container) | |
# dict: state --> path_cost | |
visited = {env.init_state: 0} | |
n_expanded = 0 | |
while len(container) > 0: | |
n_expanded += 1 | |
_, node = heapq.heappop(container) | |
# check if this state is the goal | |
if env.is_goal(node.state): | |
log(verbose, visited, container, node, n_expanded) | |
return node.actions | |
# add unvisited (or visited at higher path cost) successors to container | |
successors = node.get_successors() | |
for s in successors: | |
if s.state not in visited.keys() or s.path_cost < visited[s.state]: | |
visited[s.state] = s.path_cost | |
heapq.heappush(container, (s.path_cost, s)) | |
return None | |
def manhattan_dist_heuristic(env, state): | |
# Gridworld only | |
return abs(env.goal_state[0] - state[0]) + abs(env.goal_state[0] - state[1]) | |
def a_star(env, heuristic, verbose=True): | |
container = [(heuristic(env, env.init_state), | |
StateNode(env, env.init_state, [], 0))] | |
heapq.heapify(container) | |
# dict: state --> path_cost | |
visited = {env.init_state: 0} | |
n_expanded = 0 | |
while len(container) > 0: | |
n_expanded += 1 | |
_, node = heapq.heappop(container) | |
# check if this state is the goal | |
if env.is_goal(node.state): | |
log(verbose, visited, container, node, n_expanded) | |
return node.actions | |
# add unvisited (or visited at higher path cost) successors to container | |
successors = node.get_successors() | |
for s in successors: | |
if s.state not in visited.keys() or s.path_cost < visited[s.state]: | |
visited[s.state] = s.path_cost | |
heapq.heappush(container, | |
(s.path_cost + heuristic(env, s.state), s)) | |
return None | |
if __name__ == "__main__": | |
n_trials = 100 | |
print('== Exercise 3.1 ==') | |
gridworld = GridWorld() | |
print('BFS:') | |
t0 = time.time() | |
for i in range(n_trials): | |
actions_bfs = bfs(gridworld, verbose=(i == 0)) | |
t_bfs = (time.time() - t0) / n_trials | |
print(f'Num Actions: {len(actions_bfs)},\nActions: {actions_bfs}') | |
print(f'Time: {t_bfs}') | |
print('\n') | |
print('IDDFS:') | |
t0 = time.time() | |
for i in range(n_trials): | |
actions_iddfs = iddfs(gridworld, verbose=(i == 0)) | |
t_iddfs = (time.time() - t0) / n_trials | |
print(f'Num Actions: {len(actions_iddfs)},\nActions: {actions_iddfs}') | |
print(f'Time: {t_iddfs}') | |
print('\n') | |
print('== Exercise 3.2 ==') | |
print('UCS:') | |
t0 = time.time() | |
for i in range(n_trials): | |
actions_ucs = ucs(gridworld, verbose=(i == 0)) | |
t_ucs = (time.time() - t0) / n_trials | |
print(f'Num Actions: {len(actions_ucs)},\nActions: {actions_ucs}') | |
print(f'Time: {t_ucs}') | |
print('\n') | |
print('A*:') | |
t0 = time.time() | |
for i in range(n_trials): | |
actions_a_star = a_star(gridworld, manhattan_dist_heuristic, verbose=(i == 0)) | |
t_a_star = (time.time() - t0) / n_trials | |
print(f'Num Actions: {len(actions_a_star)},\nActions: {actions_a_star}') | |
print(f'Time: {t_a_star}') | |
print('\n') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment