Last active
April 12, 2025 18:44
-
-
Save alexlib/f1e9dd20d4afc5e4497947d0c6844191 to your computer and use it in GitHub Desktop.
Flowtracks trajectories to Eulerian average field using RBF interpolator
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
from scipy.interpolate import RBFInterpolator | |
import matplotlib.pyplot as plt | |
import matplotlib.colors as mcolors | |
def process_frames_average(particles, N, corner_m, fps, max_frames=None): | |
""" | |
Process multiple frames to create average velocity maps. | |
Args: | |
particles: Particle data object | |
N: Grid resolution | |
corner_m: Corner coordinates [x, y] in meters | |
fps: Frames per second | |
max_frames: Maximum number of frames to process (None for all frames) | |
Returns: | |
tuple: (u_mean, v_mean, X_phys, Y_phys, maskout) | |
""" | |
# Create grid in physical units using bounding box | |
bbox = particles.bounding_box() | |
x_phys = np.linspace(bbox[0][0], bbox[1][0], N) | |
y_phys = np.linspace(bbox[0][1], bbox[1][1], N) | |
X_phys, Y_phys = np.meshgrid(x_phys, y_phys) | |
# Create mask in physical units | |
maskout = np.logical_and(X_phys > corner_m[0], Y_phys < corner_m[1]) | |
X_phys = np.ma.masked_where(maskout, X_phys) | |
Y_phys = np.ma.masked_where(maskout, Y_phys) | |
# Prepare grid for interpolation | |
xgrid = np.stack([X_phys, Y_phys]) | |
xflat = xgrid.reshape(2, -1).T | |
# Add zero-velocity points along the walls | |
n_wall_points = 50 | |
y_wall = np.linspace(bbox[0][1], corner_m[1], n_wall_points) | |
x_wall = np.full_like(y_wall, corner_m[0]) | |
zeros_wall = np.zeros_like(y_wall) | |
# Horizontal wall | |
x_wall_h = np.linspace(corner_m[0], bbox[1][0], n_wall_points) | |
y_wall_h = np.full_like(x_wall_h, corner_m[1]) | |
# Lists to store interpolated fields | |
u_fields = [] | |
v_fields = [] | |
# Process frames | |
frame_count = 0 | |
for frame in particles.iter_frames(): | |
# Get positions and velocities | |
pos = frame.pos()[:,:2] | |
x_t, y_t = pos[:,0], pos[:,1] | |
u_t = frame.velocity()[:,0] | |
v_t = frame.velocity()[:,1] | |
# Mask data points | |
pos_mask = np.logical_and(x_t > corner_m[0], y_t < corner_m[1]) | |
x_t = np.ma.masked_where(pos_mask, x_t) | |
y_t = np.ma.masked_where(pos_mask, y_t) | |
u_t = np.ma.masked_where(pos_mask, u_t) | |
v_t = np.ma.masked_where(pos_mask, v_t) | |
# Combine original data with wall points | |
x_t = np.concatenate([x_t.compressed(), x_wall, x_wall_h]) | |
y_t = np.concatenate([y_t.compressed(), y_wall, y_wall_h]) | |
u_t = np.concatenate([u_t.compressed(), zeros_wall, zeros_wall]) | |
v_t = np.concatenate([v_t.compressed(), zeros_wall, zeros_wall]) | |
# Interpolate | |
uflat = RBFInterpolator(np.stack([x_t, y_t]).T, u_t, | |
kernel='thin_plate_spline', smoothing=0.00005)(xflat) | |
vflat = RBFInterpolator(np.stack([x_t, y_t]).T, v_t, | |
kernel='thin_plate_spline', smoothing=0.00005)(xflat) | |
# Convert to m/s and reshape | |
u_interp = uflat.reshape(N, N) * fps | |
v_interp = vflat.reshape(N, N) * fps | |
# Mask and append | |
u_interp = np.ma.masked_where(maskout, u_interp) | |
v_interp = np.ma.masked_where(maskout, v_interp) | |
u_fields.append(u_interp) | |
v_fields.append(v_interp) | |
frame_count += 1 | |
if max_frames is not None and frame_count >= max_frames: | |
break | |
if frame_count % 10 == 0: # Progress update every 10 frames | |
print(f"Processed {frame_count} frames") | |
# Compute means | |
u_mean = np.ma.mean(np.ma.stack(u_fields), axis=0) | |
v_mean = np.ma.mean(np.ma.stack(v_fields), axis=0) | |
return u_mean, v_mean, x_phys, y_phys, maskout | |
def save_results(filename, u_mean, v_mean, X, Y, maskout): | |
""" Save the results to a .npz file. """ | |
np.savez(filename, | |
u=u_mean, | |
v=v_mean, | |
x=X, | |
y=Y, | |
maskout=maskout) | |
def plot_velocity_field(u_mean, v_mean, x_phys, y_phys, im, bbox): | |
""" Plot the average velocity field with an image background. """ | |
colors = [ | |
(0.0, '#6E7AA9'), # soft muted blue | |
(0.2, '#8DA0CB'), # lighter soft blue | |
(0.35, '#C5D5E9'), # very light blue | |
(0.45, '#EEF2F7'), # nearly white blue | |
(0.5, '#FFFFFF'), # white | |
(0.55, '#F7F0EE'), # nearly white pink | |
(0.65, '#E9D5D3'), # very light pink | |
(0.8, '#CBA89D'), # lighter soft red | |
(1.0, '#A97E6E') # soft muted red | |
] | |
custom_cmap = mcolors.LinearSegmentedColormap.from_list("custom_diverging", colors) | |
fig, ax = plt.subplots() | |
ax.imshow(im, cmap='gray', alpha=0.8, | |
extent=[bbox[0][0], bbox[1][0], bbox[0][1], bbox[1][1]]) | |
velocity_magnitude = np.sqrt(u_mean**2 + v_mean**2) | |
cl = ax.pcolormesh(x_phys, y_phys, velocity_magnitude, | |
shading='gouraud', alpha=0.8, cmap = custom_cmap) | |
#cmap='coolwarm') | |
ax.streamplot(x_phys, y_phys, u_mean, v_mean, color='black') | |
plt.colorbar(cl, label='Velocity magnitude (m/s)', orientation='horizontal') | |
ax.set_aspect('equal') | |
plt.xlabel('x (m)') | |
plt.ylabel('y (m)') | |
plt.title('Average Velocity Field') | |
plt.show() | |
plt.savefig('alex_test51_map.png',dpi=200) | |
if __name__ == '__main__': | |
# Usage example: | |
from flowtracks.scene import Scene | |
h5_name = r'../corner_51.h5' | |
npz_name = r'../test.npz' | |
particles = Scene(h5_name) | |
bbox = particles.bounding_box() | |
corner_m = np.array([0.00213, 0.0072]) | |
im = plt.imread(r'../15-03-42.828-100001.tif') | |
u_mean, v_mean, x_phys, y_phys, maskout = process_frames_average(particles, N=140, corner_m=corner_m, fps=200, max_frames=10) | |
save_results(npz_name,u_mean, v_mean, x_phys, y_phys, maskout) | |
plot_velocity_field(u_mean, v_mean, x_phys, y_phys, im, bbox) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment