Created
January 19, 2022 19:06
-
-
Save d33tah/ec637f1655b91ba59d406a407ba4c8c9 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import time | |
import subprocess | |
import copy | |
from enum import Enum, auto | |
import typing as T | |
import random | |
T_POS = T.List[int] | |
X = 0 | |
Y = 1 | |
def clear_screen(): | |
subprocess.call(["clear"]) | |
class Task: | |
def __init__(self, matrix, frompos: T_POS, topos: T_POS): | |
self.frompos = frompos | |
self.topos = topos | |
matrix[self.frompos[X]][self.frompos[Y]] = self | |
class Action(Enum): | |
MOVE_UP = auto() | |
MOVE_DOWN = auto() | |
MOVE_LEFT = auto() | |
MOVE_RIGHT = auto() | |
DO_NOTHING = auto() | |
class Agent: | |
def __init__(self, matrix, pos: T_POS): | |
self.pos = pos | |
matrix[self.pos[X]][self.pos[Y]] = self | |
self.on_started(matrix) | |
def on_started(self): | |
raise NotImplementedError() | |
def tick(self, matrix) -> Action: | |
raise NotImplementedError() | |
class SmartAgent(Agent): | |
def selfassign_task(self, matrix): | |
self.assigned_task = None | |
observed_tasks = [] | |
observed_agents = [] | |
for x in matrix: | |
for obj in x: | |
if isinstance(obj, Task): | |
observed_tasks.append(obj) | |
elif isinstance(obj, Agent): | |
observed_agents.append(obj) | |
for agent in observed_agents: | |
if agent.assigned_task is None: | |
continue | |
observed_tasks.remove(agent.assigned_task) | |
if not observed_tasks: | |
raise RuntimeError("Nothing to do!") | |
observed_tasks.sort( | |
key=lambda t: abs(self.pos[X] - t.topos[X]) | |
+ abs(self.pos[Y] - t.topos[Y]), reverse=True | |
) | |
self.assigned_task = observed_tasks.pop() # FIXME: not optimal | |
def on_started(self, matrix): | |
self.selfassign_task(matrix) | |
def tick(self, matrix) -> Action: | |
if self.assigned_task.topos[X] > self.pos[X]: | |
return Action.MOVE_RIGHT | |
elif self.assigned_task.topos[X] < self.pos[X]: | |
return Action.MOVE_LEFT | |
if self.assigned_task.topos[Y] < self.pos[Y]: | |
return Action.MOVE_UP | |
elif self.assigned_task.topos[Y] > self.pos[Y]: | |
return Action.MOVE_DOWN | |
class Game: | |
def __init__(self, N: int): | |
self.N = N | |
self.matrix = [[None for i in range(N)] for i in range(N)] | |
self.score = 0 | |
agents_and_tasks = 5 | |
self.tasks = [ | |
Task( | |
self.matrix, self.random_empty_spot(), self.random_empty_spot() | |
) | |
for i in range(agents_and_tasks) | |
] | |
self.agents = [ | |
SmartAgent(self.matrix, self.random_empty_spot()) | |
for i in range(agents_and_tasks) | |
] | |
self.keep_going = True | |
def random_empty_spot(self) -> T_POS: | |
x = random.randint(0, self.N - 1) | |
y = random.randint(0, self.N - 1) | |
while self.matrix[x][y] is not None: | |
x = random.randint(0, self.N - 1) | |
y = random.randint(0, self.N - 1) | |
return [x, y] | |
def print_matrix(self): | |
clear_screen() | |
matrix = copy.deepcopy(self.matrix) | |
for task in self.tasks: | |
matrix[task.frompos[X]][task.frompos[Y]] = "F" | |
matrix[task.topos[X]][task.topos[Y]] = "T" | |
for agent in self.agents: | |
matrix[agent.pos[X]][agent.pos[Y]] = "A" | |
matrix = list(map(list, zip(*matrix))) | |
for line in matrix: | |
line = [x if x else " " for x in line] | |
print("".join(line)) | |
def apply_move(self, pos: T_POS, action: Action): | |
if action == Action.MOVE_UP: | |
pos[Y] -= 1 | |
if pos[Y] < 0: | |
raise RuntimeError("Invalid move") | |
if action == Action.MOVE_DOWN: | |
pos[Y] += 1 | |
if pos[Y] > self.N - 1: | |
raise RuntimeError("Invalid move") | |
if action == Action.MOVE_LEFT: | |
pos[X] -= 1 | |
if pos[X] < 0: | |
raise RuntimeError("Invalid move") | |
if action == Action.MOVE_RIGHT: | |
pos[X] += 1 | |
if pos[X] > self.N - 1: | |
raise RuntimeError("Invalid move") | |
return pos | |
def apply_action(self, agent: Agent, action: Action): | |
new_pos = copy.copy(agent.pos) | |
self.apply_move(new_pos, action) | |
move_valid = False | |
obj_at_new_pos = self.matrix[new_pos[X]][new_pos[Y]] | |
if obj_at_new_pos is None: | |
move_valid = True | |
elif isinstance(obj_at_new_pos, Task): | |
move_valid = True | |
if move_valid: | |
self.matrix[agent.pos[X]][agent.pos[Y]] = None | |
self.apply_move(agent.pos, action) | |
self.matrix[agent.pos[X]][agent.pos[Y]] = agent | |
def run(self): | |
while self.keep_going: | |
self.print_matrix() | |
for agent in self.agents: | |
action = agent.tick(copy.deepcopy(self.matrix)) | |
self.apply_action(agent, action) | |
time.sleep(0.1) | |
if __name__ == "__main__": | |
N = 50 | |
while True: | |
Game(N).run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment