Last active
April 3, 2025 14:00
-
-
Save aavogt/26530b34560e3aef23dc329da519b4f8 to your computer and use it in GitHub Desktop.
paraview.simple purge gas mixing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # install cfdof | |
| # make the geometry where the gases mix such that inlets with z>0 are Ar, z<0 are Air | |
| # adjust rx,ry,rz below | |
| # the normal speed-weighted average composition of a rectangle specified below is calculated | |
| # | |
| # TODO: | |
| # - [ ] diffusion: reconstructPar -latestTime, then use the output velocity field with another openfoam solver for the transport equation | |
| # - [ ] composition from boundary names, instead of the ad-hoc plane split based on the location of the farthest upstream point in the streamline | |
| # - [ ] surrogate model (tgp?) for geometry/flow/model parameter sweeps | |
| from typing import cast | |
| from paraview.simple import * | |
| # from rich import inspect | |
| import numpy as np | |
| import pandas as pd | |
| # Load your OpenFOAM data | |
| reader = OpenFOAMReader(FileName=r"pv.foam") | |
| reader.CaseType = 'Decomposed Case' | |
| if hasattr(reader, 'Decomposepolyhedra'): | |
| reader.Decomposepolyhedra = 0 | |
| reader.CellArrays = ['U'] # 'U' is typically the velocity field name in OpenFOAM | |
| reader.UpdatePipeline() | |
| # rectangle where the purge gas is needed / where the average composition is reported | |
| rx = 0.08 | |
| ry1 = 0.0016 | |
| ry2 = 0.03 | |
| rz1 = -1e-3 | |
| rz2 = -0.01 | |
| rectangle = Plane( | |
| Origin=[rx, ry1, rz1], | |
| Point1=[rx, ry2, rz1], | |
| Point2=[rx, ry1, rz2], | |
| XResolution=30, | |
| YResolution=30 | |
| ) | |
| rectangle.UpdatePipeline() | |
| # for now it's just [1,0,0], (normalized later) | |
| rnormal = np.cross(np.array(rectangle.Point1) - np.array(rectangle.Origin), np.array(rectangle.Point2) - np.array(rectangle.Origin)) | |
| # vtk doesn't seem to define python iterators | |
| def cells(xs): | |
| return (xs.GetCell(i) for i in range(xs.GetNumberOfCells())) | |
| def pointids(cells): | |
| return (cells.GetPointId(i) for i in range(cells.GetNumberOfPoints())) | |
| streamTracer = StreamTracerWithCustomSource( | |
| Input=reader, | |
| SeedSource=rectangle, | |
| Vectors = ['POINTS', 'U'], | |
| MaximumStreamlineLength= 100.0, | |
| IntegrationDirection = "BACKWARD") | |
| streamTracer.UpdatePipeline() | |
| streamlines = servermanager.Fetch(streamTracer) | |
| # %% | |
| streamline_points = [ | |
| [*map(streamlines.GetPoint, pointids(cell))] | |
| for cell in cells(streamlines) | |
| ] | |
| velocity_data = streamlines.GetPointData().GetArray('U') | |
| def middle_speed(cell, proj = None): | |
| """ | |
| Get the mean speed through a cell | |
| Parameters: | |
| cell: element | |
| proj: optional vector defining the direction to project onto | |
| TODO: is mean the right average ie. the cell may have a trapezoidal profile, | |
| leading to the 4 vertices on the small top face getting too much weight compared with the larger bottom face, | |
| where a much larger area is determined by the same number of vertices? | |
| Similarly with a parabolic pipe velocity profile, the average of the wall | |
| (0) and center line (2Q/A) gives the right average speed (v = (2Q/A + 0) /2) | |
| such that v*A = Q | |
| """ | |
| if proj is None: | |
| f = lambda _: np.array(np.linalg.norm(velocity_data.GetTuple3(_))) | |
| else: | |
| proj = proj / np.linalg.norm(proj) | |
| f = lambda _: np.array([np.dot(proj,velocity_data.GetTuple3(_))]) | |
| return np.mean([* map(f, pointids(cell))]) | |
| start_speeds = [ np.linalg.norm(middle_speed(cell, rnormal)) for cell in cells(streamlines)] | |
| def fraction_above(xs : list[list[float]], x0 : list[float], n : list[float], ws : list[float] | None=None)-> float: | |
| """ | |
| Count the number of points above the plane defined by the point x0 and normal vector n. | |
| Parameters: | |
| xs: List of points to be checked. | |
| x0: A point on the plane. | |
| n: Normal vector of the plane. | |
| ws: optional weights for each point. | |
| Returns: | |
| weighted fraction above the plane | |
| """ | |
| if ws is None: | |
| ws = [1.] * len(xs) | |
| count = sum(w * (np.dot(np.array(x) - x0, n) > 0) | |
| for x,w in zip(xs, ws)) | |
| return count / sum(ws) | |
| nar= fraction_above([x[-1] for x in streamline_points], | |
| [0,0,0], [0,0,1], # above the xy plane | |
| cast(list[float], start_speeds) | |
| ) | |
| print(f"\nVelocity-weighted composition of the rx,ry1,rz1-rx,ry2,rz2 rectangle\nAr: {nar:.4f} Air: {1-nar:.4f}") | |
| flattened = ([i, *point] for i, sublist in enumerate(streamline_points) for point in sublist) | |
| df = pd.DataFrame(flattened, | |
| columns=['i', 'x', 'y', 'z']) | |
| df.to_csv('streams.csv', index=False) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment