Skip to content

Instantly share code, notes, and snippets.

@jaskiratsingh2000
Created November 17, 2025 20:40
Show Gist options
  • Select an option

  • Save jaskiratsingh2000/86339c4bbc7908bcf23ca8a3b5b41b19 to your computer and use it in GitHub Desktop.

Select an option

Save jaskiratsingh2000/86339c4bbc7908bcf23ca8a3b5b41b19 to your computer and use it in GitHub Desktop.
import os
import time
import threading
import numpy as np
import matplotlib.pyplot as plt
from numpy import e
from hyperparameters import (
spread, noise, radius, delt, pert, b, sensor_range, number_of_uavs, maxv,
attractive_gain, repulsive_gain, collision_distance, clipping_power, seed, priority_type
)
# -----------------------------------------------------------------------------
# Toggle: enable/disable LLM-based negotiation globally
# -----------------------------------------------------------------------------
USE_LLM_NEGOTIATION = False # ← keep False to disable the LLM
if USE_LLM_NEGOTIATION:
from llm_agent import negotiate_priority_with_llm
else:
# No-op stub: returns existing priorities and empty transcript
def negotiate_priority_with_llm(state_i, state_j):
return state_i["current_priority"], state_j["current_priority"], []
# -----------------------------------------------------------------------------
# Damping coefficient for convergence
# -----------------------------------------------------------------------------
damping_coefficient = 1.0
# -----------------------------------------------------------------------------
# Minimum clipped speed during negotiation
# -----------------------------------------------------------------------------
MIN_CLIP_SPEED = 0.5 * maxv
# -----------------------------------------------------------------------------
# <<< CHANGED: Motion throttles (physics + optional visual slow-mo)
# -----------------------------------------------------------------------------
SPEED_SCALE = 0.45 # scales all accelerations → smaller = slower motion
VEL_CAP_SCALE = 0.60 # scales max allowed speed → smaller = lower top speed
SLOW_MO = 1.0 # multiplies rendering pause; set >1.0 for extra slow animation
global_data_array = np.zeros((7, 13))
# Seed for reproducibility
np.random.seed(seed)
print("Random seed set to:", seed)
# -----------------------------------------------------------------------------
# No-Fly Zone (NFZ) Parameters
# -----------------------------------------------------------------------------
# --- Doorway / Vertical Wall NFZ ---
NFZ_WALL_X = 0.0
DOOR_HEIGHT = collision_distance * 8 * 1.2
DOOR_Y_MIN = -DOOR_HEIGHT / 2
DOOR_Y_MAX = DOOR_Y_MAX = DOOR_HEIGHT / 2
DOOR_CENTER = np.array([NFZ_WALL_X, 0.0])
WALL_AVOIDANCE_MARGIN = radius * 0.07
WALL_AVOIDANCE_GAIN = repulsive_gain * 0.3
# --- Circular NFZ ---
NFZ_CIRC_CENTER = np.array([radius * 0.5, radius * 0.4])
NFZ_CIRC_RADIUS = radius * 0.2
# You also define stronger NFZ gains in hyperparameters; we’ll use those if present
try:
from hyperparameters import NFZ_CIRC_AVOIDANCE_GAIN as NFZ_CIRC_AVOID_GAIN_HARD
from hyperparameters import NFZ_CIRC_TANGENTIAL_GAIN as NFZ_CIRC_TAN_GAIN_HARD
except Exception:
NFZ_CIRC_AVOID_GAIN_HARD = attractive_gain * 5.0
NFZ_CIRC_TAN_GAIN_HARD = attractive_gain * 3.0
NFZ_CIRC_AVOIDANCE_MARGIN = radius * 0.07
# =============================================================================
# --- START: FAIRNESS METRIC IMPLEMENTATION ---
# =============================================================================
def calculate_fairness_score(priorities, mission_times):
"""
Fairness = Σ_i (priority_i / time_to_goal_i)
"""
total_fairness_score = 0.0
print("\n--- Calculating Final Fairness Score ---")
for i in range(len(priorities)):
p_i = priorities[i]
t_i = mission_times[i]
if t_i > 0:
score_i = p_i / t_i
total_fairness_score += score_i
print(f"UAV {i}: Priority={p_i:<4.1f}, Time={t_i:<5.2f}s => Score contribution: {score_i:.2f}")
else:
print(f"UAV {i}: Mission time was zero, score contribution is 0.")
return total_fairness_score
# =============================================================================
# --- END: FAIRNESS METRIC IMPLEMENTATION ---
# =============================================================================
# =============================================================================
# --- START: CODE-1 METRICS HELPERS (pair-avg-min, AMMD, global min) ---
# =============================================================================
def compute_pair_and_ammd(min_distances_matrix):
"""
Metrics: pair_avg_min, ammd, global_min
Returns: pair_avg_min, ammd, global_min, per_uav_min
"""
N = min_distances_matrix.shape[0]
upper = min_distances_matrix[np.triu_indices(N, k=1)]
upper = upper[np.isfinite(upper)]
pair_avg_min = float(np.mean(upper)) if upper.size else float('inf')
sym = np.minimum(min_distances_matrix, min_distances_matrix.T).copy()
np.fill_diagonal(sym, np.inf)
per_uav_min = np.min(sym, axis=1)
finite = per_uav_min[np.isfinite(per_uav_min)]
ammd = float(np.mean(finite)) if finite.size else float('inf')
global_min = float(np.min(finite)) if finite.size else float('inf')
return pair_avg_min, ammd, global_min, per_uav_min
# =============================================================================
# --- END: CODE-1 METRICS HELPERS ---
# =============================================================================
# =============================================================================
# --- START: SMOOTHNESS METRICS (K̄, H̄, K_rms, FinalSmoothness) ---
# =============================================================================
def _path_length(P):
P = np.asarray(P, float)
if len(P) < 2: return 0.0
seg = np.diff(P, axis=0)
return float(np.sum(np.linalg.norm(seg, axis=1)))
def _unwrap_angles(theta):
return np.unwrap(theta)
def _segment_headings(P):
P = np.asarray(P, float)
seg = np.diff(P, axis=0)
ang = np.arctan2(seg[:,1], seg[:,0])
return _unwrap_angles(ang), np.linalg.norm(seg, axis=1)
def _per_path_smoothness(P):
"""
Returns: (Sk2_over_L, HTV_over_L, L, Sk2_total, HTV_total)
Sk2_over_L = (1/L) * sum( (|dθ|/ds_local)^2 * ds_local )
HTV_over_L = (sum |dθ|) / L
"""
P = np.asarray(P, float)
L = _path_length(P)
if len(P) < 3 or L < 1e-12:
return 0.0, 0.0, L, 0.0, 0.0
theta, ds = _segment_headings(P) # len N-1
dtheta = theta[1:] - theta[:-1] # len N-2
dtheta = (dtheta + np.pi) % (2*np.pi) - np.pi
ds_local = 0.5 * (ds[1:] + ds[:-1]) # average adjacent segment lengths
ds_local = np.where(ds_local < 1e-12, 1e-12, ds_local)
kappa = np.abs(dtheta) / ds_local
Sk2_total = float(np.sum((kappa**2) * ds_local)) # ≈ ∫ κ^2 ds
HTV_total = float(np.sum(np.abs(dtheta))) # ≈ ∫ |dθ|
Sk2_over_L = Sk2_total / L
HTV_over_L = HTV_total / L
return Sk2_over_L, HTV_over_L, L, Sk2_total, HTV_total
def fleet_smoothness_index(path_history):
"""
Returns: K_bar [1/m^2], H_bar [1/m], K_rms [1/m], FinalSmoothness [1/m], total_L, per_uav
where FinalSmoothness = K_rms + H_bar.
"""
total_L = 0.0
total_Sk2 = 0.0
total_HTV = 0.0
per_uav = []
for traj in path_history:
P = np.asarray(traj, float)
Sk2_L, HTV_L, L, Sk2, HTV = _per_path_smoothness(P)
per_uav.append((Sk2_L, HTV_L, L))
total_L += L
total_Sk2 += Sk2
total_HTV += HTV
if total_L < 1e-12:
return {
"K_bar": 0.0, "H_bar": 0.0, "K_rms": 0.0, "FinalSmoothness": 0.0,
"total_L": total_L, "per_uav": per_uav
}
K_bar = total_Sk2 / total_L
H_bar = total_HTV / total_L
K_rms = np.sqrt(max(K_bar, 0.0))
FinalSmoothness = K_rms + H_bar
return {
"K_bar": K_bar, "H_bar": H_bar, "K_rms": K_rms,
"FinalSmoothness": FinalSmoothness,
"total_L": total_L, "per_uav": per_uav
}
# =============================================================================
# --- END: SMOOTHNESS METRICS ---
# =============================================================================
# =============================================================================
# --- START: DEGREE OF OSCILLATION (DO) HELPERS (added) ---
# =============================================================================
def _do_from_path(path_xy, ds_min=None, eps=1e-9):
"""
Per-UAV Degree of Oscillation:
DO = (sum |Δθ| - |θ_N - θ_0|) / L [rad/m]
where θ are segment headings along the path and L is total path length.
Tiny steps are skipped to avoid noise.
"""
P = np.asarray(path_xy, dtype=float)
if P.shape[0] < 3:
return {"TV": 0.0, "NR": 0.0, "L": 0.0, "DO_rad_per_m": 0.0, "DO_deg_per_m": 0.0}
seg = np.diff(P, axis=0)
ds = np.linalg.norm(seg, axis=1)
if ds_min is None:
ds_min = 0.05 * maxv * delt # ignore tiny jitters
mask = ds >= ds_min
if np.count_nonzero(mask) < 2:
L_mask = float(np.sum(ds[mask])) if mask.size else 0.0
return {"TV": 0.0, "NR": 0.0, "L": L_mask, "DO_rad_per_m": 0.0, "DO_deg_per_m": 0.0}
seg = seg[mask]
ds = ds[mask]
theta = np.unwrap(np.arctan2(seg[:, 1], seg[:, 0]))
dtheta = np.diff(theta)
TV = float(np.sum(np.abs(dtheta))) # total turning
NR = float(abs(theta[-1] - theta[0])) # net rotation
L = float(np.sum(ds))
DO = max(0.0, TV - NR) / max(L, eps) # rad/m
return {"TV": TV, "NR": NR, "L": L, "DO_rad_per_m": DO, "DO_deg_per_m": np.degrees(DO)}
def compute_oscillation_metrics(path_history):
"""
Compute per-UAV DO and distribution stats (median, P90, max).
"""
per = [_do_from_path(traj) for traj in path_history]
do_vals = np.array([d["DO_rad_per_m"] for d in per], dtype=float)
do_deg = np.degrees(do_vals)
def p(x, q): return float(np.nanpercentile(x, q)) if x.size else float("nan")
summary = {
"median_rad_per_m": float(np.nanmedian(do_vals)) if do_vals.size else float("nan"),
"p90_rad_per_m": p(do_vals, 90),
"max_rad_per_m": float(np.nanmax(do_vals)) if do_vals.size else float("nan"),
"median_deg_per_m": float(np.nanmedian(do_deg)) if do_deg.size else float("nan"),
"p90_deg_per_m": p(do_deg, 90),
"max_deg_per_m": float(np.nanmax(do_deg)) if do_deg.size else float("nan"),
}
return per, summary
# =============================================================================
# --- END: DEGREE OF OSCILLATION (DO) HELPERS ---
# =============================================================================
# Clear old output files
def clear_previous_output_files():
for file_name in ['time.csv', 'priority.csv', 'avg.csv', 'uav_trajectories.csv', 'smoothness.csv']:
if os.path.isfile(file_name):
os.remove(file_name)
clear_previous_output_files()
# Initialize start and goal positions
def initialize_uav_positions_and_goals(number_of_uavs, radius):
start_positions = []
goal_positions = []
for i in range(1, number_of_uavs // 4 + 1):
position1 = (np.cos(spread * (i / (number_of_uavs // 4))),
np.sin(spread * (i / (number_of_uavs // 4))))
position2 = (-position1[0], position1[1])
position3 = (-position1[0], -position1[1])
position4 = (position1[0], -position1[1])
start_positions.extend([position1, position2, position3, position4])
goal_positions.extend([position2, position1, position4, position3])
start_positions = np.array(start_positions) * radius
goal_positions = np.array(goal_positions) * radius * 1.2
return start_positions, goal_positions
# Plot initial positions, goals, and both NFZs
def plot_initial_positions_and_goals(start_positions, goal_positions, number_of_uavs, radius):
cmap = plt.get_cmap('Dark2')
colors = [cmap(i) for i in np.linspace(0, 1, number_of_uavs)]
fig, ax = plt.subplots(figsize=(20, 10))
ax.scatter(start_positions[:, 0], start_positions[:, 1], c=colors, s=200, marker='o')
ax.scatter(goal_positions[:, 0], goal_positions[:, 1], c=colors, marker='*', s=250)
ax.set_xlim(-1.2 * radius, 1.2 * radius)
ax.set_ylim(-1.2 * radius, 1.2 * radius)
ax.set_xlabel('x', fontsize=24)
ax.set_ylabel('y', fontsize=24)
ax.tick_params(axis='both', labelsize=20)
for spine in ax.spines.values():
spine.set_linewidth(3)
spine.set_edgecolor('black')
ax.add_patch(plt.Circle((0, 0), radius, fill=False, linewidth=3, edgecolor='black'))
ax.add_patch(plt.Circle((0, 0), 1.2 * radius, fill=False, linewidth=3, edgecolor='black'))
# Doorway NFZ
ax.plot([NFZ_WALL_X, NFZ_WALL_X], [-radius, DOOR_Y_MIN], color='red', linewidth=10, alpha=0.2)
ax.plot([NFZ_WALL_X, NFZ_WALL_X], [DOOR_Y_MAX, radius], color='red', linewidth=10, alpha=0.2)
# Circular NFZ
ax.add_patch(plt.Circle(NFZ_CIRC_CENTER, NFZ_CIRC_RADIUS, color='red', alpha=0.2, label='No-Fly Zone'))
plt.savefig("Integrated_NFZ_initial_positions.png", dpi=300, bbox_inches='tight')
# Initialize UAV properties
def initialize_uav_properties(number_of_uavs, radius):
velocity_u = np.zeros((number_of_uavs, 2)) # not used directly but kept
velocity_v = np.zeros((number_of_uavs, 2))
start_positions, goal_positions = initialize_uav_positions_and_goals(number_of_uavs, radius)
acceleration = np.zeros((number_of_uavs, 2))
completed_status = np.zeros(number_of_uavs)
clipping_status = np.zeros(number_of_uavs)
adjusted_max_velocity = maxv * np.ones(number_of_uavs)
alpha = np.ones(number_of_uavs) * 0.3
return (velocity_u, velocity_v, start_positions, goal_positions,
acceleration, completed_status, clipping_status,
adjusted_max_velocity, alpha)
# Assign initial priorities
def set_uav_priorities(number_of_uavs, priority_type):
priority = 3 * np.ones(number_of_uavs)
with open("priority.csv", 'a') as f:
f.write(", ".join(map(str, priority)) + '\n')
return priority
# Check goal reached
def reached_goal(uav_index, start_positions, goal_positions):
return np.linalg.norm(start_positions[uav_index] - goal_positions[uav_index]) <= 0.5
# Collision check used for negotiation triggers (kept)
def collision(i, j, start_positions, velocity_v, velocity_u, collision_distance, sensor_range):
rel_v = velocity_v[i] - velocity_v[j]
t = -np.dot(start_positions[i] - start_positions[j], rel_v) / (np.dot(rel_v, rel_v) + 1e-6)
if t < 0:
return False
proj = (start_positions[i] + velocity_v[i] * t
- start_positions[j] - velocity_v[j] * t)
return (np.sqrt(np.dot(proj, proj)) < collision_distance and
np.linalg.norm(start_positions[i] - start_positions[j]) < sensor_range)
# Clip velocity (speed cap); if LLM negotiation were on, MIN_CLIP_SPEED applies
def clip_velocity(number_of_uavs):
global velocity_v, clipping_status, adjusted_max_velocity
for i in range(number_of_uavs):
base_cap = (MIN_CLIP_SPEED if np.any(clipping_status[i]) else maxv)
adjusted_max_velocity[i] = VEL_CAP_SCALE * base_cap # <<< CHANGED: lower top speed
speed = np.linalg.norm(velocity_v[i])
if speed > adjusted_max_velocity[i]:
velocity_v[i] = adjusted_max_velocity[i] * velocity_v[i] / (speed + 1e-9)
# Random perturbation (kept)
def perturb_velocity(velocity_v):
for i, vel in enumerate(velocity_v):
x, y = vel
delta_theta = np.random.normal(0, np.pi / 2 ** 10.5)
theta = np.arctan2(y, x)
mag = np.hypot(x, y)
theta += delta_theta
velocity_v[i] = np.array([np.cos(theta) * mag, np.sin(theta) * mag])
# Rotate vector helper
def rotate_vector(v, angle):
c, s = np.cos(angle), np.sin(angle)
return np.array([v[0]*c - v[1]*s, v[0]*s + v[1]*c])
# ----------------- DRL-VO-style heading selection (Algorithm 1 style) -----------------
def wrap_angle(theta):
"""Wrap to (-pi, pi]."""
return (theta + np.pi) % (2*np.pi) - np.pi
def angle_to(vec):
return np.arctan2(vec[1], vec[0] + 1e-12)
def union_collision_cones_for_agent(i, positions, velocities):
"""
Build a list of forbidden heading intervals [theta - beta, theta + beta] (in world frame).
Approximate VO cones using geometry only: sin(beta) = (rA + rB)/dist, with (rA+rB)~collision_distance.
Only neighbors within sensor_range contribute.
"""
cones = []
pos_i = positions[i]
for j in range(len(positions)):
if j == i:
continue
dvec = positions[j] - pos_i
dist = np.linalg.norm(dvec)
if dist < 1e-6 or dist > sensor_range:
continue
sum_radius = collision_distance
s = np.clip(sum_radius / (dist + 1e-9), -1.0, 1.0)
beta = np.arcsin(s)
theta_ij = angle_to(dvec)
th_min = wrap_angle(theta_ij - beta)
th_max = wrap_angle(theta_ij + beta)
cones.append((th_min, th_max))
return cones
def angle_in_interval(theta, th_min, th_max):
"""Check if theta ∈ [th_min, th_max] on the circle."""
theta = wrap_angle(theta); th_min = wrap_angle(th_min); th_max = wrap_angle(th_max)
if th_min <= th_max:
return th_min - 1e-9 <= theta <= th_max + 1e-9
# interval wraps across -pi/pi
return theta >= th_min - 1e-9 or theta <= th_max + 1e-9
def is_heading_safe(theta, cones):
for (a, b) in cones:
if angle_in_interval(theta, a, b):
return False
return True
def closest_safe_heading_to(theta_goal, cones, step_deg=2.0):
"""
Sample headings around theta_goal; return the safe heading closest to goal.
If none safe, return heading at the midpoint between the two closest cone edges (max clearance).
"""
if is_heading_safe(theta_goal, cones):
return theta_goal, True
step = np.deg2rad(step_deg)
best_theta = None
best_dev = 1e9
for k in range(1, int(np.pi/step) + 2):
for sgn in (+1, -1):
cand = wrap_angle(theta_goal + sgn * k * step)
if is_heading_safe(cand, cones):
dev = abs(wrap_angle(cand - theta_goal))
if dev < best_dev:
best_dev = dev
best_theta = cand
if dev < 1e-3:
return best_theta, True
if best_theta is not None:
return best_theta, True
# No safe heading → choose direction with maximum clearance
edges = []
for (a, b) in cones:
edges.append(wrap_angle(a))
edges.append(wrap_angle(b))
if not edges:
return theta_goal, True
edges = np.sort(np.array(edges))
gaps = []
for idx in range(len(edges)):
a = edges[idx]
b = edges[(idx + 1) % len(edges)]
gap = wrap_angle(b - a)
if gap <= 0:
gap += 2*np.pi
gaps.append((gap, a, b))
gaps.sort(reverse=True, key=lambda x: x[0])
_, a, b = gaps[0]
mid = wrap_angle(a + 0.5 * wrap_angle(b - a))
return mid, False
def steer_toward_heading(pos, vel, heading, gain=1.0):
"""
Produce an acceleration toward a desired heading (heading-aligned attractive field).
"""
u = np.array([np.cos(heading), np.sin(heading)])
return gain * u
# ---------------------------------------------------------------------------------------
# Mission contexts (kept)
uav_contexts = [
"Carrying a medical patient—emergency!",
"Carrying standard logistics",
"Surveillance mission",
"Returning to base with low battery"
] * (number_of_uavs // 4)
if len(uav_contexts) < number_of_uavs:
uav_contexts += ["Standard mission"] * (number_of_uavs - len(uav_contexts))
# Negotiation bookkeeping (mostly unused with LLM off)
in_negotiation = np.zeros((number_of_uavs, number_of_uavs), dtype=int)
negotiation_cooldown = np.zeros((number_of_uavs, number_of_uavs), dtype=int)
negotiation_duration = 5
# Background worker for LLM negotiation
def negotiation_worker(i, j, state_i, state_j):
if not USE_LLM_NEGOTIATION:
return
new_i, new_j, transcripts = negotiate_priority_with_llm(state_i, state_j)
print(f"[THREAD] ({i},{j}) transcript: {transcripts}")
print(f"[THREAD] PRIORITIES: UAV {i}\u2192{priority[i]}\u2192{new_i}, UAV {j}\u2192{priority[j]}\u2192{new_j}")
priority[i], priority[j] = new_i, new_j
if __name__ == "__main__":
if 'number_of_uavs' not in globals():
number_of_uavs = 12
check = np.ones(number_of_uavs)
start_positions, goal_positions = initialize_uav_positions_and_goals(number_of_uavs, radius)
plot_initial_positions_and_goals(start_positions, goal_positions, number_of_uavs, radius)
(velocity_u, velocity_v, start_positions, goal_positions,
acceleration, completed_status, clipping_status,
adjusted_max_velocity, alpha) = initialize_uav_properties(number_of_uavs, radius)
uav_start_times = {i: time.time() for i in range(number_of_uavs)}
mission_times = np.zeros(number_of_uavs) # completion times
priority = set_uav_priorities(number_of_uavs, priority_type)
path_history = [[start_positions[i].copy()] for i in range(number_of_uavs)]
colors = plt.get_cmap('Dark2')(np.linspace(0, 1, number_of_uavs))
passed_through = np.zeros(number_of_uavs, dtype=bool)
# --- added for Code-1 metrics ---
min_distances_matrix = np.full((number_of_uavs, number_of_uavs), np.inf)
r = 0
plt.ion()
while not np.array_equal(completed_status, check):
r += 1
clipping_status = np.zeros(number_of_uavs)
acceleration = np.zeros((number_of_uavs, 2))
# Precompute cones for all agents (VO-like)
all_cones = [union_collision_cones_for_agent(i, start_positions, velocity_v) for i in range(number_of_uavs)]
for i in range(number_of_uavs):
if reached_goal(i, start_positions, goal_positions):
if completed_status[i] != 1:
completed_status[i] = 1
velocity_v[i] = 0
if mission_times[i] == 0:
mission_times[i] = time.time() - uav_start_times[i]
print(f"✅ UAV {i} has reached its goal in {mission_times[i]:.2f} seconds.")
continue
# Subgoal: go to doorway center until you’ve passed through, then go to own goal
if not passed_through[i]:
subgoal = DOOR_CENTER
else:
subgoal = goal_positions[i]
to_subgoal = subgoal - start_positions[i]
dist_to_subgoal = np.linalg.norm(to_subgoal) + 1e-9
theta_g = angle_to(to_subgoal)
# DRL-VO style: choose desired heading outside all cones, closest to subgoal heading
theta_td, safe = closest_safe_heading_to(theta_g, all_cones[i], step_deg=2.0)
# Base steering toward that heading (like an attractive field along heading)
acc_heading = steer_toward_heading(
start_positions[i],
velocity_v[i],
theta_td,
gain=attractive_gain * (2 * (1 - e**(-2 * dist_to_subgoal**2)))
)
acceleration[i] += acc_heading
# Mark as "passed through" once inside the doorway aperture
if (abs(start_positions[i, 0] - NFZ_WALL_X) < delt * maxv and
DOOR_Y_MIN <= start_positions[i, 1] <= DOOR_Y_MAX):
passed_through[i] = True
# --- NFZ AVOIDANCE LOGIC (kept, with stronger circular NFZ effect) ---
# 1) Doorway / Wall avoidance (push back if near wall outside the door)
dist_to_wall = start_positions[i, 0] - NFZ_WALL_X
if abs(dist_to_wall) < WALL_AVOIDANCE_MARGIN and not (DOOR_Y_MIN <= start_positions[i, 1] <= DOOR_Y_MAX):
radial_dir = np.array([np.sign(dist_to_wall), 0.0])
wall_force = WALL_AVOIDANCE_GAIN * (WALL_AVOIDANCE_MARGIN - abs(dist_to_wall)) * radial_dir
acceleration[i] += wall_force
# 2) Circular NFZ avoidance + tangential skirt
d_to_circ = start_positions[i] - NFZ_CIRC_CENTER
dist_circ = np.linalg.norm(d_to_circ) + 1e-9
if dist_circ < NFZ_CIRC_RADIUS + NFZ_CIRC_AVOIDANCE_MARGIN:
radial_dir_circ = d_to_circ / dist_circ
# Strong radial push outward (hard wall)
radial_force = NFZ_CIRC_AVOID_GAIN_HARD * np.exp(-(dist_circ - NFZ_CIRC_RADIUS)**2) * radial_dir_circ
# Tangential steering around the NFZ toward goal side
angle_current = np.arctan2(d_to_circ[1], d_to_circ[0])
goal_offset = goal_positions[i] - NFZ_CIRC_CENTER
angle_goal = np.arctan2(goal_offset[1], goal_offset[0])
diff = wrap_angle(angle_goal - angle_current)
turn_dir = 1.0 if diff > 0 else -1.0
tangent_dir_circ = rotate_vector(radial_dir_circ, turn_dir * np.pi/2)
tangential_force = NFZ_CIRC_TAN_GAIN_HARD * tangent_dir_circ
acceleration[i] += radial_force + tangential_force
# --- Optional: negotiation (kept but inactive when LLM off) ---
for j in range(number_of_uavs):
if i == j or reached_goal(j, start_positions, goal_positions):
continue
if collision(i, j, start_positions, velocity_v, velocity_u, collision_distance, sensor_range):
if USE_LLM_NEGOTIATION:
old_i, old_j = priority[i], priority[j]
if (in_negotiation[i][j] == 0 and negotiation_cooldown[i][j] == 0):
print(f"Negotiation triggered between UAV {i} and UAV {j}!")
state_i = {"id": i, "dist_to_goal": dist_to_subgoal, "context": uav_contexts[i], "current_priority": old_i}
state_j = {"id": j, "dist_to_goal": np.linalg.norm(goal_positions[j] - start_positions[j]), "context": uav_contexts[j], "current_priority": old_j}
in_negotiation[i][j] = negotiation_duration
in_negotiation[j][i] = negotiation_duration
negotiation_cooldown[i][j] = negotiation_duration * 3
negotiation_cooldown[j][i] = negotiation_duration * 3
threading.Thread(target=negotiation_worker, args=(i, j, state_i, state_j), daemon=True).start()
if in_negotiation[i][j] > 0:
in_negotiation[i][j] -= 1
if in_negotiation[i][j] == 0:
negotiation_cooldown[i][j] = negotiation_duration * 2
negotiation_cooldown[j][i] = negotiation_duration * 2
if negotiation_cooldown[i][j] > 0:
negotiation_cooldown[i][j] -= 1
# <<< CHANGED: throttle total acceleration for this agent
acceleration[i] *= SPEED_SCALE
# If LLM enabled, slow agents currently in negotiation
if USE_LLM_NEGOTIATION:
for i in range(number_of_uavs):
if np.any(in_negotiation[i] > 0):
clipping_status[i] = 1
# Small exploration noise (kept)
if pert:
perturb_velocity(velocity_v)
# Integrate simple 1st-order dynamics
velocity_v = velocity_v + acceleration * delt
velocity_v = velocity_v * (1 - damping_coefficient * delt)
clip_velocity(number_of_uavs)
start_positions = start_positions + velocity_v * delt
# Log paths
for i in range(number_of_uavs):
path_history[i].append(start_positions[i].copy())
# --- live pairwise min-distance tracking (for Code-1 metrics) ---
for i in range(number_of_uavs):
for j in range(i + 1, number_of_uavs):
d = np.linalg.norm(start_positions[i] - start_positions[j])
if d < min_distances_matrix[i, j]:
min_distances_matrix[i, j] = d
# ----------- Rendering -----------
plt.clf()
ax = plt.gca()
for spine in ax.spines.values():
spine.set_linewidth(3)
spine.set_edgecolor('black')
ax.add_patch(plt.Circle((0, 0), radius, fill=False, linewidth=3, edgecolor='black'))
ax.add_patch(plt.Circle((0, 0), 1.2 * radius, fill=False, linewidth=3, edgecolor='black'))
# NFZs
ax.plot([NFZ_WALL_X, NFZ_WALL_X], [-radius, DOOR_Y_MIN], color='red', linewidth=10, alpha=0.2)
ax.plot([NFZ_WALL_X, NFZ_WALL_X], [DOOR_Y_MAX, radius], color='red', linewidth=10, alpha=0.2)
ax.add_patch(plt.Circle(NFZ_CIRC_CENTER, NFZ_CIRC_RADIUS, color='red', alpha=0.2))
# Paths and agents
for i in range(number_of_uavs):
ph = np.array(path_history[i])
plt.plot(ph[:, 0], ph[:, 1], linestyle='-', linewidth=1.5, color=colors[i], alpha=0.6)
for i in range(number_of_uavs):
plt.scatter(start_positions[i, 0], start_positions[i, 1], c=[colors[i]], s=80, marker='o')
plt.scatter(goal_positions[:, 0], goal_positions[:, 1], c=colors, marker='*', s=250)
plt.xlim(-1.2 * radius, 1.2 * radius)
plt.ylim(-1.2 * radius, 1.2 * radius)
plt.xlabel('x (meters)', fontsize=28)
plt.ylabel('y (meters)', fontsize=28)
plt.xticks(fontsize=22)
plt.yticks(fontsize=22)
plt.subplots_adjust(bottom=0.20)
plt.pause(0.01 * SLOW_MO) # <<< CHANGED: optional visual slow-mo
# Save the trajectory data
output_filename = "uav_trajectories.csv"
try:
with open(output_filename, 'w') as f:
f.write("Timestep,UAV_ID,X,Y\n")
for uav_id, trajectory in enumerate(path_history):
for timestep, coordinates in enumerate(trajectory):
x, y = coordinates
f.write(f"{timestep},{uav_id},{x},{y}\n")
print(f"✅ Trajectory data successfully saved to {output_filename}")
except IOError as e:
print(f"❌ Error writing to file {output_filename}: {e}")
# Final fairness score
final_fairness = calculate_fairness_score(priority, mission_times)
print("-----------------------------------------")
print(f"🏆 Final Fairness Score: {final_fairness:.4f}")
print("-----------------------------------------")
# ----------------- CODE-1 METRICS BLOCK (added) -----------------
finished = mission_times[mission_times > 0]
avg_time_to_goal = float(np.mean(finished)) if finished.size else float('nan')
pair_avg_min, ammd, global_min_sep, per_uav_min = compute_pair_and_ammd(min_distances_matrix)
print("\n==================== METRICS (Code 1 set) ====================")
print(f"Average Time to Goal (s): {avg_time_to_goal:.3f}")
print(f"Pair-averaged Minimum Separation (yours): {pair_avg_min:.3f}")
print(f"AMMD (per-UAV average of min sep): {ammd:.3f}")
print(f"Global Minimum Separation: {global_min_sep:.3f}")
print("Per-UAV Min Separation:", ", ".join(f"{d:.3f}" for d in per_uav_min))
print("==============================================================\n")
# Save Code-1 summary metrics to avg.csv
try:
with open("avg.csv", "w") as f:
f.write("avg_time_to_goal,pair_avg_min,ammd,global_min_sep\n")
f.write(f"{avg_time_to_goal:.6f},{pair_avg_min:.6f},{ammd:.6f},{global_min_sep:.6f}\n")
print("✅ Summary metrics (Code 1 set) saved to avg.csv")
except IOError as e:
print(f"❌ Error writing to avg.csv: {e}")
# ----------------- SMOOTHNESS METRICS BLOCK (added) -----------------
fleet = fleet_smoothness_index(path_history)
print("\n============== FLEET SMOOTHNESS ==============")
print(f"K̄ (∫κ²/L) : {fleet['K_bar']:.6f} [1/m²]")
print(f"K_rms (sqrt(K̄)) : {fleet['K_rms']:.6f} [1/m]")
print(f"H̄ (HTV/L) : {fleet['H_bar']:.6f} [1/m]")
print(f"FinalSmoothness=K_rms+H̄: {fleet['FinalSmoothness']:.6f} [lower is smoother]")
print(f"Total fleet length (m): {fleet['total_L']:.2f}")
print("===============================================\n")
# ----------------- DEGREE OF OSCILLATION (added) -----------------
per_do, do_summary = compute_oscillation_metrics(path_history)
total_L = sum(d["L"] for d in per_do)
total_excess_turn = sum((d["TV"] - d["NR"]) for d in per_do)
fleet_DO_rad_per_m = (total_excess_turn / max(total_L, 1e-9)) if total_L > 0 else 0.0
fleet_DO_deg_per_m = np.degrees(fleet_DO_rad_per_m)
per_uav_do_deg = ", ".join(f"{d['DO_deg_per_m']:.2f}" for d in per_do)
print("--------------- DEGREE OF OSCILLATION ---------------")
print(f"Fleet DO (length-weighted): {fleet_DO_rad_per_m:.4f} rad/m ({fleet_DO_deg_per_m:.2f} °/m)")
print(f"DO (median across UAVs): {do_summary['median_rad_per_m']:.4f} rad/m ({do_summary['median_deg_per_m']:.2f} °/m)")
print(f"DO (P90 across UAVs): {do_summary['p90_rad_per_m']:.4f} rad/m ({do_summary['p90_deg_per_m']:.2f} °/m)")
print(f"DO (max across UAVs): {do_summary['max_rad_per_m']:.4f} rad/m ({do_summary['max_deg_per_m']:.2f} °/m)")
print(f"Per-UAV DO (deg/m): {per_uav_do_deg}")
print("-----------------------------------------------------\n")
# Save DO CSVs
try:
with open("oscillation.csv", "w") as f:
f.write("uav_id,TV_rad,NR_rad,L_m,DO_rad_per_m,DO_deg_per_m\n")
for idx, d in enumerate(per_do):
f.write(f"{idx},{d['TV']:.8f},{d['NR']:.8f},{d['L']:.6f},{d['DO_rad_per_m']:.8f},{d['DO_deg_per_m']:.8f}\n")
with open("oscillation_summary.csv", "w") as f:
f.write("fleet_rad_per_m,fleet_deg_per_m,median_rad_per_m,p90_rad_per_m,max_rad_per_m,median_deg_per_m,p90_deg_per_m,max_deg_per_m,total_length_m\n")
f.write(
f"{fleet_DO_rad_per_m:.8f},{fleet_DO_deg_per_m:.8f},"
f"{do_summary['median_rad_per_m']:.8f},{do_summary['p90_rad_per_m']:.8f},{do_summary['max_rad_per_m']:.8f},"
f"{do_summary['median_deg_per_m']:.8f},{do_summary['p90_deg_per_m']:.8f},{do_summary['max_deg_per_m']:.8f},"
f"{total_L:.6f}\n"
)
print("✅ Oscillation metrics saved to oscillation.csv and oscillation_summary.csv")
except IOError as e:
print(f"❌ Error writing oscillation CSVs: {e}")
# Save smoothness summary
try:
with open("smoothness.csv", "w") as f:
f.write("fleet_Kbar,fleet_Krms,fleet_Hbar,fleet_FinalSmoothness,total_length\n")
f.write(f"{fleet['K_bar']:.8f},{fleet['K_rms']:.8f},{fleet['H_bar']:.8f},{fleet['FinalSmoothness']:.8f},{fleet['total_L']:.6f}\n")
print("✅ Smoothness metrics saved to smoothness.csv")
except IOError as e:
print(f"❌ Error writing to smoothness.csv: {e}")
plt.ioff()
plt.show()
print("All UAVs completed:", completed_status)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment