Skip to content

Instantly share code, notes, and snippets.

@alexlib
Last active April 12, 2025 18:44
Show Gist options
  • Save alexlib/f1e9dd20d4afc5e4497947d0c6844191 to your computer and use it in GitHub Desktop.
Save alexlib/f1e9dd20d4afc5e4497947d0c6844191 to your computer and use it in GitHub Desktop.
Flowtracks trajectories to Eulerian average field using RBF interpolator
import numpy as np
from scipy.interpolate import RBFInterpolator
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
def process_frames_average(particles, N, corner_m, fps, max_frames=None):
"""
Process multiple frames to create average velocity maps.
Args:
particles: Particle data object
N: Grid resolution
corner_m: Corner coordinates [x, y] in meters
fps: Frames per second
max_frames: Maximum number of frames to process (None for all frames)
Returns:
tuple: (u_mean, v_mean, X_phys, Y_phys, maskout)
"""
# Create grid in physical units using bounding box
bbox = particles.bounding_box()
x_phys = np.linspace(bbox[0][0], bbox[1][0], N)
y_phys = np.linspace(bbox[0][1], bbox[1][1], N)
X_phys, Y_phys = np.meshgrid(x_phys, y_phys)
# Create mask in physical units
maskout = np.logical_and(X_phys > corner_m[0], Y_phys < corner_m[1])
X_phys = np.ma.masked_where(maskout, X_phys)
Y_phys = np.ma.masked_where(maskout, Y_phys)
# Prepare grid for interpolation
xgrid = np.stack([X_phys, Y_phys])
xflat = xgrid.reshape(2, -1).T
# Add zero-velocity points along the walls
n_wall_points = 50
y_wall = np.linspace(bbox[0][1], corner_m[1], n_wall_points)
x_wall = np.full_like(y_wall, corner_m[0])
zeros_wall = np.zeros_like(y_wall)
# Horizontal wall
x_wall_h = np.linspace(corner_m[0], bbox[1][0], n_wall_points)
y_wall_h = np.full_like(x_wall_h, corner_m[1])
# Lists to store interpolated fields
u_fields = []
v_fields = []
# Process frames
frame_count = 0
for frame in particles.iter_frames():
# Get positions and velocities
pos = frame.pos()[:,:2]
x_t, y_t = pos[:,0], pos[:,1]
u_t = frame.velocity()[:,0]
v_t = frame.velocity()[:,1]
# Mask data points
pos_mask = np.logical_and(x_t > corner_m[0], y_t < corner_m[1])
x_t = np.ma.masked_where(pos_mask, x_t)
y_t = np.ma.masked_where(pos_mask, y_t)
u_t = np.ma.masked_where(pos_mask, u_t)
v_t = np.ma.masked_where(pos_mask, v_t)
# Combine original data with wall points
x_t = np.concatenate([x_t.compressed(), x_wall, x_wall_h])
y_t = np.concatenate([y_t.compressed(), y_wall, y_wall_h])
u_t = np.concatenate([u_t.compressed(), zeros_wall, zeros_wall])
v_t = np.concatenate([v_t.compressed(), zeros_wall, zeros_wall])
# Interpolate
uflat = RBFInterpolator(np.stack([x_t, y_t]).T, u_t,
kernel='thin_plate_spline', smoothing=0.00005)(xflat)
vflat = RBFInterpolator(np.stack([x_t, y_t]).T, v_t,
kernel='thin_plate_spline', smoothing=0.00005)(xflat)
# Convert to m/s and reshape
u_interp = uflat.reshape(N, N) * fps
v_interp = vflat.reshape(N, N) * fps
# Mask and append
u_interp = np.ma.masked_where(maskout, u_interp)
v_interp = np.ma.masked_where(maskout, v_interp)
u_fields.append(u_interp)
v_fields.append(v_interp)
frame_count += 1
if max_frames is not None and frame_count >= max_frames:
break
if frame_count % 10 == 0: # Progress update every 10 frames
print(f"Processed {frame_count} frames")
# Compute means
u_mean = np.ma.mean(np.ma.stack(u_fields), axis=0)
v_mean = np.ma.mean(np.ma.stack(v_fields), axis=0)
return u_mean, v_mean, x_phys, y_phys, maskout
def save_results(filename, u_mean, v_mean, X, Y, maskout):
""" Save the results to a .npz file. """
np.savez(filename,
u=u_mean,
v=v_mean,
x=X,
y=Y,
maskout=maskout)
def plot_velocity_field(u_mean, v_mean, x_phys, y_phys, im, bbox):
""" Plot the average velocity field with an image background. """
colors = [
(0.0, '#6E7AA9'), # soft muted blue
(0.2, '#8DA0CB'), # lighter soft blue
(0.35, '#C5D5E9'), # very light blue
(0.45, '#EEF2F7'), # nearly white blue
(0.5, '#FFFFFF'), # white
(0.55, '#F7F0EE'), # nearly white pink
(0.65, '#E9D5D3'), # very light pink
(0.8, '#CBA89D'), # lighter soft red
(1.0, '#A97E6E') # soft muted red
]
custom_cmap = mcolors.LinearSegmentedColormap.from_list("custom_diverging", colors)
fig, ax = plt.subplots()
ax.imshow(im, cmap='gray', alpha=0.8,
extent=[bbox[0][0], bbox[1][0], bbox[0][1], bbox[1][1]])
velocity_magnitude = np.sqrt(u_mean**2 + v_mean**2)
cl = ax.pcolormesh(x_phys, y_phys, velocity_magnitude,
shading='gouraud', alpha=0.8, cmap = custom_cmap)
#cmap='coolwarm')
ax.streamplot(x_phys, y_phys, u_mean, v_mean, color='black')
plt.colorbar(cl, label='Velocity magnitude (m/s)', orientation='horizontal')
ax.set_aspect('equal')
plt.xlabel('x (m)')
plt.ylabel('y (m)')
plt.title('Average Velocity Field')
plt.show()
plt.savefig('alex_test51_map.png',dpi=200)
if __name__ == '__main__':
# Usage example:
from flowtracks.scene import Scene
h5_name = r'../corner_51.h5'
npz_name = r'../test.npz'
particles = Scene(h5_name)
bbox = particles.bounding_box()
corner_m = np.array([0.00213, 0.0072])
im = plt.imread(r'../15-03-42.828-100001.tif')
u_mean, v_mean, x_phys, y_phys, maskout = process_frames_average(particles, N=140, corner_m=corner_m, fps=200, max_frames=10)
save_results(npz_name,u_mean, v_mean, x_phys, y_phys, maskout)
plot_velocity_field(u_mean, v_mean, x_phys, y_phys, im, bbox)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment