Skip to content

Instantly share code, notes, and snippets.

@Debilski
Last active September 27, 2016 10:59
Show Gist options
  • Save Debilski/f66d10c5f16934d45c1ee37fe14371a0 to your computer and use it in GitHub Desktop.
Save Debilski/f66d10c5f16934d45c1ee37fe14371a0 to your computer and use it in GitHub Desktop.
AStarImplementations.ipynb
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"metadata": {
"trusted": true,
"collapsed": false
},
"cell_type": "code",
"source": "import heapq\n\nimport pelita\nfrom pelita.datamodel import CTFUniverse\nfrom pelita.graph import AdjacencyList, manhattan_dist",
"execution_count": 1,
"outputs": []
},
{
"metadata": {
"trusted": true,
"collapsed": false
},
"cell_type": "code",
"source": "class Node:\n \"\"\" Representation of a position in a path. \n \n Holds the number of moves it took to get to the node (value), \n the coordinates and the parent Node.\n \n \"\"\"\n \n def __init__(self, value=None, coordinates=None, parent=None):\n self.value = value\n self.coordinates = coordinates\n self.parent = parent\n \n def __str__(self):\n return str(self.value)\n \n def __lt__(self, other):\n return self.value < other.value\n \n def backtrack(self):\n \"\"\" Backtracks through the parent Nodes to find the path that let to \n this node.\n \n Returns\n -------\n path : list of tuple of (int, int)\n the path from the first partent Node to the current Node.\n \n \"\"\"\n \n parent_list = []\n if self.parent:\n parent_list = self.parent.backtrack()\n return parent_list + [self.coordinates]\n \n def check_in_path(self, pos):\n \"\"\" Checks if a given position is in the path.\n \n Parameters\n ----------\n pos : tuple of (int, int)\n the position to be checked\n\n Returns\n -------\n in_path : boolean\n True if the pos is in the path from the first parent to the\n current Node\n \n \"\"\"\n in_parent = False\n if self.parent:\n in_parent = self.parent.check_in_path(pos)\n in_path = (pos == self.coordinates or in_parent)\n return in_path\n\nclass AStarNode(AdjacencyList):\n def a_star(self, initial, target):\n \"\"\" A* search.\n\n A* (A Star) [1] from one position to another. The search will return the\n shortest path from the `initial` position to the `target` using the\n Manhattan distance as a heuristic.\n\n Parameters\n ----------\n initial : tuple of (int, int)\n the first position\n target : tuple of (int, int)\n the target position\n\n Returns\n -------\n path : lits of tuple of (int, int)\n the path from `initial` to the closest `target`\n\n Raises\n ------\n NoPathException\n if no path from `initial` to one of `targets`\n NoPositionException\n if either `initial` or `targets` does not exist\n\n [1] http://en.wikipedia.org/wiki/A*_search_algorithm\n\n \"\"\"\n # First check that the arguments were valid.\n self._check_pos_exist([initial, target])\n to_visit = []\n seen = []\n dist_so_far = {}\n dist_so_far[initial] = 0\n # Since it's A* we use a heap queue to ensure that we always\n # get the next node with to lowest manhattan distance to the\n # current node.\n initial_node = Node(0, initial)\n heapq.heappush(to_visit, initial_node)\n found = False\n while to_visit:\n current_node = heapq.heappop(to_visit)\n current = current_node.coordinates\n value = current_node.value\n if current in seen:\n continue\n elif current == target:\n found = True\n break\n else:\n seen.append(current)\n for pos in self[current]:\n new_dist = dist_so_far[current] + 1\n if pos not in dist_so_far or new_dist < dist_so_far[pos]:\n new_estimate = new_dist + manhattan_dist(target, pos)\n dist_so_far[pos] = new_dist\n heapq.heappush(to_visit, Node(new_estimate, pos, current_node))\n if not found:\n raise NoPathException(\"BFS: No path from %r to %r.\"\n % (initial, target))\n\n # Now back-track using seen to determine how we got here.\n # Initialise the path with current node, i.e. position of food.\n path = current_node.backtrack()[1:][::-1]\n return path",
"execution_count": 2,
"outputs": []
},
{
"metadata": {
"trusted": true,
"collapsed": true
},
"cell_type": "code",
"source": "class AStarWikipedia(AdjacencyList):\n def a_star(self, initial, target):\n # The set of nodes already evaluated.\n closed_set = set()\n # the set of currently discovered nodes still to be evaluated.\n # initially, only the start node is known.\n open_set = {initial}\n # for each node, which node it can most efficiently be reached from.\n # if a node can be reached from many nodes, cameFrom will eventually contain the\n # most efficient previous step.\n came_from = dict()\n\n # for each node, the cost of getting from the start node to that node.\n import collections\n import math\n g_score = collections.defaultdict(default_factory=lambda k: math.inf)\n # the cost of going from start to start is zero.\n g_score[initial]= 0\n\n # for each node, the total cost of getting from the start node to the goal\n # by passing by that node. That value is partly known, partly heuristic.\n f_score = collections.defaultdict(default_factory=lambda k: math.inf)\n # for the first node, that value is completely heuristic.\n f_score[initial] = manhattan_dist(initial, target)\n\n while open_set:\n current = sorted((f_score[node], node) for node in open_set)[0][1]# the node in openSet having the lowest fScore[] value\n if current == target:\n return reconstruct_path_wiki(came_from, current)[:-1]\n\n open_set.remove(current)\n closed_set.add(current)\n try:\n neighbors = self[current]\n except KeyError:\n raise NoPathException(\"A*: No path from %r to %r.\" % (initial, target))\n\n for neighbor in neighbors:\n if neighbor in closed_set:\n continue # Ignore the neighbor which is already evaluated.\n # the distance from start to a neighbor\n tentative_g_score = g_score[current] + 1\n if neighbor not in open_set: # Discover a new node\n open_set.add(neighbor)\n elif tentative_g_score >= g_score[neighbor]:\n continue # This is not a better path.\n\n # this path is the best until now. Record it!\n came_from[neighbor] = current\n g_score[neighbor] = tentative_g_score\n f_score[neighbor] = g_score[neighbor] + manhattan_dist(neighbor, target)\n\n raise NoPathException(\"A*: No path from %r to %r.\" % (initial, target))\n\ndef reconstruct_path_wiki(came_from, current):\n total_path = [current]\n while current in came_from.keys():\n current = came_from[current]\n total_path.append(current)\n return total_path\n",
"execution_count": 3,
"outputs": []
},
{
"metadata": {
"trusted": true,
"collapsed": true
},
"cell_type": "code",
"source": "class AStarRedblob(AdjacencyList):\n def a_star_search(self, start, goal):\n frontier = PriorityQueue()\n frontier.put(start, 0)\n came_from = {}\n cost_so_far = {}\n came_from[start] = None\n cost_so_far[start] = 0\n\n while not frontier.empty():\n current = frontier.get()\n\n if current == goal:\n break\n\n for next in self[current]:\n new_cost = cost_so_far[current] + 1\n if next not in cost_so_far or new_cost < cost_so_far[next]:\n cost_so_far[next] = new_cost\n priority = new_cost + manhattan_dist(goal, next)\n frontier.put(next, priority)\n came_from[next] = current\n\n return came_from, cost_so_far\n\n def a_star(self, start, goal):\n try:\n came_from, cost = self.a_star_search(start, goal)\n return reconstruct_path_rb(came_from, start, goal)[:-1]\n except KeyError:\n raise NoPathException(\"\")\n\nclass PriorityQueue:\n def __init__(self):\n self.elements = []\n\n def empty(self):\n return len(self.elements) == 0\n\n def put(self, item, priority):\n heapq.heappush(self.elements, (priority, item))\n\n def get(self):\n return heapq.heappop(self.elements)[1]\n\ndef reconstruct_path_rb(came_from, start, goal):\n current = goal\n path = [current]\n while current != start:\n current = came_from[current]\n path.append(current)\n return path\n",
"execution_count": 4,
"outputs": []
},
{
"metadata": {
"trusted": true,
"collapsed": true
},
"cell_type": "code",
"source": "class AStarFixed(AdjacencyList):\n def a_star(self, initial, target):\n \"\"\" A* search.\n\n A* (A Star) [1] from one position to another. The search will return the\n shortest path from the `initial` position to the `target` using the\n Manhattan distance as a heuristic.\n\n Parameters\n ----------\n initial : tuple of (int, int)\n the first position\n target : tuple of (int, int)\n the target position\n\n Returns\n -------\n path : lits of tuple of (int, int)\n the path from `initial` to the closest `target`\n\n Raises\n ------\n NoPathException\n if no path from `initial` to one of `targets`\n\n [1] http://en.wikipedia.org/wiki/A*_search_algorithm\n\n \"\"\"\n # First check that the arguments were valid.\n self._check_pos_exist([initial, target])\n to_visit = []\n # Initialize the dicts that help us keep track\n # A set would make the lookup faster, but backtracking impossible.\n came_from = {}\n cost_so_far = {}\n came_from[initial] = None\n cost_so_far[initial] = 0\n # Since it's A* we use a heap queue to ensure that we always\n # get the next node with to lowest manhattan distance to the\n # current node.\n heapq.heappush(to_visit, (0, (initial)))\n found = False\n while to_visit:\n old_prio, current = heapq.heappop(to_visit)\n\n if current == target:\n found = True\n break\n\n for next in self[current]:\n new_cost = cost_so_far[current] + 1 # 1 is the cost to the neighbor\n if next not in cost_so_far or new_cost < cost_so_far[next]: # only choose unvisited and ’worthy’ nodes\n cost_so_far[next] = new_cost\n priority = new_cost + manhattan_dist(target, next)\n heapq.heappush(to_visit, (priority, next))\n came_from[next] = current\n\n if not found:\n raise NoPathException(\"BFS: No path from %r to %r.\" % (initial, target))\n\n # Now back-track using seen to determine how we got here.\n # Initialise the path with current node, i.e. position of food.\n current = target\n path = [current]\n while current != initial:\n current = came_from[current]\n path.append(current)\n # The last element is the current position, we don't need that in our\n # path, so don't include it.\n return path[:-1]\n",
"execution_count": 5,
"outputs": []
},
{
"metadata": {
"trusted": true,
"collapsed": false
},
"cell_type": "code",
"source": "astar_implementations = [\n AStarNode,\n AStarWikipedia,\n AStarRedblob,\n AStarFixed\n]",
"execution_count": 6,
"outputs": []
},
{
"metadata": {},
"cell_type": "markdown",
"source": "basic test of the implementations"
},
{
"metadata": {
"trusted": true,
"collapsed": false
},
"cell_type": "code",
"source": "layout = \"\"\"\n##########\n#0 1#\n##########\n\"\"\"\nuniverse = CTFUniverse.create(layout, 2)\nfor astar in astar_implementations:\n print(\"testing\", astar)\n al = astar(universe.free_positions())\n assert 7 == len(al.a_star(universe.bots[0].current_pos, universe.bots[1].current_pos))",
"execution_count": 7,
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "testing <class '__main__.AStarNode'>\ntesting <class '__main__.AStarWikipedia'>\ntesting <class '__main__.AStarRedblob'>\ntesting <class '__main__.AStarFixed'>\n"
}
]
},
{
"metadata": {
"trusted": true,
"collapsed": true
},
"cell_type": "code",
"source": "def check_all_paths(al, start):\n visited = set()\n to_visit = []\n heapq.heappush(to_visit, (0, start))\n while to_visit:\n dist, current_node = heapq.heappop(to_visit)\n if current_node in visited:\n continue\n\n path = al.a_star(start, current_node)\n path_rev = al.a_star(current_node, start)\n# uni.bots[0].current_pos = start\n# uni.bots[1].current_pos = current_node\n# uni.food = path\n# print(uni.compact_str)\n#i print(start, current_node)\n assert dist == len(path)\n assert dist == len(path_rev)\n\n visited.add(current_node)\n neighbors = al[current_node]\n for n in neighbors:\n if n not in visited:\n heapq.heappush(to_visit, (dist + 1, n))",
"execution_count": 8,
"outputs": []
},
{
"metadata": {
"trusted": true,
"collapsed": false
},
"cell_type": "code",
"source": "layout = pelita.layout.get_layout_by_name('layout_normal_without_dead_ends_001')\nuniverse = CTFUniverse.create(layout, 4)\nal = AStarNode(universe.free_positions())\n%timeit check_all_paths(al, (1, 1))",
"execution_count": 22,
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "10 loops, best of 3: 189 ms per loop\n"
}
]
},
{
"metadata": {
"trusted": true,
"collapsed": false
},
"cell_type": "code",
"source": "layout = pelita.layout.get_layout_by_name('layout_normal_without_dead_ends_001')\nuniverse = CTFUniverse.create(layout, 4)\nal = AStarWikipedia(universe.free_positions())\n%timeit check_all_paths(al, (1, 1))",
"execution_count": 23,
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "1 loop, best of 3: 211 ms per loop\n"
}
]
},
{
"metadata": {
"trusted": true,
"collapsed": false
},
"cell_type": "code",
"source": "layout = pelita.layout.get_layout_by_name('layout_normal_without_dead_ends_001')\nuniverse = CTFUniverse.create(layout, 4)\nal = AStarRedblob(universe.free_positions())\n%timeit check_all_paths(al, (1, 1))",
"execution_count": 24,
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "10 loops, best of 3: 95.1 ms per loop\n"
}
]
},
{
"metadata": {
"trusted": true,
"collapsed": false
},
"cell_type": "code",
"source": "layout = pelita.layout.get_layout_by_name('layout_normal_without_dead_ends_001')\nuniverse = CTFUniverse.create(layout, 4)\nal = AStarFixed(universe.free_positions())\n%timeit check_all_paths(al, (1, 1))",
"execution_count": 25,
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "10 loops, best of 3: 78.1 ms per loop\n"
}
]
},
{
"metadata": {
"trusted": true,
"collapsed": true
},
"cell_type": "code",
"source": "",
"execution_count": null,
"outputs": []
}
],
"metadata": {
"language_info": {
"mimetype": "text/x-python",
"pygments_lexer": "ipython3",
"file_extension": ".py",
"version": "3.5.2",
"nbconvert_exporter": "python",
"name": "python",
"codemirror_mode": {
"version": 3,
"name": "ipython"
}
},
"kernelspec": {
"name": "python3-anaconda",
"display_name": "Python 3 (anaconda)",
"language": "python"
},
"gist": {
"id": "",
"data": {
"description": "AStarImplementations.ipynb",
"public": true
}
}
},
"nbformat": 4,
"nbformat_minor": 0
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment