Created
December 17, 2024 05:23
-
-
Save meghbhalerao/36a02e69a1ca79955ba9b796303b0faa to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from minigrid.wrappers import RGBImgPartialObsWrapper, ImgObsWrapper | |
from stable_baselines3.common.monitor import Monitor | |
from gymnasium.utils.play import play | |
from utils.utils import move_to_pos, get_pos_from_int, turn_and_explore # get_pos_from_int, turn_and_explore | |
from minigrid.core.actions import Actions | |
second_task = gym.make("MiniGrid-BlockedUnlockPickup-v0", render_mode = 'human') | |
# play(second_task, | |
# keys_to_action={ | |
# "w": np.int64(2), | |
# "a": np.int64(0), | |
# "d": np.int64(1), | |
# "s": np.int64(3), | |
# "z": np.int64(4), | |
# "x": np.int64(5), | |
# "c": np.int64(6), | |
# }, noop = np.int64(6), fps = 1 | |
# ) | |
# second_task = ImgObsWrapper(second_task) # Get rid of the 'mission' field | |
# second_task = Monitor(second_task) | |
print("Width of grid in units of number of tiles is:", second_task.grid.width) | |
print("Height of grid in units of number of tiles is:", second_task.grid.height) | |
print("Size of individual room in units of number of tiles is", second_task.room_size) | |
print("Number of rows of rooms is:", second_task.num_rows) | |
print("Number of columns of rooms is:", second_task.num_cols) | |
agent_view_size = second_task.agent_view_size | |
agent_pos = (agent_view_size // 2, agent_view_size - 1) | |
print(f"Initial Position of Agent is {agent_pos}") | |
rew_list = [] | |
seed_by_episode = [42, 34, 50, 1, 9, 7, 43, 56, 90, 11] | |
for s in seed_by_episode: | |
obs, _ = second_task.reset(seed = s) | |
print(obs.keys()) | |
try: | |
# ------------- ball stuff ------------------- | |
obs = turn_and_explore(second_task, obs, 6) | |
ball_coords = get_pos_from_int(obs['image'][:,:,0], 6) # 5 is the code for key - hard coding this now, later the codebase can be improved, key_coords by default gives us th relative coordinates between the agent and the object | |
print(f"coordinates of key relative to agent is {ball_coords}") | |
agent_dir = obs['direction'] | |
obs = move_to_pos(second_task, ball_coords, agent_dir, obs) | |
obs, _, _, _, _ = second_task.step(3) | |
# -----------key stuff ------------------------- | |
# first step of our rule, go to the key | |
obs = turn_and_explore(second_task, obs, 5) | |
key_coords = get_pos_from_int(obs['image'][:,:,0], 5) # 5 is the code for key - hard coding this now, later the codebase can be improved, key_coords by default gives us th relative coordinates between the agent and the object | |
print(f"coordinates of key relative to agent is {key_coords}") | |
agent_dir = obs['direction'] | |
obs = move_to_pos(second_task, key_coords, agent_dir, obs) | |
# now drop the ball in an appropriate direction | |
agent_right_side = (agent_view_size // 2 + 1 , agent_view_size - 1) | |
agent_left_side = (agent_view_size // 2 - 1, agent_view_size - 1) | |
if obs['direction'] == 2: # direction = up - prefer to put obj in left so less obstruction | |
if obs['image'][:,:,0][agent_left_side[0], agent_left_side[1]] == 1: | |
obs, _, _, _, _ = second_task.step(Actions.left) | |
obs, _, _, _, _ = second_task.step(Actions.drop) | |
obs, _, _, _, _ = second_task.step(Actions.right) | |
elif obs['image'][:,:,0][agent_right_side[0], agent_right_side[1]] == 1: | |
obs, _, _, _, _ = second_task.step(Actions.right) | |
obs, _, _, _, _ = second_task.step(Actions.drop) | |
obs, _, _, _, _ = second_task.step(Actions.left) | |
elif obs['direction'] == 1: # direction = down - prefer to put obj in left so less obstruction | |
if obs['image'][:,:,0][agent_right_side[0], agent_right_side[1]] == 1: | |
obs, _, _, _, _ = second_task.step(Actions.right) | |
obs, _, _, _, _ = second_task.step(Actions.drop) | |
obs, _, _, _, _ = second_task.step(Actions.left) | |
elif obs['image'][:,:,0][agent_left_side[0], agent_left_side[1]] == 1: | |
obs, _, _, _, _ = second_task.step(Actions.left) | |
obs, _, _, _, _ = second_task.step(Actions.drop) | |
obs, _, _, _, _ = second_task.step(Actions.right) | |
elif obs['direction'] == 0: # direction = down - prefer to put obj in left so less obstruction | |
if obs['image'][:,:,0][agent_left_side[0], agent_left_side[1]] == 1: | |
obs, _, _, _, _ = second_task.step(Actions.left) | |
obs, _, _, _, _ = second_task.step(Actions.drop) | |
obs, _, _, _, _ = second_task.step(Actions.right) | |
elif obs['image'][:,:,0][agent_right_side[0], agent_right_side[1]] == 1: | |
obs, _, _, _, _ = second_task.step(Actions.right) | |
obs, _, _, _, _ = second_task.step(Actions.drop) | |
obs, _, _, _, _ = second_task.step(Actions.left) | |
elif obs['direction'] == 3: | |
if obs['image'][:,:,0][agent_right_side[0], agent_right_side[1]] == 1: | |
obs, _, _, _, _ = second_task.step(Actions.right) | |
obs, _, _, _, _ = second_task.step(Actions.drop) | |
obs, _, _, _, _ = second_task.step(Actions.left) | |
if obs['image'][:,:,0][agent_left_side[0], agent_left_side[1]] == 1: | |
obs, _, _, _, _ = second_task.step(Actions.left) | |
obs, _, _, _, _ = second_task.step(Actions.drop) | |
obs, _, _, _, _ = second_task.step(Actions.right) | |
obs, _, _, _, _ = second_task.step(3) | |
# TODO - add a sanity check to see that the object has been actually picked up | |
assert second_task.carrying.type == 'key' | |
obs = turn_and_explore(second_task, obs, 4) | |
door_coords = get_pos_from_int(obs['image'][:,:,0], 4) # 4 is the code for the door object | |
agent_dir = obs['direction'] | |
obs = move_to_pos(second_task, door_coords, agent_dir, obs) | |
# here check if door is on the left or right - this can be made simpler actually - if dir is upwards then turn right, downwards means turn left | |
agent_dir_at_door = obs['direction'] | |
if agent_dir_at_door == 1: | |
obs, _, _, _, _ = second_task.step(Actions.left) | |
elif agent_dir_at_door == 3: | |
obs, _, _, _, _ = second_task.step(Actions.right) | |
else: | |
pass | |
obs, _, _, _, _ = second_task.step(5) # 5 is the action code for toggle which is used to open and close a door\\ | |
for _ in range(2): | |
second_task.step(Actions.right) | |
# now we need to drop the key before picking up the box - the key can be dropped any where - but it might be good if we can just drop the key before going into the next room, the ideal way would be simply to turn back, 180 degree and then drop the key since it is always a valid drop and does not obstruct our further path in any way | |
second_task.step(Actions.drop) | |
# turn again 2 times to come to original orientation - this can be either left or right does not matter | |
for _ in range(2): | |
second_task.step(Actions.right) | |
# move through the passage way through the door - this is applicable only when the door is of thickness = 1 - this seems to be the case for most of the envs | |
for _ in range(2): | |
obs, _, _, _, _ = second_task.step(Actions.forward) | |
obs_object = obs['image'][:,:,0] | |
# since we just entered through the door and stopped it is guarenteed atleast for this observation space that we will always see the box, since the agent view is more than the dimensions of an individual room | |
box_coords = get_pos_from_int(obs['image'][:,:,0], 7) | |
agent_dir = obs['direction'] | |
move_to_pos(second_task, box_coords, agent_dir, obs) | |
# pick up the box | |
obs, reward, _, _, _ = second_task.step(Actions.pickup) | |
print("Final reward", reward) | |
except: | |
print("Task Failed!") | |
reward = 0 | |
rew_list.append(reward) | |
print(np.mean(rew_list), np.std(rew_list)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from minigrid.core.actions import Actions | |
from minigrid.core.constants import DIR_TO_VEC | |
import numpy as np | |
import sys | |
def move_to_pos(env, rel_pos, agent_dir, obs): | |
agent_view_size = env.agent_view_size | |
agent_pos = (agent_view_size // 2, agent_view_size - 1) | |
x_diff = agent_pos[0] - rel_pos[0] | |
y_diff = agent_pos[1] - rel_pos[1] | |
print("Agent coodinates is", agent_pos) | |
print("Pos of to move to is", rel_pos) | |
print('Move Vector is ', x_diff, y_diff) | |
print(f"Agent Dir is {DIR_TO_VEC[agent_dir]}") | |
if agent_dir == 0: | |
x_diff, y_diff = y_diff, -x_diff | |
elif agent_dir == 1: | |
pass | |
elif agent_dir == 2: | |
x_diff, y_diff = -y_diff, x_diff | |
elif agent_dir == 3: | |
x_diff, y_diff = -x_diff, -y_diff | |
print(f'New move Vector is {x_diff, y_diff}') | |
# we will essentially have 8 conditions here, 2 for whatever direction the x difference and 4 each for what the orientation of the agent is, and inside each of the condition we have to specify which direction the agent has to rotate to align with either the positive or the negative x axis | |
if x_diff > 0: | |
if agent_dir == 0: | |
pass | |
elif agent_dir == 1: | |
obs, _, _, _, _ = env.step(Actions.left) | |
elif agent_dir == 2: | |
obs, _, _, _, _ = env.step(Actions.left) | |
obs, _, _, _, _ = env.step(Actions.left) | |
elif agent_dir == 3: | |
obs, _, _, _, _ = env.step(Actions.right) | |
elif x_diff < 0: | |
if agent_dir == 0: | |
obs, _, _, _, _ = env.step(Actions.left) | |
obs, _, _, _, _ = env.step(Actions.left) | |
elif agent_dir == 1: | |
obs, _, _, _, _ = env.step(Actions.right) | |
elif agent_dir == 2: | |
pass | |
elif agent_dir == 3: | |
obs, _, _, _, _ = env.step(Actions.left) | |
else: | |
pass | |
# else: | |
# if y_diff > 0: | |
# if agent_dir == 1: | |
# env.step(Actions.right) | |
# elif agent_dir == 3: | |
# env.step(Actions.left) | |
# elif agent_dir == 4: | |
# env.step(Actions.left) | |
# env.step(Actions.left) | |
# elif y_diff < 0: | |
# pass | |
# now we know the move vector, now we have to move the agent | |
# sys.exit() | |
xaxis_action_sequence = [Actions.forward for _ in range(abs(x_diff))] | |
for action in xaxis_action_sequence: | |
obs, _, _, _, _ = env.step(action) | |
agent_dir_after_x_movement = obs['direction'] | |
print("Agent direction after x movement is ", {agent_dir_after_x_movement}) | |
yaxis_action_sequence = [] | |
if y_diff > 0: | |
if agent_dir_after_x_movement == 2: | |
yaxis_action_sequence.append(Actions.left) | |
elif agent_dir_after_x_movement== 0: | |
yaxis_action_sequence.append(Actions.right) | |
elif y_diff < 0: | |
if agent_dir_after_x_movement == 2: | |
yaxis_action_sequence.append(Actions.right) | |
elif agent_dir_after_x_movement == 0: | |
yaxis_action_sequence.append(Actions.left) | |
for _ in range(abs(y_diff)): | |
yaxis_action_sequence.append(Actions.forward) | |
for action in yaxis_action_sequence: | |
obs, _, _, _, _ = env.step(action) | |
return obs | |
def turn_and_explore(env, obs, obj_id): | |
for i in range(4): | |
if np.isin(obj_id, obs['image'][:,:,0]): | |
# print(f"Found!! {obj_id}") | |
break | |
else: | |
obs, _, _, _, _ = env.step(0) # turn left and sort of explore to see if the key is present in the agent observation space | |
return obs | |
def get_pos_from_int(obs_array, obj_code): | |
assert isinstance(obs_array, np.ndarray) and isinstance(obj_code, int), "mismatched data types in function arguments " | |
pos = np.array(np.where(obs_array == obj_code)) | |
# print(obs_array) | |
return (int(pos[0]), int(pos[1])) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment