-
-
Save ryancollingwood/32446307e976a11a1185a5394d6657bc to your computer and use it in GitHub Desktop.
| # Credit for this: Nicholas Swift | |
| # as found at https://medium.com/@nicholas.w.swift/easy-a-star-pathfinding-7e6689c7f7b2 | |
| from warnings import warn | |
| import heapq | |
| class Node: | |
| """ | |
| A node class for A* Pathfinding | |
| """ | |
| def __init__(self, parent=None, position=None): | |
| self.parent = parent | |
| self.position = position | |
| self.g = 0 | |
| self.h = 0 | |
| self.f = 0 | |
| def __eq__(self, other): | |
| return self.position == other.position | |
| def __repr__(self): | |
| return f"{self.position} - g: {self.g} h: {self.h} f: {self.f}" | |
| # defining less than for purposes of heap queue | |
| def __lt__(self, other): | |
| return self.f < other.f | |
| # defining greater than for purposes of heap queue | |
| def __gt__(self, other): | |
| return self.f > other.f | |
| def return_path(current_node): | |
| path = [] | |
| current = current_node | |
| while current is not None: | |
| path.append(current.position) | |
| current = current.parent | |
| return path[::-1] # Return reversed path | |
| def astar(maze, start, end, allow_diagonal_movement = False): | |
| """ | |
| Returns a list of tuples as a path from the given start to the given end in the given maze | |
| :param maze: | |
| :param start: | |
| :param end: | |
| :return: | |
| """ | |
| # Create start and end node | |
| start_node = Node(None, start) | |
| start_node.g = start_node.h = start_node.f = 0 | |
| end_node = Node(None, end) | |
| end_node.g = end_node.h = end_node.f = 0 | |
| # Initialize both open and closed list | |
| open_list = [] | |
| closed_list = [] | |
| # Heapify the open_list and Add the start node | |
| heapq.heapify(open_list) | |
| heapq.heappush(open_list, start_node) | |
| # Adding a stop condition | |
| outer_iterations = 0 | |
| max_iterations = (len(maze[0]) * len(maze) // 2) | |
| # what squares do we search | |
| adjacent_squares = ((0, -1), (0, 1), (-1, 0), (1, 0),) | |
| if allow_diagonal_movement: | |
| adjacent_squares = ((0, -1), (0, 1), (-1, 0), (1, 0), (-1, -1), (-1, 1), (1, -1), (1, 1),) | |
| # Loop until you find the end | |
| while len(open_list) > 0: | |
| outer_iterations += 1 | |
| if outer_iterations > max_iterations: | |
| # if we hit this point return the path such as it is | |
| # it will not contain the destination | |
| warn("giving up on pathfinding too many iterations") | |
| return return_path(current_node) | |
| # Get the current node | |
| current_node = heapq.heappop(open_list) | |
| closed_list.append(current_node) | |
| # Found the goal | |
| if current_node == end_node: | |
| return return_path(current_node) | |
| # Generate children | |
| children = [] | |
| for new_position in adjacent_squares: # Adjacent squares | |
| # Get node position | |
| node_position = (current_node.position[0] + new_position[0], current_node.position[1] + new_position[1]) | |
| # Make sure within range | |
| if node_position[0] > (len(maze) - 1) or node_position[0] < 0 or node_position[1] > (len(maze[len(maze)-1]) -1) or node_position[1] < 0: | |
| continue | |
| # Make sure walkable terrain | |
| if maze[node_position[0]][node_position[1]] != 0: | |
| continue | |
| # Create new node | |
| new_node = Node(current_node, node_position) | |
| # Append | |
| children.append(new_node) | |
| # Loop through children | |
| for child in children: | |
| # Child is on the closed list | |
| if len([closed_child for closed_child in closed_list if closed_child == child]) > 0: | |
| continue | |
| # Create the f, g, and h values | |
| child.g = current_node.g + 1 | |
| child.h = ((child.position[0] - end_node.position[0]) ** 2) + ((child.position[1] - end_node.position[1]) ** 2) | |
| child.f = child.g + child.h | |
| # Child is already in the open list | |
| if len([open_node for open_node in open_list if child.position == open_node.position and child.g > open_node.g]) > 0: | |
| continue | |
| # Add the child to the open list | |
| heapq.heappush(open_list, child) | |
| warn("Couldn't get a path to destination") | |
| return None | |
| def example(print_maze = True): | |
| maze = [[0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,] * 2, | |
| [0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,] * 2, | |
| [0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,] * 2, | |
| [0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,] * 2, | |
| [0,0,0,1,1,0,0,1,1,1,1,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,] * 2, | |
| [0,0,0,1,0,0,0,1,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,1,0,1,0,] * 2, | |
| [0,0,0,1,0,1,1,1,1,0,1,1,0,0,1,1,1,0,0,0,1,1,1,1,1,1,1,0,0,0,] * 2, | |
| [0,0,0,1,0,1,0,0,0,0,0,1,0,0,0,0,1,1,0,1,0,0,0,0,0,0,1,1,1,0,] * 2, | |
| [0,0,0,1,0,1,1,0,1,1,0,1,1,1,0,0,0,0,0,1,0,0,1,1,1,1,1,0,0,0,] * 2, | |
| [0,0,0,1,0,1,0,0,0,0,0,0,0,1,1,1,1,1,1,1,0,0,0,0,1,0,1,0,1,1,] * 2, | |
| [0,0,0,1,0,1,0,1,1,0,1,1,1,1,0,0,1,1,1,1,1,1,1,0,1,0,1,0,0,0,] * 2, | |
| [0,0,0,1,0,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,1,0,0,0,1,0,1,1,1,0,] * 2, | |
| [0,0,0,1,0,1,1,1,1,0,1,0,0,1,1,1,0,1,1,1,1,0,1,1,1,0,1,0,0,0,] * 2, | |
| [0,0,0,1,0,0,0,0,1,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1,0,1,1,] * 2, | |
| [0,0,0,1,0,0,0,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,0,0,1,0,0,0,] * 2, | |
| [1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,0,] * 2,] | |
| start = (0, 0) | |
| end = (len(maze)-1, len(maze[0])-1) | |
| path = astar(maze, start, end) | |
| if print_maze: | |
| for step in path: | |
| maze[step[0]][step[1]] = 2 | |
| for row in maze: | |
| line = [] | |
| for col in row: | |
| if col == 1: | |
| line.append("\u2588") | |
| elif col == 0: | |
| line.append(" ") | |
| elif col == 2: | |
| line.append(".") | |
| print("".join(line)) | |
| print(path) |
After messing around with this code for a uni project, I perfected the code and created a version that works for both 2d and 3d
from warnings import warn
import heapq
from itertools import product
class Node:
def __init__(self, parent=None, position=None):
self.parent = parent
self.position = position
self.g = 0
self.h = 0
self.f = 0
def __eq__(self, other):
return self.position == other.position
def __repr__(self):
return f"{self.position} - g: {self.g} h: {self.h} f: {self.f}"
# defining less than for purposes of heap queue
def __lt__(self, other):
return self.f < other.f
# defining greater than for purposes of heap queue
def __gt__(self, other):
return self.f > other.f
def return_path(self):
path = []
current = self
while current is not None:
path.append(current.position)
current = current.parent
return path[::-1]
def euclidean_distance(self, other):
return round(sum([(self.position[i] - other.position[i])**2 for i in range(len(self.position))]) ** 0.5, 3)
def is_outside(self, maze):
def check_dimension(dim_maze, position, dim=0):
if dim >= len(position):
return False
if position[dim] < 0 or position[dim] >= len(dim_maze):
return True
return check_dimension(dim_maze[position[dim]], position, dim + 1)
return check_dimension(maze, self.position)
def is_wall(self, maze):
def navigate_dimension(dim_maze, position, dim=0):
if dim == len(position) - 1:
return dim_maze[position[dim]] == 0
return navigate_dimension(dim_maze[position[dim]], position, dim + 1)
return navigate_dimension(maze, self.position)
def astar(maze, start, end):
dimension = len(maze.shape)
if len(start) != dimension or len(end) != dimension:
raise ValueError("Start and end must have the same number of dimensions as the maze")
# Create start and end node
start_node = Node(None, start)
end_node = Node(None, end)
closed_list = []
open_list = []
heapq.heapify(open_list) #create priority queue for open list
heapq.heappush(open_list, start_node)
outer_iterations = 0
max_iterations = (maze.size // 2)
# Define the adjacent squares (including diagonals)
adjacent_squares = [c for c in product((-1, 0, 1), repeat=dimension) if any(c)]
# Loop until you find the end
while len(open_list) > 0:
outer_iterations += 1
if outer_iterations > max_iterations:# if we cannot find by searching half the maze, we give up
warn("giving up on pathfinding. too many iterations")
return current_node.return_path()
# Get the current node
current_node = heapq.heappop(open_list)
closed_list.append(current_node)
if current_node == end_node and open_list[0].f >= current_node.f:
#! we are done
return current_node.return_path()
for new_position in adjacent_squares:
# Get node position
node_position = tuple([current_node.position[i] + new_position[i] for i in range(dimension)])
new_node = Node(current_node, node_position)
if new_node.is_outside(maze):
continue
if new_node.is_wall(maze):
continue
if new_node in closed_list:
continue
child = new_node #the new node is a valid child of the current_node
#calculate the heuristic
cost = sum([(child.position[i] - current_node.position[i])**2 for i in range(dimension)]) ** 0.5
child.g = current_node.g + cost
child.h = child.euclidean_distance(end_node)
child.f = child.g + child.h
#add or update the child to the open list
if child in open_list:
i = open_list.index(child)
if child.g < open_list[i].g:
# update the node in the open list
open_list[i] = child
else:
heapq.heappush(open_list, child)
warn("Couldn't get a path to destination")
return None
Heapifying an empty list is a no-op; if len(X)>0 can be simplified to if X.
You definitely should replace the closed_list with a set. You'd need to add a __hash__ method to Node:
def ___hash__(self):
return hash(self.position)
The "add or update the child to the open list" part scans the open list twice. You might want to implement a NodeArray data structure that augments the array by remembering where its members are.
Finally, a major bug: replacing a heap member with a lower-weight one is likely to violate the heap invariants. After you replace the item at position n, you need to compare it with the one at (n-1)//2 and swap them if it's smaller (repeat until n is zero). heapq._siftdown does this for you. (In the generic case you'd also need to check that the items at 2n and 2n+1 are larger (repeat likewise), but that can't happen here.)
@Yichen-Lei that's a great solution, however you forgot to update the parent, this leads to suboptimal paths. You should just update the entire
open_list[idx]