Last active
March 11, 2019 07:43
-
-
Save fnauman/e105bdf901616345b70145c6ded7c963 to your computer and use it in GitHub Desktop.
mhd with shenfun
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from mpi4py import MPI | |
import numpy as np | |
from shenfun import * | |
nu = 0.000625 | |
eta = 0.01 | |
end_time = 0.1 | |
dt = 0.01 # no adaptive time stepping | |
comm = MPI.COMM_WORLD | |
N = (2**5, 2**5, 2**5) # 32^3 | |
L = [2*np.pi,4*np.pi,6*np.pi] # from TGMHD.py demo from spectralDNS | |
dim = 3 | |
# Define basis/tensor product spaces | |
V0 = Basis(N[0], 'F', dtype='D') # Complex-Complex | |
V1 = Basis(N[1], 'F', dtype='D') # Complex-Complex | |
V2 = Basis(N[2], 'F', dtype='d') # Real-Complex | |
T = TensorProductSpace(comm, (V0, V1, V2), **{'planner_effort': 'FFTW_MEASURE'}) # x,y,z now Fourier basis | |
TV = VectorTensorProductSpace(T) # Vector implies any function "u" will now have 3 components | |
VM = MixedTensorProductSpace([T]*2*dim) | |
#u = TrialFunction(T) # Galerkin method uses the 'weak/variational' forumation | |
#v = TestFunction(T) | |
# Mesh variables | |
X = T.local_mesh(True) | |
K = T.local_wavenumbers(scaled=True) | |
for i in range(dim): | |
X[i] = X[i].astype(float) | |
K[i] = K[i].astype(float) | |
K2 = np.zeros(T.local_shape(True), dtype=float) | |
for i in range(dim): | |
K2 += K[i]*K[i] | |
K_over_K2 = np.zeros(TV.local_shape(), dtype=float) # TV = vector | |
for i in range(dim): | |
K_over_K2[i] = K[i] / np.where(K2 == 0, 1, K2) | |
# Dealiased grid | |
kw = {'padding_factor': 1, | |
'dealias_direct': '2/3-rule'} | |
dtypes = ['D','D','d'] | |
Vp = [Basis(N[i], 'F', domain=(0, L[i]), | |
dtype=dtypes[i], **kw) for i in range(dim)] | |
Tp = TensorProductSpace(comm, Vp, dtype=float, | |
collapse_fourier=True, | |
**{'planner_effort': 'FFTW_MEASURE'}) | |
VTp = VectorTensorProductSpace(Tp) | |
VMp = MixedTensorProductSpace([Tp]*2*dim) | |
ub_dealias = Array(VMp) | |
UB = Array(VM) # Both V and B vectors: 6 components | |
P = Array(T) # Pressure scalar: 1 component | |
curl = Array(TV) # Vector: 3 components | |
UB_hat = Function(VM) # K-space V,B vectors | |
P_hat = Function(T) # K-space Pressure scalar | |
#dU = Function(VM) | |
#Source = Array(VM) | |
ZZ_hat = np.zeros((3, 3) + Tp.local_shape(True), dtype=complex) # Work array | |
# Create views into large data structures | |
U, B = UB[:3], UB[3:] | |
U_hat, B_hat = UB_hat[:3], UB_hat[3:] | |
# Primary variable | |
#u = UB_hat | |
def set_Elsasser(c, ZZ, K): | |
c[:3] = -1j*(K[0]*(ZZ[:, 0] + ZZ[0, :]) | |
+ K[1]*(ZZ[:, 1] + ZZ[1, :]) | |
+ K[2]*(ZZ[:, 2] + ZZ[2, :]))/2.0 | |
c[3:] = 1j*(K[0]*(ZZ[0, :] - ZZ[:, 0]) | |
+ K[1]*(ZZ[1, :] - ZZ[:, 1]) | |
+ K[2]*(ZZ[2, :] - ZZ[:, 2]))/2.0 | |
return c | |
def divergenceConvection(c, z0, z1, Tp, K, ZZ_hat): | |
"""Divergence convection using Elsasser variables | |
z0=U+B | |
z1=U-B | |
""" | |
for i in range(3): | |
for j in range(3): | |
ZZ_hat[i, j] = Tp.forward(z0[i]*z1[j], ZZ_hat[i, j]) | |
c = set_Elsasser(c, ZZ_hat, K) | |
return c | |
def NonlinearRHS(self, ub_hat, **params): # called getConvection in MHD.py | |
global ub_dealias, Tp, VMp, K, ZZ_hat | |
ub_dealias = VMp.backward(ub_hat, ub_dealias) | |
u_dealias = ub_dealias[:3] | |
b_dealias = ub_dealias[3:] | |
# Compute convective term and place in dU | |
Nlin = divergenceConvection(Nlin, u_dealias+b_dealias, u_dealias-b_dealias, | |
Tp, K, ZZ_hat) | |
return Nlin | |
def LinearRHS(self, **params): # called pressure diffusion in MHD.py | |
"""Add contributions from pressure and diffusion to the rhs""" | |
global TV, UB_hat, P_hat, K, K_over_K2 | |
u_hat = UB_hat[:3] | |
b_hat = UB_hat[3:] | |
# Compute pressure (To get actual pressure multiply by 1j) | |
P_hat = np.sum(Lin[:3]*K_over_K2, 0, out=P_hat) | |
# Add pressure gradient | |
for i in range(3): | |
Lin[i] = -P_hat*K[i] | |
# Add contribution from diffusion | |
Lin[:3] -= nu*K2*u_hat | |
Lin[3:] -= eta*K2*b_hat | |
return Lin | |
if __name__ == '__main__': | |
for integrator in (RK4, ETDRK4): | |
# Initialization | |
U[0] = np.sin(X[0])*np.cos(X[1])*np.cos(X[2]) | |
U[1] = -np.cos(X[0])*np.sin(X[1])*np.cos(X[2]) | |
U[2] = 0 | |
B[0] = np.sin(X[0])*np.sin(X[1])*np.cos(X[2]) | |
B[1] = np.cos(X[0])*np.cos(X[1])*np.cos(X[2]) | |
B[2] = 0 | |
UB[:3], UB[3:] = U, B | |
UB_hat = UB.forward(UB_hat) | |
# Solve | |
integ = integrator(VMp, Lin=LinearRHS, Nlin=NonlinearRHS) | |
integ.setup(dt) | |
UB_hat = integ.solve(UB, UB_hat, dt, (0, end_time)) | |
# Check accuracy | |
UB = UB_hat.backward(UB) | |
U,B = UB[:3], UB[3:] | |
k = comm.reduce(0.5*np.sum(U.astype(np.float64)*U.astype(np.float64))/np.prod(np.array(N))) | |
b = comm.reduce(0.5*np.sum(B.astype(np.float64)*B.astype(np.float64))/np.prod(np.array(N))) | |
if comm.Get_rank() == 0: | |
print(k,b) | |
assert np.round(k - 0.124565408177, 7) == 0 | |
assert np.round(b - 0.124637762143, 7) == 0 |
The pressure gradient is not linear in UB. It needs to go into the NonlinearRHS
Use def NonlinearRHS(self, UB, UB_hat, dU **params):
, where dU
is the rhs array.
This is actually a bit tricky since there are different coefficients for the first three items (U/nu) and the last three (B/eta). You can implement it as
def LinearRHS(self, **params): # called pressure diffusion in MHD.py
L = inner(div(grad(u)), v).scale # Really just -K2, but code is clearer
L = np.broadcast_to(L[np.newaxis, ...], (6,)+L.shape).copy()
L[:3] *= nu
L[3:] *= eta
return L # Now this L will be multiplied with UB_hat inside the integrator since it is linear in UB_hat
This gist should work
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
You have to use
L=LinearRHS, N=NonlinearRHS
. See IntegratorBase class