Skip to content

Instantly share code, notes, and snippets.

@globalpolicy
Created December 1, 2025 19:25
Show Gist options
  • Select an option

  • Save globalpolicy/423c7788e4768aca5627bf6f8776da9a to your computer and use it in GitHub Desktop.

Select an option

Save globalpolicy/423c7788e4768aca5627bf6f8776da9a to your computer and use it in GitHub Desktop.
Finite Difference schemes solution to the Advection-Diffusion Equation
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
import math
L=1000
number_of_nodes=500
number_of_intervals=number_of_nodes-1
delX=L/number_of_intervals
delT=1
number_of_time_steps=500
V=5 # constant advection speed
D=1 # diffusivity
courant_number=V*delT/delX
peclet_number=V*delX/D
diffusion_number=D*delT/delX**2
total_time=delT*number_of_time_steps
semi_infinite_domain_length=total_time*V+4*math.sqrt(D*total_time)
def init_plot(num_nodes, y, x1, x2, marker, label, color, linestyle, title):
num_intervals=num_nodes-1
# Define x and y values
x = [x1+i*(x2-x1)/(num_intervals) for i in range(num_nodes)]
fig, ax = plt.subplots()
# Plot the points
line,=ax.plot(x, y, marker=marker, label=label, color=color, linestyle=linestyle)
# Add labels and a title for clarity
plt.xlabel('x')
plt.ylabel('C')
plt.title(title)
plt.legend()
plt.grid(True)
return fig, ax, line
def animate_fd(solutions, num_nodes, x1, x2, marker='o', label='C(x,t)', color='b', linestyle='-', title='Advection-Diffusion PDE Finite Difference Solutions'):
fig, ax, line = init_plot(num_nodes, solutions[0], x1, x2, marker, label, color, linestyle, title)
# set global y axis limits
ymin = -1 #min(map(min, solutions))
ymax = 1 #max(map(max, solutions))
ax.set_ylim(ymin, ymax)
def update(frame):
y = solutions[frame]
line.set_ydata(y)
ax.set_title(f"t = {frame*delT:.3f}")
return line,
anim = FuncAnimation(
fig,
update,
frames=len(solutions),
interval=200, # milliseconds between frames
blit=False # needed for changing title
)
plt.show()
return anim
solutions=[] # contains nodal values for each time step starting from initial condition (t=0). each row corresponds to a time step
def time_march_crank_nicolson_central_diff(u, number_of_nodes, left_bc):
'''
Solves a system of linear equations for all the nodal values at once for the next time step
'''
A=[] # A matrix for AX=B. should contain rows as lists
B=[] # B vector for AX=B
for i in range(number_of_nodes): # loop to create the A matrix and B vector for the system AX=B
if i==0: # first row i.e. left boundary condition
A.append([1]+[0]*(number_of_nodes-1)) # first element 1 and the rest 0s
B.append(left_bc()) # first element of B vector should be the left boundary condition value
elif i==number_of_nodes-1: # last row i.e. right boundary condition
A.append([0]*(number_of_nodes-1)+[1]) # last element should be 1
B.append(0) # last element of B vector should be 0
else: # interior nodes
# LHS coefficients
p=delT/(2*delX)*(D/delX + V/2)
q=-1-delT/delX**2 * D
r=delT/(2*delX)*(D/delX - V/2)
# RHS terms
u_i_n=u[i]
d_n=D/delX**2 * (u[i+1]-2*u[i]+u[i-1])
v_n=V/(2*delX) * (u[i+1]-u[i-1])
T=-u_i_n-delT/2*(d_n-v_n)
B.append(T) # internal elements of B vector should be T
pre_diagonal_entries=[0]*(i-1)
post_diagonal_entries=[0]*(number_of_nodes-3-len(pre_diagonal_entries))
tridiagonal_entries=[p,q,r]
A.append(pre_diagonal_entries+tridiagonal_entries+post_diagonal_entries)
u_new=np.linalg.solve(A,B) # nodal solutions for the next time step
return u_new.tolist()
def time_march_ftcs(u, number_of_nodes, left_bc):
'''
Gives the nodal values for the next time step
u = previous time step's nodal values
f = source term nodal values
'''
u_new=[] # newly calculated nodal values for the next time step
for i in range(number_of_nodes):
if i==0: # left boundary node
u_new.append(left_bc())
elif i==number_of_nodes-1: # right boundary node
u_new.append(0)
else:
new_nodal_value=u[i]+D*delT/delX**2 * (u[i+1]-2*u[i]+u[i-1])-0.5*V*delT/delX*(u[i+1]-u[i-1])
u_new.append(new_nodal_value)
return u_new
def time_march_crank_nicolson_upwind(u, number_of_nodes, left_bc):
'''
Solves a system of linear equations for all the nodal values at once for the next time step
'''
A=[] # A matrix for AX=B. should contain rows as lists
B=[] # B vector for AX=B
for i in range(number_of_nodes): # loop to create the A matrix and B vector for the system AX=B
if i==0: # first row i.e. left boundary condition
A.append([1]+[0]*(number_of_nodes-1)) # first element 1 and the rest 0s
B.append(left_bc()) # first element of B vector should be the left boundary condition value
elif i==number_of_nodes-1: # last row i.e. right boundary condition
A.append([0]*(number_of_nodes-1)+[1]) # last element should be 1
B.append(0) # last element of B vector should be 0
else: # interior nodes
# LHS coefficients
p=0.5*(diffusion_number+courant_number)
q=-diffusion_number-.5*courant_number-1
r=.5*diffusion_number
# RHS terms
u_i_n=u[i]
d_n=D/delX**2 * (u[i+1]-2*u[i]+u[i-1])
v_n=V/delX * (u[i]-u[i-1])
T=-u_i_n-delT/2*(d_n-v_n)
B.append(T) # internal elements of B vector should be T
pre_diagonal_entries=[0]*(i-1)
post_diagonal_entries=[0]*(number_of_nodes-3-len(pre_diagonal_entries))
tridiagonal_entries=[p,q,r]
A.append(pre_diagonal_entries+tridiagonal_entries+post_diagonal_entries)
u_new=np.linalg.solve(A,B) # nodal solutions for the next time step
return u_new.tolist()
def make_instantaneous_point_source_initial_us(number_of_nodes, spike_location_relative, spike_value):
# fill the initial solution (for t=0). should have as many elements as the number of nodes (including left b/c and right b/c)
u=[]
for i in range(number_of_nodes):
if i==int(spike_location_relative* number_of_nodes):
u.append(spike_value) # spike in the middle
else:
u.append(0) # initial condition : u should be 0 for all other nodes at t=0
return u
def make_zero_initial_condition(number_of_nodes):
# fill the initial solution (for t=0). should have as many elements as the number of nodes (including left b/c and right b/c)
u=[]
for i in range(number_of_nodes):
u.append(0) # initial condition : u should be 0 for all nodes at t=0
return u
# u=make_instantaneous_point_source_initial_us(number_of_nodes, 0.1, 5)
u=make_zero_initial_condition(number_of_nodes)
# fill the initial condition nodal values to the solution matrix
solutions.append(u)
# loop for as many time steps as specified, advancing by delT after each iteration
for n in range(number_of_time_steps):
u_new=time_march_crank_nicolson_central_diff(u,number_of_nodes, lambda: 0.5)
# u_new=time_march_crank_nicolson_upwind(u,number_of_nodes, lambda: 0.5)
# u_new=time_march_ftcs(u,number_of_nodes, lambda: 0.5)
u=u_new
solutions.append(u)
# animate the solutions from the solution matrix
# animate_fd(solutions, number_of_nodes, 0, L)
# exit()
# plot the nodal concentrations at a particular t value
time_step_to_plot=int(number_of_time_steps*.35)
solution=solutions[time_step_to_plot]
# now analytical solution
def u_analytical_continuous_source_origin(x, t, u0, v, D):
"""
Analytical concentration 'u' for a 1D continuous source at the origin.
"""
if t <= 0:
return 0.0
sqrtDt = 2 * math.sqrt(D * t)
term1 = math.erfc((x - v*t) / sqrtDt)
term2 = math.exp(np.minimum(v*x / D, 700)) * math.erfc((x + v*t) / sqrtDt)
return 0.5 * u0 * (term1 + term2)
node_xs= np.linspace(0,L,num=number_of_nodes) # x = 0, delX, 2*delX, ..., L
t=time_step_to_plot*delT
nodal_us=[u_analytical_continuous_source_origin(x,t,0.5,V,D) for x in node_xs]
# calculate RMSE between the FDA solution and analytical nodal_us
rmse = np.sqrt(np.mean((np.array(nodal_us) - np.array(solution))**2))
fig,ax,line=init_plot(number_of_nodes, solution, 0, L, marker=None, label= 'FDA',color= 'blue', linestyle='--', title=f'ADE FD Solutions|Cr={courant_number:.2f}|Pe={peclet_number:.2f}|r={diffusion_number:.2f}\nRMSE:{rmse:.3f}')
ax.plot(node_xs, nodal_us, marker=None, label= 'Analytical',color= 'red', linestyle='-')
plt.legend()
plt.show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment