Created
December 1, 2025 19:25
-
-
Save globalpolicy/423c7788e4768aca5627bf6f8776da9a to your computer and use it in GitHub Desktop.
Finite Difference schemes solution to the Advection-Diffusion Equation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| from matplotlib.animation import FuncAnimation | |
| import math | |
| L=1000 | |
| number_of_nodes=500 | |
| number_of_intervals=number_of_nodes-1 | |
| delX=L/number_of_intervals | |
| delT=1 | |
| number_of_time_steps=500 | |
| V=5 # constant advection speed | |
| D=1 # diffusivity | |
| courant_number=V*delT/delX | |
| peclet_number=V*delX/D | |
| diffusion_number=D*delT/delX**2 | |
| total_time=delT*number_of_time_steps | |
| semi_infinite_domain_length=total_time*V+4*math.sqrt(D*total_time) | |
| def init_plot(num_nodes, y, x1, x2, marker, label, color, linestyle, title): | |
| num_intervals=num_nodes-1 | |
| # Define x and y values | |
| x = [x1+i*(x2-x1)/(num_intervals) for i in range(num_nodes)] | |
| fig, ax = plt.subplots() | |
| # Plot the points | |
| line,=ax.plot(x, y, marker=marker, label=label, color=color, linestyle=linestyle) | |
| # Add labels and a title for clarity | |
| plt.xlabel('x') | |
| plt.ylabel('C') | |
| plt.title(title) | |
| plt.legend() | |
| plt.grid(True) | |
| return fig, ax, line | |
| def animate_fd(solutions, num_nodes, x1, x2, marker='o', label='C(x,t)', color='b', linestyle='-', title='Advection-Diffusion PDE Finite Difference Solutions'): | |
| fig, ax, line = init_plot(num_nodes, solutions[0], x1, x2, marker, label, color, linestyle, title) | |
| # set global y axis limits | |
| ymin = -1 #min(map(min, solutions)) | |
| ymax = 1 #max(map(max, solutions)) | |
| ax.set_ylim(ymin, ymax) | |
| def update(frame): | |
| y = solutions[frame] | |
| line.set_ydata(y) | |
| ax.set_title(f"t = {frame*delT:.3f}") | |
| return line, | |
| anim = FuncAnimation( | |
| fig, | |
| update, | |
| frames=len(solutions), | |
| interval=200, # milliseconds between frames | |
| blit=False # needed for changing title | |
| ) | |
| plt.show() | |
| return anim | |
| solutions=[] # contains nodal values for each time step starting from initial condition (t=0). each row corresponds to a time step | |
| def time_march_crank_nicolson_central_diff(u, number_of_nodes, left_bc): | |
| ''' | |
| Solves a system of linear equations for all the nodal values at once for the next time step | |
| ''' | |
| A=[] # A matrix for AX=B. should contain rows as lists | |
| B=[] # B vector for AX=B | |
| for i in range(number_of_nodes): # loop to create the A matrix and B vector for the system AX=B | |
| if i==0: # first row i.e. left boundary condition | |
| A.append([1]+[0]*(number_of_nodes-1)) # first element 1 and the rest 0s | |
| B.append(left_bc()) # first element of B vector should be the left boundary condition value | |
| elif i==number_of_nodes-1: # last row i.e. right boundary condition | |
| A.append([0]*(number_of_nodes-1)+[1]) # last element should be 1 | |
| B.append(0) # last element of B vector should be 0 | |
| else: # interior nodes | |
| # LHS coefficients | |
| p=delT/(2*delX)*(D/delX + V/2) | |
| q=-1-delT/delX**2 * D | |
| r=delT/(2*delX)*(D/delX - V/2) | |
| # RHS terms | |
| u_i_n=u[i] | |
| d_n=D/delX**2 * (u[i+1]-2*u[i]+u[i-1]) | |
| v_n=V/(2*delX) * (u[i+1]-u[i-1]) | |
| T=-u_i_n-delT/2*(d_n-v_n) | |
| B.append(T) # internal elements of B vector should be T | |
| pre_diagonal_entries=[0]*(i-1) | |
| post_diagonal_entries=[0]*(number_of_nodes-3-len(pre_diagonal_entries)) | |
| tridiagonal_entries=[p,q,r] | |
| A.append(pre_diagonal_entries+tridiagonal_entries+post_diagonal_entries) | |
| u_new=np.linalg.solve(A,B) # nodal solutions for the next time step | |
| return u_new.tolist() | |
| def time_march_ftcs(u, number_of_nodes, left_bc): | |
| ''' | |
| Gives the nodal values for the next time step | |
| u = previous time step's nodal values | |
| f = source term nodal values | |
| ''' | |
| u_new=[] # newly calculated nodal values for the next time step | |
| for i in range(number_of_nodes): | |
| if i==0: # left boundary node | |
| u_new.append(left_bc()) | |
| elif i==number_of_nodes-1: # right boundary node | |
| u_new.append(0) | |
| else: | |
| new_nodal_value=u[i]+D*delT/delX**2 * (u[i+1]-2*u[i]+u[i-1])-0.5*V*delT/delX*(u[i+1]-u[i-1]) | |
| u_new.append(new_nodal_value) | |
| return u_new | |
| def time_march_crank_nicolson_upwind(u, number_of_nodes, left_bc): | |
| ''' | |
| Solves a system of linear equations for all the nodal values at once for the next time step | |
| ''' | |
| A=[] # A matrix for AX=B. should contain rows as lists | |
| B=[] # B vector for AX=B | |
| for i in range(number_of_nodes): # loop to create the A matrix and B vector for the system AX=B | |
| if i==0: # first row i.e. left boundary condition | |
| A.append([1]+[0]*(number_of_nodes-1)) # first element 1 and the rest 0s | |
| B.append(left_bc()) # first element of B vector should be the left boundary condition value | |
| elif i==number_of_nodes-1: # last row i.e. right boundary condition | |
| A.append([0]*(number_of_nodes-1)+[1]) # last element should be 1 | |
| B.append(0) # last element of B vector should be 0 | |
| else: # interior nodes | |
| # LHS coefficients | |
| p=0.5*(diffusion_number+courant_number) | |
| q=-diffusion_number-.5*courant_number-1 | |
| r=.5*diffusion_number | |
| # RHS terms | |
| u_i_n=u[i] | |
| d_n=D/delX**2 * (u[i+1]-2*u[i]+u[i-1]) | |
| v_n=V/delX * (u[i]-u[i-1]) | |
| T=-u_i_n-delT/2*(d_n-v_n) | |
| B.append(T) # internal elements of B vector should be T | |
| pre_diagonal_entries=[0]*(i-1) | |
| post_diagonal_entries=[0]*(number_of_nodes-3-len(pre_diagonal_entries)) | |
| tridiagonal_entries=[p,q,r] | |
| A.append(pre_diagonal_entries+tridiagonal_entries+post_diagonal_entries) | |
| u_new=np.linalg.solve(A,B) # nodal solutions for the next time step | |
| return u_new.tolist() | |
| def make_instantaneous_point_source_initial_us(number_of_nodes, spike_location_relative, spike_value): | |
| # fill the initial solution (for t=0). should have as many elements as the number of nodes (including left b/c and right b/c) | |
| u=[] | |
| for i in range(number_of_nodes): | |
| if i==int(spike_location_relative* number_of_nodes): | |
| u.append(spike_value) # spike in the middle | |
| else: | |
| u.append(0) # initial condition : u should be 0 for all other nodes at t=0 | |
| return u | |
| def make_zero_initial_condition(number_of_nodes): | |
| # fill the initial solution (for t=0). should have as many elements as the number of nodes (including left b/c and right b/c) | |
| u=[] | |
| for i in range(number_of_nodes): | |
| u.append(0) # initial condition : u should be 0 for all nodes at t=0 | |
| return u | |
| # u=make_instantaneous_point_source_initial_us(number_of_nodes, 0.1, 5) | |
| u=make_zero_initial_condition(number_of_nodes) | |
| # fill the initial condition nodal values to the solution matrix | |
| solutions.append(u) | |
| # loop for as many time steps as specified, advancing by delT after each iteration | |
| for n in range(number_of_time_steps): | |
| u_new=time_march_crank_nicolson_central_diff(u,number_of_nodes, lambda: 0.5) | |
| # u_new=time_march_crank_nicolson_upwind(u,number_of_nodes, lambda: 0.5) | |
| # u_new=time_march_ftcs(u,number_of_nodes, lambda: 0.5) | |
| u=u_new | |
| solutions.append(u) | |
| # animate the solutions from the solution matrix | |
| # animate_fd(solutions, number_of_nodes, 0, L) | |
| # exit() | |
| # plot the nodal concentrations at a particular t value | |
| time_step_to_plot=int(number_of_time_steps*.35) | |
| solution=solutions[time_step_to_plot] | |
| # now analytical solution | |
| def u_analytical_continuous_source_origin(x, t, u0, v, D): | |
| """ | |
| Analytical concentration 'u' for a 1D continuous source at the origin. | |
| """ | |
| if t <= 0: | |
| return 0.0 | |
| sqrtDt = 2 * math.sqrt(D * t) | |
| term1 = math.erfc((x - v*t) / sqrtDt) | |
| term2 = math.exp(np.minimum(v*x / D, 700)) * math.erfc((x + v*t) / sqrtDt) | |
| return 0.5 * u0 * (term1 + term2) | |
| node_xs= np.linspace(0,L,num=number_of_nodes) # x = 0, delX, 2*delX, ..., L | |
| t=time_step_to_plot*delT | |
| nodal_us=[u_analytical_continuous_source_origin(x,t,0.5,V,D) for x in node_xs] | |
| # calculate RMSE between the FDA solution and analytical nodal_us | |
| rmse = np.sqrt(np.mean((np.array(nodal_us) - np.array(solution))**2)) | |
| fig,ax,line=init_plot(number_of_nodes, solution, 0, L, marker=None, label= 'FDA',color= 'blue', linestyle='--', title=f'ADE FD Solutions|Cr={courant_number:.2f}|Pe={peclet_number:.2f}|r={diffusion_number:.2f}\nRMSE:{rmse:.3f}') | |
| ax.plot(node_xs, nodal_us, marker=None, label= 'Analytical',color= 'red', linestyle='-') | |
| plt.legend() | |
| plt.show() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment