Skip to content

Instantly share code, notes, and snippets.

@meghbhalerao
Created December 17, 2024 05:23
Show Gist options
  • Save meghbhalerao/36a02e69a1ca79955ba9b796303b0faa to your computer and use it in GitHub Desktop.
Save meghbhalerao/36a02e69a1ca79955ba9b796303b0faa to your computer and use it in GitHub Desktop.
from minigrid.wrappers import RGBImgPartialObsWrapper, ImgObsWrapper
from stable_baselines3.common.monitor import Monitor
from gymnasium.utils.play import play
from utils.utils import move_to_pos, get_pos_from_int, turn_and_explore # get_pos_from_int, turn_and_explore
from minigrid.core.actions import Actions
second_task = gym.make("MiniGrid-BlockedUnlockPickup-v0", render_mode = 'human')
# play(second_task,
# keys_to_action={
# "w": np.int64(2),
# "a": np.int64(0),
# "d": np.int64(1),
# "s": np.int64(3),
# "z": np.int64(4),
# "x": np.int64(5),
# "c": np.int64(6),
# }, noop = np.int64(6), fps = 1
# )
# second_task = ImgObsWrapper(second_task) # Get rid of the 'mission' field
# second_task = Monitor(second_task)
print("Width of grid in units of number of tiles is:", second_task.grid.width)
print("Height of grid in units of number of tiles is:", second_task.grid.height)
print("Size of individual room in units of number of tiles is", second_task.room_size)
print("Number of rows of rooms is:", second_task.num_rows)
print("Number of columns of rooms is:", second_task.num_cols)
agent_view_size = second_task.agent_view_size
agent_pos = (agent_view_size // 2, agent_view_size - 1)
print(f"Initial Position of Agent is {agent_pos}")
rew_list = []
seed_by_episode = [42, 34, 50, 1, 9, 7, 43, 56, 90, 11]
for s in seed_by_episode:
obs, _ = second_task.reset(seed = s)
print(obs.keys())
try:
# ------------- ball stuff -------------------
obs = turn_and_explore(second_task, obs, 6)
ball_coords = get_pos_from_int(obs['image'][:,:,0], 6) # 5 is the code for key - hard coding this now, later the codebase can be improved, key_coords by default gives us th relative coordinates between the agent and the object
print(f"coordinates of key relative to agent is {ball_coords}")
agent_dir = obs['direction']
obs = move_to_pos(second_task, ball_coords, agent_dir, obs)
obs, _, _, _, _ = second_task.step(3)
# -----------key stuff -------------------------
# first step of our rule, go to the key
obs = turn_and_explore(second_task, obs, 5)
key_coords = get_pos_from_int(obs['image'][:,:,0], 5) # 5 is the code for key - hard coding this now, later the codebase can be improved, key_coords by default gives us th relative coordinates between the agent and the object
print(f"coordinates of key relative to agent is {key_coords}")
agent_dir = obs['direction']
obs = move_to_pos(second_task, key_coords, agent_dir, obs)
# now drop the ball in an appropriate direction
agent_right_side = (agent_view_size // 2 + 1 , agent_view_size - 1)
agent_left_side = (agent_view_size // 2 - 1, agent_view_size - 1)
if obs['direction'] == 2: # direction = up - prefer to put obj in left so less obstruction
if obs['image'][:,:,0][agent_left_side[0], agent_left_side[1]] == 1:
obs, _, _, _, _ = second_task.step(Actions.left)
obs, _, _, _, _ = second_task.step(Actions.drop)
obs, _, _, _, _ = second_task.step(Actions.right)
elif obs['image'][:,:,0][agent_right_side[0], agent_right_side[1]] == 1:
obs, _, _, _, _ = second_task.step(Actions.right)
obs, _, _, _, _ = second_task.step(Actions.drop)
obs, _, _, _, _ = second_task.step(Actions.left)
elif obs['direction'] == 1: # direction = down - prefer to put obj in left so less obstruction
if obs['image'][:,:,0][agent_right_side[0], agent_right_side[1]] == 1:
obs, _, _, _, _ = second_task.step(Actions.right)
obs, _, _, _, _ = second_task.step(Actions.drop)
obs, _, _, _, _ = second_task.step(Actions.left)
elif obs['image'][:,:,0][agent_left_side[0], agent_left_side[1]] == 1:
obs, _, _, _, _ = second_task.step(Actions.left)
obs, _, _, _, _ = second_task.step(Actions.drop)
obs, _, _, _, _ = second_task.step(Actions.right)
elif obs['direction'] == 0: # direction = down - prefer to put obj in left so less obstruction
if obs['image'][:,:,0][agent_left_side[0], agent_left_side[1]] == 1:
obs, _, _, _, _ = second_task.step(Actions.left)
obs, _, _, _, _ = second_task.step(Actions.drop)
obs, _, _, _, _ = second_task.step(Actions.right)
elif obs['image'][:,:,0][agent_right_side[0], agent_right_side[1]] == 1:
obs, _, _, _, _ = second_task.step(Actions.right)
obs, _, _, _, _ = second_task.step(Actions.drop)
obs, _, _, _, _ = second_task.step(Actions.left)
elif obs['direction'] == 3:
if obs['image'][:,:,0][agent_right_side[0], agent_right_side[1]] == 1:
obs, _, _, _, _ = second_task.step(Actions.right)
obs, _, _, _, _ = second_task.step(Actions.drop)
obs, _, _, _, _ = second_task.step(Actions.left)
if obs['image'][:,:,0][agent_left_side[0], agent_left_side[1]] == 1:
obs, _, _, _, _ = second_task.step(Actions.left)
obs, _, _, _, _ = second_task.step(Actions.drop)
obs, _, _, _, _ = second_task.step(Actions.right)
obs, _, _, _, _ = second_task.step(3)
# TODO - add a sanity check to see that the object has been actually picked up
assert second_task.carrying.type == 'key'
obs = turn_and_explore(second_task, obs, 4)
door_coords = get_pos_from_int(obs['image'][:,:,0], 4) # 4 is the code for the door object
agent_dir = obs['direction']
obs = move_to_pos(second_task, door_coords, agent_dir, obs)
# here check if door is on the left or right - this can be made simpler actually - if dir is upwards then turn right, downwards means turn left
agent_dir_at_door = obs['direction']
if agent_dir_at_door == 1:
obs, _, _, _, _ = second_task.step(Actions.left)
elif agent_dir_at_door == 3:
obs, _, _, _, _ = second_task.step(Actions.right)
else:
pass
obs, _, _, _, _ = second_task.step(5) # 5 is the action code for toggle which is used to open and close a door\\
for _ in range(2):
second_task.step(Actions.right)
# now we need to drop the key before picking up the box - the key can be dropped any where - but it might be good if we can just drop the key before going into the next room, the ideal way would be simply to turn back, 180 degree and then drop the key since it is always a valid drop and does not obstruct our further path in any way
second_task.step(Actions.drop)
# turn again 2 times to come to original orientation - this can be either left or right does not matter
for _ in range(2):
second_task.step(Actions.right)
# move through the passage way through the door - this is applicable only when the door is of thickness = 1 - this seems to be the case for most of the envs
for _ in range(2):
obs, _, _, _, _ = second_task.step(Actions.forward)
obs_object = obs['image'][:,:,0]
# since we just entered through the door and stopped it is guarenteed atleast for this observation space that we will always see the box, since the agent view is more than the dimensions of an individual room
box_coords = get_pos_from_int(obs['image'][:,:,0], 7)
agent_dir = obs['direction']
move_to_pos(second_task, box_coords, agent_dir, obs)
# pick up the box
obs, reward, _, _, _ = second_task.step(Actions.pickup)
print("Final reward", reward)
except:
print("Task Failed!")
reward = 0
rew_list.append(reward)
print(np.mean(rew_list), np.std(rew_list))
from minigrid.core.actions import Actions
from minigrid.core.constants import DIR_TO_VEC
import numpy as np
import sys
def move_to_pos(env, rel_pos, agent_dir, obs):
agent_view_size = env.agent_view_size
agent_pos = (agent_view_size // 2, agent_view_size - 1)
x_diff = agent_pos[0] - rel_pos[0]
y_diff = agent_pos[1] - rel_pos[1]
print("Agent coodinates is", agent_pos)
print("Pos of to move to is", rel_pos)
print('Move Vector is ', x_diff, y_diff)
print(f"Agent Dir is {DIR_TO_VEC[agent_dir]}")
if agent_dir == 0:
x_diff, y_diff = y_diff, -x_diff
elif agent_dir == 1:
pass
elif agent_dir == 2:
x_diff, y_diff = -y_diff, x_diff
elif agent_dir == 3:
x_diff, y_diff = -x_diff, -y_diff
print(f'New move Vector is {x_diff, y_diff}')
# we will essentially have 8 conditions here, 2 for whatever direction the x difference and 4 each for what the orientation of the agent is, and inside each of the condition we have to specify which direction the agent has to rotate to align with either the positive or the negative x axis
if x_diff > 0:
if agent_dir == 0:
pass
elif agent_dir == 1:
obs, _, _, _, _ = env.step(Actions.left)
elif agent_dir == 2:
obs, _, _, _, _ = env.step(Actions.left)
obs, _, _, _, _ = env.step(Actions.left)
elif agent_dir == 3:
obs, _, _, _, _ = env.step(Actions.right)
elif x_diff < 0:
if agent_dir == 0:
obs, _, _, _, _ = env.step(Actions.left)
obs, _, _, _, _ = env.step(Actions.left)
elif agent_dir == 1:
obs, _, _, _, _ = env.step(Actions.right)
elif agent_dir == 2:
pass
elif agent_dir == 3:
obs, _, _, _, _ = env.step(Actions.left)
else:
pass
# else:
# if y_diff > 0:
# if agent_dir == 1:
# env.step(Actions.right)
# elif agent_dir == 3:
# env.step(Actions.left)
# elif agent_dir == 4:
# env.step(Actions.left)
# env.step(Actions.left)
# elif y_diff < 0:
# pass
# now we know the move vector, now we have to move the agent
# sys.exit()
xaxis_action_sequence = [Actions.forward for _ in range(abs(x_diff))]
for action in xaxis_action_sequence:
obs, _, _, _, _ = env.step(action)
agent_dir_after_x_movement = obs['direction']
print("Agent direction after x movement is ", {agent_dir_after_x_movement})
yaxis_action_sequence = []
if y_diff > 0:
if agent_dir_after_x_movement == 2:
yaxis_action_sequence.append(Actions.left)
elif agent_dir_after_x_movement== 0:
yaxis_action_sequence.append(Actions.right)
elif y_diff < 0:
if agent_dir_after_x_movement == 2:
yaxis_action_sequence.append(Actions.right)
elif agent_dir_after_x_movement == 0:
yaxis_action_sequence.append(Actions.left)
for _ in range(abs(y_diff)):
yaxis_action_sequence.append(Actions.forward)
for action in yaxis_action_sequence:
obs, _, _, _, _ = env.step(action)
return obs
def turn_and_explore(env, obs, obj_id):
for i in range(4):
if np.isin(obj_id, obs['image'][:,:,0]):
# print(f"Found!! {obj_id}")
break
else:
obs, _, _, _, _ = env.step(0) # turn left and sort of explore to see if the key is present in the agent observation space
return obs
def get_pos_from_int(obs_array, obj_code):
assert isinstance(obs_array, np.ndarray) and isinstance(obj_code, int), "mismatched data types in function arguments "
pos = np.array(np.where(obs_array == obj_code))
# print(obs_array)
return (int(pos[0]), int(pos[1]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment