Created
November 17, 2025 20:40
-
-
Save jaskiratsingh2000/86339c4bbc7908bcf23ca8a3b5b41b19 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import time | |
| import threading | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| from numpy import e | |
| from hyperparameters import ( | |
| spread, noise, radius, delt, pert, b, sensor_range, number_of_uavs, maxv, | |
| attractive_gain, repulsive_gain, collision_distance, clipping_power, seed, priority_type | |
| ) | |
| # ----------------------------------------------------------------------------- | |
| # Toggle: enable/disable LLM-based negotiation globally | |
| # ----------------------------------------------------------------------------- | |
| USE_LLM_NEGOTIATION = False # ← keep False to disable the LLM | |
| if USE_LLM_NEGOTIATION: | |
| from llm_agent import negotiate_priority_with_llm | |
| else: | |
| # No-op stub: returns existing priorities and empty transcript | |
| def negotiate_priority_with_llm(state_i, state_j): | |
| return state_i["current_priority"], state_j["current_priority"], [] | |
| # ----------------------------------------------------------------------------- | |
| # Damping coefficient for convergence | |
| # ----------------------------------------------------------------------------- | |
| damping_coefficient = 1.0 | |
| # ----------------------------------------------------------------------------- | |
| # Minimum clipped speed during negotiation | |
| # ----------------------------------------------------------------------------- | |
| MIN_CLIP_SPEED = 0.5 * maxv | |
| # ----------------------------------------------------------------------------- | |
| # <<< CHANGED: Motion throttles (physics + optional visual slow-mo) | |
| # ----------------------------------------------------------------------------- | |
| SPEED_SCALE = 0.45 # scales all accelerations → smaller = slower motion | |
| VEL_CAP_SCALE = 0.60 # scales max allowed speed → smaller = lower top speed | |
| SLOW_MO = 1.0 # multiplies rendering pause; set >1.0 for extra slow animation | |
| global_data_array = np.zeros((7, 13)) | |
| # Seed for reproducibility | |
| np.random.seed(seed) | |
| print("Random seed set to:", seed) | |
| # ----------------------------------------------------------------------------- | |
| # No-Fly Zone (NFZ) Parameters | |
| # ----------------------------------------------------------------------------- | |
| # --- Doorway / Vertical Wall NFZ --- | |
| NFZ_WALL_X = 0.0 | |
| DOOR_HEIGHT = collision_distance * 8 * 1.2 | |
| DOOR_Y_MIN = -DOOR_HEIGHT / 2 | |
| DOOR_Y_MAX = DOOR_Y_MAX = DOOR_HEIGHT / 2 | |
| DOOR_CENTER = np.array([NFZ_WALL_X, 0.0]) | |
| WALL_AVOIDANCE_MARGIN = radius * 0.07 | |
| WALL_AVOIDANCE_GAIN = repulsive_gain * 0.3 | |
| # --- Circular NFZ --- | |
| NFZ_CIRC_CENTER = np.array([radius * 0.5, radius * 0.4]) | |
| NFZ_CIRC_RADIUS = radius * 0.2 | |
| # You also define stronger NFZ gains in hyperparameters; we’ll use those if present | |
| try: | |
| from hyperparameters import NFZ_CIRC_AVOIDANCE_GAIN as NFZ_CIRC_AVOID_GAIN_HARD | |
| from hyperparameters import NFZ_CIRC_TANGENTIAL_GAIN as NFZ_CIRC_TAN_GAIN_HARD | |
| except Exception: | |
| NFZ_CIRC_AVOID_GAIN_HARD = attractive_gain * 5.0 | |
| NFZ_CIRC_TAN_GAIN_HARD = attractive_gain * 3.0 | |
| NFZ_CIRC_AVOIDANCE_MARGIN = radius * 0.07 | |
| # ============================================================================= | |
| # --- START: FAIRNESS METRIC IMPLEMENTATION --- | |
| # ============================================================================= | |
| def calculate_fairness_score(priorities, mission_times): | |
| """ | |
| Fairness = Σ_i (priority_i / time_to_goal_i) | |
| """ | |
| total_fairness_score = 0.0 | |
| print("\n--- Calculating Final Fairness Score ---") | |
| for i in range(len(priorities)): | |
| p_i = priorities[i] | |
| t_i = mission_times[i] | |
| if t_i > 0: | |
| score_i = p_i / t_i | |
| total_fairness_score += score_i | |
| print(f"UAV {i}: Priority={p_i:<4.1f}, Time={t_i:<5.2f}s => Score contribution: {score_i:.2f}") | |
| else: | |
| print(f"UAV {i}: Mission time was zero, score contribution is 0.") | |
| return total_fairness_score | |
| # ============================================================================= | |
| # --- END: FAIRNESS METRIC IMPLEMENTATION --- | |
| # ============================================================================= | |
| # ============================================================================= | |
| # --- START: CODE-1 METRICS HELPERS (pair-avg-min, AMMD, global min) --- | |
| # ============================================================================= | |
| def compute_pair_and_ammd(min_distances_matrix): | |
| """ | |
| Metrics: pair_avg_min, ammd, global_min | |
| Returns: pair_avg_min, ammd, global_min, per_uav_min | |
| """ | |
| N = min_distances_matrix.shape[0] | |
| upper = min_distances_matrix[np.triu_indices(N, k=1)] | |
| upper = upper[np.isfinite(upper)] | |
| pair_avg_min = float(np.mean(upper)) if upper.size else float('inf') | |
| sym = np.minimum(min_distances_matrix, min_distances_matrix.T).copy() | |
| np.fill_diagonal(sym, np.inf) | |
| per_uav_min = np.min(sym, axis=1) | |
| finite = per_uav_min[np.isfinite(per_uav_min)] | |
| ammd = float(np.mean(finite)) if finite.size else float('inf') | |
| global_min = float(np.min(finite)) if finite.size else float('inf') | |
| return pair_avg_min, ammd, global_min, per_uav_min | |
| # ============================================================================= | |
| # --- END: CODE-1 METRICS HELPERS --- | |
| # ============================================================================= | |
| # ============================================================================= | |
| # --- START: SMOOTHNESS METRICS (K̄, H̄, K_rms, FinalSmoothness) --- | |
| # ============================================================================= | |
| def _path_length(P): | |
| P = np.asarray(P, float) | |
| if len(P) < 2: return 0.0 | |
| seg = np.diff(P, axis=0) | |
| return float(np.sum(np.linalg.norm(seg, axis=1))) | |
| def _unwrap_angles(theta): | |
| return np.unwrap(theta) | |
| def _segment_headings(P): | |
| P = np.asarray(P, float) | |
| seg = np.diff(P, axis=0) | |
| ang = np.arctan2(seg[:,1], seg[:,0]) | |
| return _unwrap_angles(ang), np.linalg.norm(seg, axis=1) | |
| def _per_path_smoothness(P): | |
| """ | |
| Returns: (Sk2_over_L, HTV_over_L, L, Sk2_total, HTV_total) | |
| Sk2_over_L = (1/L) * sum( (|dθ|/ds_local)^2 * ds_local ) | |
| HTV_over_L = (sum |dθ|) / L | |
| """ | |
| P = np.asarray(P, float) | |
| L = _path_length(P) | |
| if len(P) < 3 or L < 1e-12: | |
| return 0.0, 0.0, L, 0.0, 0.0 | |
| theta, ds = _segment_headings(P) # len N-1 | |
| dtheta = theta[1:] - theta[:-1] # len N-2 | |
| dtheta = (dtheta + np.pi) % (2*np.pi) - np.pi | |
| ds_local = 0.5 * (ds[1:] + ds[:-1]) # average adjacent segment lengths | |
| ds_local = np.where(ds_local < 1e-12, 1e-12, ds_local) | |
| kappa = np.abs(dtheta) / ds_local | |
| Sk2_total = float(np.sum((kappa**2) * ds_local)) # ≈ ∫ κ^2 ds | |
| HTV_total = float(np.sum(np.abs(dtheta))) # ≈ ∫ |dθ| | |
| Sk2_over_L = Sk2_total / L | |
| HTV_over_L = HTV_total / L | |
| return Sk2_over_L, HTV_over_L, L, Sk2_total, HTV_total | |
| def fleet_smoothness_index(path_history): | |
| """ | |
| Returns: K_bar [1/m^2], H_bar [1/m], K_rms [1/m], FinalSmoothness [1/m], total_L, per_uav | |
| where FinalSmoothness = K_rms + H_bar. | |
| """ | |
| total_L = 0.0 | |
| total_Sk2 = 0.0 | |
| total_HTV = 0.0 | |
| per_uav = [] | |
| for traj in path_history: | |
| P = np.asarray(traj, float) | |
| Sk2_L, HTV_L, L, Sk2, HTV = _per_path_smoothness(P) | |
| per_uav.append((Sk2_L, HTV_L, L)) | |
| total_L += L | |
| total_Sk2 += Sk2 | |
| total_HTV += HTV | |
| if total_L < 1e-12: | |
| return { | |
| "K_bar": 0.0, "H_bar": 0.0, "K_rms": 0.0, "FinalSmoothness": 0.0, | |
| "total_L": total_L, "per_uav": per_uav | |
| } | |
| K_bar = total_Sk2 / total_L | |
| H_bar = total_HTV / total_L | |
| K_rms = np.sqrt(max(K_bar, 0.0)) | |
| FinalSmoothness = K_rms + H_bar | |
| return { | |
| "K_bar": K_bar, "H_bar": H_bar, "K_rms": K_rms, | |
| "FinalSmoothness": FinalSmoothness, | |
| "total_L": total_L, "per_uav": per_uav | |
| } | |
| # ============================================================================= | |
| # --- END: SMOOTHNESS METRICS --- | |
| # ============================================================================= | |
| # ============================================================================= | |
| # --- START: DEGREE OF OSCILLATION (DO) HELPERS (added) --- | |
| # ============================================================================= | |
| def _do_from_path(path_xy, ds_min=None, eps=1e-9): | |
| """ | |
| Per-UAV Degree of Oscillation: | |
| DO = (sum |Δθ| - |θ_N - θ_0|) / L [rad/m] | |
| where θ are segment headings along the path and L is total path length. | |
| Tiny steps are skipped to avoid noise. | |
| """ | |
| P = np.asarray(path_xy, dtype=float) | |
| if P.shape[0] < 3: | |
| return {"TV": 0.0, "NR": 0.0, "L": 0.0, "DO_rad_per_m": 0.0, "DO_deg_per_m": 0.0} | |
| seg = np.diff(P, axis=0) | |
| ds = np.linalg.norm(seg, axis=1) | |
| if ds_min is None: | |
| ds_min = 0.05 * maxv * delt # ignore tiny jitters | |
| mask = ds >= ds_min | |
| if np.count_nonzero(mask) < 2: | |
| L_mask = float(np.sum(ds[mask])) if mask.size else 0.0 | |
| return {"TV": 0.0, "NR": 0.0, "L": L_mask, "DO_rad_per_m": 0.0, "DO_deg_per_m": 0.0} | |
| seg = seg[mask] | |
| ds = ds[mask] | |
| theta = np.unwrap(np.arctan2(seg[:, 1], seg[:, 0])) | |
| dtheta = np.diff(theta) | |
| TV = float(np.sum(np.abs(dtheta))) # total turning | |
| NR = float(abs(theta[-1] - theta[0])) # net rotation | |
| L = float(np.sum(ds)) | |
| DO = max(0.0, TV - NR) / max(L, eps) # rad/m | |
| return {"TV": TV, "NR": NR, "L": L, "DO_rad_per_m": DO, "DO_deg_per_m": np.degrees(DO)} | |
| def compute_oscillation_metrics(path_history): | |
| """ | |
| Compute per-UAV DO and distribution stats (median, P90, max). | |
| """ | |
| per = [_do_from_path(traj) for traj in path_history] | |
| do_vals = np.array([d["DO_rad_per_m"] for d in per], dtype=float) | |
| do_deg = np.degrees(do_vals) | |
| def p(x, q): return float(np.nanpercentile(x, q)) if x.size else float("nan") | |
| summary = { | |
| "median_rad_per_m": float(np.nanmedian(do_vals)) if do_vals.size else float("nan"), | |
| "p90_rad_per_m": p(do_vals, 90), | |
| "max_rad_per_m": float(np.nanmax(do_vals)) if do_vals.size else float("nan"), | |
| "median_deg_per_m": float(np.nanmedian(do_deg)) if do_deg.size else float("nan"), | |
| "p90_deg_per_m": p(do_deg, 90), | |
| "max_deg_per_m": float(np.nanmax(do_deg)) if do_deg.size else float("nan"), | |
| } | |
| return per, summary | |
| # ============================================================================= | |
| # --- END: DEGREE OF OSCILLATION (DO) HELPERS --- | |
| # ============================================================================= | |
| # Clear old output files | |
| def clear_previous_output_files(): | |
| for file_name in ['time.csv', 'priority.csv', 'avg.csv', 'uav_trajectories.csv', 'smoothness.csv']: | |
| if os.path.isfile(file_name): | |
| os.remove(file_name) | |
| clear_previous_output_files() | |
| # Initialize start and goal positions | |
| def initialize_uav_positions_and_goals(number_of_uavs, radius): | |
| start_positions = [] | |
| goal_positions = [] | |
| for i in range(1, number_of_uavs // 4 + 1): | |
| position1 = (np.cos(spread * (i / (number_of_uavs // 4))), | |
| np.sin(spread * (i / (number_of_uavs // 4)))) | |
| position2 = (-position1[0], position1[1]) | |
| position3 = (-position1[0], -position1[1]) | |
| position4 = (position1[0], -position1[1]) | |
| start_positions.extend([position1, position2, position3, position4]) | |
| goal_positions.extend([position2, position1, position4, position3]) | |
| start_positions = np.array(start_positions) * radius | |
| goal_positions = np.array(goal_positions) * radius * 1.2 | |
| return start_positions, goal_positions | |
| # Plot initial positions, goals, and both NFZs | |
| def plot_initial_positions_and_goals(start_positions, goal_positions, number_of_uavs, radius): | |
| cmap = plt.get_cmap('Dark2') | |
| colors = [cmap(i) for i in np.linspace(0, 1, number_of_uavs)] | |
| fig, ax = plt.subplots(figsize=(20, 10)) | |
| ax.scatter(start_positions[:, 0], start_positions[:, 1], c=colors, s=200, marker='o') | |
| ax.scatter(goal_positions[:, 0], goal_positions[:, 1], c=colors, marker='*', s=250) | |
| ax.set_xlim(-1.2 * radius, 1.2 * radius) | |
| ax.set_ylim(-1.2 * radius, 1.2 * radius) | |
| ax.set_xlabel('x', fontsize=24) | |
| ax.set_ylabel('y', fontsize=24) | |
| ax.tick_params(axis='both', labelsize=20) | |
| for spine in ax.spines.values(): | |
| spine.set_linewidth(3) | |
| spine.set_edgecolor('black') | |
| ax.add_patch(plt.Circle((0, 0), radius, fill=False, linewidth=3, edgecolor='black')) | |
| ax.add_patch(plt.Circle((0, 0), 1.2 * radius, fill=False, linewidth=3, edgecolor='black')) | |
| # Doorway NFZ | |
| ax.plot([NFZ_WALL_X, NFZ_WALL_X], [-radius, DOOR_Y_MIN], color='red', linewidth=10, alpha=0.2) | |
| ax.plot([NFZ_WALL_X, NFZ_WALL_X], [DOOR_Y_MAX, radius], color='red', linewidth=10, alpha=0.2) | |
| # Circular NFZ | |
| ax.add_patch(plt.Circle(NFZ_CIRC_CENTER, NFZ_CIRC_RADIUS, color='red', alpha=0.2, label='No-Fly Zone')) | |
| plt.savefig("Integrated_NFZ_initial_positions.png", dpi=300, bbox_inches='tight') | |
| # Initialize UAV properties | |
| def initialize_uav_properties(number_of_uavs, radius): | |
| velocity_u = np.zeros((number_of_uavs, 2)) # not used directly but kept | |
| velocity_v = np.zeros((number_of_uavs, 2)) | |
| start_positions, goal_positions = initialize_uav_positions_and_goals(number_of_uavs, radius) | |
| acceleration = np.zeros((number_of_uavs, 2)) | |
| completed_status = np.zeros(number_of_uavs) | |
| clipping_status = np.zeros(number_of_uavs) | |
| adjusted_max_velocity = maxv * np.ones(number_of_uavs) | |
| alpha = np.ones(number_of_uavs) * 0.3 | |
| return (velocity_u, velocity_v, start_positions, goal_positions, | |
| acceleration, completed_status, clipping_status, | |
| adjusted_max_velocity, alpha) | |
| # Assign initial priorities | |
| def set_uav_priorities(number_of_uavs, priority_type): | |
| priority = 3 * np.ones(number_of_uavs) | |
| with open("priority.csv", 'a') as f: | |
| f.write(", ".join(map(str, priority)) + '\n') | |
| return priority | |
| # Check goal reached | |
| def reached_goal(uav_index, start_positions, goal_positions): | |
| return np.linalg.norm(start_positions[uav_index] - goal_positions[uav_index]) <= 0.5 | |
| # Collision check used for negotiation triggers (kept) | |
| def collision(i, j, start_positions, velocity_v, velocity_u, collision_distance, sensor_range): | |
| rel_v = velocity_v[i] - velocity_v[j] | |
| t = -np.dot(start_positions[i] - start_positions[j], rel_v) / (np.dot(rel_v, rel_v) + 1e-6) | |
| if t < 0: | |
| return False | |
| proj = (start_positions[i] + velocity_v[i] * t | |
| - start_positions[j] - velocity_v[j] * t) | |
| return (np.sqrt(np.dot(proj, proj)) < collision_distance and | |
| np.linalg.norm(start_positions[i] - start_positions[j]) < sensor_range) | |
| # Clip velocity (speed cap); if LLM negotiation were on, MIN_CLIP_SPEED applies | |
| def clip_velocity(number_of_uavs): | |
| global velocity_v, clipping_status, adjusted_max_velocity | |
| for i in range(number_of_uavs): | |
| base_cap = (MIN_CLIP_SPEED if np.any(clipping_status[i]) else maxv) | |
| adjusted_max_velocity[i] = VEL_CAP_SCALE * base_cap # <<< CHANGED: lower top speed | |
| speed = np.linalg.norm(velocity_v[i]) | |
| if speed > adjusted_max_velocity[i]: | |
| velocity_v[i] = adjusted_max_velocity[i] * velocity_v[i] / (speed + 1e-9) | |
| # Random perturbation (kept) | |
| def perturb_velocity(velocity_v): | |
| for i, vel in enumerate(velocity_v): | |
| x, y = vel | |
| delta_theta = np.random.normal(0, np.pi / 2 ** 10.5) | |
| theta = np.arctan2(y, x) | |
| mag = np.hypot(x, y) | |
| theta += delta_theta | |
| velocity_v[i] = np.array([np.cos(theta) * mag, np.sin(theta) * mag]) | |
| # Rotate vector helper | |
| def rotate_vector(v, angle): | |
| c, s = np.cos(angle), np.sin(angle) | |
| return np.array([v[0]*c - v[1]*s, v[0]*s + v[1]*c]) | |
| # ----------------- DRL-VO-style heading selection (Algorithm 1 style) ----------------- | |
| def wrap_angle(theta): | |
| """Wrap to (-pi, pi].""" | |
| return (theta + np.pi) % (2*np.pi) - np.pi | |
| def angle_to(vec): | |
| return np.arctan2(vec[1], vec[0] + 1e-12) | |
| def union_collision_cones_for_agent(i, positions, velocities): | |
| """ | |
| Build a list of forbidden heading intervals [theta - beta, theta + beta] (in world frame). | |
| Approximate VO cones using geometry only: sin(beta) = (rA + rB)/dist, with (rA+rB)~collision_distance. | |
| Only neighbors within sensor_range contribute. | |
| """ | |
| cones = [] | |
| pos_i = positions[i] | |
| for j in range(len(positions)): | |
| if j == i: | |
| continue | |
| dvec = positions[j] - pos_i | |
| dist = np.linalg.norm(dvec) | |
| if dist < 1e-6 or dist > sensor_range: | |
| continue | |
| sum_radius = collision_distance | |
| s = np.clip(sum_radius / (dist + 1e-9), -1.0, 1.0) | |
| beta = np.arcsin(s) | |
| theta_ij = angle_to(dvec) | |
| th_min = wrap_angle(theta_ij - beta) | |
| th_max = wrap_angle(theta_ij + beta) | |
| cones.append((th_min, th_max)) | |
| return cones | |
| def angle_in_interval(theta, th_min, th_max): | |
| """Check if theta ∈ [th_min, th_max] on the circle.""" | |
| theta = wrap_angle(theta); th_min = wrap_angle(th_min); th_max = wrap_angle(th_max) | |
| if th_min <= th_max: | |
| return th_min - 1e-9 <= theta <= th_max + 1e-9 | |
| # interval wraps across -pi/pi | |
| return theta >= th_min - 1e-9 or theta <= th_max + 1e-9 | |
| def is_heading_safe(theta, cones): | |
| for (a, b) in cones: | |
| if angle_in_interval(theta, a, b): | |
| return False | |
| return True | |
| def closest_safe_heading_to(theta_goal, cones, step_deg=2.0): | |
| """ | |
| Sample headings around theta_goal; return the safe heading closest to goal. | |
| If none safe, return heading at the midpoint between the two closest cone edges (max clearance). | |
| """ | |
| if is_heading_safe(theta_goal, cones): | |
| return theta_goal, True | |
| step = np.deg2rad(step_deg) | |
| best_theta = None | |
| best_dev = 1e9 | |
| for k in range(1, int(np.pi/step) + 2): | |
| for sgn in (+1, -1): | |
| cand = wrap_angle(theta_goal + sgn * k * step) | |
| if is_heading_safe(cand, cones): | |
| dev = abs(wrap_angle(cand - theta_goal)) | |
| if dev < best_dev: | |
| best_dev = dev | |
| best_theta = cand | |
| if dev < 1e-3: | |
| return best_theta, True | |
| if best_theta is not None: | |
| return best_theta, True | |
| # No safe heading → choose direction with maximum clearance | |
| edges = [] | |
| for (a, b) in cones: | |
| edges.append(wrap_angle(a)) | |
| edges.append(wrap_angle(b)) | |
| if not edges: | |
| return theta_goal, True | |
| edges = np.sort(np.array(edges)) | |
| gaps = [] | |
| for idx in range(len(edges)): | |
| a = edges[idx] | |
| b = edges[(idx + 1) % len(edges)] | |
| gap = wrap_angle(b - a) | |
| if gap <= 0: | |
| gap += 2*np.pi | |
| gaps.append((gap, a, b)) | |
| gaps.sort(reverse=True, key=lambda x: x[0]) | |
| _, a, b = gaps[0] | |
| mid = wrap_angle(a + 0.5 * wrap_angle(b - a)) | |
| return mid, False | |
| def steer_toward_heading(pos, vel, heading, gain=1.0): | |
| """ | |
| Produce an acceleration toward a desired heading (heading-aligned attractive field). | |
| """ | |
| u = np.array([np.cos(heading), np.sin(heading)]) | |
| return gain * u | |
| # --------------------------------------------------------------------------------------- | |
| # Mission contexts (kept) | |
| uav_contexts = [ | |
| "Carrying a medical patient—emergency!", | |
| "Carrying standard logistics", | |
| "Surveillance mission", | |
| "Returning to base with low battery" | |
| ] * (number_of_uavs // 4) | |
| if len(uav_contexts) < number_of_uavs: | |
| uav_contexts += ["Standard mission"] * (number_of_uavs - len(uav_contexts)) | |
| # Negotiation bookkeeping (mostly unused with LLM off) | |
| in_negotiation = np.zeros((number_of_uavs, number_of_uavs), dtype=int) | |
| negotiation_cooldown = np.zeros((number_of_uavs, number_of_uavs), dtype=int) | |
| negotiation_duration = 5 | |
| # Background worker for LLM negotiation | |
| def negotiation_worker(i, j, state_i, state_j): | |
| if not USE_LLM_NEGOTIATION: | |
| return | |
| new_i, new_j, transcripts = negotiate_priority_with_llm(state_i, state_j) | |
| print(f"[THREAD] ({i},{j}) transcript: {transcripts}") | |
| print(f"[THREAD] PRIORITIES: UAV {i}\u2192{priority[i]}\u2192{new_i}, UAV {j}\u2192{priority[j]}\u2192{new_j}") | |
| priority[i], priority[j] = new_i, new_j | |
| if __name__ == "__main__": | |
| if 'number_of_uavs' not in globals(): | |
| number_of_uavs = 12 | |
| check = np.ones(number_of_uavs) | |
| start_positions, goal_positions = initialize_uav_positions_and_goals(number_of_uavs, radius) | |
| plot_initial_positions_and_goals(start_positions, goal_positions, number_of_uavs, radius) | |
| (velocity_u, velocity_v, start_positions, goal_positions, | |
| acceleration, completed_status, clipping_status, | |
| adjusted_max_velocity, alpha) = initialize_uav_properties(number_of_uavs, radius) | |
| uav_start_times = {i: time.time() for i in range(number_of_uavs)} | |
| mission_times = np.zeros(number_of_uavs) # completion times | |
| priority = set_uav_priorities(number_of_uavs, priority_type) | |
| path_history = [[start_positions[i].copy()] for i in range(number_of_uavs)] | |
| colors = plt.get_cmap('Dark2')(np.linspace(0, 1, number_of_uavs)) | |
| passed_through = np.zeros(number_of_uavs, dtype=bool) | |
| # --- added for Code-1 metrics --- | |
| min_distances_matrix = np.full((number_of_uavs, number_of_uavs), np.inf) | |
| r = 0 | |
| plt.ion() | |
| while not np.array_equal(completed_status, check): | |
| r += 1 | |
| clipping_status = np.zeros(number_of_uavs) | |
| acceleration = np.zeros((number_of_uavs, 2)) | |
| # Precompute cones for all agents (VO-like) | |
| all_cones = [union_collision_cones_for_agent(i, start_positions, velocity_v) for i in range(number_of_uavs)] | |
| for i in range(number_of_uavs): | |
| if reached_goal(i, start_positions, goal_positions): | |
| if completed_status[i] != 1: | |
| completed_status[i] = 1 | |
| velocity_v[i] = 0 | |
| if mission_times[i] == 0: | |
| mission_times[i] = time.time() - uav_start_times[i] | |
| print(f"✅ UAV {i} has reached its goal in {mission_times[i]:.2f} seconds.") | |
| continue | |
| # Subgoal: go to doorway center until you’ve passed through, then go to own goal | |
| if not passed_through[i]: | |
| subgoal = DOOR_CENTER | |
| else: | |
| subgoal = goal_positions[i] | |
| to_subgoal = subgoal - start_positions[i] | |
| dist_to_subgoal = np.linalg.norm(to_subgoal) + 1e-9 | |
| theta_g = angle_to(to_subgoal) | |
| # DRL-VO style: choose desired heading outside all cones, closest to subgoal heading | |
| theta_td, safe = closest_safe_heading_to(theta_g, all_cones[i], step_deg=2.0) | |
| # Base steering toward that heading (like an attractive field along heading) | |
| acc_heading = steer_toward_heading( | |
| start_positions[i], | |
| velocity_v[i], | |
| theta_td, | |
| gain=attractive_gain * (2 * (1 - e**(-2 * dist_to_subgoal**2))) | |
| ) | |
| acceleration[i] += acc_heading | |
| # Mark as "passed through" once inside the doorway aperture | |
| if (abs(start_positions[i, 0] - NFZ_WALL_X) < delt * maxv and | |
| DOOR_Y_MIN <= start_positions[i, 1] <= DOOR_Y_MAX): | |
| passed_through[i] = True | |
| # --- NFZ AVOIDANCE LOGIC (kept, with stronger circular NFZ effect) --- | |
| # 1) Doorway / Wall avoidance (push back if near wall outside the door) | |
| dist_to_wall = start_positions[i, 0] - NFZ_WALL_X | |
| if abs(dist_to_wall) < WALL_AVOIDANCE_MARGIN and not (DOOR_Y_MIN <= start_positions[i, 1] <= DOOR_Y_MAX): | |
| radial_dir = np.array([np.sign(dist_to_wall), 0.0]) | |
| wall_force = WALL_AVOIDANCE_GAIN * (WALL_AVOIDANCE_MARGIN - abs(dist_to_wall)) * radial_dir | |
| acceleration[i] += wall_force | |
| # 2) Circular NFZ avoidance + tangential skirt | |
| d_to_circ = start_positions[i] - NFZ_CIRC_CENTER | |
| dist_circ = np.linalg.norm(d_to_circ) + 1e-9 | |
| if dist_circ < NFZ_CIRC_RADIUS + NFZ_CIRC_AVOIDANCE_MARGIN: | |
| radial_dir_circ = d_to_circ / dist_circ | |
| # Strong radial push outward (hard wall) | |
| radial_force = NFZ_CIRC_AVOID_GAIN_HARD * np.exp(-(dist_circ - NFZ_CIRC_RADIUS)**2) * radial_dir_circ | |
| # Tangential steering around the NFZ toward goal side | |
| angle_current = np.arctan2(d_to_circ[1], d_to_circ[0]) | |
| goal_offset = goal_positions[i] - NFZ_CIRC_CENTER | |
| angle_goal = np.arctan2(goal_offset[1], goal_offset[0]) | |
| diff = wrap_angle(angle_goal - angle_current) | |
| turn_dir = 1.0 if diff > 0 else -1.0 | |
| tangent_dir_circ = rotate_vector(radial_dir_circ, turn_dir * np.pi/2) | |
| tangential_force = NFZ_CIRC_TAN_GAIN_HARD * tangent_dir_circ | |
| acceleration[i] += radial_force + tangential_force | |
| # --- Optional: negotiation (kept but inactive when LLM off) --- | |
| for j in range(number_of_uavs): | |
| if i == j or reached_goal(j, start_positions, goal_positions): | |
| continue | |
| if collision(i, j, start_positions, velocity_v, velocity_u, collision_distance, sensor_range): | |
| if USE_LLM_NEGOTIATION: | |
| old_i, old_j = priority[i], priority[j] | |
| if (in_negotiation[i][j] == 0 and negotiation_cooldown[i][j] == 0): | |
| print(f"Negotiation triggered between UAV {i} and UAV {j}!") | |
| state_i = {"id": i, "dist_to_goal": dist_to_subgoal, "context": uav_contexts[i], "current_priority": old_i} | |
| state_j = {"id": j, "dist_to_goal": np.linalg.norm(goal_positions[j] - start_positions[j]), "context": uav_contexts[j], "current_priority": old_j} | |
| in_negotiation[i][j] = negotiation_duration | |
| in_negotiation[j][i] = negotiation_duration | |
| negotiation_cooldown[i][j] = negotiation_duration * 3 | |
| negotiation_cooldown[j][i] = negotiation_duration * 3 | |
| threading.Thread(target=negotiation_worker, args=(i, j, state_i, state_j), daemon=True).start() | |
| if in_negotiation[i][j] > 0: | |
| in_negotiation[i][j] -= 1 | |
| if in_negotiation[i][j] == 0: | |
| negotiation_cooldown[i][j] = negotiation_duration * 2 | |
| negotiation_cooldown[j][i] = negotiation_duration * 2 | |
| if negotiation_cooldown[i][j] > 0: | |
| negotiation_cooldown[i][j] -= 1 | |
| # <<< CHANGED: throttle total acceleration for this agent | |
| acceleration[i] *= SPEED_SCALE | |
| # If LLM enabled, slow agents currently in negotiation | |
| if USE_LLM_NEGOTIATION: | |
| for i in range(number_of_uavs): | |
| if np.any(in_negotiation[i] > 0): | |
| clipping_status[i] = 1 | |
| # Small exploration noise (kept) | |
| if pert: | |
| perturb_velocity(velocity_v) | |
| # Integrate simple 1st-order dynamics | |
| velocity_v = velocity_v + acceleration * delt | |
| velocity_v = velocity_v * (1 - damping_coefficient * delt) | |
| clip_velocity(number_of_uavs) | |
| start_positions = start_positions + velocity_v * delt | |
| # Log paths | |
| for i in range(number_of_uavs): | |
| path_history[i].append(start_positions[i].copy()) | |
| # --- live pairwise min-distance tracking (for Code-1 metrics) --- | |
| for i in range(number_of_uavs): | |
| for j in range(i + 1, number_of_uavs): | |
| d = np.linalg.norm(start_positions[i] - start_positions[j]) | |
| if d < min_distances_matrix[i, j]: | |
| min_distances_matrix[i, j] = d | |
| # ----------- Rendering ----------- | |
| plt.clf() | |
| ax = plt.gca() | |
| for spine in ax.spines.values(): | |
| spine.set_linewidth(3) | |
| spine.set_edgecolor('black') | |
| ax.add_patch(plt.Circle((0, 0), radius, fill=False, linewidth=3, edgecolor='black')) | |
| ax.add_patch(plt.Circle((0, 0), 1.2 * radius, fill=False, linewidth=3, edgecolor='black')) | |
| # NFZs | |
| ax.plot([NFZ_WALL_X, NFZ_WALL_X], [-radius, DOOR_Y_MIN], color='red', linewidth=10, alpha=0.2) | |
| ax.plot([NFZ_WALL_X, NFZ_WALL_X], [DOOR_Y_MAX, radius], color='red', linewidth=10, alpha=0.2) | |
| ax.add_patch(plt.Circle(NFZ_CIRC_CENTER, NFZ_CIRC_RADIUS, color='red', alpha=0.2)) | |
| # Paths and agents | |
| for i in range(number_of_uavs): | |
| ph = np.array(path_history[i]) | |
| plt.plot(ph[:, 0], ph[:, 1], linestyle='-', linewidth=1.5, color=colors[i], alpha=0.6) | |
| for i in range(number_of_uavs): | |
| plt.scatter(start_positions[i, 0], start_positions[i, 1], c=[colors[i]], s=80, marker='o') | |
| plt.scatter(goal_positions[:, 0], goal_positions[:, 1], c=colors, marker='*', s=250) | |
| plt.xlim(-1.2 * radius, 1.2 * radius) | |
| plt.ylim(-1.2 * radius, 1.2 * radius) | |
| plt.xlabel('x (meters)', fontsize=28) | |
| plt.ylabel('y (meters)', fontsize=28) | |
| plt.xticks(fontsize=22) | |
| plt.yticks(fontsize=22) | |
| plt.subplots_adjust(bottom=0.20) | |
| plt.pause(0.01 * SLOW_MO) # <<< CHANGED: optional visual slow-mo | |
| # Save the trajectory data | |
| output_filename = "uav_trajectories.csv" | |
| try: | |
| with open(output_filename, 'w') as f: | |
| f.write("Timestep,UAV_ID,X,Y\n") | |
| for uav_id, trajectory in enumerate(path_history): | |
| for timestep, coordinates in enumerate(trajectory): | |
| x, y = coordinates | |
| f.write(f"{timestep},{uav_id},{x},{y}\n") | |
| print(f"✅ Trajectory data successfully saved to {output_filename}") | |
| except IOError as e: | |
| print(f"❌ Error writing to file {output_filename}: {e}") | |
| # Final fairness score | |
| final_fairness = calculate_fairness_score(priority, mission_times) | |
| print("-----------------------------------------") | |
| print(f"🏆 Final Fairness Score: {final_fairness:.4f}") | |
| print("-----------------------------------------") | |
| # ----------------- CODE-1 METRICS BLOCK (added) ----------------- | |
| finished = mission_times[mission_times > 0] | |
| avg_time_to_goal = float(np.mean(finished)) if finished.size else float('nan') | |
| pair_avg_min, ammd, global_min_sep, per_uav_min = compute_pair_and_ammd(min_distances_matrix) | |
| print("\n==================== METRICS (Code 1 set) ====================") | |
| print(f"Average Time to Goal (s): {avg_time_to_goal:.3f}") | |
| print(f"Pair-averaged Minimum Separation (yours): {pair_avg_min:.3f}") | |
| print(f"AMMD (per-UAV average of min sep): {ammd:.3f}") | |
| print(f"Global Minimum Separation: {global_min_sep:.3f}") | |
| print("Per-UAV Min Separation:", ", ".join(f"{d:.3f}" for d in per_uav_min)) | |
| print("==============================================================\n") | |
| # Save Code-1 summary metrics to avg.csv | |
| try: | |
| with open("avg.csv", "w") as f: | |
| f.write("avg_time_to_goal,pair_avg_min,ammd,global_min_sep\n") | |
| f.write(f"{avg_time_to_goal:.6f},{pair_avg_min:.6f},{ammd:.6f},{global_min_sep:.6f}\n") | |
| print("✅ Summary metrics (Code 1 set) saved to avg.csv") | |
| except IOError as e: | |
| print(f"❌ Error writing to avg.csv: {e}") | |
| # ----------------- SMOOTHNESS METRICS BLOCK (added) ----------------- | |
| fleet = fleet_smoothness_index(path_history) | |
| print("\n============== FLEET SMOOTHNESS ==============") | |
| print(f"K̄ (∫κ²/L) : {fleet['K_bar']:.6f} [1/m²]") | |
| print(f"K_rms (sqrt(K̄)) : {fleet['K_rms']:.6f} [1/m]") | |
| print(f"H̄ (HTV/L) : {fleet['H_bar']:.6f} [1/m]") | |
| print(f"FinalSmoothness=K_rms+H̄: {fleet['FinalSmoothness']:.6f} [lower is smoother]") | |
| print(f"Total fleet length (m): {fleet['total_L']:.2f}") | |
| print("===============================================\n") | |
| # ----------------- DEGREE OF OSCILLATION (added) ----------------- | |
| per_do, do_summary = compute_oscillation_metrics(path_history) | |
| total_L = sum(d["L"] for d in per_do) | |
| total_excess_turn = sum((d["TV"] - d["NR"]) for d in per_do) | |
| fleet_DO_rad_per_m = (total_excess_turn / max(total_L, 1e-9)) if total_L > 0 else 0.0 | |
| fleet_DO_deg_per_m = np.degrees(fleet_DO_rad_per_m) | |
| per_uav_do_deg = ", ".join(f"{d['DO_deg_per_m']:.2f}" for d in per_do) | |
| print("--------------- DEGREE OF OSCILLATION ---------------") | |
| print(f"Fleet DO (length-weighted): {fleet_DO_rad_per_m:.4f} rad/m ({fleet_DO_deg_per_m:.2f} °/m)") | |
| print(f"DO (median across UAVs): {do_summary['median_rad_per_m']:.4f} rad/m ({do_summary['median_deg_per_m']:.2f} °/m)") | |
| print(f"DO (P90 across UAVs): {do_summary['p90_rad_per_m']:.4f} rad/m ({do_summary['p90_deg_per_m']:.2f} °/m)") | |
| print(f"DO (max across UAVs): {do_summary['max_rad_per_m']:.4f} rad/m ({do_summary['max_deg_per_m']:.2f} °/m)") | |
| print(f"Per-UAV DO (deg/m): {per_uav_do_deg}") | |
| print("-----------------------------------------------------\n") | |
| # Save DO CSVs | |
| try: | |
| with open("oscillation.csv", "w") as f: | |
| f.write("uav_id,TV_rad,NR_rad,L_m,DO_rad_per_m,DO_deg_per_m\n") | |
| for idx, d in enumerate(per_do): | |
| f.write(f"{idx},{d['TV']:.8f},{d['NR']:.8f},{d['L']:.6f},{d['DO_rad_per_m']:.8f},{d['DO_deg_per_m']:.8f}\n") | |
| with open("oscillation_summary.csv", "w") as f: | |
| f.write("fleet_rad_per_m,fleet_deg_per_m,median_rad_per_m,p90_rad_per_m,max_rad_per_m,median_deg_per_m,p90_deg_per_m,max_deg_per_m,total_length_m\n") | |
| f.write( | |
| f"{fleet_DO_rad_per_m:.8f},{fleet_DO_deg_per_m:.8f}," | |
| f"{do_summary['median_rad_per_m']:.8f},{do_summary['p90_rad_per_m']:.8f},{do_summary['max_rad_per_m']:.8f}," | |
| f"{do_summary['median_deg_per_m']:.8f},{do_summary['p90_deg_per_m']:.8f},{do_summary['max_deg_per_m']:.8f}," | |
| f"{total_L:.6f}\n" | |
| ) | |
| print("✅ Oscillation metrics saved to oscillation.csv and oscillation_summary.csv") | |
| except IOError as e: | |
| print(f"❌ Error writing oscillation CSVs: {e}") | |
| # Save smoothness summary | |
| try: | |
| with open("smoothness.csv", "w") as f: | |
| f.write("fleet_Kbar,fleet_Krms,fleet_Hbar,fleet_FinalSmoothness,total_length\n") | |
| f.write(f"{fleet['K_bar']:.8f},{fleet['K_rms']:.8f},{fleet['H_bar']:.8f},{fleet['FinalSmoothness']:.8f},{fleet['total_L']:.6f}\n") | |
| print("✅ Smoothness metrics saved to smoothness.csv") | |
| except IOError as e: | |
| print(f"❌ Error writing to smoothness.csv: {e}") | |
| plt.ioff() | |
| plt.show() | |
| print("All UAVs completed:", completed_status) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment