Last active
September 27, 2016 10:59
-
-
Save Debilski/f66d10c5f16934d45c1ee37fe14371a0 to your computer and use it in GitHub Desktop.
AStarImplementations.ipynb
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"metadata": { | |
"trusted": true, | |
"collapsed": false | |
}, | |
"cell_type": "code", | |
"source": "import heapq\n\nimport pelita\nfrom pelita.datamodel import CTFUniverse\nfrom pelita.graph import AdjacencyList, manhattan_dist", | |
"execution_count": 1, | |
"outputs": [] | |
}, | |
{ | |
"metadata": { | |
"trusted": true, | |
"collapsed": false | |
}, | |
"cell_type": "code", | |
"source": "class Node:\n \"\"\" Representation of a position in a path. \n \n Holds the number of moves it took to get to the node (value), \n the coordinates and the parent Node.\n \n \"\"\"\n \n def __init__(self, value=None, coordinates=None, parent=None):\n self.value = value\n self.coordinates = coordinates\n self.parent = parent\n \n def __str__(self):\n return str(self.value)\n \n def __lt__(self, other):\n return self.value < other.value\n \n def backtrack(self):\n \"\"\" Backtracks through the parent Nodes to find the path that let to \n this node.\n \n Returns\n -------\n path : list of tuple of (int, int)\n the path from the first partent Node to the current Node.\n \n \"\"\"\n \n parent_list = []\n if self.parent:\n parent_list = self.parent.backtrack()\n return parent_list + [self.coordinates]\n \n def check_in_path(self, pos):\n \"\"\" Checks if a given position is in the path.\n \n Parameters\n ----------\n pos : tuple of (int, int)\n the position to be checked\n\n Returns\n -------\n in_path : boolean\n True if the pos is in the path from the first parent to the\n current Node\n \n \"\"\"\n in_parent = False\n if self.parent:\n in_parent = self.parent.check_in_path(pos)\n in_path = (pos == self.coordinates or in_parent)\n return in_path\n\nclass AStarNode(AdjacencyList):\n def a_star(self, initial, target):\n \"\"\" A* search.\n\n A* (A Star) [1] from one position to another. The search will return the\n shortest path from the `initial` position to the `target` using the\n Manhattan distance as a heuristic.\n\n Parameters\n ----------\n initial : tuple of (int, int)\n the first position\n target : tuple of (int, int)\n the target position\n\n Returns\n -------\n path : lits of tuple of (int, int)\n the path from `initial` to the closest `target`\n\n Raises\n ------\n NoPathException\n if no path from `initial` to one of `targets`\n NoPositionException\n if either `initial` or `targets` does not exist\n\n [1] http://en.wikipedia.org/wiki/A*_search_algorithm\n\n \"\"\"\n # First check that the arguments were valid.\n self._check_pos_exist([initial, target])\n to_visit = []\n seen = []\n dist_so_far = {}\n dist_so_far[initial] = 0\n # Since it's A* we use a heap queue to ensure that we always\n # get the next node with to lowest manhattan distance to the\n # current node.\n initial_node = Node(0, initial)\n heapq.heappush(to_visit, initial_node)\n found = False\n while to_visit:\n current_node = heapq.heappop(to_visit)\n current = current_node.coordinates\n value = current_node.value\n if current in seen:\n continue\n elif current == target:\n found = True\n break\n else:\n seen.append(current)\n for pos in self[current]:\n new_dist = dist_so_far[current] + 1\n if pos not in dist_so_far or new_dist < dist_so_far[pos]:\n new_estimate = new_dist + manhattan_dist(target, pos)\n dist_so_far[pos] = new_dist\n heapq.heappush(to_visit, Node(new_estimate, pos, current_node))\n if not found:\n raise NoPathException(\"BFS: No path from %r to %r.\"\n % (initial, target))\n\n # Now back-track using seen to determine how we got here.\n # Initialise the path with current node, i.e. position of food.\n path = current_node.backtrack()[1:][::-1]\n return path", | |
"execution_count": 2, | |
"outputs": [] | |
}, | |
{ | |
"metadata": { | |
"trusted": true, | |
"collapsed": true | |
}, | |
"cell_type": "code", | |
"source": "class AStarWikipedia(AdjacencyList):\n def a_star(self, initial, target):\n # The set of nodes already evaluated.\n closed_set = set()\n # the set of currently discovered nodes still to be evaluated.\n # initially, only the start node is known.\n open_set = {initial}\n # for each node, which node it can most efficiently be reached from.\n # if a node can be reached from many nodes, cameFrom will eventually contain the\n # most efficient previous step.\n came_from = dict()\n\n # for each node, the cost of getting from the start node to that node.\n import collections\n import math\n g_score = collections.defaultdict(default_factory=lambda k: math.inf)\n # the cost of going from start to start is zero.\n g_score[initial]= 0\n\n # for each node, the total cost of getting from the start node to the goal\n # by passing by that node. That value is partly known, partly heuristic.\n f_score = collections.defaultdict(default_factory=lambda k: math.inf)\n # for the first node, that value is completely heuristic.\n f_score[initial] = manhattan_dist(initial, target)\n\n while open_set:\n current = sorted((f_score[node], node) for node in open_set)[0][1]# the node in openSet having the lowest fScore[] value\n if current == target:\n return reconstruct_path_wiki(came_from, current)[:-1]\n\n open_set.remove(current)\n closed_set.add(current)\n try:\n neighbors = self[current]\n except KeyError:\n raise NoPathException(\"A*: No path from %r to %r.\" % (initial, target))\n\n for neighbor in neighbors:\n if neighbor in closed_set:\n continue # Ignore the neighbor which is already evaluated.\n # the distance from start to a neighbor\n tentative_g_score = g_score[current] + 1\n if neighbor not in open_set: # Discover a new node\n open_set.add(neighbor)\n elif tentative_g_score >= g_score[neighbor]:\n continue # This is not a better path.\n\n # this path is the best until now. Record it!\n came_from[neighbor] = current\n g_score[neighbor] = tentative_g_score\n f_score[neighbor] = g_score[neighbor] + manhattan_dist(neighbor, target)\n\n raise NoPathException(\"A*: No path from %r to %r.\" % (initial, target))\n\ndef reconstruct_path_wiki(came_from, current):\n total_path = [current]\n while current in came_from.keys():\n current = came_from[current]\n total_path.append(current)\n return total_path\n", | |
"execution_count": 3, | |
"outputs": [] | |
}, | |
{ | |
"metadata": { | |
"trusted": true, | |
"collapsed": true | |
}, | |
"cell_type": "code", | |
"source": "class AStarRedblob(AdjacencyList):\n def a_star_search(self, start, goal):\n frontier = PriorityQueue()\n frontier.put(start, 0)\n came_from = {}\n cost_so_far = {}\n came_from[start] = None\n cost_so_far[start] = 0\n\n while not frontier.empty():\n current = frontier.get()\n\n if current == goal:\n break\n\n for next in self[current]:\n new_cost = cost_so_far[current] + 1\n if next not in cost_so_far or new_cost < cost_so_far[next]:\n cost_so_far[next] = new_cost\n priority = new_cost + manhattan_dist(goal, next)\n frontier.put(next, priority)\n came_from[next] = current\n\n return came_from, cost_so_far\n\n def a_star(self, start, goal):\n try:\n came_from, cost = self.a_star_search(start, goal)\n return reconstruct_path_rb(came_from, start, goal)[:-1]\n except KeyError:\n raise NoPathException(\"\")\n\nclass PriorityQueue:\n def __init__(self):\n self.elements = []\n\n def empty(self):\n return len(self.elements) == 0\n\n def put(self, item, priority):\n heapq.heappush(self.elements, (priority, item))\n\n def get(self):\n return heapq.heappop(self.elements)[1]\n\ndef reconstruct_path_rb(came_from, start, goal):\n current = goal\n path = [current]\n while current != start:\n current = came_from[current]\n path.append(current)\n return path\n", | |
"execution_count": 4, | |
"outputs": [] | |
}, | |
{ | |
"metadata": { | |
"trusted": true, | |
"collapsed": true | |
}, | |
"cell_type": "code", | |
"source": "class AStarFixed(AdjacencyList):\n def a_star(self, initial, target):\n \"\"\" A* search.\n\n A* (A Star) [1] from one position to another. The search will return the\n shortest path from the `initial` position to the `target` using the\n Manhattan distance as a heuristic.\n\n Parameters\n ----------\n initial : tuple of (int, int)\n the first position\n target : tuple of (int, int)\n the target position\n\n Returns\n -------\n path : lits of tuple of (int, int)\n the path from `initial` to the closest `target`\n\n Raises\n ------\n NoPathException\n if no path from `initial` to one of `targets`\n\n [1] http://en.wikipedia.org/wiki/A*_search_algorithm\n\n \"\"\"\n # First check that the arguments were valid.\n self._check_pos_exist([initial, target])\n to_visit = []\n # Initialize the dicts that help us keep track\n # A set would make the lookup faster, but backtracking impossible.\n came_from = {}\n cost_so_far = {}\n came_from[initial] = None\n cost_so_far[initial] = 0\n # Since it's A* we use a heap queue to ensure that we always\n # get the next node with to lowest manhattan distance to the\n # current node.\n heapq.heappush(to_visit, (0, (initial)))\n found = False\n while to_visit:\n old_prio, current = heapq.heappop(to_visit)\n\n if current == target:\n found = True\n break\n\n for next in self[current]:\n new_cost = cost_so_far[current] + 1 # 1 is the cost to the neighbor\n if next not in cost_so_far or new_cost < cost_so_far[next]: # only choose unvisited and ’worthy’ nodes\n cost_so_far[next] = new_cost\n priority = new_cost + manhattan_dist(target, next)\n heapq.heappush(to_visit, (priority, next))\n came_from[next] = current\n\n if not found:\n raise NoPathException(\"BFS: No path from %r to %r.\" % (initial, target))\n\n # Now back-track using seen to determine how we got here.\n # Initialise the path with current node, i.e. position of food.\n current = target\n path = [current]\n while current != initial:\n current = came_from[current]\n path.append(current)\n # The last element is the current position, we don't need that in our\n # path, so don't include it.\n return path[:-1]\n", | |
"execution_count": 5, | |
"outputs": [] | |
}, | |
{ | |
"metadata": { | |
"trusted": true, | |
"collapsed": false | |
}, | |
"cell_type": "code", | |
"source": "astar_implementations = [\n AStarNode,\n AStarWikipedia,\n AStarRedblob,\n AStarFixed\n]", | |
"execution_count": 6, | |
"outputs": [] | |
}, | |
{ | |
"metadata": {}, | |
"cell_type": "markdown", | |
"source": "basic test of the implementations" | |
}, | |
{ | |
"metadata": { | |
"trusted": true, | |
"collapsed": false | |
}, | |
"cell_type": "code", | |
"source": "layout = \"\"\"\n##########\n#0 1#\n##########\n\"\"\"\nuniverse = CTFUniverse.create(layout, 2)\nfor astar in astar_implementations:\n print(\"testing\", astar)\n al = astar(universe.free_positions())\n assert 7 == len(al.a_star(universe.bots[0].current_pos, universe.bots[1].current_pos))", | |
"execution_count": 7, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": "testing <class '__main__.AStarNode'>\ntesting <class '__main__.AStarWikipedia'>\ntesting <class '__main__.AStarRedblob'>\ntesting <class '__main__.AStarFixed'>\n" | |
} | |
] | |
}, | |
{ | |
"metadata": { | |
"trusted": true, | |
"collapsed": true | |
}, | |
"cell_type": "code", | |
"source": "def check_all_paths(al, start):\n visited = set()\n to_visit = []\n heapq.heappush(to_visit, (0, start))\n while to_visit:\n dist, current_node = heapq.heappop(to_visit)\n if current_node in visited:\n continue\n\n path = al.a_star(start, current_node)\n path_rev = al.a_star(current_node, start)\n# uni.bots[0].current_pos = start\n# uni.bots[1].current_pos = current_node\n# uni.food = path\n# print(uni.compact_str)\n#i print(start, current_node)\n assert dist == len(path)\n assert dist == len(path_rev)\n\n visited.add(current_node)\n neighbors = al[current_node]\n for n in neighbors:\n if n not in visited:\n heapq.heappush(to_visit, (dist + 1, n))", | |
"execution_count": 8, | |
"outputs": [] | |
}, | |
{ | |
"metadata": { | |
"trusted": true, | |
"collapsed": false | |
}, | |
"cell_type": "code", | |
"source": "layout = pelita.layout.get_layout_by_name('layout_normal_without_dead_ends_001')\nuniverse = CTFUniverse.create(layout, 4)\nal = AStarNode(universe.free_positions())\n%timeit check_all_paths(al, (1, 1))", | |
"execution_count": 22, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": "10 loops, best of 3: 189 ms per loop\n" | |
} | |
] | |
}, | |
{ | |
"metadata": { | |
"trusted": true, | |
"collapsed": false | |
}, | |
"cell_type": "code", | |
"source": "layout = pelita.layout.get_layout_by_name('layout_normal_without_dead_ends_001')\nuniverse = CTFUniverse.create(layout, 4)\nal = AStarWikipedia(universe.free_positions())\n%timeit check_all_paths(al, (1, 1))", | |
"execution_count": 23, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": "1 loop, best of 3: 211 ms per loop\n" | |
} | |
] | |
}, | |
{ | |
"metadata": { | |
"trusted": true, | |
"collapsed": false | |
}, | |
"cell_type": "code", | |
"source": "layout = pelita.layout.get_layout_by_name('layout_normal_without_dead_ends_001')\nuniverse = CTFUniverse.create(layout, 4)\nal = AStarRedblob(universe.free_positions())\n%timeit check_all_paths(al, (1, 1))", | |
"execution_count": 24, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": "10 loops, best of 3: 95.1 ms per loop\n" | |
} | |
] | |
}, | |
{ | |
"metadata": { | |
"trusted": true, | |
"collapsed": false | |
}, | |
"cell_type": "code", | |
"source": "layout = pelita.layout.get_layout_by_name('layout_normal_without_dead_ends_001')\nuniverse = CTFUniverse.create(layout, 4)\nal = AStarFixed(universe.free_positions())\n%timeit check_all_paths(al, (1, 1))", | |
"execution_count": 25, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": "10 loops, best of 3: 78.1 ms per loop\n" | |
} | |
] | |
}, | |
{ | |
"metadata": { | |
"trusted": true, | |
"collapsed": true | |
}, | |
"cell_type": "code", | |
"source": "", | |
"execution_count": null, | |
"outputs": [] | |
} | |
], | |
"metadata": { | |
"language_info": { | |
"mimetype": "text/x-python", | |
"pygments_lexer": "ipython3", | |
"file_extension": ".py", | |
"version": "3.5.2", | |
"nbconvert_exporter": "python", | |
"name": "python", | |
"codemirror_mode": { | |
"version": 3, | |
"name": "ipython" | |
} | |
}, | |
"kernelspec": { | |
"name": "python3-anaconda", | |
"display_name": "Python 3 (anaconda)", | |
"language": "python" | |
}, | |
"gist": { | |
"id": "", | |
"data": { | |
"description": "AStarImplementations.ipynb", | |
"public": true | |
} | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 0 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment