Last active
March 11, 2019 07:43
-
-
Save fnauman/e105bdf901616345b70145c6ded7c963 to your computer and use it in GitHub Desktop.
mhd with shenfun
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from mpi4py import MPI | |
import numpy as np | |
from shenfun import * | |
nu = 0.000625 | |
eta = 0.01 | |
end_time = 0.1 | |
dt = 0.01 # no adaptive time stepping | |
comm = MPI.COMM_WORLD | |
N = (2**5, 2**5, 2**5) # 32^3 | |
L = [2*np.pi,4*np.pi,6*np.pi] # from TGMHD.py demo from spectralDNS | |
dim = 3 | |
# Define basis/tensor product spaces | |
V0 = Basis(N[0], 'F', dtype='D') # Complex-Complex | |
V1 = Basis(N[1], 'F', dtype='D') # Complex-Complex | |
V2 = Basis(N[2], 'F', dtype='d') # Real-Complex | |
T = TensorProductSpace(comm, (V0, V1, V2), **{'planner_effort': 'FFTW_MEASURE'}) # x,y,z now Fourier basis | |
TV = VectorTensorProductSpace(T) # Vector implies any function "u" will now have 3 components | |
VM = MixedTensorProductSpace([T]*2*dim) | |
#u = TrialFunction(T) # Galerkin method uses the 'weak/variational' forumation | |
#v = TestFunction(T) | |
# Mesh variables | |
X = T.local_mesh(True) | |
K = T.local_wavenumbers(scaled=True) | |
for i in range(dim): | |
X[i] = X[i].astype(float) | |
K[i] = K[i].astype(float) | |
K2 = np.zeros(T.local_shape(True), dtype=float) | |
for i in range(dim): | |
K2 += K[i]*K[i] | |
K_over_K2 = np.zeros(TV.local_shape(), dtype=float) # TV = vector | |
for i in range(dim): | |
K_over_K2[i] = K[i] / np.where(K2 == 0, 1, K2) | |
# Dealiased grid | |
kw = {'padding_factor': 1, | |
'dealias_direct': '2/3-rule'} | |
dtypes = ['D','D','d'] | |
Vp = [Basis(N[i], 'F', domain=(0, L[i]), | |
dtype=dtypes[i], **kw) for i in range(dim)] | |
Tp = TensorProductSpace(comm, Vp, dtype=float, | |
collapse_fourier=True, | |
**{'planner_effort': 'FFTW_MEASURE'}) | |
VTp = VectorTensorProductSpace(Tp) | |
VMp = MixedTensorProductSpace([Tp]*2*dim) | |
ub_dealias = Array(VMp) | |
UB = Array(VM) # Both V and B vectors: 6 components | |
P = Array(T) # Pressure scalar: 1 component | |
curl = Array(TV) # Vector: 3 components | |
UB_hat = Function(VM) # K-space V,B vectors | |
P_hat = Function(T) # K-space Pressure scalar | |
#dU = Function(VM) | |
#Source = Array(VM) | |
ZZ_hat = np.zeros((3, 3) + Tp.local_shape(True), dtype=complex) # Work array | |
# Create views into large data structures | |
U, B = UB[:3], UB[3:] | |
U_hat, B_hat = UB_hat[:3], UB_hat[3:] | |
# Primary variable | |
#u = UB_hat | |
def set_Elsasser(c, ZZ, K): | |
c[:3] = -1j*(K[0]*(ZZ[:, 0] + ZZ[0, :]) | |
+ K[1]*(ZZ[:, 1] + ZZ[1, :]) | |
+ K[2]*(ZZ[:, 2] + ZZ[2, :]))/2.0 | |
c[3:] = 1j*(K[0]*(ZZ[0, :] - ZZ[:, 0]) | |
+ K[1]*(ZZ[1, :] - ZZ[:, 1]) | |
+ K[2]*(ZZ[2, :] - ZZ[:, 2]))/2.0 | |
return c | |
def divergenceConvection(c, z0, z1, Tp, K, ZZ_hat): | |
"""Divergence convection using Elsasser variables | |
z0=U+B | |
z1=U-B | |
""" | |
for i in range(3): | |
for j in range(3): | |
ZZ_hat[i, j] = Tp.forward(z0[i]*z1[j], ZZ_hat[i, j]) | |
c = set_Elsasser(c, ZZ_hat, K) | |
return c | |
def NonlinearRHS(self, ub_hat, **params): # called getConvection in MHD.py | |
global ub_dealias, Tp, VMp, K, ZZ_hat | |
ub_dealias = VMp.backward(ub_hat, ub_dealias) | |
u_dealias = ub_dealias[:3] | |
b_dealias = ub_dealias[3:] | |
# Compute convective term and place in dU | |
Nlin = divergenceConvection(Nlin, u_dealias+b_dealias, u_dealias-b_dealias, | |
Tp, K, ZZ_hat) | |
return Nlin | |
def LinearRHS(self, **params): # called pressure diffusion in MHD.py | |
"""Add contributions from pressure and diffusion to the rhs""" | |
global TV, UB_hat, P_hat, K, K_over_K2 | |
u_hat = UB_hat[:3] | |
b_hat = UB_hat[3:] | |
# Compute pressure (To get actual pressure multiply by 1j) | |
P_hat = np.sum(Lin[:3]*K_over_K2, 0, out=P_hat) | |
# Add pressure gradient | |
for i in range(3): | |
Lin[i] = -P_hat*K[i] | |
# Add contribution from diffusion | |
Lin[:3] -= nu*K2*u_hat | |
Lin[3:] -= eta*K2*b_hat | |
return Lin | |
if __name__ == '__main__': | |
for integrator in (RK4, ETDRK4): | |
# Initialization | |
U[0] = np.sin(X[0])*np.cos(X[1])*np.cos(X[2]) | |
U[1] = -np.cos(X[0])*np.sin(X[1])*np.cos(X[2]) | |
U[2] = 0 | |
B[0] = np.sin(X[0])*np.sin(X[1])*np.cos(X[2]) | |
B[1] = np.cos(X[0])*np.cos(X[1])*np.cos(X[2]) | |
B[2] = 0 | |
UB[:3], UB[3:] = U, B | |
UB_hat = UB.forward(UB_hat) | |
# Solve | |
integ = integrator(VMp, Lin=LinearRHS, Nlin=NonlinearRHS) | |
integ.setup(dt) | |
UB_hat = integ.solve(UB, UB_hat, dt, (0, end_time)) | |
# Check accuracy | |
UB = UB_hat.backward(UB) | |
U,B = UB[:3], UB[3:] | |
k = comm.reduce(0.5*np.sum(U.astype(np.float64)*U.astype(np.float64))/np.prod(np.array(N))) | |
b = comm.reduce(0.5*np.sum(B.astype(np.float64)*B.astype(np.float64))/np.prod(np.array(N))) | |
if comm.Get_rank() == 0: | |
print(k,b) | |
assert np.round(k - 0.124565408177, 7) == 0 | |
assert np.round(b - 0.124637762143, 7) == 0 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This gist should work