Created
September 29, 2025 13:43
-
-
Save damp11113/40e2895917965627832225bbe6e5e6b2 to your computer and use it in GitHub Desktop.
The lidar viewer for Peripheral+ lidar (https://github.com/damp11113/peripheralplus)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import pygame | |
| import math | |
| import threading | |
| import asyncio | |
| import websockets | |
| import json | |
| import time | |
| from collections import deque | |
| # Initialize Pygame | |
| pygame.init() | |
| # Constants | |
| WINDOW_WIDTH = 800 | |
| WINDOW_HEIGHT = 600 | |
| CENTER_X = WINDOW_WIDTH // 2 | |
| CENTER_Y = WINDOW_HEIGHT // 2 | |
| MAX_RANGE = 250 # Maximum display range in pixels | |
| BACKGROUND_COLOR = (10, 10, 20) | |
| GRID_COLOR = (30, 50, 30) | |
| TEXT_COLOR = (100, 255, 100) | |
| SCAN_LINE_COLORS = [(255, 100, 100), (100, 255, 100), (100, 100, 255), (255, 255, 100), (255, 100, 255)] | |
| class LidarVisualizer: | |
| def __init__(self): | |
| self.screen = pygame.display.set_mode((WINDOW_WIDTH, WINDOW_HEIGHT)) | |
| pygame.display.set_caption("Lidar Visualization") | |
| self.clock = pygame.time.Clock() | |
| self.font = pygame.font.Font(None, 24) | |
| self.small_font = pygame.font.Font(None, 16) | |
| # Lidar data storage | |
| self.lidar_data = {} | |
| self.offsets = [] | |
| self.max_distance = 10 # Will be updated based on actual data | |
| self.data_lock = threading.Lock() | |
| # Animation | |
| self.sweep_angle = 0 | |
| self.last_update = time.time() | |
| def calculate_y_offsets(self, multiline: int): | |
| """Python version of your Java calculateYOffsets""" | |
| offsets = [] | |
| if multiline <= 1: | |
| offsets.append(0) | |
| elif multiline == 2: | |
| offsets.extend([-1, 1]) | |
| else: | |
| half_range = multiline // 2 | |
| for i in range(-half_range, half_range + 1): | |
| offsets.append(i) | |
| return offsets | |
| def build_multiline_lidar(self, detections, multiline: int): | |
| """Build lidar map [line_index][angle] using Y-offset rules.""" | |
| offsets = self.calculate_y_offsets(multiline) | |
| offset_to_idx = {offset: idx for idx, offset in enumerate(offsets)} | |
| # init lidar map: rows = offsets, cols = 360 | |
| lidar_map = [[-1] * 360 for _ in offsets] | |
| for item in detections: | |
| line = item["line"] | |
| angle = item["angle"] | |
| dist = item["distance"] | |
| if line in offset_to_idx and 0 <= angle < 360: | |
| row = offset_to_idx[line] | |
| # if multiple detections at same spot → keep closest | |
| if lidar_map[row][angle] == -1: | |
| lidar_map[row][angle] = dist | |
| else: | |
| lidar_map[row][angle] = min(lidar_map[row][angle], dist) | |
| return lidar_map, offsets | |
| def update_lidar_data(self, detections, multiline=7): | |
| """Update lidar data from websocket""" | |
| with self.data_lock: | |
| lidar_map, offsets = self.build_multiline_lidar(detections, multiline) | |
| self.lidar_data = {} | |
| self.offsets = offsets | |
| # Convert to easier format for visualization | |
| for line_idx, line_data in enumerate(lidar_map): | |
| line_offset = offsets[line_idx] | |
| self.lidar_data[line_offset] = {} | |
| for angle, distance in enumerate(line_data): | |
| if distance != -1: | |
| self.lidar_data[line_offset][angle] = distance | |
| # Update max distance for scaling | |
| if distance > self.max_distance: | |
| self.max_distance = min(distance * 1.1, 5000) # Cap at reasonable range | |
| def draw_grid(self): | |
| """Draw radar grid""" | |
| # Draw range circles | |
| for i in range(1, 5): | |
| radius = (MAX_RANGE * i) // 4 | |
| pygame.draw.circle(self.screen, GRID_COLOR, (CENTER_X, CENTER_Y), radius, 1) | |
| # Range labels | |
| label = self.small_font.render(f"{int(self.max_distance * i / 4)}", True, TEXT_COLOR) | |
| self.screen.blit(label, (CENTER_X + radius - 20, CENTER_Y - 10)) | |
| # Draw angle lines | |
| for angle in range(0, 360, 30): | |
| end_x = CENTER_X + MAX_RANGE * math.cos(math.radians(angle - 90)) | |
| end_y = CENTER_Y + MAX_RANGE * math.sin(math.radians(angle - 90)) | |
| pygame.draw.line(self.screen, GRID_COLOR, (CENTER_X, CENTER_Y), (end_x, end_y), 1) | |
| # Angle labels | |
| label_x = CENTER_X + (MAX_RANGE + 20) * math.cos(math.radians(angle - 90)) | |
| label_y = CENTER_Y + (MAX_RANGE + 20) * math.sin(math.radians(angle - 90)) | |
| label = self.small_font.render(f"{angle}°", True, TEXT_COLOR) | |
| label_rect = label.get_rect(center=(label_x, label_y)) | |
| self.screen.blit(label, label_rect) | |
| def draw_lidar_points(self): | |
| """Draw lidar detection points""" | |
| with self.data_lock: | |
| if not self.lidar_data: | |
| return | |
| for line_idx, (line_offset, line_data) in enumerate(self.lidar_data.items()): | |
| color = SCAN_LINE_COLORS[line_idx % len(SCAN_LINE_COLORS)] | |
| for angle, distance in line_data.items(): | |
| # Convert to screen coordinates | |
| # Scale distance to fit in display | |
| scaled_distance = (distance / self.max_distance) * MAX_RANGE | |
| # Convert angle (0° = right, increasing clockwise) to pygame coordinates | |
| screen_angle = math.radians(angle - 90) # -90 to make 0° point up | |
| x = CENTER_X + scaled_distance * math.cos(screen_angle) | |
| y = CENTER_Y + scaled_distance * math.sin(screen_angle) | |
| # Draw point with line-specific color | |
| pygame.draw.circle(self.screen, color, (int(x), int(y)), 3) | |
| # Draw line from center to point for better visibility | |
| pygame.draw.line(self.screen, | |
| (color[0] // 3, color[1] // 3, color[2] // 3), | |
| (CENTER_X, CENTER_Y), (int(x), int(y)), 1) | |
| def draw_info_panel(self): | |
| """Draw information panel""" | |
| info_y = 10 | |
| # Title | |
| title = self.font.render("Multi-Line Lidar Visualization", True, TEXT_COLOR) | |
| self.screen.blit(title, (10, info_y)) | |
| info_y += 30 | |
| # Data info | |
| with self.data_lock: | |
| points_count = sum(len(line_data) for line_data in self.lidar_data.values()) | |
| info = self.small_font.render(f"Points: {points_count}", True, TEXT_COLOR) | |
| self.screen.blit(info, (10, info_y)) | |
| info_y += 20 | |
| range_info = self.small_font.render(f"Max Range: {int(self.max_distance)}", True, TEXT_COLOR) | |
| self.screen.blit(range_info, (10, info_y)) | |
| info_y += 20 | |
| lines_info = self.small_font.render(f"Scan Lines: {len(self.offsets)}", True, TEXT_COLOR) | |
| self.screen.blit(lines_info, (10, info_y)) | |
| info_y += 30 | |
| # Legend for scan lines | |
| legend_title = self.small_font.render("Scan Lines:", True, TEXT_COLOR) | |
| self.screen.blit(legend_title, (10, info_y)) | |
| info_y += 20 | |
| for i, offset in enumerate(self.offsets): | |
| color = SCAN_LINE_COLORS[i % len(SCAN_LINE_COLORS)] | |
| pygame.draw.circle(self.screen, color, (20, info_y + 8), 5) | |
| line_label = self.small_font.render(f"Line {offset}", True, TEXT_COLOR) | |
| self.screen.blit(line_label, (35, info_y)) | |
| info_y += 18 | |
| def run_visualization(self): | |
| """Main visualization loop""" | |
| running = True | |
| while running: | |
| for event in pygame.event.get(): | |
| if event.type == pygame.QUIT: | |
| running = False | |
| elif event.type == pygame.KEYDOWN: | |
| if event.key == pygame.K_ESCAPE: | |
| running = False | |
| elif event.key == pygame.K_r: # Reset max range | |
| with self.data_lock: | |
| self.max_distance = 10 | |
| # Clear screen | |
| self.screen.fill(BACKGROUND_COLOR) | |
| # Draw components | |
| self.draw_grid() | |
| self.draw_lidar_points() | |
| self.draw_info_panel() | |
| # Update display | |
| pygame.display.flip() | |
| self.clock.tick(60) # 60 FPS | |
| pygame.quit() | |
| class WebSocketLidarServer: | |
| def __init__(self, visualizer): | |
| self.visualizer = visualizer | |
| async def handle_client(self, websocket, path): | |
| """Handle incoming websocket messages""" | |
| print(f"New client connected: {websocket.remote_address}") | |
| async for message in websocket: | |
| try: | |
| content = json.loads(message) | |
| detections = content["lidar"]["detections"] | |
| multiline = content.get("multiline", 3) | |
| # Update visualizer data | |
| self.visualizer.update_lidar_data(detections, multiline) | |
| print(f"Updated lidar data: {len(detections)} detections") | |
| except json.JSONDecodeError: | |
| print("Invalid JSON received") | |
| except KeyError as e: | |
| print(f"Missing key in data: {e}") | |
| except Exception as e: | |
| print(f"Error processing message: {e}") | |
| def start_server(self, host="127.0.0.1", port=8080): | |
| """Start the websocket server""" | |
| print(f"Starting WebSocket server on {host}:{port}") | |
| # Create new event loop for this thread | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| # Create and start the server | |
| server = websockets.serve(self.handle_client, host, port) | |
| # Run the server | |
| loop.run_until_complete(server) | |
| loop.run_forever() | |
| def main(): | |
| # Create visualizer | |
| visualizer = LidarVisualizer() | |
| # Create and start websocket server in separate thread | |
| ws_server = WebSocketLidarServer(visualizer) | |
| server_thread = threading.Thread( | |
| target=ws_server.start_server, | |
| args=("127.0.0.1", 8080), | |
| daemon=True | |
| ) | |
| server_thread.start() | |
| print("Lidar Visualizer started!") | |
| print("Connect your lidar data source to ws://127.0.0.1:8080") | |
| print("Press ESC or close window to exit") | |
| print("Press 'R' to reset range scaling") | |
| # Generate some test data to demonstrate | |
| test_detections = [] | |
| for angle in range(0, 360, 5): | |
| for line in [-1, 0, 1]: | |
| distance = 500 + 200 * math.sin(math.radians(angle * 3)) + line * 50 | |
| test_detections.append({ | |
| "line": line, | |
| "angle": angle, | |
| "distance": max(100, distance) | |
| }) | |
| # Update with test data initially | |
| visualizer.update_lidar_data(test_detections, 3) | |
| # Start visualization loop | |
| visualizer.run_visualization() | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Code for lidar sender