Last active
May 22, 2020 15:38
-
-
Save Gerenuk/eaf69f3080486b7047fb406b048715bf to your computer and use it in GitHub Desktop.
Miner trying to flee from Pirates
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import contextlib as __stickytape_contextlib | |
@__stickytape_contextlib.contextmanager | |
def __stickytape_temporary_dir(): | |
import tempfile | |
import shutil | |
dir_path = tempfile.mkdtemp() | |
try: | |
yield dir_path | |
finally: | |
shutil.rmtree(dir_path) | |
with __stickytape_temporary_dir() as __stickytape_working_dir: | |
def __stickytape_write_module(path, contents): | |
import os, os.path | |
def make_package(path): | |
parts = path.split("/") | |
partial_path = __stickytape_working_dir | |
for part in parts: | |
partial_path = os.path.join(partial_path, part) | |
if not os.path.exists(partial_path): | |
os.mkdir(partial_path) | |
open(os.path.join(partial_path, "__init__.py"), "w").write("\n") | |
make_package(os.path.dirname(path)) | |
full_path = os.path.join(__stickytape_working_dir, path) | |
with open(full_path, "w") as module_file: | |
module_file.write(contents) | |
import sys as __stickytape_sys | |
__stickytape_sys.path.insert(0, __stickytape_working_dir) | |
__stickytape_write_module('agent.py', 'from dataclasses import dataclass\nfrom itertools import chain\nfrom typing import Dict, List, Optional\n\nfrom assign_solver import solve_assign\nfrom config import Config\nfrom halite_amount_solver import solve_halite_amount\nfrom observation import Id, Observation\nfrom plans import Plan\nfrom actions import Action\nfrom strategy import Strategy\n\n\nclass StrategyAssigner: # pylint: disable=too-few-public-methods\n def updated_strategies(\n self, strategies: Dict[Id, Strategy], config: Config\n ) -> Dict[Id, Strategy]:\n raise NotImplementedError()\n\n\n@dataclass\nclass Record:\n obs: Observation\n strategy_names: Dict[Id, str]\n plans: List[Plan]\n best_plans: List[Plan]\n actions: List[Action]\n best_actions: List[Action]\n halite_actions: Dict[Id, str]\n\n\nclass Agent: # pylint: disable=too-few-public-methods\n def __init__(self, config: Config, strategy_assigner: StrategyAssigner):\n self.config = config\n self.strategy_assigner = strategy_assigner\n self.strategies: Dict[Id, Strategy] = {}\n self.records = []\n\n def __call__(self, raw_obs) -> Dict[str, str]:\n obs = Observation.from_obs(raw_obs)\n\n self.config.set_obs(obs)\n\n self.strategies = self.strategy_assigner.updated_strategies(\n self.strategies, self.config\n )\n\n print(\n f"\\n----- Step {self.config.obs.step}, Halite {self.config.obs.my_halite:.0f} -----"\n )\n\n print(f"Config: {self.config}")\n\n print(f"Strategies: {self.strategies}")\n\n assert (\n obs.my_ships.keys() | obs.my_shipyards.keys() == self.strategies.keys()\n ), (\n obs.my_ships,\n obs.my_shipyards,\n self.strategies,\n )\n\n bot_plans = list(\n chain.from_iterable(\n bot.make_plans(num=len(self.strategies))\n for bot in self.strategies.values()\n )\n )\n\n best_plans = solve_assign(bot_plans) # TODO type\n print("Best plans", best_plans)\n\n possible_actions = list(\n chain.from_iterable(plan.actions for plan in best_plans)\n )\n print("Possible actions", possible_actions)\n\n affordable_actions = solve_halite_amount(possible_actions)\n\n best_actions = solve_assign(affordable_actions)\n print("Best actions", best_actions)\n\n for action in best_actions:\n self.strategies[action.id].notify_action(action)\n\n halite_actions: Dict[str, str] = {}\n\n for action in best_actions:\n halite_command = action.halite_command\n\n assert (\n not halite_actions.keys() & halite_command.keys()\n ), f"Overwriting commands {halite_actions.keys() & halite_command.keys()}"\n\n halite_actions.update(halite_command)\n\n self.records.append(\n Record(\n obs=obs,\n strategy_names={\n id: str(strategy) for id, strategy in self.strategies.items()\n },\n plans=bot_plans,\n best_plans=best_plans,\n actions=possible_actions,\n best_actions=best_actions,\n halite_actions=halite_actions,\n )\n )\n\n return halite_actions\n') | |
__stickytape_write_module('assign_solver.py', 'import itertools\nfrom collections import defaultdict\nfrom typing import Any, List, Tuple\n\nimport numpy as np\nfrom scipy.optimize import linear_sum_assignment\n\nfrom helper import is_unique\n\n\nclass AssignSolverMixin:\n """\n You can use goal=None to signify a non-conflicting goal\n """\n\n @property\n def obj_goal_penalty(self) -> Tuple[Any, Any, float]:\n raise NotImplementedError()\n\n\ndef solve_assign(penalty_objs: List[AssignSolverMixin]):\n if not penalty_objs:\n return []\n\n obj_goal_penalties = [penalty_obj.obj_goal_penalty for penalty_obj in penalty_objs]\n\n # Rewrite non-conflict goals\n non_conflict_goals = (("nc", i) for i in itertools.count())\n\n obj_goal_penalties = [\n (obj, goal if goal is not None else next(non_conflict_goals), score)\n for obj, goal, score in obj_goal_penalties\n ]\n\n all_to_resolve_objs = defaultdict(list)\n\n for obj_goal_penalty, penalty_obj in zip(obj_goal_penalties, penalty_objs):\n all_to_resolve_objs[obj_goal_penalty[:2]].append(\n (obj_goal_penalty, penalty_obj)\n )\n\n best_to_resolve_objs = list(\n min(objs, key=lambda x: x[1].obj_goal_penalty[2])\n for objs in all_to_resolve_objs.values()\n )\n\n best_obj_goal_penalties, best_penalty_objs = zip(*best_to_resolve_objs)\n\n matrix, obj_goal_penalty_map = make_matrix(\n best_obj_goal_penalties, best_penalty_objs\n )\n\n try:\n x_idxs, y_idxs = linear_sum_assignment(matrix)\n except ValueError as exc:\n raise ValueError(\n f"Assign solver failed with {exc} for {penalty_objs}. "\n f"You may need to add actions which guarantee resolution, like when bots can stay on the spot."\n )\n\n try:\n result = [\n obj_goal_penalty_map[x_idx, y_idx] for x_idx, y_idx in zip(x_idxs, y_idxs)\n ]\n except KeyError as exc:\n raise ValueError(\n f"Assignment solution could not be resolved for {exc}. "\n "You may need to add a stay on the spot move to the bot. Objects were:\\n"\n + "\\n".join(map(str, penalty_objs))\n )\n\n assert is_unique(x_idxs), penalty_objs\n assert is_unique(y_idxs), penalty_objs\n\n return result\n\n\ndef make_matrix(obj_goal_penalties, penalty_objs: List[AssignSolverMixin]):\n assert is_unique(obj[:2] for obj in obj_goal_penalties)\n\n xs = list(set(x[0] for x in obj_goal_penalties))\n ys = list(set(x[1] for x in obj_goal_penalties))\n\n result = np.full(shape=(len(xs), len(ys)), fill_value=np.inf)\n\n obj_goal_penalty_map = {}\n\n for (x, y, penalty), obj in zip(obj_goal_penalties, penalty_objs):\n x_idx = xs.index(x)\n y_idx = ys.index(y)\n\n obj_goal_penalty_map[x_idx, y_idx] = obj\n\n result[x_idx, y_idx] = penalty\n\n return result, obj_goal_penalty_map\n') | |
__stickytape_write_module('helper.py', 'from collections import Counter\nfrom dataclasses import dataclass\nfrom operator import itemgetter\nfrom typing import Iterable, Optional, Dict, Tuple\n\nimport numpy as np\nfrom scipy.signal import convolve2d\n\nfrom geometry import Pos\nfrom halite_config import dist\n\n\n@dataclass\nclass ClosestDist:\n idx: int\n pos: Pos\n dist: int\n\n\ndef find_closest(pos: Pos, dest_poses: Iterable[Pos]) -> Optional[ClosestDist]:\n if not dest_poses:\n raise ValueError("Empty dest_poses provided")\n\n dists = [\n (i, dest_pos, dist(pos, dest_pos)) for i, dest_pos in enumerate(dest_poses)\n ]\n\n if not dists:\n return None\n\n closest = min(dists, key=itemgetter(2),)\n\n return ClosestDist(*closest)\n\n\ndef is_unique(elems: Iterable): # what if empty?\n cnts = Counter(elems)\n\n if not cnts:\n return True\n\n return cnts.most_common(1)[0][1] == 1\n\n\ndef shape_from_poses(poses):\n len_x = max(pox.x for pox in poses) + 1\n len_y = max(pos.y for pos in poses) + 1\n\n return (len_x, len_y)\n\n\ndef pos_dict_to_array(pos_dict: Dict[Pos, float]) -> np.ndarray:\n shape = shape_from_poses(pos_dict.keys())\n\n result = np.full(shape, fill_value=np.nan)\n\n for pos, val in pos_dict.items():\n result[pos] = val\n\n return result\n\n\ndef poses_to_dist_array(poses: Iterable[Pos], shape: Tuple[int, int]):\n result = np.full(shape, fill_value=np.nan)\n\n for i in range(shape[0]):\n for j in range(shape[1]):\n pos = Pos(i, j)\n result[i, j] = find_closest(pos, poses).dist\n\n return result\n\n\ndef make_filter(size: int):\n middle = size // 2\n dists_from_middle = np.array(\n [[abs(i - middle) + abs(j - middle) for i in range(size)] for j in range(size)]\n )\n result = np.exp(-0.5 * dists_from_middle)\n\n return result\n\n\ndef filter_array(array: np.ndarray, filter: np.ndarray):\n return convolve2d(array, filter, boundary="wrap", mode="same")\n\n\ndef score_new_shipyard(\n map_halite: np.ndarray,\n shipyard_dist: Optional[np.ndarray] = None,\n enemy_dist: Optional[np.ndarray] = None,\n):\n if shipyard_dist is not None:\n discounted_map_halite = map_halite * (1 - np.exp(-0.5 * shipyard_dist ** 2))\n else:\n discounted_map_halite = map_halite\n\n if enemy_dist is not None:\n discounted_enemy_map_halite = map_halite * (1 - np.exp(-0.5 * enemy_dist ** 2))\n discounted_map_halite -= discounted_enemy_map_halite\n\n filter_ = make_filter(size=11)\n\n filtered_map_halite = filter_array(discounted_map_halite, filter_)\n\n return filtered_map_halite\n\n\ndef max_pos_val(array: np.ndarray):\n linear_idx_map = array.argmax()\n\n x, y = np.unravel_index(linear_idx_map, array.shape)\n val = array[x, y]\n\n return Pos(x, y), val\n\n\ndef array_to_pos_dist(array: np.ndarray) -> Dict[Pos, float]:\n len_x, len_y = array.shape\n\n result = {}\n\n for i in range(len_x):\n for j in range(len_y):\n result[Pos(i, j)] = array[i, j]\n\n return result\n') | |
__stickytape_write_module('geometry.py', 'from typing import NamedTuple, Tuple, List, Set, FrozenSet, Dict\nimport itertools\nimport collections\n\n\nclass Pos(NamedTuple):\n x: int\n y: int\n\n def __repr__(self):\n return f"[{self.x}:{self.y}]"\n\n\nDiffType = Tuple[int, int]\n\n\ndef sliding_window(iterable, size):\n iterable = iter(iterable)\n window = collections.deque(itertools.islice(iterable, size - 1), maxlen=size)\n for item in iterable:\n window.append(item)\n yield tuple(window)\n\n\ndiff_order = [(1, 0), (0, 1), (-1, 0), (0, -1)]\nselect_side_map: Dict[bool, Dict[FrozenSet[DiffType], DiffType]] = {\n True: {},\n False: {},\n} # initialize map to pick left/right diff\n\nfor side_flag, elem_idx in [(True, 0), (False, -1)]:\n for length in [2, 3]:\n select_side_map[side_flag].update(\n {\n frozenset(order): order[elem_idx]\n for order in itertools.islice(\n sliding_window(itertools.cycle(diff_order), length), 4\n )\n }\n )\n\n\nclass Geometry:\n def __init__(self, size_x: int, size_y: int):\n self.size_x = size_x\n self.size_y = size_y\n\n self.poses = {Pos(x, y) for x in range(size_x) for y in range(size_y)}\n\n def int_to_pos(self, int_pos: int) -> Pos:\n x = int_pos % self.size_x\n y = int_pos // self.size_x\n assert 0 <= y < self.size_y\n return Pos(x, y)\n\n def _to_xy(self, pos: Pos) -> DiffType:\n assert isinstance(pos, Pos), f"Invalid position {pos}"\n x = pos.x\n y = pos.y\n if not 0 <= x < self.size_x and 0 <= y < self.size_y:\n raise ValueError(\n f"Position {pos} is illegal for geometry of size {self.size_x} x {self.size_y}"\n )\n\n return (x, y)\n\n def _to_pos(self, x: int, y: int) -> Pos:\n x %= self.size_x\n y %= self.size_y\n return Pos(x, y)\n\n def _diff_to(self, pos1: Pos, pos2: Pos) -> DiffType:\n """\n Return diff vector for shortest path (torus)\n External function currently are not supposed to deal with position differences\n """\n x1, y1 = self._to_xy(pos1)\n x2, y2 = self._to_xy(pos2)\n\n dxy_shifted = [\n (x2 + shift_x - x1, y2 + shift_y - y1)\n for shift_x, shift_y in itertools.product(\n [-self.size_x, 0, self.size_x], [-self.size_y, 0, self.size_y]\n )\n ]\n\n dx, dy = min(dxy_shifted, key=lambda dxy: self._raw_dist(dxy[0], dxy[1]))\n\n return (dx, dy)\n\n @staticmethod\n def _raw_dist(dx: int, dy: int) -> int:\n return abs(dx) + abs(dy)\n\n def dist(self, pos1: Pos, pos2: Pos) -> int:\n dx, dy = self._diff_to(pos1, pos2)\n return self._raw_dist(dx, dy)\n\n def pos_towards(self, pos1: Pos, pos2: Pos) -> Set[Pos]:\n if pos1 == pos2:\n return {pos2}\n\n x1, y1 = self._to_xy(pos1)\n dx, dy = self._diff_to(pos1, pos2)\n\n result = []\n\n if dx > 0:\n result.append((x1 + 1, y1))\n\n if dx < 0:\n result.append((x1 - 1, y1))\n\n if dy > 0:\n result.append((x1, y1 + 1))\n\n if dy < 0:\n result.append((x1, y1 - 1))\n\n return set(itertools.starmap(self._to_pos, result))\n\n def get_prox(self, pos: Pos, *dists: int) -> Set[Pos]:\n x, y = self._to_xy(pos)\n\n result = []\n for dist in dists:\n if dist == 0:\n result.append((x, y))\n continue\n for d in range(dist):\n result.append((x + d, y + dist - d))\n result.append((x - d, y - dist + d))\n result.append((x - dist + d, y + d))\n result.append((x + dist - d, y - d))\n\n return set(itertools.starmap(self._to_pos, result))\n\n def choose_side(self, pos: Pos, next_poses: List[Pos], left=True) -> Pos:\n if len(next_poses) == 1:\n return next_poses[0]\n\n diffs = frozenset(self._diff_to(pos, next_pos) for next_pos in next_poses)\n\n side_diff = select_side_map[left].get(diffs)\n\n if side_diff is None:\n raise ValueError(\n f"Cannot determine { {True:\'left\', False:\'right\'}[left] } side "\n "for positions {pos} to {\', \'.join(map(str, next_poses))}"\n )\n\n x, y = self._to_xy(pos)\n dx, dy = side_diff\n\n return self._to_pos(x + dx, y + dy)\n') | |
__stickytape_write_module('halite_config.py', 'from geometry import Geometry\n\n\nSIZE_X: int = 21\nSIZE_Y: int = 21\n\nSHIP_COST = 500\nSHIPYARD_COST = 500\n\n\ngeometry = Geometry(SIZE_X, SIZE_Y)\ndist = geometry.dist\nget_prox = geometry.get_prox\npos_towards = geometry.pos_towards\nint_to_pos = geometry.int_to_pos\n_diff_to = geometry._diff_to\n') | |
__stickytape_write_module('config.py', 'from dataclasses import dataclass, field\nfrom observation import Observation\n\n\n@dataclass\nclass Config:\n obs: Observation = field(default=None, repr=False)\n\n def set_obs(self, obs):\n self.obs = obs\n') | |
__stickytape_write_module('observation.py', 'from dataclasses import dataclass, field\nfrom typing import Dict, List, NamedTuple, Set, Tuple\n\nfrom geometry import Pos\nfrom halite_config import int_to_pos\n\n\nclass ObservationShip(NamedTuple):\n pos: Pos\n halite: float\n\n\nclass ObservationShipYard(NamedTuple):\n pos: Pos\n\n\nId = str\nEnemyId = Tuple[int, Id]\n\n\n@dataclass # pylint: disable=too-many-instance-attributes\nclass Observation:\n """\n Holds obs info with types Pos, ObservationShip, ObservationShipYard for clarity\n Precalculates some frequently needed information\n """\n\n step: int\n player_idx: int\n map_halite: Dict[Pos, float]\n\n player_halite: List[float]\n ships: List[Dict[Id, ObservationShip]]\n shipyards: List[Dict[Id, ObservationShipYard]]\n\n my_halite: float = field(init=False)\n my_ships: Dict[Id, ObservationShip] = field(init=False)\n my_shipyards: Dict[Id, ObservationShipYard] = field(init=False)\n ship_poses: Set[Pos] = field(init=False)\n shipyard_poses: Set[Pos] = field(init=False)\n\n enemy_ships: Dict[EnemyId, ObservationShip] = field(init=False)\n enemy_shipyards: Dict[EnemyId, ObservationShipYard] = field(init=False)\n enemy_ship_poses: Set[Pos] = field(init=False)\n enemy_shipyard_poses: Set[Pos] = field(init=False)\n\n num_ships: int = field(init=False)\n\n def __repr__(self):\n return (\n f"Observation(step={self.step}, {self.num_ships} ships, "\n f"{len(self.enemy_ships)} enemy ships)"\n )\n\n def __post_init__(self):\n self.my_halite = self.player_halite[self.player_idx]\n\n self.my_ships = self.ships[self.player_idx]\n\n self.enemy_ships = {\n (idx, id_): ship\n for idx, cur_ships in enumerate(self.ships)\n if idx != self.player_idx\n for id_, ship in cur_ships.items()\n }\n\n self.my_shipyards = self.shipyards[self.player_idx]\n\n self.enemy_shipyards = {\n (idx, id_): ship\n for idx, cur_shipyards in enumerate(self.shipyards)\n if idx != self.player_idx\n for id_, ship in cur_shipyards.items()\n }\n\n self.ship_poses = set(ship.pos for ship in self.my_ships.values())\n\n self.shipyard_poses = set(\n shipyard.pos for shipyard in self.my_shipyards.values()\n )\n\n self.enemy_ship_poses = set(ship.pos for ship in self.enemy_ships.values())\n\n self.enemy_shipyard_poses = set(\n shipyard.pos for shipyard in self.enemy_shipyards.values()\n )\n\n self.num_ships = len(self.my_ships)\n\n @classmethod\n def from_obs(cls, obs):\n player_halite = []\n shipyards = []\n ships = []\n\n for (cur_player_halite, cur_shipyards, cur_ships) in obs["players"]:\n player_halite.append(cur_player_halite)\n\n ships.append(\n {\n id_: ObservationShip(pos=int_to_pos(int_pos), halite=halite)\n for id_, (int_pos, halite) in cur_ships.items()\n }\n )\n\n shipyards.append(\n {\n id_: ObservationShipYard(pos=int_to_pos(int_pos))\n for id_, int_pos in cur_shipyards.items()\n }\n )\n\n return cls(\n step=obs["step"],\n player_idx=obs["player"],\n map_halite={int_to_pos(pos): val for pos, val in enumerate(obs["halite"])},\n player_halite=player_halite,\n ships=ships,\n shipyards=shipyards,\n )\n') | |
__stickytape_write_module('halite_amount_solver.py', 'def solve_halite_amount(objs):\n return objs\n') | |
__stickytape_write_module('plans.py', 'from dataclasses import dataclass, field\nfrom typing import Set\n\nfrom actions import ConvertAction, MoveAction, SpawnAction, StayAction, NoShipYardAction\nfrom assign_solver import AssignSolverMixin\nfrom geometry import Pos\nfrom halite_config import get_prox, pos_towards\n\n\n@dataclass\nclass Plan(AssignSolverMixin):\n """\n Bot plan with all information for resolution\n\n Every plan probably should also have a StayAction to guarantee resolution\n """\n\n id: str\n score: float\n\n @property\n def actions(self):\n raise NotImplementedError()\n\n\n@dataclass\nclass MovePlan(Plan):\n start_pos: Pos\n end_pos: Pos\n forbidden_pos: Set[Pos] = field(default_factory=set, repr=False)\n\n @property\n def obj_goal_penalty(self):\n return (self.id, self.end_pos, -self.score)\n\n @property\n def actions(self):\n danger_adj = (\n lambda score, pos: score\n if pos not in self.forbidden_pos\n else (\n self.forbidden_pos[pos]\n if self.forbidden_pos[pos] <= -10\n else score + self.forbidden_pos[pos]\n )\n )\n\n reachable_poses = get_prox(self.start_pos, 0, 1)\n\n if self.start_pos == self.end_pos:\n return [\n MoveAction(\n id=self.id,\n from_pos=self.start_pos,\n pos=pos,\n score=danger_adj(5 if pos == self.end_pos else 0.5, pos),\n )\n for pos in reachable_poses\n ]\n\n next_poses = pos_towards(self.start_pos, self.end_pos)\n\n adj_dir = (\n lambda pos1, pos2: abs(pos1.x - pos2.x) <= 1 and abs(pos1.y - pos2.y) <= 1\n )\n\n if len(next_poses) == 1:\n return [\n MoveAction(\n id=self.id,\n from_pos=self.start_pos,\n pos=pos,\n score=danger_adj(\n 1\n if pos in next_poses\n else (0.75 if adj_dir(pos, self.start_pos) else 0.5),\n pos,\n ),\n )\n for pos in reachable_poses\n ]\n\n return [\n MoveAction(\n id=self.id,\n from_pos=self.start_pos,\n pos=pos,\n score=danger_adj(1 if pos in next_poses else 0.5, pos),\n )\n for pos in reachable_poses\n ]\n\n\n@dataclass\nclass ReturnHalitePlan(MovePlan):\n """\n This plan\'s goal is non-conflicting with same destination plans\n """\n\n @property\n def obj_goal_penalty(self):\n return (self.id, None, -self.score)\n\n\n@dataclass\nclass ConvertPlan(Plan):\n pos: Pos\n\n @property\n def obj_goal_penalty(self):\n return (self.id, None, -self.score)\n\n @property\n def actions(self):\n return [\n ConvertAction(id=self.id, pos=self.pos, score=self.score),\n StayAction(id=self.id, pos=self.pos, score=0),\n ]\n\n\n@dataclass\nclass SpawnPlan(Plan):\n pos: Pos\n\n @property\n def obj_goal_penalty(self):\n return (self.id, None, -self.score)\n\n @property\n def actions(self):\n return [\n SpawnAction(id=self.id, pos=self.pos, score=self.score),\n NoShipYardAction(id=self.id, pos=self.pos, score=0),\n ]\n\n\n@dataclass\nclass ScatterPlan(Plan):\n pos: Pos\n\n @property\n def obj_goal_penalty(self):\n return (self.id, None, -self.score)\n\n @property\n def actions(self):\n return [\n MoveAction(id=self.id, from_pos=self.pos, pos=next_pos, score=1)\n for next_pos in get_prox(self.pos, 1)\n ] + [StayAction(id=self.id, pos=self.pos, score=0)]\n') | |
__stickytape_write_module('actions.py', 'from dataclasses import dataclass\n\nfrom assign_solver import AssignSolverMixin\nfrom geometry import Pos\nfrom halite_config import _diff_to\nfrom observation import Id\n\n\n@dataclass\nclass Action(AssignSolverMixin):\n id: Id\n score: float\n pos: Pos\n\n @property\n def obj_goal_penalty(self):\n return (self.id, self.pos, -self.score)\n\n @property\n def halite_command(self):\n raise NotImplementedError()\n\n\n@dataclass\nclass ConvertAction(Action):\n @property\n def obj_goal_penalty(self):\n return (self.id, ("sy", self.pos), -self.score)\n\n @property\n def halite_command(self):\n return {self.id: "CONVERT"}\n\n\n@dataclass\nclass SpawnAction(Action):\n @property\n def halite_command(self):\n return {self.id: "SPAWN"}\n\n\n@dataclass\nclass StayAction(Action):\n @property\n def halite_command(self):\n return {}\n\n\n@dataclass\nclass NoShipYardAction(Action):\n @property\n def obj_goal_penalty(self):\n return (self.id, ("sy", self.pos), -self.score)\n\n @property\n def halite_command(self):\n return {}\n\n\n@dataclass\nclass MoveAction(Action):\n from_pos: Pos\n\n def __repr__(self):\n return f"MoveAction({self.id}: {self.from_pos}->{self.pos}; {self.score})"\n\n @property\n def halite_command(self):\n if self.from_pos == self.pos:\n return {}\n\n dx, dy = _diff_to(self.from_pos, self.pos)\n result = {\n (1, 0): "EAST",\n (-1, 0): "WEST",\n (0, 1): "SOUTH",\n (0, -1): "NORTH",\n }.get((dx, dy))\n\n if result is None:\n raise ValueError(\n f"Cannot move in one step from {self.from_pos} to {self.pos}"\n )\n\n return {self.id: result}\n') | |
__stickytape_write_module('strategy.py', 'from typing import List\n\nfrom config import Config\nfrom actions import Action\nfrom plans import Plan\n\n\nclass Strategy:\n def __init__(self, *, config, id):\n self.config: Config = config\n self.id: str = id\n\n def make_plans(self, num=None) -> List[Plan]:\n raise NotImplementedError()\n\n def notify_action(self, action: Action) -> None:\n pass\n\n\nclass Ship(Strategy): # pylint: disable=abstract-method\n @property\n def pos(self):\n return self.config.obs.my_ships[self.id].pos\n\n @property\n def halite(self):\n return self.config.obs.my_ships[self.id].halite\n\n def __repr__(self):\n return f"{self.__class__.__name__}({self.id} @ {self.pos})"\n\n\nclass ShipYard(Strategy): # pylint: disable=abstract-method\n @property\n def pos(self):\n return self.config.obs.my_shipyards[self.id].pos\n\n def __repr__(self):\n return f"{self.__class__.__name__}({self.id} @ {self.pos})"\n') | |
__stickytape_write_module('strategies/my_config.py', 'from collections import defaultdict\nfrom dataclasses import dataclass, field\nfrom typing import Dict, NamedTuple, Set\n\nfrom config import Config\nfrom geometry import Pos\nfrom halite_config import geometry, get_prox\nfrom helper import find_closest\nfrom observation import Observation\n\n\nclass MineScore(NamedTuple):\n """\n Score of a mining position independent of ship\n Used only for precalculations\n """\n\n score: float # final value\n halite: float\n steps: int\n\n\n@dataclass\nclass MyConfig(Config):\n NUM_SHIPS: int = 4\n NUM_HUNTER: int = 0\n NUM_SHIPYARDS: int = 3\n MAX_NUM_TURNS_BLOCKED: int = 2 # to avoid deadlocks ships may scatter\n # LAST_TURN_SPAWNING: int = 360\n\n HALITE_LOSS_FACTOR: float = 0.99\n HALITE_MINE_FACTOR: float = 0.3\n TIME_FACTOR: float = 0.99\n\n mine_scores: Dict[Pos, MineScore] = field(default_factory=dict, repr=False)\n\n danger_pos_score: Dict[Pos, float] = field(default_factory=dict, repr=False)\n\n def set_obs(self, obs: Observation):\n super().set_obs(obs)\n\n self.danger_pos_score = defaultdict(float)\n\n for enemy_pos in obs.enemy_ship_poses:\n for score, prox in [(-100, 0), (-50, 1), (-10, 2), (-1, 3)]:\n for cur_pos in get_prox(enemy_pos, prox):\n self.danger_pos_score[cur_pos] += score\n\n for enemy_pos in obs.enemy_shipyard_poses:\n for score, prox in [(-50, 0), (-10, 1), (-1, 2)]:\n for cur_pos in get_prox(enemy_pos, prox):\n self.danger_pos_score[cur_pos] += score\n\n if not obs.my_shipyards:\n self.mine_scores = {}\n else:\n mine_scores = {}\n\n for pos in geometry.poses:\n closest = find_closest(pos, obs.shipyard_poses)\n pos_dist = closest.dist\n\n discount = self.HALITE_LOSS_FACTOR ** pos_dist\n\n halite = obs.map_halite[pos]\n\n mine_scores[pos] = MineScore(\n score=halite * discount, halite=halite, steps=pos_dist,\n )\n\n self.mine_scores = mine_scores\n') | |
__stickytape_write_module('strategies/__init__.py', '') | |
__stickytape_write_module('strategies/my_strategy_assigner.py', 'from collections import Counter\nfrom functools import partial\nfrom typing import Dict\n\nfrom agent import StrategyAssigner\nfrom helper import find_closest, max_pos_val\nfrom observation import Id\nfrom strategy import Strategy\nimport halite_config\n\nfrom .first_ship import FirstShip\nfrom .gatherer import Gatherer\nfrom .hunter import Hunter\nfrom .my_config import MyConfig\nfrom .my_helper import obs_to_new_shipyard_score\nfrom .new_shipyard import NewShipYard\nfrom .plainshipyard import PlainShipYard\n\n\nclass MyStrategyAssigner(StrategyAssigner):\n def updated_strategies(\n self, strategies: Dict[Id, Strategy], config: MyConfig\n ) -> Dict[Id, Strategy]:\n result = {}\n\n for id_ in config.obs.my_ships.keys():\n if id_ in strategies:\n result[id_] = strategies[id_]\n continue\n\n strat_class = self._new_ship_class(strategies, config)\n\n result[id_] = strat_class(config=config, id=id_)\n\n # Shipyards\n for id_ in config.obs.my_shipyards.keys():\n if id_ in strategies:\n result[id_] = strategies[id_]\n continue\n\n result[id_] = PlainShipYard(config=config, id=id_)\n\n result = self._try_new_shipyard(result, config)\n\n return result\n\n def _new_ship_class(self, strategies, config):\n if not strategies:\n result = FirstShip\n else:\n ship_type_cnts = Counter(\n strat.__class__.__name__ for strat in strategies.values()\n )\n\n if ship_type_cnts["Hunter"] < config.NUM_HUNTER:\n result = Hunter\n else:\n result = Gatherer\n\n return result\n\n def _change_ship_strategy(self, strategies, id, new_strategy):\n result = {}\n for cur_id, strategy in strategies.items():\n if cur_id != id:\n result[cur_id] = strategy\n continue\n\n result[cur_id] = new_strategy(config=strategy.config, id=strategy.id)\n\n return result\n\n def _try_new_shipyard(self, strategies, config):\n for id, strategy in list(strategies.items()):\n if not isinstance(strategy, NewShipYard):\n continue\n if strategy.steps_to_gather <= 0:\n strategies = self._change_ship_strategy(strategies, id, Gatherer)\n\n virtual_shipyard_poses = {\n strategy.new_shipyard_pos\n for strategy in strategies.values()\n if isinstance(strategy, NewShipYard)\n }\n\n if (\n len(config.obs.shipyard_poses) + len(virtual_shipyard_poses)\n >= config.NUM_SHIPYARDS\n or config.obs.my_halite < halite_config.SHIPYARD_COST\n ):\n return strategies\n\n ships = [x for x in strategies.values() if isinstance(x, Gatherer)]\n\n if not ships:\n return strategies\n\n shipyard_scores = obs_to_new_shipyard_score(\n config.obs, virtual_shipyard_poses=virtual_shipyard_poses\n )\n\n new_shipyard_pos, _val = max_pos_val(shipyard_scores)\n\n ship_poses = [x.pos for x in ships]\n\n closest_ship_info = find_closest(new_shipyard_pos, ship_poses)\n closest_ship = ships[closest_ship_info.idx]\n\n result = self._change_ship_strategy(\n strategies,\n closest_ship.id,\n partial(NewShipYard, new_shipyard_pos=new_shipyard_pos),\n )\n\n return result\n') | |
__stickytape_write_module('strategies/first_ship.py', 'from operator import itemgetter\n\nfrom halite_config import dist\nfrom helper import array_to_pos_dist\nfrom plans import ConvertPlan, MovePlan\nfrom strategy import Ship\n\nfrom .my_helper import obs_to_new_shipyard_score\n\n\nclass FirstShip(Ship):\n def __init__(self, *, config, id):\n super().__init__(config=config, id=id)\n self.first_shipyard_pos = None\n\n def make_plans(self, num=None):\n if self.first_shipyard_pos is None:\n self.first_shipyard_pos = self._find_first_shipyard_pos()\n\n if self.pos == self.first_shipyard_pos:\n return [ConvertPlan(id=self.id, pos=self.pos, score=1)]\n\n return [\n MovePlan(\n id=self.id, start_pos=self.pos, end_pos=self.first_shipyard_pos, score=1\n )\n ]\n\n def _find_first_shipyard_pos(self):\n new_shipyard_scores = obs_to_new_shipyard_score(self.config.obs)\n\n pos_new_shipyard_scores = array_to_pos_dist(new_shipyard_scores)\n\n close_new_shipyard_scores = {\n pos: score\n for pos, score in pos_new_shipyard_scores.items()\n if dist(pos, self.pos) < 5\n }\n\n chosen_new_shipyard_pos, _ = max(\n close_new_shipyard_scores.items(), key=itemgetter(1)\n )\n\n return chosen_new_shipyard_pos\n') | |
__stickytape_write_module('strategies/my_helper.py', 'from typing import Set\n\nfrom geometry import Pos\nfrom helper import (\n pos_dict_to_array,\n poses_to_dist_array,\n score_new_shipyard,\n shape_from_poses,\n)\nfrom observation import Observation\nfrom strategies.my_config import MyConfig\n\n\ndef obs_to_new_shipyard_score(\n obs: Observation, virtual_shipyard_poses: Set[Pos] = None\n):\n if virtual_shipyard_poses is None:\n virtual_shipyard_poses = set()\n\n map_halite = pos_dict_to_array(obs.map_halite)\n\n shape = shape_from_poses(obs.map_halite.keys())\n\n shipyard_poses = obs.shipyard_poses | virtual_shipyard_poses\n\n if shipyard_poses:\n shipyard_dist = poses_to_dist_array(shipyard_poses, shape)\n else:\n shipyard_dist = None\n\n enemy_poses = obs.enemy_shipyard_poses | obs.enemy_ship_poses\n if enemy_poses:\n enemy_shipyard_dist = poses_to_dist_array(enemy_poses, shape)\n else:\n enemy_shipyard_dist = None\n\n result = score_new_shipyard(map_halite, shipyard_dist, enemy_shipyard_dist)\n\n return result\n') | |
__stickytape_write_module('strategies/gatherer.py', 'from operator import attrgetter\nfrom typing import List, cast\n\nfrom halite_config import dist, geometry\nfrom plans import MovePlan, Plan, ReturnHalitePlan\nfrom strategy import Ship\n\nfrom .my_config import MyConfig\n\n\nclass Gatherer(Ship):\n def make_plans(self, num=None) -> List[Plan]:\n plans = []\n\n config = cast(MyConfig, self.config)\n\n if not config.mine_scores:\n return [\n MovePlan(\n id=self.id,\n start_pos=self.pos,\n end_pos=self.pos,\n score=1,\n forbidden_pos=config.danger_pos_score,\n )\n ]\n\n for end_pos in geometry.poses:\n ship_pos_dist = dist(self.pos, end_pos)\n\n mine_score = config.mine_scores[end_pos]\n\n total_steps = ship_pos_dist + mine_score.steps\n\n extra_halite = (\n config.HALITE_MINE_FACTOR * mine_score.halite\n if self.pos == end_pos\n else 0\n )\n\n ##### MAGIC EQUATION #####\n trip_score = (\n (self.halite + extra_halite) * config.HALITE_LOSS_FACTOR ** total_steps\n + config.HALITE_MINE_FACTOR * mine_score.score\n ) * (config.TIME_FACTOR if self.pos == end_pos else 1)\n\n if end_pos in config.obs.shipyard_poses:\n plan = ReturnHalitePlan(\n id=self.id,\n start_pos=self.pos,\n end_pos=end_pos,\n score=trip_score,\n forbidden_pos=config.danger_pos_score,\n )\n else:\n plan = MovePlan(\n id=self.id,\n start_pos=self.pos,\n end_pos=end_pos,\n score=trip_score,\n forbidden_pos=config.danger_pos_score,\n )\n\n plans.append(plan)\n\n plans.sort(key=attrgetter("score"), reverse=True)\n\n return plans[:num]\n') | |
__stickytape_write_module('strategies/hunter.py', 'from operator import itemgetter\n\nfrom halite_config import dist\nfrom plans import MovePlan\nfrom strategy import Ship\n\n\nclass Hunter(Ship):\n def make_plans(self, num=None):\n enemy_ship_poses = self.config.obs.enemy_ship_poses\n if not enemy_ship_poses:\n return []\n\n enemy_dists = [(pos, dist(pos, self.pos)) for pos in enemy_ship_poses]\n\n enemy_dists.sort(key=itemgetter(1))\n\n plans = [\n MovePlan(\n id=self.id,\n start_pos=self.pos,\n end_pos=enemy_pos,\n score=1 / (1 + enemy_dist),\n )\n for enemy_pos, enemy_dist in enemy_dists\n ][:num]\n\n if len(plans) < num:\n plans.append(\n MovePlan(id=self.id, start_pos=self.pos, end_pos=self.pos, score=1)\n )\n\n return plans\n') | |
__stickytape_write_module('strategies/new_shipyard.py', 'from typing import cast\n\nimport halite_config\nfrom geometry import Pos\nfrom plans import ConvertPlan, MovePlan\nfrom strategy import Ship\nfrom halite_config import dist\n\nfrom .my_config import MyConfig\n\n\nclass NewShipYard(Ship):\n def __init__(self, *, config, id, new_shipyard_pos: Pos):\n super().__init__(config=config, id=id)\n self.new_shipyard_pos = new_shipyard_pos\n self.steps_to_gather = int(\n 1.2 * dist(self.pos, self.new_shipyard_pos)\n ) # hard-coded TODO\n\n def __repr__(self):\n return f"NewShipYard({self.id} @ {self.pos}; shipyard: {self.new_shipyard_pos})"\n\n def make_plans(self, num=None):\n self.steps_to_gather -= 1\n\n config = cast(MyConfig, self.config)\n\n if self.pos == self.new_shipyard_pos:\n if self.config.obs.my_halite >= halite_config.SHIPYARD_COST:\n return [\n ConvertPlan(id=self.id, pos=self.pos, score=1),\n ]\n else:\n return [\n MovePlan(\n id=self.id,\n start_pos=self.pos,\n end_pos=self.pos,\n score=0,\n forbidden_pos=config.danger_pos_score,\n ),\n ]\n\n return [\n MovePlan(\n id=self.id,\n start_pos=self.pos,\n end_pos=self.new_shipyard_pos,\n score=1,\n forbidden_pos=config.danger_pos_score,\n )\n ]\n') | |
__stickytape_write_module('strategies/plainshipyard.py', 'from strategy import ShipYard\nfrom plans import SpawnPlan\nimport halite_config\n\n\nclass PlainShipYard(ShipYard):\n def make_plans(self, num=None):\n if (\n self.config.obs.num_ships < self.config.NUM_SHIPS\n and self.config.obs.my_halite > halite_config.SHIP_COST\n and self.pos not in self.config.obs.ship_poses\n ):\n return [SpawnPlan(id=self.id, pos=self.pos, score=1)]\n\n return []\n') | |
from agent import Agent | |
from strategies.my_config import MyConfig | |
from strategies.my_strategy_assigner import MyStrategyAssigner | |
agent = Agent(MyConfig(), MyStrategyAssigner()) | |
def call_agent(obs): | |
return agent(obs) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment