Skip to content

Instantly share code, notes, and snippets.

@si3mshady
Created November 24, 2025 17:50
Show Gist options
  • Select an option

  • Save si3mshady/f6f83acc26dc1a7becdce9d2102b68d5 to your computer and use it in GitHub Desktop.

Select an option

Save si3mshady/f6f83acc26dc1a7becdce9d2102b68d5 to your computer and use it in GitHub Desktop.
PPO Mjölnir Orbit Training Script (V33) — Tight Orbit Entry, Reward Visualization, and Reinforced Orbital Stability. Custom Gymnasium environment + PPO training loop simulating orbital mechanics around a black hole.

🪐 PPO Mjölnir Orbit Environment (V33)

Tight-entry orbital reinforcement learning environment built with Gymnasium + Stable Baselines3 (PPO).

This script defines MjölnirEnvV33, a custom 2D physics-based orbital environment where an RL agent attempts to sling a “hammer” into a stable orbit around a black hole.
Version 33 introduces:

  • Tight orbit-entry requirements (must hit the exact white ring radius)
  • Explicit reward visualization in the render panel
  • Improved gravitational model
  • Quadratic orbit-error penalties
  • Orbit completion detection (360° sweep)
  • Hard truncation boundary
  • Higher gravity and calibrated circular-velocity target

The PPO training loop uses 10 parallel environments and records a video demonstrating the trained agent.


🚀 Features

✔ Physics-Based Orbital Simulation

  • Central black hole gravity
  • Radial & tangential thrust controls
  • Correct circular-velocity target (V_circ) calculation
  • Velocity decomposition (V_r, V_t)

✔ RL-Friendly Observation Space

The agent receives:

  • Normalized positions
  • Delta-to-black-hole vector
  • Radial/tangential velocity
  • Current phase (approach or orbit)
  • Target orbital radius

✔ Reward System (V33)

  • Distance-to-orbit minimization
  • Tangential-speed matching
  • Radial-velocity suppression
  • Orbit stability bonus
  • Progressive angle rewards
  • Full-orbit completion reward (+15,000)

✔ Render Overlay (HUD)

Real-time display of:

  • Phase
  • Step counter
  • Orbit error
  • V_r / V_t
  • Total swept angle
  • Step reward & cumulative reward

🧠 Training

The script trains with:

INIT_LR = 0.0001
N_STEPS = 2048
BATCH_SIZE = 64
N_EPOCHS = 10
GAMMA = 0.99
ENT_COEF = 0.01
net_arch = [64, 64]
# ====================================================================================
# PPO MJÖLNIR ORBIT TRAINING SCRIPT (V33 - TIGHT ENTRY & REWARD DISPLAY)
# ====================================================================================
# ------------------------------------------------------------------------------------
# --- IMPORTS & SETUP ⚙️ ---
# ------------------------------------------------------------------------------------
# !pip install stable_baselines3[extra] gymnasium[classic_control] -q
# !apt-get install -y swig
# !pip install 'gymnasium[box2d]' -q
# !pip install pyvirtualdisplay -q
# !apt-get install -y xvfb xserver-xephyr vnc4server
# !pip install moviepy -q
# - Google Colab
import os
import numpy as np
import gymnasium as gym
from gymnasium import spaces
from stable_baselines3 import PPO
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.vec_env import VecVideoRecorder, DummyVecEnv
from IPython.display import HTML
import base64
import time
import math
# Define Paths and Budget
TOTAL_TIMESTEPS = 500_000
LOG_DIR = "./ppo_mjollnir_logs/"
VIDEO_DIR = f"{LOG_DIR}/videos/"
os.makedirs(LOG_DIR, exist_ok=True)
os.makedirs(VIDEO_DIR, exist_ok=True)
# ------------------------------------------------------------------------------------
## 1. Custom Environment: MjollnirEnvV33 (Tight Entry, Stabilized Control)
# ------------------------------------------------------------------------------------
class MjollnirEnvV33(gym.Env):
"""
V33 enforces a tighter orbit entry condition (must reach R_orbit)
and adds step reward display to the render frame.
"""
metadata = {"render_modes": ["rgb_array", "human"], "render_fps": 15}
PHASE_APPROACH = 0
PHASE_ORBIT = 1
def __init__(self, render_mode=None, size=1000, dt=1/30):
super().__init__()
self.size = size
self.dt = dt
# --- CORE DYNAMICS & ENVIRONMENT CONSTANTS ---
self.max_force = 240.0
self.hammer_mass = 5.0
self.max_steps = 3000
self.MAX_VELOCITY_NORM = 20.0
# --- V33 Orbital Constants (50% REDUCTION) ---
# ORBIT_RADIUS (White Ring) and ORBIT_ENTRY_DISTANCE (Blue Ring)
self.ORBIT_RADIUS = 75.0
self.ORBIT_TOLERANCE = 15.0
self.ORBIT_ENTRY_DISTANCE = 90.0 # Blue Ring
self.ORBIT_HARD_TRUNCATION = 175.0 # Red Ring
# --- Black Hole Constants (INCREASED GRAVITY) ---
self.BH_POS = np.array([self.size / 2, self.size / 2], dtype=np.float32)
self.BH_RADIUS = 35.0
self.BH_GRAVITY_CONSTANT = 6000.0
self.BH_COLLISION_PENALTY = -15000.0
# --- V33 PHYSICS FIX: Calculate the ideal circular velocity (V_circ) ---
self.TARGET_ORBITAL_SPEED = np.sqrt(self.BH_GRAVITY_CONSTANT / self.ORBIT_RADIUS)
# --- V33 State Variables ---
self._current_phase = self.PHASE_APPROACH
self._bh_last_angle = 0.0
self._bh_total_angle_swept = 0.0
self._bh_last_distance = 0.0
self._last_step_reward = 0.0 # New variable to track reward for display
self._total_reward_accumulated = 0.0 # Total reward tracker
# Action Space is Radial/Tangential force
self.action_space = spaces.Box(-1.0, 1.0, shape=(2,), dtype=np.float32)
# Observation Space reflects the critical variables (normalized)
OBS_SIZE = 9
self.observation_space = spaces.Box(-np.inf, np.inf, shape=(OBS_SIZE,), dtype=np.float32)
self.render_mode = render_mode
self.window = None
self.clock = None
def _get_info(self):
bh_delta_pos = self.BH_POS - self._hammer_pos
distance_bh = np.linalg.norm(bh_delta_pos)
bh_to_hammer_vec = self._hammer_pos - self.BH_POS
bh_to_hammer_norm = bh_to_hammer_vec / distance_bh if distance_bh > 0 else np.zeros(2)
# Radial vector (normalized, points away from BH)
radial_vec = bh_to_hammer_norm
# Tangential vector (normalized, points counter-clockwise, 90 deg from radial)
tangential_vec = np.array([-radial_vec[1], radial_vec[0]])
# Velocity Components: V_R is along radial, V_T is along tangential
V_R = np.dot(self._hammer_vel, radial_vec)
V_T = np.dot(self._hammer_vel, tangential_vec)
# Angular movement calculation (for reward and angle_swept)
bh_angle = np.arctan2(bh_delta_pos[1], bh_delta_pos[0])
angle_moved = bh_angle - self._bh_last_angle
angle_moved = (angle_moved + np.pi) % (2 * np.pi) - np.pi
self._bh_last_angle = bh_angle
return {
"angle_moved": angle_moved,
"distance_bh": distance_bh,
"delta_pos_bh": bh_delta_pos,
"bh_total_angle_swept": self._bh_total_angle_swept,
"radial_velocity": V_R,
"tangential_velocity": V_T,
"hammer_speed": np.linalg.norm(self._hammer_vel),
"radial_vector": radial_vec,
"tangential_vector": tangential_vec,
}
def _get_obs(self):
info = self._get_info()
bh_delta_pos_norm = info['delta_pos_bh'] / self.size
# Normalize Velocities for stable training
V_R_norm = info['radial_velocity'] / self.MAX_VELOCITY_NORM
V_T_norm = info['tangential_velocity'] / self.MAX_VELOCITY_NORM
return np.concatenate([
self._hammer_pos / self.size, # 2 (Normalized Pos)
bh_delta_pos_norm, # 2 (Normalized Delta Pos)
[info['distance_bh'] / self.size], # 1 (Normalized Distance)
[V_R_norm], # 1 (Normalized V_R)
[V_T_norm], # 1 (Normalized V_T)
[self._current_phase], # 1
[self.ORBIT_RADIUS / self.size] # 1 (Target R_orbit)
], dtype=np.float32)
def reset(self, seed=None, options=None):
super().reset(seed=seed)
self.current_step = 0
self._current_phase = self.PHASE_APPROACH
self._bh_total_angle_swept = 0.0
self._hammer_vel = np.zeros(2, dtype=np.float32)
self._last_step_reward = 0.0
self._total_reward_accumulated = 0.0
# Initial position far away but within the new screen boundaries
self._hammer_pos = np.array([
self.np_random.uniform(self.size*0.1, self.size*0.2),
self.np_random.uniform(self.size*0.1, self.size*0.2),
], dtype=np.float32)
bh_delta_pos = self.BH_POS - self._hammer_pos
self._bh_last_angle = np.arctan2(bh_delta_pos[1], bh_delta_pos[0])
self._bh_last_distance = np.linalg.norm(bh_delta_pos)
obs = self._get_obs()
info = self._get_info()
return obs, info
def step(self, action):
self.current_step += 1
info_before = self._get_info()
bh_dist_before = info_before['distance_bh']
# --- 1. Forces and Dynamics (V33 Steering Control) ---
action_tangential = action[0]
action_radial = action[1]
radial_vec = info_before['radial_vector']
tangential_vec = info_before['tangential_vector']
# Engine Force Calculation
tangential_force = tangential_vec * action_tangential * self.max_force
radial_force = radial_vec * action_radial * self.max_force * 0.5
total_engine_force = tangential_force + radial_force
# Gravity Force
bh_dist = bh_dist_before
bh_delta_pos = info_before['delta_pos_bh']
bh_dist_clamped = max(bh_dist, 10.0)
bh_gravity_magnitude = (self.BH_GRAVITY_CONSTANT * self.hammer_mass) / (bh_dist_clamped**2)
bh_gravity_force = bh_delta_pos / bh_dist_clamped * bh_gravity_magnitude
# Integration
total_force = total_engine_force + bh_gravity_force
hammer_accel = total_force / self.hammer_mass
self._hammer_vel += hammer_accel * self.dt
self._hammer_pos += self._hammer_vel * self.dt
# --- 2. Reward Calculation (V33) ---
reward = 0.0
info_after = self._get_info()
bh_dist_after = info_after['distance_bh']
V_R_after = info_after['radial_velocity']
V_T_after = info_after['tangential_velocity']
current_speed = info_after['hammer_speed']
terminated = False
truncated = False
# R_ACTION_PENALTY
reward -= 0.01 * (action_tangential**2 + action_radial**2)
# --- PHASE 0: APPROACH ---
if self._current_phase == self.PHASE_APPROACH:
# R_GOAL_SEEKING_REWARD: Incentivize reaching the new target radius (75m)
dist_to_orbit_before = np.abs(bh_dist_before - self.ORBIT_RADIUS)
dist_to_orbit_after = np.abs(bh_dist_after - self.ORBIT_RADIUS)
distance_reduction = dist_to_orbit_before - dist_to_orbit_after
reward += 500.0 * distance_reduction / self.size
# R_STATIONARITY_PENALTY
MIN_SPEED_THRESHOLD = 5.0 # m/s
if current_speed < MIN_SPEED_THRESHOLD:
reward -= 5.0
# V33 FIX: Transition Condition: Must reach ORBIT_RADIUS (75.0m, white ring)
if bh_dist_after <= self.ORBIT_RADIUS:
# R_SPEED_PENALTY_ON_ENTRY: Targets the actual V_circ (8.94 m/s)
speed_error = np.abs(V_T_after - self.TARGET_ORBITAL_SPEED)
reward_speed_penalty = -500.0 * (speed_error / self.TARGET_ORBITAL_SPEED)**2
# R_RADIAL_VELOCITY_PENALTY_ON_ENTRY
radial_penalty = -1000.0 * np.abs(V_R_after) / self.TARGET_ORBITAL_SPEED
reward += reward_speed_penalty
reward += radial_penalty
reward += 1000.0 # Entry bonus
self._current_phase = self.PHASE_ORBIT
print(f"Transitioned to ORBIT Phase! Tangential Speed Penalty: {reward_speed_penalty:.1f}, Radial Penalty: {radial_penalty:.1f}")
# --- PHASE 1: ORBIT (V33 Stability Enforcement) ---
elif self._current_phase == self.PHASE_ORBIT:
angle_moved = info_after['angle_moved']
# R_PROGRESSIVE_COMPLETION (Meme Reward)
if angle_moved > 0:
reward += 50.0 * angle_moved
self._bh_total_angle_swept += angle_moved
else:
# Small penalty for moving backward
reward += 100.0 * angle_moved
# R_Orbit_Stability (QUADRATIC PENALTY)
orbit_error = np.abs(bh_dist_after - self.ORBIT_RADIUS)
if orbit_error < self.ORBIT_TOLERANCE:
reward += 15.0 # Constant bonus for staying in tolerance
else:
scaled_error = orbit_error / self.ORBIT_RADIUS
reward -= 200.0 * (scaled_error)**2
# R_Radial_Velocity_Penalty (Punish movement away/toward BH)
reward -= 50.0 * np.abs(V_R_after) / self.TARGET_ORBITAL_SPEED
# R_360_Completion (Terminal Reward)
if self._bh_total_angle_swept >= 2 * np.pi:
reward += 15000.0
terminated = True
print(f"--- SUCCESS: Full 360-degree orbit completed in {self.current_step} steps! ---")
# --- 3. Termination/Truncation (V33 Limits) ---
# R_COLLISION_PENALTY
collision_distance_bh = self.BH_RADIUS + 10.0
if bh_dist_after <= collision_distance_bh:
reward += self.BH_COLLISION_PENALTY
terminated = True
# HARD TRUNCATION APPLIED ONLY IN ORBIT PHASE
if self._current_phase == self.PHASE_ORBIT:
if bh_dist_after > self.ORBIT_HARD_TRUNCATION:
reward -= 7500.0
truncated = True
print(f"--- TRUNCATED: Agent drifted outside hard limit ({bh_dist_after:.1f} m) ---")
# R_TIMEOUT
truncated = truncated or (self.current_step >= self.max_steps)
self._bh_last_distance = bh_dist_after
self._last_step_reward = reward
self._total_reward_accumulated += reward
return self._get_obs(), reward, terminated, truncated, info_after
def render(self):
if self.render_mode == "rgb_array":
return self._render_frame()
def _render_frame(self):
import pygame
if self.window is None:
import pygame
pygame.init()
self.window = pygame.display.set_mode((self.size, self.size))
pygame.display.set_caption("Mjölnir Orbit-v33 (TIGHT, REWARD FEEDBACK)")
if self.clock is None:
self.clock = pygame.time.Clock()
canvas = pygame.Surface((self.size, self.size))
canvas.fill((20, 20, 40))
info = self._get_info()
bh_dist = info['distance_bh']
angle_swept = self._bh_total_angle_swept
font = pygame.font.Font(None, 30) # Smaller font for more text
# --- Draw Static Elements ---
bh_pos_int = self.BH_POS.astype(int)
pygame.draw.circle(canvas, (0, 0, 0), bh_pos_int, int(self.BH_RADIUS))
pygame.draw.circle(canvas, (255, 60, 0), bh_pos_int, int(self.BH_RADIUS * 1.5), 5) # Orange Ring
pygame.draw.circle(canvas, (255, 255, 255), bh_pos_int, int(self.ORBIT_RADIUS), 1) # White Ring (Target Orbit)
# Blue Ring (Entry/Approach Guidance)
if self._current_phase == self.PHASE_APPROACH:
pygame.draw.circle(canvas, (0, 150, 255), bh_pos_int, int(self.ORBIT_ENTRY_DISTANCE), 1)
pygame.draw.circle(canvas, (255, 0, 0), bh_pos_int, int(self.ORBIT_HARD_TRUNCATION), 1) # Red Ring (Hard Truncation)
# 3. Draw Agent (Mjölnir Hammer)
hammer_pos = self._hammer_pos
head_width = 30
head_height = 20
handle_width = 5
handle_length = 25
head_points = np.array([
[-head_width / 2, -head_height / 2],
[head_width / 2, -head_height / 2],
[head_width / 2, head_height / 2],
[-head_width / 2, head_height / 2]
])
handle_points = np.array([
[-handle_width / 2, head_height / 2],
[handle_width / 2, head_height / 2],
[handle_width / 2, head_height / 2 + handle_length],
[-handle_width / 2, head_height / 2 + handle_length]
])
hammer_angle = 0.0
if np.linalg.norm(self._hammer_vel) > 1.0:
vel_x, vel_y = self._hammer_vel
hammer_angle = np.arctan2(vel_y, vel_x)
rot_matrix = np.array([
[np.cos(hammer_angle), -np.sin(hammer_angle)],
[np.sin(hammer_angle), np.cos(hammer_angle)]
])
transformed_head_points = np.dot(head_points, rot_matrix.T) + hammer_pos
transformed_handle_points = np.dot(handle_points, rot_matrix.T) + hammer_pos
head_draw_points = [tuple(p.astype(int)) for p in transformed_head_points]
handle_draw_points = [tuple(p.astype(int)) for p in transformed_handle_points]
pygame.draw.polygon(canvas, (200, 200, 200), head_draw_points)
pygame.draw.polygon(canvas, (139, 69, 19), handle_draw_points)
# 4. Draw Info text (V33 - Includes Reward)
text_y_start = 10
y_offset = 30
PHASE_NAMES = ["APPROACH", "ORBIT"]
PHASE_COLORS = [(255, 255, 0), (0, 255, 0)]
current_phase_name = PHASE_NAMES[self._current_phase]
# Line 1: Phase and Step Count
phase_text = f"PHASE: {current_phase_name} | STEP: {self.current_step}/{self.max_steps}"
text_surface_phase = font.render(phase_text, True, PHASE_COLORS[self._current_phase])
canvas.blit(text_surface_phase, (10, text_y_start))
# Line 2: Distance Error
orbit_error = bh_dist - self.ORBIT_RADIUS
bh_text = f"BH DIST: {bh_dist:.1f} m (Error: {orbit_error:.1f})"
text_color_bh = (255, 100, 100) if np.abs(orbit_error) > self.ORBIT_TOLERANCE else (100, 200, 255)
text_surface_bh = font.render(bh_text, True, text_color_bh)
canvas.blit(text_surface_bh, (10, text_y_start + y_offset))
# Line 3: Velocities
v_t_text = f"V_T: {info['tangential_velocity']:.1f} m/s | V_R: {info['radial_velocity']:.1f} m/s (Target: {self.TARGET_ORBITAL_SPEED:.2f})"
text_surface_v = font.render(v_t_text, True, (255, 255, 255))
canvas.blit(text_surface_v, (10, text_y_start + 2*y_offset))
# Line 4: Progressive Completion
circum_text = f"TOTAL ORBIT: {np.rad2deg(angle_swept):.1f}° (Goal: 360.0°)"
text_surface_circum = font.render(circum_text, True, (0, 255, 0) if angle_swept > np.pi else (150, 150, 150))
canvas.blit(text_surface_circum, (10, text_y_start + 3*y_offset))
# Line 5 (NEW): Step Reward and Total Reward
# Total reward is often very large/negative due to penalties, so we color it yellow
reward_color = (0, 255, 0) if self._last_step_reward >= 0 else (255, 100, 100)
reward_text = f"STEP REWARD: {self._last_step_reward:.2f} | TOTAL REWARD: {self._total_reward_accumulated:.1f}"
text_surface_reward = font.render(reward_text, True, reward_color)
canvas.blit(text_surface_reward, (10, text_y_start + 4*y_offset))
if self.render_mode == "rgb_array":
return np.transpose(
np.array(pygame.surfarray.pixels3d(canvas)), axes=(1, 0, 2)
)
def close(self):
if self.window is not None:
import pygame
pygame.quit()
self.window = None
# ------------------------------------------------------------------------------------
## 2. PPO Training and Evaluation (V33 ENVIRONMENT) 🚀
# ------------------------------------------------------------------------------------
# --- Helper Functions (Same) ---
def linear_schedule(initial_value: float):
def func(progress_remaining: float) -> float:
return initial_value * progress_remaining
return func
def show_video(video_dir):
mp4list = [f for f in os.listdir(video_dir) if f.endswith('.mp4')]
if len(mp4list) > 0:
mp4list.sort(key=lambda x: os.path.getmtime(os.path.join(video_dir, x)), reverse=True)
file = mp4list[0]
video = open(os.path.join(video_dir, file), 'rb').read()
b64 = base64.b64encode(video).decode()
return HTML(f'<video width="100%" controls><source src="data:video/mp4;base64,{b64}" type="video/mp4"></video>')
else:
print("No video file found.")
return HTML('<p>No video file found.</p>')
# --- Hyperparameters (Same) ---
INIT_LR = 0.0001
N_STEPS = 2048
BATCH_SIZE = 64
N_EPOCHS = 10
GAMMA = 0.99
ENT_COEF = 0.01
POLICY_KWARGS = dict(net_arch=[dict(pi=[64, 64], vf=[64, 64])])
# --- Model Initialization and Training ---
env_id = "MjollnirOrbit-v33"
# Register the new V33 environment
gym.envs.registration.register(id=env_id, entry_point=MjollnirEnvV33, max_episode_steps=3000)
print(f"Starting Mjölnir training for {TOTAL_TIMESTEPS} timesteps using {env_id} (TIGHT ENTRY, REWARD FEEDBACK)...")
train_env = make_vec_env(env_id, n_envs=10, env_kwargs=dict(dt=1/30))
model = PPO(
"MlpPolicy",
train_env,
learning_rate=linear_schedule(INIT_LR),
n_steps=N_STEPS,
batch_size=BATCH_SIZE,
n_epochs=N_EPOCHS,
gamma=GAMMA,
ent_coef=ENT_COEF,
policy_kwargs=POLICY_KWARGS,
verbose=1,
device="cuda"
)
# Start training the V33 model
model.learn(total_timesteps=TOTAL_TIMESTEPS)
# --- Video Recording and Evaluation ---
print("\n--- Recording Trained Agent Performance ---")
# Ensure the correct V33 environment is used for evaluation
eval_env = DummyVecEnv([lambda: gym.make(env_id, render_mode="rgb_array", dt=1/30)])
video_length = 6000
eval_env = VecVideoRecorder(
eval_env,
VIDEO_DIR,
record_video_trigger=lambda x: x == 0,
video_length=video_length,
name_prefix=f"ppo-mjollnir-agent-v33"
)
# Robust Reset
try:
obs, _ = eval_env.reset()
except ValueError:
obs = eval_env.reset()
if obs.ndim == 1:
obs = obs[None]
for _ in range(video_length):
action, _ = model.predict(obs, deterministic=True)
try:
obs, _, terminated, truncated, _ = eval_env.step(action)
except ValueError:
obs, _, done, _ = eval_env.step(action)
terminated = done
truncated = np.array([False])
if terminated[0] or truncated[0]:
try:
obs, _ = eval_env.reset()
except ValueError:
obs = eval_env.reset()
break
eval_env.close()
# --- Display Results ---
print("\n--- Video of Trained Agent ---")
show_video(VIDEO_DIR)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment