Skip to content

Instantly share code, notes, and snippets.

@aavogt
Last active April 3, 2025 14:00
Show Gist options
  • Select an option

  • Save aavogt/26530b34560e3aef23dc329da519b4f8 to your computer and use it in GitHub Desktop.

Select an option

Save aavogt/26530b34560e3aef23dc329da519b4f8 to your computer and use it in GitHub Desktop.
paraview.simple purge gas mixing
# install cfdof
# make the geometry where the gases mix such that inlets with z>0 are Ar, z<0 are Air
# adjust rx,ry,rz below
# the normal speed-weighted average composition of a rectangle specified below is calculated
#
# TODO:
# - [ ] diffusion: reconstructPar -latestTime, then use the output velocity field with another openfoam solver for the transport equation
# - [ ] composition from boundary names, instead of the ad-hoc plane split based on the location of the farthest upstream point in the streamline
# - [ ] surrogate model (tgp?) for geometry/flow/model parameter sweeps
from typing import cast
from paraview.simple import *
# from rich import inspect
import numpy as np
import pandas as pd
# Load your OpenFOAM data
reader = OpenFOAMReader(FileName=r"pv.foam")
reader.CaseType = 'Decomposed Case'
if hasattr(reader, 'Decomposepolyhedra'):
reader.Decomposepolyhedra = 0
reader.CellArrays = ['U'] # 'U' is typically the velocity field name in OpenFOAM
reader.UpdatePipeline()
# rectangle where the purge gas is needed / where the average composition is reported
rx = 0.08
ry1 = 0.0016
ry2 = 0.03
rz1 = -1e-3
rz2 = -0.01
rectangle = Plane(
Origin=[rx, ry1, rz1],
Point1=[rx, ry2, rz1],
Point2=[rx, ry1, rz2],
XResolution=30,
YResolution=30
)
rectangle.UpdatePipeline()
# for now it's just [1,0,0], (normalized later)
rnormal = np.cross(np.array(rectangle.Point1) - np.array(rectangle.Origin), np.array(rectangle.Point2) - np.array(rectangle.Origin))
# vtk doesn't seem to define python iterators
def cells(xs):
return (xs.GetCell(i) for i in range(xs.GetNumberOfCells()))
def pointids(cells):
return (cells.GetPointId(i) for i in range(cells.GetNumberOfPoints()))
streamTracer = StreamTracerWithCustomSource(
Input=reader,
SeedSource=rectangle,
Vectors = ['POINTS', 'U'],
MaximumStreamlineLength= 100.0,
IntegrationDirection = "BACKWARD")
streamTracer.UpdatePipeline()
streamlines = servermanager.Fetch(streamTracer)
# %%
streamline_points = [
[*map(streamlines.GetPoint, pointids(cell))]
for cell in cells(streamlines)
]
velocity_data = streamlines.GetPointData().GetArray('U')
def middle_speed(cell, proj = None):
"""
Get the mean speed through a cell
Parameters:
cell: element
proj: optional vector defining the direction to project onto
TODO: is mean the right average ie. the cell may have a trapezoidal profile,
leading to the 4 vertices on the small top face getting too much weight compared with the larger bottom face,
where a much larger area is determined by the same number of vertices?
Similarly with a parabolic pipe velocity profile, the average of the wall
(0) and center line (2Q/A) gives the right average speed (v = (2Q/A + 0) /2)
such that v*A = Q
"""
if proj is None:
f = lambda _: np.array(np.linalg.norm(velocity_data.GetTuple3(_)))
else:
proj = proj / np.linalg.norm(proj)
f = lambda _: np.array([np.dot(proj,velocity_data.GetTuple3(_))])
return np.mean([* map(f, pointids(cell))])
start_speeds = [ np.linalg.norm(middle_speed(cell, rnormal)) for cell in cells(streamlines)]
def fraction_above(xs : list[list[float]], x0 : list[float], n : list[float], ws : list[float] | None=None)-> float:
"""
Count the number of points above the plane defined by the point x0 and normal vector n.
Parameters:
xs: List of points to be checked.
x0: A point on the plane.
n: Normal vector of the plane.
ws: optional weights for each point.
Returns:
weighted fraction above the plane
"""
if ws is None:
ws = [1.] * len(xs)
count = sum(w * (np.dot(np.array(x) - x0, n) > 0)
for x,w in zip(xs, ws))
return count / sum(ws)
nar= fraction_above([x[-1] for x in streamline_points],
[0,0,0], [0,0,1], # above the xy plane
cast(list[float], start_speeds)
)
print(f"\nVelocity-weighted composition of the rx,ry1,rz1-rx,ry2,rz2 rectangle\nAr: {nar:.4f} Air: {1-nar:.4f}")
flattened = ([i, *point] for i, sublist in enumerate(streamline_points) for point in sublist)
df = pd.DataFrame(flattened,
columns=['i', 'x', 'y', 'z'])
df.to_csv('streams.csv', index=False)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment