Created
November 17, 2025 20:38
-
-
Save jaskiratsingh2000/b6d10f2e0570b12c0b66c9a28e3b0481 to your computer and use it in GitHub Desktop.
ORCA Code
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import time | |
| import threading | |
| import math | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| from numpy import e | |
| from hyperparameters import ( | |
| spread, noise, radius, delt, pert, b, sensor_range, number_of_uavs, maxv, | |
| attractive_gain, repulsive_gain, collision_distance, clipping_power, seed, priority_type | |
| ) | |
| # ----------------------------------------------------------------------------- | |
| # Toggle: enable/disable LLM-based negotiation globally (kept; default OFF) | |
| # ----------------------------------------------------------------------------- | |
| USE_LLM_NEGOTIATION = False | |
| if USE_LLM_NEGOTIATION: | |
| from llm_agent import negotiate_priority_with_llm | |
| else: | |
| # No-op stub for priorities if LLM is off. | |
| def negotiate_priority_with_llm(state_i, state_j): | |
| return state_i["current_priority"], state_j["current_priority"], [] | |
| # ----------------------------------------------------------------------------- | |
| # ORCA / RVO2 toggle | |
| # ----------------------------------------------------------------------------- | |
| USE_ORCA = True # ← set to False to revert to your original CBF-QP projection path | |
| # ----------------------------------------------------------------------------- | |
| # VFCBF Parameters | |
| # ----------------------------------------------------------------------------- | |
| VFCBF_TAU = (collision_distance * 1.5)**2 / 2.0 | |
| VFCBF_BETA = 1.0 | |
| VFCBF_GAMMA_GAIN = 0.5 | |
| VFCBF_ACTIVATION_RANGE = sensor_range * 1.1 | |
| # ----------------------------------------------------------------------------- | |
| # Tracking & Simulation parameters | |
| # ----------------------------------------------------------------------------- | |
| kv_track = 12.0 | |
| damping_coefficient = 0.1 | |
| EPS = 1e-6 | |
| # ----------------------------------------------------------------------------- | |
| # Goal proximity shaping and override to kill tangential “herding” | |
| # ----------------------------------------------------------------------------- | |
| GOAL_SLOWDOWN_RADIUS = 2.5 | |
| REPULSION_FADE_START = 4.0 | |
| REPULSION_FADE_END = 1.0 | |
| VFCBF_RELAX_RADIUS = 2.0 | |
| GOAL_OVERRIDE_RADIUS = 2.0 | |
| HARD_SAFETY_ONLY_RADIUS = 1.25 * collision_distance | |
| FOV_COS = 0.0 | |
| K_GOAL_OVR = 1.5 | |
| GOAL_OVERRIDE_ENTER = 2.0 | |
| GOAL_OVERRIDE_EXIT = 2.4 | |
| U_CMD_SMOOTHING = 0.35 | |
| def smoothstep01(x: float) -> float: | |
| if x <= 0.0: return 0.0 | |
| if x >= 1.0: return 1.0 | |
| return x * x * (3.0 - 2.0 * x) | |
| global_data_array = np.zeros((7, 13)) | |
| # Seed | |
| np.random.seed(seed) | |
| print("Random seed set to:", seed) | |
| # ----------------------------------------------------------------------------- | |
| # No-Fly Zone (NFZ) Parameters | |
| # ----------------------------------------------------------------------------- | |
| NFZ_WALL_X = 0.0 | |
| DOOR_HEIGHT = collision_distance * 8 * 1.2 | |
| DOOR_Y_MIN = -DOOR_HEIGHT / 2 | |
| DOOR_Y_MAX = DOOR_HEIGHT / 2 | |
| LANE_OFFSET_Y = DOOR_HEIGHT / 4.0 | |
| WAYPOINT_FOR_RIGHT_TRAFFIC = np.array([NFZ_WALL_X, -LANE_OFFSET_Y]) | |
| WAYPOINT_FOR_LEFT_TRAFFIC = np.array([NFZ_WALL_X, LANE_OFFSET_Y]) | |
| NFZ_CIRC_CENTER = np.array([radius * 0.5, radius * 0.5]) | |
| NFZ_CIRC_RADIUS = radius * 0.2 | |
| # ============================================================================= | |
| # Fairness Metric | |
| # ============================================================================= | |
| def calculate_fairness_score(priorities, mission_times): | |
| total_fairness_score = 0.0 | |
| print("\n--- Calculating Final Fairness Score ---") | |
| for i in range(len(priorities)): | |
| p_i = priorities[i] | |
| t_i = mission_times[i] | |
| if t_i > 0: | |
| score_i = p_i / t_i | |
| total_fairness_score += score_i | |
| print(f"UAV {i}: Priority={p_i:<4.1f}, Time={t_i:<5.2f}s => Score contribution: {score_i:.2f}") | |
| else: | |
| print(f"UAV {i}: Mission time was zero, score contribution is 0.") | |
| return total_fairness_score | |
| def clear_previous_output_files(): | |
| for file_name in ['time.csv', 'priority.csv', 'avg.csv']: | |
| if os.path.isfile(file_name): | |
| os.remove(file_name) | |
| clear_previous_output_files() | |
| # ----------------------------------------------------------------------------- | |
| # Scenario init | |
| # ----------------------------------------------------------------------------- | |
| def initialize_uav_positions_and_goals(number_of_uavs, radius): | |
| start_positions = [] | |
| goal_positions = [] | |
| for i in range(1, number_of_uavs // 4 + 1): | |
| position1 = (np.cos(spread * (i / (number_of_uavs // 4))), | |
| np.sin(spread * (i / (number_of_uavs // 4)))) | |
| position2 = (-position1[0], position1[1]) | |
| position3 = (-position1[0], -position1[1]) | |
| position4 = (position1[0], -position1[1]) | |
| start_positions.extend([position1, position2, position3, position4]) | |
| goal_positions.extend([position2, position1, position4, position3]) | |
| start_positions = np.array(start_positions) * radius | |
| goal_positions = np.array(goal_positions) * radius * 1.2 | |
| return start_positions, goal_positions | |
| def plot_initial_positions_and_goals(start_positions, goal_positions, number_of_uavs, radius): | |
| cmap = plt.get_cmap('Dark2') | |
| colors = [cmap(i) for i in np.linspace(0, 1, number_of_uavs)] | |
| fig, ax = plt.subplots(figsize=(20, 10)) | |
| ax.scatter(start_positions[:, 0], start_positions[:, 1], c=colors, s=200, marker='o') | |
| ax.scatter(goal_positions[:, 0], goal_positions[:, 1], c=colors, marker='*', s=250) | |
| ax.set_xlim(-1.2 * radius, 1.2 * radius) | |
| ax.set_ylim(-1.2 * radius, 1.2 * radius) | |
| ax.set_xlabel('x', fontsize=24) | |
| ax.set_ylabel('y', fontsize=24) | |
| ax.tick_params(axis='both', labelsize=20) | |
| for spine in ax.spines.values(): | |
| spine.set_linewidth(3) | |
| spine.set_edgecolor('black') | |
| ax.add_patch(plt.Circle((0, 0), radius, fill=False, linewidth=3, edgecolor='black')) | |
| ax.add_patch(plt.Circle((0, 0), 1.2 * radius, fill=False, linewidth=3, edgecolor='black')) | |
| # Wall with doorway gap: | |
| ax.plot([NFZ_WALL_X, NFZ_WALL_X], [-radius, DOOR_Y_MIN], color='red', linewidth=10, alpha=0.2) | |
| ax.plot([NFZ_WALL_X, NFZ_WALL_X], [DOOR_Y_MAX, radius], color='red', linewidth=10, alpha=0.2) | |
| # Circular NFZ: | |
| ax.add_patch(plt.Circle(NFZ_CIRC_CENTER, NFZ_CIRC_RADIUS, color='red', alpha=0.2)) | |
| plt.savefig("Integrated_NFZ_initial_positions.png", dpi=300, bbox_inches='tight') | |
| def initialize_uav_properties(number_of_uavs, radius): | |
| velocity_v = np.zeros((number_of_uavs, 2)) | |
| start_positions, goal_positions = initialize_uav_positions_and_goals(number_of_uavs, radius) | |
| acceleration = np.zeros((number_of_uavs, 2)) | |
| completed_status = np.zeros(number_of_uavs) | |
| return (velocity_v, start_positions, goal_positions, | |
| acceleration, completed_status) | |
| def set_uav_priorities(number_of_uavs, priority_type): | |
| priority = 3 * np.ones(number_of_uavs) | |
| priority += np.random.uniform(-0.1, 0.1, number_of_uavs) | |
| with open("priority.csv", 'a') as f: | |
| f.write(", ".join(map(str, priority)) + '\n') | |
| return priority | |
| def reached_goal(uav_index, start_positions, goal_positions): | |
| return np.linalg.norm(start_positions[uav_index] - goal_positions[uav_index]) <= 1.2 | |
| def rotate_vector(v, angle): | |
| c, s = np.cos(angle), np.sin(angle) | |
| return np.array([c*v[0] - s*v[1], s*v[0] + c*v[1]]) | |
| def perturb_velocity(velocity_v): | |
| for i, vel in enumerate(velocity_v): | |
| x, y = vel | |
| delta_theta = np.random.normal(0, np.pi / 2 ** 10.5) | |
| theta = np.arctan2(y, x) | |
| mag = np.hypot(x, y) | |
| theta += delta_theta | |
| velocity_v[i] = np.array([np.cos(theta) * mag, np.sin(theta) * mag]) | |
| # ----------------------------------------------------------------------------- | |
| # We still define should_roundabout() for completeness, | |
| # but WE NO LONGER USE IT in nominal_velocity(). | |
| # ----------------------------------------------------------------------------- | |
| def should_roundabout(pi, vi, pj, vj, | |
| d_trigger=None, t_h=2.0, inflate=1.2): | |
| if d_trigger is None: | |
| d_trigger = 2.0 * collision_distance | |
| pij = pj - pi | |
| vij = vi - vj | |
| d = np.linalg.norm(pij) | |
| if d > d_trigger: | |
| return False | |
| closing = np.dot(pij, vij) < 0.0 | |
| if not closing: | |
| return False | |
| denom = np.dot(vij, vij) + EPS | |
| t_c = - np.dot(pij, vij) / denom | |
| if (t_c <= 0.0) or (t_c > t_h): | |
| return False | |
| d_min = np.linalg.norm(pij + t_c * vij) | |
| return d_min < inflate * collision_distance | |
| # ----------------------------------------------------------------------------- | |
| # CBF-QP velocity filter | |
| # ----------------------------------------------------------------------------- | |
| def cbf_filter_velocity(u_des, A_list, b_list): | |
| if not A_list: | |
| return u_des.copy() | |
| A = np.vstack(A_list) | |
| b = np.array(b_list) | |
| if np.all(A @ u_des >= b - 1e-10): | |
| return u_des.copy() | |
| candidates = [] | |
| for k in range(len(b)): | |
| a = A[k] | |
| norm2 = a @ a | |
| if norm2 < 1e-12: continue | |
| lam = (b[k] - a @ u_des) / norm2 | |
| if lam > 0: | |
| u_k = u_des + lam * a | |
| if np.all(A @ u_k >= b - 1e-10): candidates.append(u_k) | |
| m = len(b) | |
| for i in range(m): | |
| for j in range(i+1, m): | |
| A_act = np.vstack([A[i], A[j]]) | |
| rhs = np.array([b[i], b[j]]) | |
| M = A_act @ A_act.T | |
| try: | |
| lam = np.linalg.solve(M, rhs - A_act @ u_des) | |
| u_ij = u_des + A_act.T @ lam | |
| if np.all(A @ u_ij >= b - 1e-10): candidates.append(u_ij) | |
| except np.linalg.LinAlgError: | |
| pass | |
| if not candidates: | |
| viol = A @ u_des - b | |
| k = int(np.argmin(viol)) | |
| a = A[k]; norm2 = a @ a | |
| lam = (b[k] - a @ u_des) / (norm2 + 1e-12) | |
| u_k = u_des + max(0.0, lam) * a | |
| candidates.append(u_k) | |
| dists = [np.linalg.norm(u - u_des) for u in candidates] | |
| return candidates[int(np.argmin(dists))] | |
| # ----------------------------------------------------------------------------- | |
| # Evolutionary VFCBF constraints | |
| # ----------------------------------------------------------------------------- | |
| def calculate_vfcbf_constraint_params(p_i, p_obs, tau, beta): | |
| x_vec = p_i - p_obs | |
| r_sq = x_vec @ x_vec | |
| if r_sq < 1e-9: return None, None | |
| r = np.sqrt(r_sq) | |
| k = beta * np.exp(-r_sq / (4.0 * tau)) | |
| h_norm = k * r | |
| mod_factor = k**2 * (1.0 - r_sq / (2.0 * tau)) | |
| a_vec = mod_factor * x_vec | |
| return a_vec, h_norm | |
| def build_evolutionary_vfcbf_constraints(i, positions, velocities, priorities, goals, override_flags): | |
| A_list, b_list = [], [] | |
| pi = positions[i] | |
| di_goal = np.linalg.norm(pi - goals[i]) | |
| goal_dir = goals[i] - pi | |
| gnorm = np.linalg.norm(goal_dir) + EPS | |
| ghat = goal_dir / gnorm | |
| override_active = bool(override_flags[i]) | |
| relax_factor = 1.0 | |
| if di_goal < VFCBF_RELAX_RADIUS: | |
| relax_factor = smoothstep01(di_goal / max(VFCBF_RELAX_RADIUS, EPS)) | |
| # agent-agent constraints | |
| for j in range(len(positions)): | |
| if i == j: continue | |
| pj, vj = positions[j], velocities[j] | |
| rij = pj - pi | |
| dist = np.linalg.norm(rij) | |
| if override_active: | |
| if dist > HARD_SAFETY_ONLY_RADIUS: | |
| continue | |
| else: | |
| if np.dot(rij, ghat) < FOV_COS: | |
| continue | |
| if dist > VFCBF_ACTIVATION_RANGE: | |
| continue | |
| current_tau = VFCBF_TAU | |
| if priorities[i] < priorities[j]: | |
| current_tau *= 0.75 | |
| a_vec, h_norm = calculate_vfcbf_constraint_params(pi, pj, current_tau, VFCBF_BETA) | |
| if a_vec is not None: | |
| gamma_term = (VFCBF_GAMMA_GAIN * relax_factor) * h_norm | |
| b_val = -gamma_term + (a_vec @ vj) | |
| A_list.append(a_vec) | |
| b_list.append(b_val) | |
| # circular NFZ | |
| dist_to_circ_center = np.linalg.norm(pi - NFZ_CIRC_CENTER) | |
| if dist_to_circ_center < NFZ_CIRC_RADIUS + VFCBF_ACTIVATION_RANGE / 2: | |
| direction = (pi - NFZ_CIRC_CENTER) / (dist_to_circ_center + EPS) | |
| p_obs_circ = NFZ_CIRC_CENTER + NFZ_CIRC_RADIUS * direction | |
| a_vec, h_norm = calculate_vfcbf_constraint_params(pi, p_obs_circ, VFCBF_TAU / 2.0, VFCBF_BETA) | |
| if a_vec is not None: | |
| gamma_term = VFCBF_GAMMA_GAIN * h_norm | |
| b_val = -gamma_term | |
| A_list.append(a_vec) | |
| b_list.append(b_val) | |
| # wall NFZ | |
| if not (DOOR_Y_MIN < pi[1] < DOOR_Y_MAX): | |
| p_obs_wall = np.array([NFZ_WALL_X, np.clip(pi[1], -radius, radius)]) | |
| dist_to_wall = np.linalg.norm(pi - p_obs_wall) | |
| if dist_to_wall < VFCBF_ACTIVATION_RANGE / 2: | |
| a_vec, h_norm = calculate_vfcbf_constraint_params(pi, p_obs_wall, VFCBF_TAU / 2.0, VFCBF_BETA) | |
| if a_vec is not None: | |
| gamma_term = VFCBF_GAMMA_GAIN * h_norm | |
| b_val = -gamma_term | |
| A_list.append(a_vec) | |
| b_list.append(b_val) | |
| return A_list, b_list | |
| # ----------------------------------------------------------------------------- | |
| # Nominal velocity (APF-based controller) | |
| # MODIFIED: roundabout removed. We NEVER add the +90° rotated term anymore. | |
| # ----------------------------------------------------------------------------- | |
| def nominal_velocity(i, positions, velocities, goals, passed_through, priority_arr): | |
| pi = positions[i] | |
| vi = velocities[i] | |
| # lane waypoint logic (doorway guidance) | |
| if not passed_through[i]: | |
| subgoal = WAYPOINT_FOR_RIGHT_TRAFFIC if goals[i][0] > NFZ_WALL_X else WAYPOINT_FOR_LEFT_TRAFFIC | |
| else: | |
| subgoal = goals[i] | |
| # attractive term | |
| d = np.linalg.norm(subgoal - pi) | |
| u_attr = attractive_gain * (2.0 * (1.0 - e**(-2.0 * (d**2))) * (subgoal - pi) / (d + EPS)) | |
| # distance to final goal | |
| d_goal = np.linalg.norm(positions[i] - goals[i]) | |
| # If super close to goal, skip repulsion shaping here. | |
| if d_goal < GOAL_OVERRIDE_RADIUS: | |
| return u_attr | |
| # fade repulsion near goal | |
| if REPULSION_FADE_START <= REPULSION_FADE_END: | |
| repulsion_scale = 1.0 | |
| else: | |
| t = (d_goal - REPULSION_FADE_END) / max(REPULSION_FADE_START - REPULSION_FADE_END, EPS) | |
| repulsion_scale = smoothstep01(t) | |
| u_interaction = np.zeros(2) | |
| FOLLOWING_DISTANCE = collision_distance * 3.0 | |
| FOLLOWING_GAIN_FACTOR = 0.75 | |
| dir_to_subgoal = (subgoal - pi) / (np.linalg.norm(subgoal - pi) + EPS) | |
| if repulsion_scale > 0.0: | |
| for j in range(len(positions)): | |
| if j == i: | |
| continue | |
| pj, vj = positions[j], velocities[j] | |
| p_ij_vec = pj - pi | |
| dij = np.linalg.norm(p_ij_vec) | |
| if dij >= sensor_range: | |
| continue | |
| is_in_front = np.dot(dir_to_subgoal, p_ij_vec / (dij + EPS)) > 0.707 | |
| if is_in_front: | |
| # Damped following spring | |
| spring_k = repulsive_gain * FOLLOWING_GAIN_FACTOR | |
| unit_ij = p_ij_vec / (dij + EPS) | |
| pos_term = spring_k * (dij - FOLLOWING_DISTANCE) * unit_ij | |
| rel_speed_along = np.dot(vi - vj, unit_ij) | |
| damping_gain = 0.8 * repulsive_gain * FOLLOWING_GAIN_FACTOR | |
| damp_term = -damping_gain * rel_speed_along * unit_ij | |
| u_interaction += repulsion_scale * (pos_term + damp_term) | |
| else: | |
| # JUST STRAIGHT REPULSION. NO ROUNDABOUT ROTATION. | |
| rep = ((priority_arr[j] / priority_arr[i]) * repulsive_gain * | |
| (e**(-b * (dij - collision_distance)**2)) * | |
| ((pi - pj) / (dij + EPS))) | |
| u_interaction += repulsion_scale * 0.25 * rep | |
| # tiny goal-facing boost near final goal | |
| if d_goal < GOAL_SLOWDOWN_RADIUS: | |
| boost_dir = (goals[i] - pi) | |
| boost_norm = np.linalg.norm(boost_dir) + EPS | |
| u_attr += 0.75 * boost_dir / boost_norm | |
| return u_attr + u_interaction | |
| # ----------------------------------------------------------------------------- | |
| # Doorway pass check | |
| # ----------------------------------------------------------------------------- | |
| def update_passed_through(i, positions): | |
| start_x = initial_start_positions[i, 0] | |
| current_x = positions[i, 0] | |
| if (start_x < NFZ_WALL_X and current_x > NFZ_WALL_X) or \ | |
| (start_x > NFZ_WALL_X and current_x < NFZ_WALL_X): | |
| return True | |
| return False | |
| # ----------------------------------------------------------------------------- | |
| # Optional LLM negotiation bookkeeping | |
| # ----------------------------------------------------------------------------- | |
| uav_contexts = ["Mission"] * number_of_uavs | |
| def negotiation_worker(i, j, state_i, state_j): | |
| if not USE_LLM_NEGOTIATION: return | |
| new_i, new_j, transcripts = negotiate_priority_with_llm(state_i, state_j) | |
| priority[i], priority[j] = new_i, new_j | |
| # ----------------------------------------------------------------------------- | |
| # METRICS | |
| # ----------------------------------------------------------------------------- | |
| def compute_pair_and_ammd(min_distances_matrix): | |
| N = min_distances_matrix.shape[0] | |
| upper = min_distances_matrix[np.triu_indices(N, k=1)] | |
| upper = upper[np.isfinite(upper)] | |
| pair_avg_min = float(np.mean(upper)) if upper.size else float('inf') | |
| sym = np.minimum(min_distances_matrix, min_distances_matrix.T).copy() | |
| np.fill_diagonal(sym, np.inf) | |
| per_uav_min = np.min(sym, axis=1) | |
| finite = per_uav_min[np.isfinite(per_uav_min)] | |
| ammd = float(np.mean(finite)) if finite.size else float('inf') | |
| global_min = float(np.min(finite)) if finite.size else float('inf') | |
| return pair_avg_min, ammd, global_min, per_uav_min | |
| def _do_from_path(path_xy, ds_min=None, eps=1e-9): | |
| P = np.asarray(path_xy, dtype=float) | |
| if P.shape[0] < 3: | |
| return {"TV": 0.0, "NR": 0.0, "L": 0.0, "DO_rad_per_m": 0.0, "DO_deg_per_m": 0.0} | |
| seg = np.diff(P, axis=0) | |
| ds = np.linalg.norm(seg, axis=1) | |
| if ds_min is None: | |
| ds_min = 0.05 * maxv * delt | |
| mask = ds >= ds_min | |
| if np.count_nonzero(mask) < 2: | |
| L_mask = float(np.sum(ds[mask])) if mask.size else 0.0 | |
| return {"TV": 0.0, "NR": 0.0, "L": L_mask, "DO_rad_per_m": 0.0, "DO_deg_per_m": 0.0} | |
| seg = seg[mask] | |
| ds = ds[mask] | |
| theta = np.unwrap(np.arctan2(seg[:, 1], seg[:, 0])) | |
| dtheta = np.diff(theta) | |
| TV = float(np.sum(np.abs(dtheta))) | |
| NR = float(abs(theta[-1] - theta[0])) | |
| L = float(np.sum(ds)) | |
| DO = max(0.0, TV - NR) / max(L, eps) | |
| DO_deg_per_m = DO * (180.0 / np.pi) | |
| return {"TV": TV, "NR": NR, "L": L, "DO_rad_per_m": DO, "DO_deg_per_m": DO_deg_per_m} | |
| def compute_oscillation_metrics(path_history): | |
| per = [_do_from_path(traj) for traj in path_history] | |
| do_vals = np.array([d["DO_rad_per_m"] for d in per], dtype=float) | |
| do_deg = do_vals * (180.0 / np.pi) | |
| def p(x, q): return float(np.nanpercentile(x, q)) if x.size else float("nan") | |
| summary = { | |
| "mean_rad_per_m": float(np.nanmean(do_vals)) if do_vals.size else float("nan"), | |
| "median_rad_per_m": float(np.nanmedian(do_vals)) if do_vals.size else float("nan"), | |
| "p90_rad_per_m": p(do_vals, 90), | |
| "max_rad_per_m": float(np.nanmax(do_vals)) if do_vals.size else float("nan"), | |
| "mean_deg_per_m": float(np.nanmean(do_deg)) if do_deg.size else float("nan"), | |
| "median_deg_per_m": float(np.nanmedian(do_deg)) if do_deg.size else float("nan"), | |
| "p90_deg_per_m": p(do_deg, 90), | |
| "max_deg_per_m": float(np.nanmax(do_deg)) if do_deg.size else float("nan"), | |
| } | |
| return per, summary | |
| def _path_length(P): | |
| P = np.asarray(P, float) | |
| if len(P) < 2: return 0.0 | |
| seg = np.diff(P, axis=0) | |
| return float(np.sum(np.linalg.norm(seg, axis=1))) | |
| def _unwrap_angles(theta): | |
| return np.unwrap(theta) | |
| def _segment_headings(P): | |
| P = np.asarray(P, float) | |
| seg = np.diff(P, axis=0) | |
| ang = np.arctan2(seg[:,1], seg[:,0]) | |
| return _unwrap_angles(ang), np.linalg.norm(seg, axis=1) | |
| def _per_path_smoothness(P): | |
| P = np.asarray(P, float) | |
| L = _path_length(P) | |
| if len(P) < 3 or L < 1e-12: | |
| return 0.0, 0.0, L, 0.0, 0.0 | |
| theta, ds = _segment_headings(P) | |
| dtheta = theta[1:] - theta[:-1] | |
| dtheta = (dtheta + np.pi) % (2*np.pi) - np.pi | |
| ds_local = 0.5 * (ds[1:] + ds[:-1]) | |
| ds_local = np.where(ds_local < 1e-12, 1e-12, ds_local) | |
| kappa = np.abs(dtheta) / ds_local | |
| Sk2_total = float(np.sum((kappa**2) * ds_local)) | |
| HTV_total = float(np.sum(np.abs(dtheta))) | |
| Sk2_over_L = Sk2_total / L | |
| HTV_over_L = HTV_total / L | |
| return Sk2_over_L, HTV_over_L, L, Sk2_total, HTV_total | |
| def fleet_smoothness_index(path_history): | |
| total_L = 0.0 | |
| total_Sk2 = 0.0 | |
| total_HTV = 0.0 | |
| for traj in path_history: | |
| P = np.asarray(traj, float) | |
| Sk2_L, HTV_L, L, Sk2, HTV = _per_path_smoothness(P) | |
| total_L += L | |
| total_Sk2 += Sk2 | |
| total_HTV += HTV | |
| if total_L < 1e-12: | |
| return {"K_bar": 0.0, "H_bar": 0.0, "K_rms": 0.0, "FinalSmoothness": 0.0, "total_L": total_L} | |
| K_bar = total_Sk2 / total_L | |
| H_bar = total_HTV / total_L | |
| K_rms = math.sqrt(max(K_bar, 0.0)) | |
| FinalSmoothness = K_rms + H_bar | |
| return { | |
| "K_bar": K_bar, | |
| "H_bar": H_bar, | |
| "K_rms": K_rms, | |
| "FinalSmoothness": FinalSmoothness, | |
| "total_L": total_L | |
| } | |
| # ----------------------------------------------------------------------------- | |
| # Priority → speed scaling | |
| # ----------------------------------------------------------------------------- | |
| MIN_SPEED_FACTOR = 0.7 | |
| MAX_SPEED_FACTOR = 1.4 | |
| def compute_speed_factors(priorities: np.ndarray) -> np.ndarray: | |
| p = np.asarray(priorities, dtype=float) | |
| p_min = float(np.min(p)) | |
| p_max = float(np.max(p)) | |
| if abs(p_max - p_min) < 1e-9: | |
| return np.ones_like(p) * ((MIN_SPEED_FACTOR + MAX_SPEED_FACTOR) * 0.5) | |
| p_norm = (p - p_min) / (p_max - p_min + EPS) | |
| return MIN_SPEED_FACTOR + p_norm * (MAX_SPEED_FACTOR - MIN_SPEED_FACTOR) | |
| # ----------------------------------------------------------------------------- | |
| # EXTRA SPEED BOOST (CBF branch only, not used in ORCA) | |
| # ----------------------------------------------------------------------------- | |
| SPEED_BOOST = 1.5 | |
| # ----------------------------------------------------------------------------- | |
| # ORCA Integration Wrapper | |
| # ----------------------------------------------------------------------------- | |
| AGENT_RADIUS = max(0.5 * collision_distance, 1e-3) | |
| ORCA_neighborDist = sensor_range | |
| ORCA_maxNeighbors = max(10, number_of_uavs - 1) | |
| ORCA_timeHorizon = 2.0 | |
| ORCA_timeHorizonObs = 2.0 | |
| # NEW: global slow-down knobs for ORCA | |
| ORCA_SPEED_CAP_FACTOR = 0.5 # scales each agent maxSpeed passed to RVO2 | |
| ORCA_PREF_SLOWDOWN_FACTOR = 0.6 # scales v_pref before giving to ORCA | |
| def _import_rvo2_or_explain(): | |
| try: | |
| import rvo2 | |
| return rvo2 | |
| except Exception as exc: | |
| raise RuntimeError( | |
| f"Could not import rvo2 ({exc}). " | |
| "You need to build Python-RVO2 from https://github.com/sybrenstuvel/Python-RVO2\n" | |
| ) | |
| class ORCAWrapper: | |
| def __init__(self, positions, per_uav_maxv, dt): | |
| rvo2 = _import_rvo2_or_explain() | |
| # NOTE: we already applied ORCA_SPEED_CAP_FACTOR before calling ORCAWrapper | |
| self.sim = rvo2.PyRVOSimulator( | |
| dt, | |
| ORCA_neighborDist, | |
| ORCA_maxNeighbors, | |
| ORCA_timeHorizon, | |
| ORCA_timeHorizonObs, | |
| AGENT_RADIUS, | |
| max(1e-3, float(np.max(per_uav_maxv))) | |
| ) | |
| self.n = positions.shape[0] | |
| self.agent_ids = [] | |
| for i in range(self.n): | |
| pid = self.sim.addAgent( | |
| tuple(positions[i]), | |
| ORCA_neighborDist, | |
| ORCA_maxNeighbors, | |
| ORCA_timeHorizon, | |
| ORCA_timeHorizonObs, | |
| AGENT_RADIUS, | |
| float(per_uav_maxv[i]), | |
| (0.0, 0.0) | |
| ) | |
| self.agent_ids.append(pid) | |
| # Obstacles: wall segments above/below doorway | |
| self.sim.addObstacle([ | |
| (float(NFZ_WALL_X), float(-radius)), | |
| (float(NFZ_WALL_X), float(DOOR_Y_MIN)), | |
| ]) | |
| self.sim.addObstacle([ | |
| (float(NFZ_WALL_X), float(DOOR_Y_MAX)), | |
| (float(NFZ_WALL_X), float(radius)), | |
| ]) | |
| # Polygonal approximation of circular NFZ | |
| cx, cy = float(NFZ_CIRC_CENTER[0]), float(NFZ_CIRC_CENTER[1]) | |
| cr = float(NFZ_CIRC_RADIUS) | |
| N_VERT = 32 | |
| circ_poly = [] | |
| for k in range(N_VERT): | |
| ang = 2.0 * math.pi * (k / N_VERT) | |
| circ_poly.append((cx + cr * math.cos(ang), cy + cr * math.sin(ang))) | |
| self.sim.addObstacle(circ_poly) | |
| self.sim.processObstacles() | |
| def set_pref_vels(self, v_pref): | |
| for i, pid in enumerate(self.agent_ids): | |
| v = np.asarray(v_pref[i], float) | |
| spd = np.linalg.norm(v) | |
| if spd > 1e-9: | |
| vmax = self.sim.getAgentMaxSpeed(pid) | |
| if spd > vmax: | |
| v = (vmax / spd) * v | |
| self.sim.setAgentPrefVelocity(pid, (float(v[0]), float(v[1]))) | |
| def step(self): | |
| self.sim.doStep() | |
| def get_all_positions(self): | |
| P = np.zeros((self.n, 2), float) | |
| for i, pid in enumerate(self.agent_ids): | |
| x, y = self.sim.getAgentPosition(pid) | |
| P[i, 0], P[i, 1] = x, y | |
| return P | |
| def get_all_velocities(self): | |
| V = np.zeros((self.n, 2), float) | |
| for i, pid in enumerate(self.agent_ids): | |
| vx, vy = self.sim.getAgentVelocity(pid) | |
| V[i, 0], V[i, 1] = vx, vy | |
| return V | |
| # ----------------------------------------------------------------------------- | |
| # MAIN | |
| # ----------------------------------------------------------------------------- | |
| if __name__ == "__main__": | |
| if 'number_of_uavs' not in globals(): | |
| number_of_uavs = 12 | |
| check = np.ones(number_of_uavs) | |
| # Initial scene | |
| start_positions, goal_positions = initialize_uav_positions_and_goals(number_of_uavs, radius) | |
| initial_start_positions = start_positions.copy() | |
| plot_initial_positions_and_goals(start_positions, goal_positions, number_of_uavs, radius) | |
| # State arrays | |
| (velocity_v, start_positions, goal_positions, | |
| acceleration, completed_status) = initialize_uav_properties(number_of_uavs, radius) | |
| # Priorities and per-UAV speed scaling | |
| priority = set_uav_priorities(number_of_uavs, priority_type) | |
| speed_factors = compute_speed_factors(priority) | |
| # Base per-agent max speed | |
| per_uav_maxv = maxv * speed_factors | |
| # Apply our global ORCA slow-down cap | |
| if USE_ORCA: | |
| per_uav_maxv = per_uav_maxv * ORCA_SPEED_CAP_FACTOR | |
| # ORCA simulator init | |
| if USE_ORCA: | |
| orca = ORCAWrapper(start_positions, per_uav_maxv, delt) | |
| # Mission timing | |
| t0 = time.time() | |
| uav_start_times = {i: t0 for i in range(number_of_uavs)} | |
| mission_times = np.zeros(number_of_uavs) | |
| path_history = [[start_positions[i].copy()] for i in range(number_of_uavs)] | |
| colors = plt.get_cmap('Dark2')(np.linspace(0, 1, number_of_uavs)) | |
| passed_through = np.zeros(number_of_uavs, dtype=bool) | |
| goal_override = np.zeros(number_of_uavs, dtype=bool) | |
| previous_u_cmd = np.zeros_like(velocity_v) | |
| min_distances_matrix = np.full((number_of_uavs, number_of_uavs), np.inf) | |
| r = 0 | |
| plt.ion() | |
| while not np.array_equal(completed_status, check): | |
| r += 1 | |
| # ------------------------------------------------- | |
| # 1) Nominal velocities + mission bookkeeping | |
| # ------------------------------------------------- | |
| u_nom = np.zeros_like(velocity_v) | |
| for i in range(number_of_uavs): | |
| if reached_goal(i, start_positions, goal_positions): | |
| if completed_status[i] != 1: | |
| completed_status[i] = 1 | |
| velocity_v[i] = np.zeros(2) | |
| if mission_times[i] == 0: | |
| mission_times[i] = time.time() - uav_start_times[i] | |
| print(f"✅ UAV {i} reached goal in {mission_times[i]:.2f} s.") | |
| continue | |
| if not passed_through[i] and update_passed_through(i, start_positions): | |
| passed_through[i] = True | |
| u_nom[i] = nominal_velocity(i, start_positions, velocity_v, goal_positions, passed_through, priority) | |
| # ------------------------------------------------- | |
| # 2) ORCA branch vs CBF branch | |
| # ------------------------------------------------- | |
| if USE_ORCA: | |
| # hysteresis on goal override | |
| for i in range(number_of_uavs): | |
| if completed_status[i] == 1: | |
| continue | |
| d_i_goal = np.linalg.norm(start_positions[i] - goal_positions[i]) | |
| if not goal_override[i] and d_i_goal < GOAL_OVERRIDE_ENTER: | |
| goal_override[i] = True | |
| elif goal_override[i] and d_i_goal > GOAL_OVERRIDE_EXIT: | |
| goal_override[i] = False | |
| v_pref = np.zeros_like(u_nom) | |
| for i in range(number_of_uavs): | |
| if completed_status[i] == 1: | |
| continue | |
| if goal_override[i]: | |
| dir_vec = goal_positions[i] - start_positions[i] | |
| n = np.linalg.norm(dir_vec) + EPS | |
| u_goal_des = K_GOAL_OVR * dir_vec / n | |
| u_des_raw = speed_factors[i] * u_goal_des | |
| else: | |
| u_des_raw = speed_factors[i] * u_nom[i] | |
| # smoothing | |
| v_pref[i] = U_CMD_SMOOTHING * u_des_raw + (1.0 - U_CMD_SMOOTHING) * previous_u_cmd[i] | |
| # slow down as we near the goal to help stop cleanly | |
| d_goal = np.linalg.norm(start_positions[i] - goal_positions[i]) | |
| if d_goal < GOAL_SLOWDOWN_RADIUS: | |
| alpha = smoothstep01(d_goal / max(GOAL_SLOWDOWN_RADIUS, EPS)) | |
| v_pref[i] = alpha * v_pref[i] | |
| # global slowdown for ORCA (calmer motion) | |
| v_pref *= ORCA_PREF_SLOWDOWN_FACTOR | |
| # step ORCA | |
| orca.set_pref_vels(v_pref) | |
| orca.step() | |
| # sync state | |
| start_positions = orca.get_all_positions() | |
| velocity_v = orca.get_all_velocities() | |
| previous_u_cmd = v_pref.copy() | |
| else: | |
| # --------- Original CBF branch ----------- | |
| u_cmd = np.zeros_like(velocity_v) | |
| for i in range(number_of_uavs): | |
| if completed_status[i] == 1: | |
| u_cmd[i] = np.zeros(2) | |
| continue | |
| d_i_goal = np.linalg.norm(start_positions[i] - goal_positions[i]) | |
| if not goal_override[i] and d_i_goal < GOAL_OVERRIDE_ENTER: | |
| goal_override[i] = True | |
| elif goal_override[i] and d_i_goal > GOAL_OVERRIDE_EXIT: | |
| goal_override[i] = False | |
| A_list, b_list = build_evolutionary_vfcbf_constraints( | |
| i, start_positions, velocity_v, priority, goal_positions, goal_override | |
| ) | |
| if goal_override[i]: | |
| dir_vec = goal_positions[i] - start_positions[i] | |
| n = np.linalg.norm(dir_vec) + EPS | |
| u_goal_des = K_GOAL_OVR * dir_vec / n | |
| u_des_raw = speed_factors[i] * u_goal_des | |
| else: | |
| u_des_raw = speed_factors[i] * u_nom[i] | |
| u_des_smoothed = U_CMD_SMOOTHING * u_des_raw + (1.0 - U_CMD_SMOOTHING) * previous_u_cmd[i] | |
| u_safe = cbf_filter_velocity(u_des_smoothed, A_list, b_list) | |
| u_cmd[i] = SPEED_BOOST * u_safe | |
| for i in range(number_of_uavs): | |
| if completed_status[i] == 1: | |
| continue | |
| d_goal = np.linalg.norm(start_positions[i] - goal_positions[i]) | |
| if d_goal < GOAL_SLOWDOWN_RADIUS: | |
| alpha = smoothstep01((d_goal) / max(GOAL_SLOWDOWN_RADIUS, EPS)) | |
| u_cmd[i] = alpha * u_cmd[i] | |
| acceleration = -kv_track * (velocity_v - u_cmd) | |
| velocity_v = (velocity_v + acceleration * delt) * (1 - damping_coefficient * delt) | |
| for i in range(number_of_uavs): | |
| speed = np.linalg.norm(velocity_v[i]) | |
| vmax_i = per_uav_maxv[i] | |
| if speed > vmax_i: | |
| velocity_v[i] = (vmax_i / (speed + EPS)) * velocity_v[i] | |
| if pert: | |
| perturb_velocity(velocity_v) | |
| start_positions = start_positions + velocity_v * delt | |
| previous_u_cmd = u_cmd.copy() | |
| # ------------------------------------------------- | |
| # 3) Metrics bookkeeping | |
| # ------------------------------------------------- | |
| for i in range(number_of_uavs): | |
| path_history[i].append(start_positions[i].copy()) | |
| for j in range(i + 1, number_of_uavs): | |
| dist = np.linalg.norm(start_positions[i] - start_positions[j]) | |
| if dist < min_distances_matrix[i, j]: | |
| min_distances_matrix[i, j] = dist | |
| # ------------------------------------------------- | |
| # 4) Render | |
| # ------------------------------------------------- | |
| plt.clf() | |
| ax = plt.gca() | |
| for spine in ax.spines.values(): | |
| spine.set_linewidth(3) | |
| spine.set_edgecolor('black') | |
| ax.add_patch(plt.Circle((0, 0), radius, fill=False, linewidth=3, edgecolor='black')) | |
| ax.add_patch(plt.Circle((0, 0), 1.2 * radius, fill=False, linewidth=3, edgecolor='black')) | |
| ax.plot([NFZ_WALL_X, NFZ_WALL_X], [-radius, DOOR_Y_MIN], color='red', linewidth=10, alpha=0.2) | |
| ax.plot([NFZ_WALL_X, NFZ_WALL_X], [DOOR_Y_MAX, radius], color='red', linewidth=10, alpha=0.2) | |
| ax.add_patch(plt.Circle(NFZ_CIRC_CENTER, NFZ_CIRC_RADIUS, color='red', alpha=0.2)) | |
| for i in range(number_of_uavs): | |
| ph = np.array(path_history[i]) | |
| plt.plot(ph[:, 0], ph[:, 1], linestyle='-', linewidth=1.5, color=colors[i], alpha=0.6) | |
| plt.scatter(start_positions[i, 0], start_positions[i, 1], c=[colors[i]], s=80, marker='o') | |
| plt.scatter(goal_positions[:, 0], goal_positions[:, 1], c=colors, marker='*', s=250) | |
| plt.xlim(-1.2 * radius, 1.2 * radius) | |
| plt.ylim(-1.2 * radius, 1.2 * radius) | |
| plt.xlabel('x (meters)', fontsize=32) | |
| plt.ylabel('y (meters)', fontsize=32) | |
| plt.xticks(fontsize=28) | |
| plt.yticks(fontsize=28) | |
| title_backend = "ORCA (RVO2)" if USE_ORCA else "CBF-safe smoothing" | |
| plt.title(f"Simultaneous start; speed scaled by priority ({title_backend})", fontsize=22) | |
| plt.subplots_adjust(bottom=0.20) | |
| plt.pause(0.01) | |
| # ===================================================================== | |
| # POST-SIM METRICS / CSV | |
| # ===================================================================== | |
| output_filename = "uav_trajectories.csv" | |
| try: | |
| with open(output_filename, 'w') as f: | |
| f.write("Timestep,UAV_ID,X,Y\n") | |
| for uav_id, trajectory in enumerate(path_history): | |
| for timestep, coordinates in enumerate(trajectory): | |
| x, y = coordinates | |
| f.write(f"{timestep},{uav_id},{x},{y}\n") | |
| print(f"✅ Trajectory data saved to {output_filename}") | |
| except IOError as e: | |
| print(f"❌ Error writing to {output_filename}: {e}") | |
| finished = mission_times[mission_times > 0] | |
| avg_time_to_goal = float(np.mean(finished)) if finished.size else float('nan') | |
| pair_avg_min, ammd, global_min_sep, per_uav_min = compute_pair_and_ammd(min_distances_matrix) | |
| per_do, do_summary = compute_oscillation_metrics(path_history) | |
| total_L = sum(d["L"] for d in per_do) | |
| total_excess_turn = sum((d["TV"] - d["NR"]) for d in per_do) | |
| fleet_DO_rad_per_m = (total_excess_turn / max(total_L, 1e-9)) if total_L > 0 else 0.0 | |
| fleet_DO_deg_per_m = fleet_DO_rad_per_m * (180.0 / np.pi) | |
| per_uav_do_deg = ", ".join(f"{d['DO_deg_per_m']:.2f}" for d in per_do) | |
| fleet = fleet_smoothness_index(path_history) | |
| print("\n==================== METRICS ====================") | |
| print(f"Average Time to Goal (s): {avg_time_to_goal:.3f}") | |
| print(f"Pair-averaged Minimum Separation (yours): {pair_avg_min:.3f}") | |
| print(f"AMMD (per-UAV average of min sep): {ammd:.3f}") | |
| print(f"Global Minimum Separation: {global_min_sep:.3f}") | |
| print("Per-UAV Min Separation:", ", ".join(f"{d:.3f}" for d in per_uav_min)) | |
| print(f"Degree of Oscillation (fleet length-weighted): {fleet_DO_rad_per_m:.4f} rad/m ({fleet_DO_deg_per_m:.2f} °/m)") | |
| print(f"Degree of Oscillation (median): {do_summary['median_rad_per_m']:.4f} rad/m ({do_summary['median_deg_per_m']:.2f} °/m)") | |
| print(f"Degree of Oscillation (P90): {do_summary['p90_rad_per_m']:.4f} rad/m ({do_summary['p90_deg_per_m']:.2f} °/m)") | |
| print(f"Degree of Oscillation (max): {do_summary['max_rad_per_m']:.4f} rad/m ({do_summary['max_deg_per_m']:.2f} °/m)") | |
| print(f"Per-UAV DO (deg/m): {per_uav_do_deg}") | |
| print("\n--- Fleet Smoothness (unit-aligned) ---") | |
| print(f"K̄ = ∫κ²/L : {fleet['K_bar']:.6f} [1/m²]") | |
| print(f"K_rms = sqrt(K̄) : {fleet['K_rms']:.6f} [1/m]") | |
| print(f"H̄ = HTV/L : {fleet['H_bar']:.6f} [1/m]") | |
| print(f"FinalSmoothness = K_rms+H̄: {fleet['FinalSmoothness']:.6f} [lower = smoother]") | |
| print(f"Total fleet length (m) : {fleet['total_L']:.2f}") | |
| print("=================================================\n") | |
| try: | |
| with open("oscillation.csv", "w") as f: | |
| f.write("uav_id,TV_rad,NR_rad,L_m,DO_rad_per_m,DO_deg_per_m\n") | |
| for idx, d in enumerate(per_do): | |
| f.write(f"{idx},{d['TV']:.8f},{d['NR']:.8f},{d['L']:.6f},{d['DO_rad_per_m']:.8f},{d['DO_deg_per_m']:.8f}\n") | |
| with open("oscillation_summary.csv", "w") as f: | |
| f.write("fleet_rad_per_m,fleet_deg_per_m,median_rad_per_m,p90_rad_per_m,max_rad_per_m,median_deg_per_m,p90_deg_per_m,max_deg_per_m,total_length_m\n") | |
| f.write( | |
| f"{fleet_DO_rad_per_m:.8f},{fleet_DO_deg_per_m:.8f}," | |
| f"{do_summary['median_rad_per_m']:.8f},{do_summary['p90_rad_per_m']:.8f},{do_summary['max_rad_per_m']:.8f}," | |
| f"{do_summary['median_deg_per_m']:.8f},{do_summary['p90_deg_per_m']:.8f},{do_summary['max_deg_per_m']:.8f}," | |
| f"{total_L:.6f}\n" | |
| ) | |
| print("✅ Oscillation metrics saved to oscillation.csv and oscillation_summary.csv") | |
| except IOError as e: | |
| print(f"❌ Error writing oscillation CSVs: {e}") | |
| try: | |
| with open("smoothness.csv", "w") as f: | |
| f.write("fleet_Kbar,fleet_Krms,fleet_Hbar,fleet_FinalSmoothness,total_length\n") | |
| f.write(f"{fleet['K_bar']:.8f},{fleet['K_rms']:.8f},{fleet['H_bar']:.8f},{fleet['FinalSmoothness']:.8f},{fleet['total_L']:.6f}\n") | |
| print("✅ Smoothness metrics saved to smoothness.csv") | |
| except IOError as e: | |
| print(f"❌ Error writing to smoothness.csv: {e}") | |
| final_fairness = calculate_fairness_score(priority, mission_times) | |
| print("-----------------------------------------") | |
| print(f"🏆 Final Fairness Score: {final_fairness:.4f}") | |
| print("-----------------------------------------") | |
| plt.ioff() | |
| plt.show() | |
| print("All UAVs completed:", np.array_equal(np.ones(number_of_uavs), np.ones(number_of_uavs))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment