Skip to content

Instantly share code, notes, and snippets.

@ryancollingwood
Forked from Nicholas-Swift/astar.py
Last active November 7, 2024 15:24
Show Gist options
  • Save ryancollingwood/32446307e976a11a1185a5394d6657bc to your computer and use it in GitHub Desktop.
Save ryancollingwood/32446307e976a11a1185a5394d6657bc to your computer and use it in GitHub Desktop.
# Credit for this: Nicholas Swift
# as found at https://medium.com/@nicholas.w.swift/easy-a-star-pathfinding-7e6689c7f7b2
from warnings import warn
import heapq
class Node:
"""
A node class for A* Pathfinding
"""
def __init__(self, parent=None, position=None):
self.parent = parent
self.position = position
self.g = 0
self.h = 0
self.f = 0
def __eq__(self, other):
return self.position == other.position
def __repr__(self):
return f"{self.position} - g: {self.g} h: {self.h} f: {self.f}"
# defining less than for purposes of heap queue
def __lt__(self, other):
return self.f < other.f
# defining greater than for purposes of heap queue
def __gt__(self, other):
return self.f > other.f
def return_path(current_node):
path = []
current = current_node
while current is not None:
path.append(current.position)
current = current.parent
return path[::-1] # Return reversed path
def astar(maze, start, end, allow_diagonal_movement = False):
"""
Returns a list of tuples as a path from the given start to the given end in the given maze
:param maze:
:param start:
:param end:
:return:
"""
# Create start and end node
start_node = Node(None, start)
start_node.g = start_node.h = start_node.f = 0
end_node = Node(None, end)
end_node.g = end_node.h = end_node.f = 0
# Initialize both open and closed list
open_list = []
closed_list = []
# Heapify the open_list and Add the start node
heapq.heapify(open_list)
heapq.heappush(open_list, start_node)
# Adding a stop condition
outer_iterations = 0
max_iterations = (len(maze[0]) * len(maze) // 2)
# what squares do we search
adjacent_squares = ((0, -1), (0, 1), (-1, 0), (1, 0),)
if allow_diagonal_movement:
adjacent_squares = ((0, -1), (0, 1), (-1, 0), (1, 0), (-1, -1), (-1, 1), (1, -1), (1, 1),)
# Loop until you find the end
while len(open_list) > 0:
outer_iterations += 1
if outer_iterations > max_iterations:
# if we hit this point return the path such as it is
# it will not contain the destination
warn("giving up on pathfinding too many iterations")
return return_path(current_node)
# Get the current node
current_node = heapq.heappop(open_list)
closed_list.append(current_node)
# Found the goal
if current_node == end_node:
return return_path(current_node)
# Generate children
children = []
for new_position in adjacent_squares: # Adjacent squares
# Get node position
node_position = (current_node.position[0] + new_position[0], current_node.position[1] + new_position[1])
# Make sure within range
if node_position[0] > (len(maze) - 1) or node_position[0] < 0 or node_position[1] > (len(maze[len(maze)-1]) -1) or node_position[1] < 0:
continue
# Make sure walkable terrain
if maze[node_position[0]][node_position[1]] != 0:
continue
# Create new node
new_node = Node(current_node, node_position)
# Append
children.append(new_node)
# Loop through children
for child in children:
# Child is on the closed list
if len([closed_child for closed_child in closed_list if closed_child == child]) > 0:
continue
# Create the f, g, and h values
child.g = current_node.g + 1
child.h = ((child.position[0] - end_node.position[0]) ** 2) + ((child.position[1] - end_node.position[1]) ** 2)
child.f = child.g + child.h
# Child is already in the open list
if len([open_node for open_node in open_list if child.position == open_node.position and child.g > open_node.g]) > 0:
continue
# Add the child to the open list
heapq.heappush(open_list, child)
warn("Couldn't get a path to destination")
return None
def example(print_maze = True):
maze = [[0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,] * 2,
[0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,] * 2,
[0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,] * 2,
[0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,] * 2,
[0,0,0,1,1,0,0,1,1,1,1,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,] * 2,
[0,0,0,1,0,0,0,1,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,1,0,1,0,] * 2,
[0,0,0,1,0,1,1,1,1,0,1,1,0,0,1,1,1,0,0,0,1,1,1,1,1,1,1,0,0,0,] * 2,
[0,0,0,1,0,1,0,0,0,0,0,1,0,0,0,0,1,1,0,1,0,0,0,0,0,0,1,1,1,0,] * 2,
[0,0,0,1,0,1,1,0,1,1,0,1,1,1,0,0,0,0,0,1,0,0,1,1,1,1,1,0,0,0,] * 2,
[0,0,0,1,0,1,0,0,0,0,0,0,0,1,1,1,1,1,1,1,0,0,0,0,1,0,1,0,1,1,] * 2,
[0,0,0,1,0,1,0,1,1,0,1,1,1,1,0,0,1,1,1,1,1,1,1,0,1,0,1,0,0,0,] * 2,
[0,0,0,1,0,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,1,0,0,0,1,0,1,1,1,0,] * 2,
[0,0,0,1,0,1,1,1,1,0,1,0,0,1,1,1,0,1,1,1,1,0,1,1,1,0,1,0,0,0,] * 2,
[0,0,0,1,0,0,0,0,1,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1,0,1,1,] * 2,
[0,0,0,1,0,0,0,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,0,0,1,0,0,0,] * 2,
[1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,0,] * 2,]
start = (0, 0)
end = (len(maze)-1, len(maze[0])-1)
path = astar(maze, start, end)
if print_maze:
for step in path:
maze[step[0]][step[1]] = 2
for row in maze:
line = []
for col in row:
if col == 1:
line.append("\u2588")
elif col == 0:
line.append(" ")
elif col == 2:
line.append(".")
print("".join(line))
print(path)
@ryancollingwood
Copy link
Author

Howdy, I haven't updated the implementation from when I used it in a simple 2d game, https://github.com/ryancollingwood/arcade-rabbit-herder/blob/master/pathfinding/astar.py

It works well enough for my simple purposes 🐇
I have a look again this weekend

@ryancollingwood
Copy link
Author

I can't duplicate the issue @liamhan0905 raised, got anymore details?

@ryancollingwood
Copy link
Author

ryancollingwood commented Oct 23, 2019

@bearcub this is my implementation of the gist originally shared by @Nicholas-Swift

See it in use at in my silly game: https://github.com/ryancollingwood/arcade-rabbit-herder/blob/master/pathfinding/astar.py

@alarictabaries
Copy link

alarictabaries commented Nov 4, 2019

Hello,

Thank you for the gist. I think there is an error in your code.

With the following maze :

maze = [[0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
            [0, 1, 1, 0, 1, 1, 0, 1, 1, 0],
            [0, 1, 1, 0, 1, 1, 0, 1, 1, 0],
            [0, 1, 1, 0, 1, 1, 0, 1, 1, 0],
            [0, 1, 1, 0, 1, 1, 0, 1, 1, 0],
            [0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
            [0, 1, 1, 0, 1, 1, 0, 1, 1, 0],
            [0, 1, 1, 0, 1, 1, 0, 1, 1, 0],
            [0, 1, 1, 0, 1, 1, 0, 1, 1, 0],
            [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]]
start = (0, 9)
end = (9, 0)

I have this result : [(0, 9), (0, 8), (0, 7), (0, 6), (1, 6), (2, 6), (3, 6), (4, 6), (5, 6), (5, 5), (5, 4), (5, 3), (6, 3), (7, 3), (8, 3), (9, 3), (9, 2), (9, 1), (9, 0)]. It walks on [1] boxes.

I think it's a weird behavior happening when the path takes upper (and/or) right directions.

Cf. https://gist.github.com/Nicholas-Swift/003e1932ef2804bebef2710527008f44#gistcomment-3069887

@mm-airmap
Copy link

You need to use a priority queue for the open_list.

@ryancollingwood
Copy link
Author

@mm-airmap out of interest I implemented a very simple heapque and it's fractionally quicker ;)

@johnmeye
Copy link

Hello,

Thank you for the gist. I think there is an error in your code.

With the following maze :

maze = [[0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
            [0, 1, 1, 0, 1, 1, 0, 1, 1, 0],
            [0, 1, 1, 0, 1, 1, 0, 1, 1, 0],
            [0, 1, 1, 0, 1, 1, 0, 1, 1, 0],
            [0, 1, 1, 0, 1, 1, 0, 1, 1, 0],
            [0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
            [0, 1, 1, 0, 1, 1, 0, 1, 1, 0],
            [0, 1, 1, 0, 1, 1, 0, 1, 1, 0],
            [0, 1, 1, 0, 1, 1, 0, 1, 1, 0],
            [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]]
start = (0, 9)
end = (9, 0)

I have this result : [(0, 9), (0, 8), (0, 7), (0, 6), (1, 6), (2, 6), (3, 6), (4, 6), (5, 6), (5, 5), (5, 4), (5, 3), (6, 3), (7, 3), (8, 3), (9, 3), (9, 2), (9, 1), (9, 0)]. It walks on [1] boxes.

I think it's a weird behavior happening when the path takes upper (and/or) right directions.

Cf. https://gist.github.com/Nicholas-Swift/003e1932ef2804bebef2710527008f44#gistcomment-3069887

Hi @alarictabaries

I ran your map and the code solution is correct, i get the following:
Log: Path In Memory
Log: [(0, 9), (0, 8), (0, 7), (0, 6), (1, 6), (2, 6), (3, 6), (4, 6), (5, 6), (5, 5), (5, 4), (5, 3), (6, 3), (7, 3), (8, 3), (9, 3), (9, 2), (9, 1), (9, 0)]
Which looks like this:

0 | 0 | 0 | 0 | 0 | 0 | v | < | < | Start
0 | 1 | 1 | 0 | 1 | 1 | v | 1 | 1 | 0
0 | 1 | 1 | 0 | 1 | 1 | v | 1 | 1 | 0
0 | 1 | 1 | 0 | 1 | 1 | v | 1 | 1 | 0
0 | 1 | 1 | 0 | 1 | 1 | v | 1 | 1 | 0
0 | 0 | 0 | v | < | < | < | 0 | 0 | 0
0 | 1 | 1 | v | 1 | 1 | 0 | 1 | 1 | 0
0 | 1 | 1 | v | 1 | 1 | 0 | 1 | 1 | 0
0 | 1 | 1 | v | 1 | 1 | 0 | 1 | 1 | 0
F | < | < | < | 0 | 0 | 0 | 0 | 0 | 0

You should remember its not (X,Y) but rather (Row, Column) which is like (Y,X) coordinates in the output.

@Pame-hash
Copy link

Im having issues with this implementation. In particular, the code seems to have issues when a node in the open list is rediscovered, but with the same g value. The function gets stuck at line 126 when I try to run it for more complex problems.

@stabtazer
Copy link

stabtazer commented Oct 7, 2020

Hi, thanks a lot for your work!
I believe I have solved a couple of the mentioned issues.
First of all, the 'child is already in the open list' will not remove/update the old child, if the new g is <=; rather it will add the same child to the queue, resulting in duplicates. This is the reason why you thought you needed a max_iterations break. You can avoid using max_iterations and outer_iterations, by replacing this:

        # Child is already in the open list
        if len([open_node for open_node in open_list if child.position == open_node.position and child.g > open_node.g]) > 0:
            continue

with this (I believe it can be simplified):

        # Child is already in the open list
        index = None
        for i in range(0, len(open_list)):
            if child.position == open_list[i].position:
                index = i
                break
        
        if index:
            if child.g >= open_list[index].g:
                continue
            else:
                open_list[index] = open_list[-1]
                open_list.pop()
                if index < len(open_list):
                    heapq._siftup(open_list, index)
                    heapq._siftdown(open_list, 0, index)

The other thing is, that by not using the euclidean distance as heuristics, you will possible end up with a longer route, than needed.
However, this will increase the computational power needed, and without solving the above mentioned problem, this will max out the cpu (at least on my old laptop).
When using the code above, a long with the euclidean distance for child.h AND for child.g(!), the algorithm should work as intended:

        # Create the f, g, and h values
        child.g = current_node.g + (((child.position[0] - child.parent.position[0]) ** 2) + ((child.position[1] - child.parent.position[1]) ** 2))**0.5
        child.h = (((child.position[0] - end_node.position[0]) ** 2) + ((child.position[1] - end_node.position[1]) ** 2))**0.5
        child.f = child.g + child.h

Finally, initialization of start and end node f,h,g values are redundant, as the node class it self does this. You could however set the start_node.f and start_node.g to the euclidean distance to end_node, but this doesn't seem to make a difference really.

@bismanugraha
Copy link

Hello, i'm using this for my final lecture in college. Thanks for the help!

@ryancollingwood
Copy link
Author

@stabtazer thank you for your comments, I'll have a look see at them proper before the grind of new year kicks in 😎

@bismanugraha all the best for wrapping up your college lectures 🎉

@frankhale
Copy link

@stabtazer thank you for your comments, I'll have a look see at them proper before the grind of new year kicks in 😎

@ryancollingwood,

Did you have a chance to look at what @stabtazer mentioned? Just curious what your thoughts are on it.

@Yichen-Lei
Copy link

Yichen-Lei commented Dec 10, 2021

@frankhale @stabtazer @ryancollingwood
I simplify your code as below:

           \# Child is already in the open list
            if child in open_list: 
                idx = open_list.index(child) 
                if child.g < open_list[idx].g:
                    # update the node in the open list
                    open_list[idx].g = child.g
                    open_list[idx].f = child.f
                    open_list[idx].h = child.h
            else:
                \# Add the child to the open list
                heapq.heappush(open_list, child)

Through in function, we can fastly check whether child has been in open list. Also, I think we only need to update the g,h,f of the node which has the same position as child rather than operating the heap.
About the calculation of Euclidean distance, I agree to have a sqrt on h, otherwise, h will play the most role in wayfinding and the result will be a straight line (on weight map).

@ameerkatofficial
Copy link

Are there any ways to do this with multiple nodes with edges? I'm a bit daunted at the prospect of doing so. I'm not looking for a straight up answer, just a hint, some help.

@normcushing
Copy link

normcushing commented Jan 4, 2024

Big thanks to Nicholas Swift, @ryancollingwood , @stabtazer , @Yichen-Lei, and others. I put together the best version based on the feedback and comments provided by everybody, which included implementing the sq root for the value of h. The algorithm works fine on a variety of mazes from 2 x 2, 1 x 3, up to 500 x 500 (although the large maze took a long time to solve).

Things for your consideration:

  1. I implemented a change when increasing the value of g when making a diagonal move. I changed the current cost value of 1 to be 1.4 which is the distance moved from Node center to Node center along the diagonal. Using the 1.4 value differentiates the difference between moving orthogonal to the grid or diagonally.

Example: The cost of an orthogonal move from (0, 0) to (0, 1) is 1. The cost of an orthogonal move from (0, 0) to (1, 1) is 2. With the current implementation, while using the allow_diagonal_movement flag, the cost to move from (0, 0) to (1, 1) is also 1.

With my change when the allow_diagonal_movement flag is in use the cost to move from (0, 0) to (1, 1) is 1.4; thus its bigger than 1 and smaller than 2.

  1. There is a bias in finding the path because the algorithm does not randomize the initial or next search direction. I solved this issue by randomly selecting from the the adjacent_squares list the next direction to search.

  2. Here are my changes to address item 1 and 2:

Additions to initialize the data structures needed to support the diagonal move cost and the direction bias.

    #OLD CODE
    # what squares do we search
    #adjacent_squares = ((0, -1), (0, 1), (-1, 0), (1, 0),)
    #if allow_diagonal_movement:
    #   adjacent_squares = ((0, -1), (0, 1), (-1, 0), (1, 0), (-1, -1), (-1, 1), (1, -1), (1, 1),)
    #NEW CODE
    if allow_diagonal_movement:
        adjacent_squares = ((0, -1), (0, 1), (-1, 0), (1, 0), (-1, -1), (-1, 1), (1, -1), (1, 1))
        direction_cost = (1.0, 1.0, 1.0, 1.0, 1.4, 1.4, 1.4, 1.4)
        adjacent_square_pick_index = [0, 1, 2, 3, 4, 5, 6, 7]
    else:
        adjacent_squares = ((0, -1), (0, 1), (-1, 0), (1, 0))
        direction_cost= (1.0, 1.0, 1.0, 1.0)
        adjacent_square_pick_index = [0, 1, 2, 3]

    #EXISTING CODE
    # Loop until you find the end
    while len(open_list) > 0:

        #NEW CODE
        #Randomize the order of the adjacent_squares_pick_index to avoid a decision making bias
        random.shuffle(adjacent_square_pick_index)

Change to the for loop when selecting the initial and next direction to search, and to assign the direction move cost.

        # OLD CODE
        #for new_position in adjacent_squares: # Adjacent squares
        # NEW CODE
        for pick_index in adjacent_squares_pick_index:
            new_position = adjacent_squares[pick_index]
            direction_cost_factor = direction_cost[pick_index]

Change to the increase in g when assigning the move cost.

            # OLD CODE
            #child.g = current_node.g + 1
            # NEW CODE
            child.g = current_node.g + direction_cost_factor 

I hope this is useful.

@normcushing
Copy link

Are there any ways to do this with multiple nodes with edges? I'm a bit daunted at the prospect of doing so. I'm not looking for a straight up answer, just a hint, some help.

@ameerkatofficial, can you provide a more descriptive problem statement to help me better understand the problem you are looking to solve. I would benefit from better understanding what is meant by "multiple nodes with edges".

@cirillom
Copy link

@frankhale @stabtazer @ryancollingwood I simplify your code as below:

           \# Child is already in the open list
            if child in open_list: 
                idx = open_list.index(child) 
                if child.g < open_list[idx].g:
                    # update the node in the open list
                    open_list[idx].g = child.g
                    open_list[idx].f = child.f
                    open_list[idx].h = child.h
            else:
                \# Add the child to the open list
                heapq.heappush(open_list, child)

Through in function, we can fastly check whether child has been in open list. Also, I think we only need to update the g,h,f of the node which has the same position as child rather than operating the heap. About the calculation of Euclidean distance, I agree to have a sqrt on h, otherwise, h will play the most role in wayfinding and the result will be a straight line (on weight map).

@Yichen-Lei that's a great solution, however you forgot to update the parent, this leads to suboptimal paths. You should just update the entire open_list[idx]

           # Child is already in the open list
            if child in open_list: 
                idx = open_list.index(child) 
                if child.g < open_list[idx].g:
                    # update the node in the open list
                    open_list[idx] = child
            else:
                # Add the child to the open list
                heapq.heappush(open_list, child)

@cirillom
Copy link

After messing around with this code for a uni project, I perfected the code and created a version that works for both 2d and 3d

from warnings import warn
import heapq
from itertools import product

class Node:
    def __init__(self, parent=None, position=None):
        self.parent = parent
        self.position = position

        self.g = 0
        self.h = 0
        self.f = 0

    def __eq__(self, other):
        return self.position == other.position
    
    def __repr__(self):
      return f"{self.position} - g: {self.g} h: {self.h} f: {self.f}"

    # defining less than for purposes of heap queue
    def __lt__(self, other):
      return self.f < other.f
    
    # defining greater than for purposes of heap queue
    def __gt__(self, other):
      return self.f > other.f

    def return_path(self):
        path = []
        current = self
        while current is not None:
            path.append(current.position)
            current = current.parent
        return path[::-1]
    
    def euclidean_distance(self, other):
        return round(sum([(self.position[i] - other.position[i])**2 for i in range(len(self.position))]) ** 0.5, 3)

    def is_outside(self, maze):
        def check_dimension(dim_maze, position, dim=0):
            if dim >= len(position):
                return False
            
            if position[dim] < 0 or position[dim] >= len(dim_maze):
                return True
            
            return check_dimension(dim_maze[position[dim]], position, dim + 1)
        
        return check_dimension(maze, self.position)

    def is_wall(self, maze):
        def navigate_dimension(dim_maze, position, dim=0):
            if dim == len(position) - 1:
                return dim_maze[position[dim]] == 0
            
            return navigate_dimension(dim_maze[position[dim]], position, dim + 1)
        
        return navigate_dimension(maze, self.position)

def astar(maze, start, end):
    dimension = len(maze.shape)
    if len(start) != dimension or len(end) != dimension:
        raise ValueError("Start and end must have the same number of dimensions as the maze")

    # Create start and end node
    start_node = Node(None, start)
    end_node = Node(None, end)

    closed_list = []
    open_list = []
    heapq.heapify(open_list) #create priority queue for open list
    heapq.heappush(open_list, start_node)

    outer_iterations = 0
    max_iterations = (maze.size // 2)

    # Define the adjacent squares (including diagonals)
    adjacent_squares = [c for c in product((-1, 0, 1), repeat=dimension) if any(c)]

    # Loop until you find the end
    while len(open_list) > 0:
        outer_iterations += 1

        if outer_iterations > max_iterations:# if we cannot find by searching half the maze, we give up
            warn("giving up on pathfinding. too many iterations")
            return current_node.return_path()
        
        # Get the current node
        current_node = heapq.heappop(open_list)
        closed_list.append(current_node)

        if current_node == end_node and open_list[0].f >= current_node.f:
            #! we are done
            return current_node.return_path()

        for new_position in adjacent_squares:
            # Get node position
            node_position = tuple([current_node.position[i] + new_position[i] for i in range(dimension)])
            new_node = Node(current_node, node_position)

            if new_node.is_outside(maze):
                continue

            if new_node.is_wall(maze):
                continue

            if new_node in closed_list:
                continue

            child = new_node #the new node is a valid child of the current_node
            #calculate the heuristic
            cost = sum([(child.position[i] - current_node.position[i])**2 for i in range(dimension)]) ** 0.5
            child.g = current_node.g + cost
            child.h = child.euclidean_distance(end_node)
            child.f = child.g + child.h

            #add or update the child to the open list
            if child in open_list: 
                i = open_list.index(child) 
                if child.g < open_list[i].g:
                    # update the node in the open list
                    open_list[i] = child
            else:
                heapq.heappush(open_list, child)

    warn("Couldn't get a path to destination")
    return None

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment