Skip to content

Instantly share code, notes, and snippets.

@jaskiratsingh2000
Created November 17, 2025 20:38
Show Gist options
  • Select an option

  • Save jaskiratsingh2000/b6d10f2e0570b12c0b66c9a28e3b0481 to your computer and use it in GitHub Desktop.

Select an option

Save jaskiratsingh2000/b6d10f2e0570b12c0b66c9a28e3b0481 to your computer and use it in GitHub Desktop.
ORCA Code
import os
import time
import threading
import math
import numpy as np
import matplotlib.pyplot as plt
from numpy import e
from hyperparameters import (
spread, noise, radius, delt, pert, b, sensor_range, number_of_uavs, maxv,
attractive_gain, repulsive_gain, collision_distance, clipping_power, seed, priority_type
)
# -----------------------------------------------------------------------------
# Toggle: enable/disable LLM-based negotiation globally (kept; default OFF)
# -----------------------------------------------------------------------------
USE_LLM_NEGOTIATION = False
if USE_LLM_NEGOTIATION:
from llm_agent import negotiate_priority_with_llm
else:
# No-op stub for priorities if LLM is off.
def negotiate_priority_with_llm(state_i, state_j):
return state_i["current_priority"], state_j["current_priority"], []
# -----------------------------------------------------------------------------
# ORCA / RVO2 toggle
# -----------------------------------------------------------------------------
USE_ORCA = True # ← set to False to revert to your original CBF-QP projection path
# -----------------------------------------------------------------------------
# VFCBF Parameters
# -----------------------------------------------------------------------------
VFCBF_TAU = (collision_distance * 1.5)**2 / 2.0
VFCBF_BETA = 1.0
VFCBF_GAMMA_GAIN = 0.5
VFCBF_ACTIVATION_RANGE = sensor_range * 1.1
# -----------------------------------------------------------------------------
# Tracking & Simulation parameters
# -----------------------------------------------------------------------------
kv_track = 12.0
damping_coefficient = 0.1
EPS = 1e-6
# -----------------------------------------------------------------------------
# Goal proximity shaping and override to kill tangential “herding”
# -----------------------------------------------------------------------------
GOAL_SLOWDOWN_RADIUS = 2.5
REPULSION_FADE_START = 4.0
REPULSION_FADE_END = 1.0
VFCBF_RELAX_RADIUS = 2.0
GOAL_OVERRIDE_RADIUS = 2.0
HARD_SAFETY_ONLY_RADIUS = 1.25 * collision_distance
FOV_COS = 0.0
K_GOAL_OVR = 1.5
GOAL_OVERRIDE_ENTER = 2.0
GOAL_OVERRIDE_EXIT = 2.4
U_CMD_SMOOTHING = 0.35
def smoothstep01(x: float) -> float:
if x <= 0.0: return 0.0
if x >= 1.0: return 1.0
return x * x * (3.0 - 2.0 * x)
global_data_array = np.zeros((7, 13))
# Seed
np.random.seed(seed)
print("Random seed set to:", seed)
# -----------------------------------------------------------------------------
# No-Fly Zone (NFZ) Parameters
# -----------------------------------------------------------------------------
NFZ_WALL_X = 0.0
DOOR_HEIGHT = collision_distance * 8 * 1.2
DOOR_Y_MIN = -DOOR_HEIGHT / 2
DOOR_Y_MAX = DOOR_HEIGHT / 2
LANE_OFFSET_Y = DOOR_HEIGHT / 4.0
WAYPOINT_FOR_RIGHT_TRAFFIC = np.array([NFZ_WALL_X, -LANE_OFFSET_Y])
WAYPOINT_FOR_LEFT_TRAFFIC = np.array([NFZ_WALL_X, LANE_OFFSET_Y])
NFZ_CIRC_CENTER = np.array([radius * 0.5, radius * 0.5])
NFZ_CIRC_RADIUS = radius * 0.2
# =============================================================================
# Fairness Metric
# =============================================================================
def calculate_fairness_score(priorities, mission_times):
total_fairness_score = 0.0
print("\n--- Calculating Final Fairness Score ---")
for i in range(len(priorities)):
p_i = priorities[i]
t_i = mission_times[i]
if t_i > 0:
score_i = p_i / t_i
total_fairness_score += score_i
print(f"UAV {i}: Priority={p_i:<4.1f}, Time={t_i:<5.2f}s => Score contribution: {score_i:.2f}")
else:
print(f"UAV {i}: Mission time was zero, score contribution is 0.")
return total_fairness_score
def clear_previous_output_files():
for file_name in ['time.csv', 'priority.csv', 'avg.csv']:
if os.path.isfile(file_name):
os.remove(file_name)
clear_previous_output_files()
# -----------------------------------------------------------------------------
# Scenario init
# -----------------------------------------------------------------------------
def initialize_uav_positions_and_goals(number_of_uavs, radius):
start_positions = []
goal_positions = []
for i in range(1, number_of_uavs // 4 + 1):
position1 = (np.cos(spread * (i / (number_of_uavs // 4))),
np.sin(spread * (i / (number_of_uavs // 4))))
position2 = (-position1[0], position1[1])
position3 = (-position1[0], -position1[1])
position4 = (position1[0], -position1[1])
start_positions.extend([position1, position2, position3, position4])
goal_positions.extend([position2, position1, position4, position3])
start_positions = np.array(start_positions) * radius
goal_positions = np.array(goal_positions) * radius * 1.2
return start_positions, goal_positions
def plot_initial_positions_and_goals(start_positions, goal_positions, number_of_uavs, radius):
cmap = plt.get_cmap('Dark2')
colors = [cmap(i) for i in np.linspace(0, 1, number_of_uavs)]
fig, ax = plt.subplots(figsize=(20, 10))
ax.scatter(start_positions[:, 0], start_positions[:, 1], c=colors, s=200, marker='o')
ax.scatter(goal_positions[:, 0], goal_positions[:, 1], c=colors, marker='*', s=250)
ax.set_xlim(-1.2 * radius, 1.2 * radius)
ax.set_ylim(-1.2 * radius, 1.2 * radius)
ax.set_xlabel('x', fontsize=24)
ax.set_ylabel('y', fontsize=24)
ax.tick_params(axis='both', labelsize=20)
for spine in ax.spines.values():
spine.set_linewidth(3)
spine.set_edgecolor('black')
ax.add_patch(plt.Circle((0, 0), radius, fill=False, linewidth=3, edgecolor='black'))
ax.add_patch(plt.Circle((0, 0), 1.2 * radius, fill=False, linewidth=3, edgecolor='black'))
# Wall with doorway gap:
ax.plot([NFZ_WALL_X, NFZ_WALL_X], [-radius, DOOR_Y_MIN], color='red', linewidth=10, alpha=0.2)
ax.plot([NFZ_WALL_X, NFZ_WALL_X], [DOOR_Y_MAX, radius], color='red', linewidth=10, alpha=0.2)
# Circular NFZ:
ax.add_patch(plt.Circle(NFZ_CIRC_CENTER, NFZ_CIRC_RADIUS, color='red', alpha=0.2))
plt.savefig("Integrated_NFZ_initial_positions.png", dpi=300, bbox_inches='tight')
def initialize_uav_properties(number_of_uavs, radius):
velocity_v = np.zeros((number_of_uavs, 2))
start_positions, goal_positions = initialize_uav_positions_and_goals(number_of_uavs, radius)
acceleration = np.zeros((number_of_uavs, 2))
completed_status = np.zeros(number_of_uavs)
return (velocity_v, start_positions, goal_positions,
acceleration, completed_status)
def set_uav_priorities(number_of_uavs, priority_type):
priority = 3 * np.ones(number_of_uavs)
priority += np.random.uniform(-0.1, 0.1, number_of_uavs)
with open("priority.csv", 'a') as f:
f.write(", ".join(map(str, priority)) + '\n')
return priority
def reached_goal(uav_index, start_positions, goal_positions):
return np.linalg.norm(start_positions[uav_index] - goal_positions[uav_index]) <= 1.2
def rotate_vector(v, angle):
c, s = np.cos(angle), np.sin(angle)
return np.array([c*v[0] - s*v[1], s*v[0] + c*v[1]])
def perturb_velocity(velocity_v):
for i, vel in enumerate(velocity_v):
x, y = vel
delta_theta = np.random.normal(0, np.pi / 2 ** 10.5)
theta = np.arctan2(y, x)
mag = np.hypot(x, y)
theta += delta_theta
velocity_v[i] = np.array([np.cos(theta) * mag, np.sin(theta) * mag])
# -----------------------------------------------------------------------------
# We still define should_roundabout() for completeness,
# but WE NO LONGER USE IT in nominal_velocity().
# -----------------------------------------------------------------------------
def should_roundabout(pi, vi, pj, vj,
d_trigger=None, t_h=2.0, inflate=1.2):
if d_trigger is None:
d_trigger = 2.0 * collision_distance
pij = pj - pi
vij = vi - vj
d = np.linalg.norm(pij)
if d > d_trigger:
return False
closing = np.dot(pij, vij) < 0.0
if not closing:
return False
denom = np.dot(vij, vij) + EPS
t_c = - np.dot(pij, vij) / denom
if (t_c <= 0.0) or (t_c > t_h):
return False
d_min = np.linalg.norm(pij + t_c * vij)
return d_min < inflate * collision_distance
# -----------------------------------------------------------------------------
# CBF-QP velocity filter
# -----------------------------------------------------------------------------
def cbf_filter_velocity(u_des, A_list, b_list):
if not A_list:
return u_des.copy()
A = np.vstack(A_list)
b = np.array(b_list)
if np.all(A @ u_des >= b - 1e-10):
return u_des.copy()
candidates = []
for k in range(len(b)):
a = A[k]
norm2 = a @ a
if norm2 < 1e-12: continue
lam = (b[k] - a @ u_des) / norm2
if lam > 0:
u_k = u_des + lam * a
if np.all(A @ u_k >= b - 1e-10): candidates.append(u_k)
m = len(b)
for i in range(m):
for j in range(i+1, m):
A_act = np.vstack([A[i], A[j]])
rhs = np.array([b[i], b[j]])
M = A_act @ A_act.T
try:
lam = np.linalg.solve(M, rhs - A_act @ u_des)
u_ij = u_des + A_act.T @ lam
if np.all(A @ u_ij >= b - 1e-10): candidates.append(u_ij)
except np.linalg.LinAlgError:
pass
if not candidates:
viol = A @ u_des - b
k = int(np.argmin(viol))
a = A[k]; norm2 = a @ a
lam = (b[k] - a @ u_des) / (norm2 + 1e-12)
u_k = u_des + max(0.0, lam) * a
candidates.append(u_k)
dists = [np.linalg.norm(u - u_des) for u in candidates]
return candidates[int(np.argmin(dists))]
# -----------------------------------------------------------------------------
# Evolutionary VFCBF constraints
# -----------------------------------------------------------------------------
def calculate_vfcbf_constraint_params(p_i, p_obs, tau, beta):
x_vec = p_i - p_obs
r_sq = x_vec @ x_vec
if r_sq < 1e-9: return None, None
r = np.sqrt(r_sq)
k = beta * np.exp(-r_sq / (4.0 * tau))
h_norm = k * r
mod_factor = k**2 * (1.0 - r_sq / (2.0 * tau))
a_vec = mod_factor * x_vec
return a_vec, h_norm
def build_evolutionary_vfcbf_constraints(i, positions, velocities, priorities, goals, override_flags):
A_list, b_list = [], []
pi = positions[i]
di_goal = np.linalg.norm(pi - goals[i])
goal_dir = goals[i] - pi
gnorm = np.linalg.norm(goal_dir) + EPS
ghat = goal_dir / gnorm
override_active = bool(override_flags[i])
relax_factor = 1.0
if di_goal < VFCBF_RELAX_RADIUS:
relax_factor = smoothstep01(di_goal / max(VFCBF_RELAX_RADIUS, EPS))
# agent-agent constraints
for j in range(len(positions)):
if i == j: continue
pj, vj = positions[j], velocities[j]
rij = pj - pi
dist = np.linalg.norm(rij)
if override_active:
if dist > HARD_SAFETY_ONLY_RADIUS:
continue
else:
if np.dot(rij, ghat) < FOV_COS:
continue
if dist > VFCBF_ACTIVATION_RANGE:
continue
current_tau = VFCBF_TAU
if priorities[i] < priorities[j]:
current_tau *= 0.75
a_vec, h_norm = calculate_vfcbf_constraint_params(pi, pj, current_tau, VFCBF_BETA)
if a_vec is not None:
gamma_term = (VFCBF_GAMMA_GAIN * relax_factor) * h_norm
b_val = -gamma_term + (a_vec @ vj)
A_list.append(a_vec)
b_list.append(b_val)
# circular NFZ
dist_to_circ_center = np.linalg.norm(pi - NFZ_CIRC_CENTER)
if dist_to_circ_center < NFZ_CIRC_RADIUS + VFCBF_ACTIVATION_RANGE / 2:
direction = (pi - NFZ_CIRC_CENTER) / (dist_to_circ_center + EPS)
p_obs_circ = NFZ_CIRC_CENTER + NFZ_CIRC_RADIUS * direction
a_vec, h_norm = calculate_vfcbf_constraint_params(pi, p_obs_circ, VFCBF_TAU / 2.0, VFCBF_BETA)
if a_vec is not None:
gamma_term = VFCBF_GAMMA_GAIN * h_norm
b_val = -gamma_term
A_list.append(a_vec)
b_list.append(b_val)
# wall NFZ
if not (DOOR_Y_MIN < pi[1] < DOOR_Y_MAX):
p_obs_wall = np.array([NFZ_WALL_X, np.clip(pi[1], -radius, radius)])
dist_to_wall = np.linalg.norm(pi - p_obs_wall)
if dist_to_wall < VFCBF_ACTIVATION_RANGE / 2:
a_vec, h_norm = calculate_vfcbf_constraint_params(pi, p_obs_wall, VFCBF_TAU / 2.0, VFCBF_BETA)
if a_vec is not None:
gamma_term = VFCBF_GAMMA_GAIN * h_norm
b_val = -gamma_term
A_list.append(a_vec)
b_list.append(b_val)
return A_list, b_list
# -----------------------------------------------------------------------------
# Nominal velocity (APF-based controller)
# MODIFIED: roundabout removed. We NEVER add the +90° rotated term anymore.
# -----------------------------------------------------------------------------
def nominal_velocity(i, positions, velocities, goals, passed_through, priority_arr):
pi = positions[i]
vi = velocities[i]
# lane waypoint logic (doorway guidance)
if not passed_through[i]:
subgoal = WAYPOINT_FOR_RIGHT_TRAFFIC if goals[i][0] > NFZ_WALL_X else WAYPOINT_FOR_LEFT_TRAFFIC
else:
subgoal = goals[i]
# attractive term
d = np.linalg.norm(subgoal - pi)
u_attr = attractive_gain * (2.0 * (1.0 - e**(-2.0 * (d**2))) * (subgoal - pi) / (d + EPS))
# distance to final goal
d_goal = np.linalg.norm(positions[i] - goals[i])
# If super close to goal, skip repulsion shaping here.
if d_goal < GOAL_OVERRIDE_RADIUS:
return u_attr
# fade repulsion near goal
if REPULSION_FADE_START <= REPULSION_FADE_END:
repulsion_scale = 1.0
else:
t = (d_goal - REPULSION_FADE_END) / max(REPULSION_FADE_START - REPULSION_FADE_END, EPS)
repulsion_scale = smoothstep01(t)
u_interaction = np.zeros(2)
FOLLOWING_DISTANCE = collision_distance * 3.0
FOLLOWING_GAIN_FACTOR = 0.75
dir_to_subgoal = (subgoal - pi) / (np.linalg.norm(subgoal - pi) + EPS)
if repulsion_scale > 0.0:
for j in range(len(positions)):
if j == i:
continue
pj, vj = positions[j], velocities[j]
p_ij_vec = pj - pi
dij = np.linalg.norm(p_ij_vec)
if dij >= sensor_range:
continue
is_in_front = np.dot(dir_to_subgoal, p_ij_vec / (dij + EPS)) > 0.707
if is_in_front:
# Damped following spring
spring_k = repulsive_gain * FOLLOWING_GAIN_FACTOR
unit_ij = p_ij_vec / (dij + EPS)
pos_term = spring_k * (dij - FOLLOWING_DISTANCE) * unit_ij
rel_speed_along = np.dot(vi - vj, unit_ij)
damping_gain = 0.8 * repulsive_gain * FOLLOWING_GAIN_FACTOR
damp_term = -damping_gain * rel_speed_along * unit_ij
u_interaction += repulsion_scale * (pos_term + damp_term)
else:
# JUST STRAIGHT REPULSION. NO ROUNDABOUT ROTATION.
rep = ((priority_arr[j] / priority_arr[i]) * repulsive_gain *
(e**(-b * (dij - collision_distance)**2)) *
((pi - pj) / (dij + EPS)))
u_interaction += repulsion_scale * 0.25 * rep
# tiny goal-facing boost near final goal
if d_goal < GOAL_SLOWDOWN_RADIUS:
boost_dir = (goals[i] - pi)
boost_norm = np.linalg.norm(boost_dir) + EPS
u_attr += 0.75 * boost_dir / boost_norm
return u_attr + u_interaction
# -----------------------------------------------------------------------------
# Doorway pass check
# -----------------------------------------------------------------------------
def update_passed_through(i, positions):
start_x = initial_start_positions[i, 0]
current_x = positions[i, 0]
if (start_x < NFZ_WALL_X and current_x > NFZ_WALL_X) or \
(start_x > NFZ_WALL_X and current_x < NFZ_WALL_X):
return True
return False
# -----------------------------------------------------------------------------
# Optional LLM negotiation bookkeeping
# -----------------------------------------------------------------------------
uav_contexts = ["Mission"] * number_of_uavs
def negotiation_worker(i, j, state_i, state_j):
if not USE_LLM_NEGOTIATION: return
new_i, new_j, transcripts = negotiate_priority_with_llm(state_i, state_j)
priority[i], priority[j] = new_i, new_j
# -----------------------------------------------------------------------------
# METRICS
# -----------------------------------------------------------------------------
def compute_pair_and_ammd(min_distances_matrix):
N = min_distances_matrix.shape[0]
upper = min_distances_matrix[np.triu_indices(N, k=1)]
upper = upper[np.isfinite(upper)]
pair_avg_min = float(np.mean(upper)) if upper.size else float('inf')
sym = np.minimum(min_distances_matrix, min_distances_matrix.T).copy()
np.fill_diagonal(sym, np.inf)
per_uav_min = np.min(sym, axis=1)
finite = per_uav_min[np.isfinite(per_uav_min)]
ammd = float(np.mean(finite)) if finite.size else float('inf')
global_min = float(np.min(finite)) if finite.size else float('inf')
return pair_avg_min, ammd, global_min, per_uav_min
def _do_from_path(path_xy, ds_min=None, eps=1e-9):
P = np.asarray(path_xy, dtype=float)
if P.shape[0] < 3:
return {"TV": 0.0, "NR": 0.0, "L": 0.0, "DO_rad_per_m": 0.0, "DO_deg_per_m": 0.0}
seg = np.diff(P, axis=0)
ds = np.linalg.norm(seg, axis=1)
if ds_min is None:
ds_min = 0.05 * maxv * delt
mask = ds >= ds_min
if np.count_nonzero(mask) < 2:
L_mask = float(np.sum(ds[mask])) if mask.size else 0.0
return {"TV": 0.0, "NR": 0.0, "L": L_mask, "DO_rad_per_m": 0.0, "DO_deg_per_m": 0.0}
seg = seg[mask]
ds = ds[mask]
theta = np.unwrap(np.arctan2(seg[:, 1], seg[:, 0]))
dtheta = np.diff(theta)
TV = float(np.sum(np.abs(dtheta)))
NR = float(abs(theta[-1] - theta[0]))
L = float(np.sum(ds))
DO = max(0.0, TV - NR) / max(L, eps)
DO_deg_per_m = DO * (180.0 / np.pi)
return {"TV": TV, "NR": NR, "L": L, "DO_rad_per_m": DO, "DO_deg_per_m": DO_deg_per_m}
def compute_oscillation_metrics(path_history):
per = [_do_from_path(traj) for traj in path_history]
do_vals = np.array([d["DO_rad_per_m"] for d in per], dtype=float)
do_deg = do_vals * (180.0 / np.pi)
def p(x, q): return float(np.nanpercentile(x, q)) if x.size else float("nan")
summary = {
"mean_rad_per_m": float(np.nanmean(do_vals)) if do_vals.size else float("nan"),
"median_rad_per_m": float(np.nanmedian(do_vals)) if do_vals.size else float("nan"),
"p90_rad_per_m": p(do_vals, 90),
"max_rad_per_m": float(np.nanmax(do_vals)) if do_vals.size else float("nan"),
"mean_deg_per_m": float(np.nanmean(do_deg)) if do_deg.size else float("nan"),
"median_deg_per_m": float(np.nanmedian(do_deg)) if do_deg.size else float("nan"),
"p90_deg_per_m": p(do_deg, 90),
"max_deg_per_m": float(np.nanmax(do_deg)) if do_deg.size else float("nan"),
}
return per, summary
def _path_length(P):
P = np.asarray(P, float)
if len(P) < 2: return 0.0
seg = np.diff(P, axis=0)
return float(np.sum(np.linalg.norm(seg, axis=1)))
def _unwrap_angles(theta):
return np.unwrap(theta)
def _segment_headings(P):
P = np.asarray(P, float)
seg = np.diff(P, axis=0)
ang = np.arctan2(seg[:,1], seg[:,0])
return _unwrap_angles(ang), np.linalg.norm(seg, axis=1)
def _per_path_smoothness(P):
P = np.asarray(P, float)
L = _path_length(P)
if len(P) < 3 or L < 1e-12:
return 0.0, 0.0, L, 0.0, 0.0
theta, ds = _segment_headings(P)
dtheta = theta[1:] - theta[:-1]
dtheta = (dtheta + np.pi) % (2*np.pi) - np.pi
ds_local = 0.5 * (ds[1:] + ds[:-1])
ds_local = np.where(ds_local < 1e-12, 1e-12, ds_local)
kappa = np.abs(dtheta) / ds_local
Sk2_total = float(np.sum((kappa**2) * ds_local))
HTV_total = float(np.sum(np.abs(dtheta)))
Sk2_over_L = Sk2_total / L
HTV_over_L = HTV_total / L
return Sk2_over_L, HTV_over_L, L, Sk2_total, HTV_total
def fleet_smoothness_index(path_history):
total_L = 0.0
total_Sk2 = 0.0
total_HTV = 0.0
for traj in path_history:
P = np.asarray(traj, float)
Sk2_L, HTV_L, L, Sk2, HTV = _per_path_smoothness(P)
total_L += L
total_Sk2 += Sk2
total_HTV += HTV
if total_L < 1e-12:
return {"K_bar": 0.0, "H_bar": 0.0, "K_rms": 0.0, "FinalSmoothness": 0.0, "total_L": total_L}
K_bar = total_Sk2 / total_L
H_bar = total_HTV / total_L
K_rms = math.sqrt(max(K_bar, 0.0))
FinalSmoothness = K_rms + H_bar
return {
"K_bar": K_bar,
"H_bar": H_bar,
"K_rms": K_rms,
"FinalSmoothness": FinalSmoothness,
"total_L": total_L
}
# -----------------------------------------------------------------------------
# Priority → speed scaling
# -----------------------------------------------------------------------------
MIN_SPEED_FACTOR = 0.7
MAX_SPEED_FACTOR = 1.4
def compute_speed_factors(priorities: np.ndarray) -> np.ndarray:
p = np.asarray(priorities, dtype=float)
p_min = float(np.min(p))
p_max = float(np.max(p))
if abs(p_max - p_min) < 1e-9:
return np.ones_like(p) * ((MIN_SPEED_FACTOR + MAX_SPEED_FACTOR) * 0.5)
p_norm = (p - p_min) / (p_max - p_min + EPS)
return MIN_SPEED_FACTOR + p_norm * (MAX_SPEED_FACTOR - MIN_SPEED_FACTOR)
# -----------------------------------------------------------------------------
# EXTRA SPEED BOOST (CBF branch only, not used in ORCA)
# -----------------------------------------------------------------------------
SPEED_BOOST = 1.5
# -----------------------------------------------------------------------------
# ORCA Integration Wrapper
# -----------------------------------------------------------------------------
AGENT_RADIUS = max(0.5 * collision_distance, 1e-3)
ORCA_neighborDist = sensor_range
ORCA_maxNeighbors = max(10, number_of_uavs - 1)
ORCA_timeHorizon = 2.0
ORCA_timeHorizonObs = 2.0
# NEW: global slow-down knobs for ORCA
ORCA_SPEED_CAP_FACTOR = 0.5 # scales each agent maxSpeed passed to RVO2
ORCA_PREF_SLOWDOWN_FACTOR = 0.6 # scales v_pref before giving to ORCA
def _import_rvo2_or_explain():
try:
import rvo2
return rvo2
except Exception as exc:
raise RuntimeError(
f"Could not import rvo2 ({exc}). "
"You need to build Python-RVO2 from https://github.com/sybrenstuvel/Python-RVO2\n"
)
class ORCAWrapper:
def __init__(self, positions, per_uav_maxv, dt):
rvo2 = _import_rvo2_or_explain()
# NOTE: we already applied ORCA_SPEED_CAP_FACTOR before calling ORCAWrapper
self.sim = rvo2.PyRVOSimulator(
dt,
ORCA_neighborDist,
ORCA_maxNeighbors,
ORCA_timeHorizon,
ORCA_timeHorizonObs,
AGENT_RADIUS,
max(1e-3, float(np.max(per_uav_maxv)))
)
self.n = positions.shape[0]
self.agent_ids = []
for i in range(self.n):
pid = self.sim.addAgent(
tuple(positions[i]),
ORCA_neighborDist,
ORCA_maxNeighbors,
ORCA_timeHorizon,
ORCA_timeHorizonObs,
AGENT_RADIUS,
float(per_uav_maxv[i]),
(0.0, 0.0)
)
self.agent_ids.append(pid)
# Obstacles: wall segments above/below doorway
self.sim.addObstacle([
(float(NFZ_WALL_X), float(-radius)),
(float(NFZ_WALL_X), float(DOOR_Y_MIN)),
])
self.sim.addObstacle([
(float(NFZ_WALL_X), float(DOOR_Y_MAX)),
(float(NFZ_WALL_X), float(radius)),
])
# Polygonal approximation of circular NFZ
cx, cy = float(NFZ_CIRC_CENTER[0]), float(NFZ_CIRC_CENTER[1])
cr = float(NFZ_CIRC_RADIUS)
N_VERT = 32
circ_poly = []
for k in range(N_VERT):
ang = 2.0 * math.pi * (k / N_VERT)
circ_poly.append((cx + cr * math.cos(ang), cy + cr * math.sin(ang)))
self.sim.addObstacle(circ_poly)
self.sim.processObstacles()
def set_pref_vels(self, v_pref):
for i, pid in enumerate(self.agent_ids):
v = np.asarray(v_pref[i], float)
spd = np.linalg.norm(v)
if spd > 1e-9:
vmax = self.sim.getAgentMaxSpeed(pid)
if spd > vmax:
v = (vmax / spd) * v
self.sim.setAgentPrefVelocity(pid, (float(v[0]), float(v[1])))
def step(self):
self.sim.doStep()
def get_all_positions(self):
P = np.zeros((self.n, 2), float)
for i, pid in enumerate(self.agent_ids):
x, y = self.sim.getAgentPosition(pid)
P[i, 0], P[i, 1] = x, y
return P
def get_all_velocities(self):
V = np.zeros((self.n, 2), float)
for i, pid in enumerate(self.agent_ids):
vx, vy = self.sim.getAgentVelocity(pid)
V[i, 0], V[i, 1] = vx, vy
return V
# -----------------------------------------------------------------------------
# MAIN
# -----------------------------------------------------------------------------
if __name__ == "__main__":
if 'number_of_uavs' not in globals():
number_of_uavs = 12
check = np.ones(number_of_uavs)
# Initial scene
start_positions, goal_positions = initialize_uav_positions_and_goals(number_of_uavs, radius)
initial_start_positions = start_positions.copy()
plot_initial_positions_and_goals(start_positions, goal_positions, number_of_uavs, radius)
# State arrays
(velocity_v, start_positions, goal_positions,
acceleration, completed_status) = initialize_uav_properties(number_of_uavs, radius)
# Priorities and per-UAV speed scaling
priority = set_uav_priorities(number_of_uavs, priority_type)
speed_factors = compute_speed_factors(priority)
# Base per-agent max speed
per_uav_maxv = maxv * speed_factors
# Apply our global ORCA slow-down cap
if USE_ORCA:
per_uav_maxv = per_uav_maxv * ORCA_SPEED_CAP_FACTOR
# ORCA simulator init
if USE_ORCA:
orca = ORCAWrapper(start_positions, per_uav_maxv, delt)
# Mission timing
t0 = time.time()
uav_start_times = {i: t0 for i in range(number_of_uavs)}
mission_times = np.zeros(number_of_uavs)
path_history = [[start_positions[i].copy()] for i in range(number_of_uavs)]
colors = plt.get_cmap('Dark2')(np.linspace(0, 1, number_of_uavs))
passed_through = np.zeros(number_of_uavs, dtype=bool)
goal_override = np.zeros(number_of_uavs, dtype=bool)
previous_u_cmd = np.zeros_like(velocity_v)
min_distances_matrix = np.full((number_of_uavs, number_of_uavs), np.inf)
r = 0
plt.ion()
while not np.array_equal(completed_status, check):
r += 1
# -------------------------------------------------
# 1) Nominal velocities + mission bookkeeping
# -------------------------------------------------
u_nom = np.zeros_like(velocity_v)
for i in range(number_of_uavs):
if reached_goal(i, start_positions, goal_positions):
if completed_status[i] != 1:
completed_status[i] = 1
velocity_v[i] = np.zeros(2)
if mission_times[i] == 0:
mission_times[i] = time.time() - uav_start_times[i]
print(f"✅ UAV {i} reached goal in {mission_times[i]:.2f} s.")
continue
if not passed_through[i] and update_passed_through(i, start_positions):
passed_through[i] = True
u_nom[i] = nominal_velocity(i, start_positions, velocity_v, goal_positions, passed_through, priority)
# -------------------------------------------------
# 2) ORCA branch vs CBF branch
# -------------------------------------------------
if USE_ORCA:
# hysteresis on goal override
for i in range(number_of_uavs):
if completed_status[i] == 1:
continue
d_i_goal = np.linalg.norm(start_positions[i] - goal_positions[i])
if not goal_override[i] and d_i_goal < GOAL_OVERRIDE_ENTER:
goal_override[i] = True
elif goal_override[i] and d_i_goal > GOAL_OVERRIDE_EXIT:
goal_override[i] = False
v_pref = np.zeros_like(u_nom)
for i in range(number_of_uavs):
if completed_status[i] == 1:
continue
if goal_override[i]:
dir_vec = goal_positions[i] - start_positions[i]
n = np.linalg.norm(dir_vec) + EPS
u_goal_des = K_GOAL_OVR * dir_vec / n
u_des_raw = speed_factors[i] * u_goal_des
else:
u_des_raw = speed_factors[i] * u_nom[i]
# smoothing
v_pref[i] = U_CMD_SMOOTHING * u_des_raw + (1.0 - U_CMD_SMOOTHING) * previous_u_cmd[i]
# slow down as we near the goal to help stop cleanly
d_goal = np.linalg.norm(start_positions[i] - goal_positions[i])
if d_goal < GOAL_SLOWDOWN_RADIUS:
alpha = smoothstep01(d_goal / max(GOAL_SLOWDOWN_RADIUS, EPS))
v_pref[i] = alpha * v_pref[i]
# global slowdown for ORCA (calmer motion)
v_pref *= ORCA_PREF_SLOWDOWN_FACTOR
# step ORCA
orca.set_pref_vels(v_pref)
orca.step()
# sync state
start_positions = orca.get_all_positions()
velocity_v = orca.get_all_velocities()
previous_u_cmd = v_pref.copy()
else:
# --------- Original CBF branch -----------
u_cmd = np.zeros_like(velocity_v)
for i in range(number_of_uavs):
if completed_status[i] == 1:
u_cmd[i] = np.zeros(2)
continue
d_i_goal = np.linalg.norm(start_positions[i] - goal_positions[i])
if not goal_override[i] and d_i_goal < GOAL_OVERRIDE_ENTER:
goal_override[i] = True
elif goal_override[i] and d_i_goal > GOAL_OVERRIDE_EXIT:
goal_override[i] = False
A_list, b_list = build_evolutionary_vfcbf_constraints(
i, start_positions, velocity_v, priority, goal_positions, goal_override
)
if goal_override[i]:
dir_vec = goal_positions[i] - start_positions[i]
n = np.linalg.norm(dir_vec) + EPS
u_goal_des = K_GOAL_OVR * dir_vec / n
u_des_raw = speed_factors[i] * u_goal_des
else:
u_des_raw = speed_factors[i] * u_nom[i]
u_des_smoothed = U_CMD_SMOOTHING * u_des_raw + (1.0 - U_CMD_SMOOTHING) * previous_u_cmd[i]
u_safe = cbf_filter_velocity(u_des_smoothed, A_list, b_list)
u_cmd[i] = SPEED_BOOST * u_safe
for i in range(number_of_uavs):
if completed_status[i] == 1:
continue
d_goal = np.linalg.norm(start_positions[i] - goal_positions[i])
if d_goal < GOAL_SLOWDOWN_RADIUS:
alpha = smoothstep01((d_goal) / max(GOAL_SLOWDOWN_RADIUS, EPS))
u_cmd[i] = alpha * u_cmd[i]
acceleration = -kv_track * (velocity_v - u_cmd)
velocity_v = (velocity_v + acceleration * delt) * (1 - damping_coefficient * delt)
for i in range(number_of_uavs):
speed = np.linalg.norm(velocity_v[i])
vmax_i = per_uav_maxv[i]
if speed > vmax_i:
velocity_v[i] = (vmax_i / (speed + EPS)) * velocity_v[i]
if pert:
perturb_velocity(velocity_v)
start_positions = start_positions + velocity_v * delt
previous_u_cmd = u_cmd.copy()
# -------------------------------------------------
# 3) Metrics bookkeeping
# -------------------------------------------------
for i in range(number_of_uavs):
path_history[i].append(start_positions[i].copy())
for j in range(i + 1, number_of_uavs):
dist = np.linalg.norm(start_positions[i] - start_positions[j])
if dist < min_distances_matrix[i, j]:
min_distances_matrix[i, j] = dist
# -------------------------------------------------
# 4) Render
# -------------------------------------------------
plt.clf()
ax = plt.gca()
for spine in ax.spines.values():
spine.set_linewidth(3)
spine.set_edgecolor('black')
ax.add_patch(plt.Circle((0, 0), radius, fill=False, linewidth=3, edgecolor='black'))
ax.add_patch(plt.Circle((0, 0), 1.2 * radius, fill=False, linewidth=3, edgecolor='black'))
ax.plot([NFZ_WALL_X, NFZ_WALL_X], [-radius, DOOR_Y_MIN], color='red', linewidth=10, alpha=0.2)
ax.plot([NFZ_WALL_X, NFZ_WALL_X], [DOOR_Y_MAX, radius], color='red', linewidth=10, alpha=0.2)
ax.add_patch(plt.Circle(NFZ_CIRC_CENTER, NFZ_CIRC_RADIUS, color='red', alpha=0.2))
for i in range(number_of_uavs):
ph = np.array(path_history[i])
plt.plot(ph[:, 0], ph[:, 1], linestyle='-', linewidth=1.5, color=colors[i], alpha=0.6)
plt.scatter(start_positions[i, 0], start_positions[i, 1], c=[colors[i]], s=80, marker='o')
plt.scatter(goal_positions[:, 0], goal_positions[:, 1], c=colors, marker='*', s=250)
plt.xlim(-1.2 * radius, 1.2 * radius)
plt.ylim(-1.2 * radius, 1.2 * radius)
plt.xlabel('x (meters)', fontsize=32)
plt.ylabel('y (meters)', fontsize=32)
plt.xticks(fontsize=28)
plt.yticks(fontsize=28)
title_backend = "ORCA (RVO2)" if USE_ORCA else "CBF-safe smoothing"
plt.title(f"Simultaneous start; speed scaled by priority ({title_backend})", fontsize=22)
plt.subplots_adjust(bottom=0.20)
plt.pause(0.01)
# =====================================================================
# POST-SIM METRICS / CSV
# =====================================================================
output_filename = "uav_trajectories.csv"
try:
with open(output_filename, 'w') as f:
f.write("Timestep,UAV_ID,X,Y\n")
for uav_id, trajectory in enumerate(path_history):
for timestep, coordinates in enumerate(trajectory):
x, y = coordinates
f.write(f"{timestep},{uav_id},{x},{y}\n")
print(f"✅ Trajectory data saved to {output_filename}")
except IOError as e:
print(f"❌ Error writing to {output_filename}: {e}")
finished = mission_times[mission_times > 0]
avg_time_to_goal = float(np.mean(finished)) if finished.size else float('nan')
pair_avg_min, ammd, global_min_sep, per_uav_min = compute_pair_and_ammd(min_distances_matrix)
per_do, do_summary = compute_oscillation_metrics(path_history)
total_L = sum(d["L"] for d in per_do)
total_excess_turn = sum((d["TV"] - d["NR"]) for d in per_do)
fleet_DO_rad_per_m = (total_excess_turn / max(total_L, 1e-9)) if total_L > 0 else 0.0
fleet_DO_deg_per_m = fleet_DO_rad_per_m * (180.0 / np.pi)
per_uav_do_deg = ", ".join(f"{d['DO_deg_per_m']:.2f}" for d in per_do)
fleet = fleet_smoothness_index(path_history)
print("\n==================== METRICS ====================")
print(f"Average Time to Goal (s): {avg_time_to_goal:.3f}")
print(f"Pair-averaged Minimum Separation (yours): {pair_avg_min:.3f}")
print(f"AMMD (per-UAV average of min sep): {ammd:.3f}")
print(f"Global Minimum Separation: {global_min_sep:.3f}")
print("Per-UAV Min Separation:", ", ".join(f"{d:.3f}" for d in per_uav_min))
print(f"Degree of Oscillation (fleet length-weighted): {fleet_DO_rad_per_m:.4f} rad/m ({fleet_DO_deg_per_m:.2f} °/m)")
print(f"Degree of Oscillation (median): {do_summary['median_rad_per_m']:.4f} rad/m ({do_summary['median_deg_per_m']:.2f} °/m)")
print(f"Degree of Oscillation (P90): {do_summary['p90_rad_per_m']:.4f} rad/m ({do_summary['p90_deg_per_m']:.2f} °/m)")
print(f"Degree of Oscillation (max): {do_summary['max_rad_per_m']:.4f} rad/m ({do_summary['max_deg_per_m']:.2f} °/m)")
print(f"Per-UAV DO (deg/m): {per_uav_do_deg}")
print("\n--- Fleet Smoothness (unit-aligned) ---")
print(f"K̄ = ∫κ²/L : {fleet['K_bar']:.6f} [1/m²]")
print(f"K_rms = sqrt(K̄) : {fleet['K_rms']:.6f} [1/m]")
print(f"H̄ = HTV/L : {fleet['H_bar']:.6f} [1/m]")
print(f"FinalSmoothness = K_rms+H̄: {fleet['FinalSmoothness']:.6f} [lower = smoother]")
print(f"Total fleet length (m) : {fleet['total_L']:.2f}")
print("=================================================\n")
try:
with open("oscillation.csv", "w") as f:
f.write("uav_id,TV_rad,NR_rad,L_m,DO_rad_per_m,DO_deg_per_m\n")
for idx, d in enumerate(per_do):
f.write(f"{idx},{d['TV']:.8f},{d['NR']:.8f},{d['L']:.6f},{d['DO_rad_per_m']:.8f},{d['DO_deg_per_m']:.8f}\n")
with open("oscillation_summary.csv", "w") as f:
f.write("fleet_rad_per_m,fleet_deg_per_m,median_rad_per_m,p90_rad_per_m,max_rad_per_m,median_deg_per_m,p90_deg_per_m,max_deg_per_m,total_length_m\n")
f.write(
f"{fleet_DO_rad_per_m:.8f},{fleet_DO_deg_per_m:.8f},"
f"{do_summary['median_rad_per_m']:.8f},{do_summary['p90_rad_per_m']:.8f},{do_summary['max_rad_per_m']:.8f},"
f"{do_summary['median_deg_per_m']:.8f},{do_summary['p90_deg_per_m']:.8f},{do_summary['max_deg_per_m']:.8f},"
f"{total_L:.6f}\n"
)
print("✅ Oscillation metrics saved to oscillation.csv and oscillation_summary.csv")
except IOError as e:
print(f"❌ Error writing oscillation CSVs: {e}")
try:
with open("smoothness.csv", "w") as f:
f.write("fleet_Kbar,fleet_Krms,fleet_Hbar,fleet_FinalSmoothness,total_length\n")
f.write(f"{fleet['K_bar']:.8f},{fleet['K_rms']:.8f},{fleet['H_bar']:.8f},{fleet['FinalSmoothness']:.8f},{fleet['total_L']:.6f}\n")
print("✅ Smoothness metrics saved to smoothness.csv")
except IOError as e:
print(f"❌ Error writing to smoothness.csv: {e}")
final_fairness = calculate_fairness_score(priority, mission_times)
print("-----------------------------------------")
print(f"🏆 Final Fairness Score: {final_fairness:.4f}")
print("-----------------------------------------")
plt.ioff()
plt.show()
print("All UAVs completed:", np.array_equal(np.ones(number_of_uavs), np.ones(number_of_uavs)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment