Skip to content

Instantly share code, notes, and snippets.

@andybak
Last active December 16, 2024 11:14
Show Gist options
  • Save andybak/f695b2f7f703b1f71bafcb935109f6b4 to your computer and use it in GitHub Desktop.
Save andybak/f695b2f7f703b1f71bafcb935109f6b4 to your computer and use it in GitHub Desktop.
Generic server listener for Blender
# __init__.py
bl_info = {
"name": "Icosa Listener",
"blender": (4, 0, 0),
"description": "Allows Icosa Gallery to import models into Blender",
"version": (1, 0),
"author": "@icosa.gallery",
}
import bpy
from bpy.app.handlers import persistent
import threading
import os
# Import the server module
from .server import ModelImportServer
# Global reference to server instance
server_instance = None
@persistent
def load_handler(dummy):
"""Restart server when loading new blend files"""
global server_instance
if server_instance:
server_instance.restart()
def register():
global server_instance
server_instance = ModelImportServer()
server_instance.start()
bpy.app.handlers.load_post.append(load_handler)
def unregister():
global server_instance
if server_instance:
server_instance.stop()
bpy.app.handlers.load_post.remove(load_handler)
if __name__ == "__main__":
register()
# server.py
import bpy
import http.server
import socketserver
import requests
import os
import json
import threading
import tempfile
import shutil
import queue
import time
from collections import deque
from datetime import datetime, timedelta
import pathlib
class ModelImportServer:
def __init__(self):
self.port_range = range(44321, 44352)
self.import_queue = queue.Queue()
self.recent_imports = deque(maxlen=100) # Track recent imports
self.import_thread = None
self.server = None
self.server_thread = None
self.running = False
self.lock = threading.Lock()
def start(self):
"""Start the server and import processing thread"""
self.running = True
self.start_import_thread()
self.start_server_thread()
def stop(self):
"""Stop all server operations"""
self.running = False
if self.server:
self.server.shutdown()
if self.import_thread:
self.import_queue.put(None) # Sentinel to stop import thread
self.import_thread.join()
if self.server_thread:
self.server_thread.join()
def restart(self):
"""Restart the server"""
self.stop()
self.start()
def start_import_thread(self):
"""Start the thread that processes queued imports"""
self.import_thread = threading.Thread(target=self.process_import_queue)
self.import_thread.daemon = True
self.import_thread.start()
def start_server_thread(self):
"""Start the HTTP server thread"""
self.server_thread = threading.Thread(target=self.run_server)
self.server_thread.daemon = True
self.server_thread.start()
def run_server(self):
"""Run the HTTP server with port fallback"""
handler = self.create_request_handler()
for port in self.port_range:
try:
self.server = socketserver.TCPServer(("", port), handler)
print(f"Server started on port {port}")
self.server.serve_forever()
break
except OSError as e:
if e.errno == 48: # Address already in use
print(f"Port {port} is in use, trying next port...")
continue
raise e
def create_request_handler(self):
"""Create a request handler class with access to the server instance"""
server_instance = self
class ModelImportHandler(http.server.SimpleHTTPRequestHandler):
def do_GET(self):
if self.path == "/ping":
self.send_response(200)
self.send_cors_headers()
self.end_headers()
self.wfile.write(b"Pong")
elif self.path == "/status":
self.send_status_response()
else:
self.send_response(404)
self.send_cors_headers()
self.end_headers()
self.wfile.write(b"Endpoint not found")
def do_OPTIONS(self):
self.send_response(200)
self.send_cors_headers()
self.end_headers()
def do_POST(self):
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
try:
data = json.loads(post_data.decode('utf-8'))
model_url = data.get('obj_url') or data.get('glb_url')
if not model_url:
self.send_error_response('No model URL provided')
return
# Validate URL
file_ext = os.path.splitext(model_url)[-1].lower()
if file_ext not in ['.obj', '.glb']:
self.send_error_response(f'Unsupported file format: {file_ext}')
return
# Check if this URL was recently processed
if server_instance.is_duplicate_request(model_url):
self.send_error_response('This file was recently imported')
return
# Queue the import task
server_instance.import_queue.put(model_url)
self.send_response(202) # Accepted
self.send_cors_headers()
self.end_headers()
response_data = {
'status': 'queued',
'message': 'Import task queued successfully',
'queue_position': server_instance.import_queue.qsize()
}
self.wfile.write(json.dumps(response_data).encode('utf-8'))
except json.JSONDecodeError:
self.send_error_response('Invalid JSON data')
except Exception as e:
self.send_error_response(f'Server error: {str(e)}')
def send_cors_headers(self):
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
self.send_header("Access-Control-Allow-Headers", "Content-Type")
def send_error_response(self, message):
self.send_response(400)
self.send_cors_headers()
self.end_headers()
self.wfile.write(message.encode('utf-8'))
def send_status_response(self):
status = {
'queue_size': server_instance.import_queue.qsize(),
'recent_imports': len(server_instance.recent_imports),
'running': server_instance.running
}
self.send_response(200)
self.send_cors_headers()
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(status).encode('utf-8'))
return ModelImportHandler
def is_duplicate_request(self, url):
"""Check if URL was recently processed (within last 5 minutes)"""
now = datetime.now()
self.recent_imports = deque(
[(u, t) for u, t in self.recent_imports
if now - t < timedelta(seconds=5)],
maxlen=100
)
return any(url == u for u, _ in self.recent_imports)
def process_import_queue(self):
"""Process queued import tasks"""
while self.running:
try:
url = self.import_queue.get(timeout=1.0)
if url is None: # Sentinel value to stop thread
break
self.import_3d_model(url)
self.recent_imports.append((url, datetime.now()))
self.import_queue.task_done()
except queue.Empty:
continue
except Exception as e:
print(f"Error processing import: {str(e)}")
import traceback
traceback.print_exc()
def import_3d_model(self, url):
"""Download and import 3D model with improved file handling"""
temp_dir = tempfile.mkdtemp()
try:
file_ext = os.path.splitext(url)[-1].lower()
if file_ext not in ['.obj', '.glb']:
raise ValueError(f"Unsupported file format: {file_ext}")
file_path = os.path.join(temp_dir, f"model{file_ext}")
# Download file with progress tracking
print(f"Downloading {url} to {file_path}")
response = requests.get(url, stream=True)
response.raise_for_status()
file_size = int(response.headers.get('content-length', 0))
block_size = 8192
downloaded_size = 0
with open(file_path, "wb") as f:
for chunk in response.iter_content(chunk_size=block_size):
if chunk:
f.write(chunk)
downloaded_size += len(chunk)
if file_size:
progress = (downloaded_size / file_size) * 100
print(f"Download progress: {progress:.1f}%")
# Verify file exists and has size
if not os.path.exists(file_path) or os.path.getsize(file_path) == 0:
raise Exception("Downloaded file is empty or does not exist")
print(f"Download complete: {file_path}")
# Create a copy of the file path for the main thread
final_file_path = file_path
def import_in_main_thread():
try:
if not os.path.exists(final_file_path):
print(f"Error: File not found at {final_file_path}")
return
# Ensure we're in the right context
if not bpy.context.window_manager:
print("Error: No window manager context")
return
# Clear selection (optional, but can help prevent issues)
bpy.ops.object.select_all(action='DESELECT')
print(f"Importing file: {final_file_path}")
if file_ext == '.obj':
result = bpy.ops.import_scene.obj(
filepath=final_file_path,
axis_forward='-Z',
axis_up='Y'
)
elif file_ext == '.glb':
result = bpy.ops.import_scene.gltf(
filepath=final_file_path,
import_pack_images=True,
merge_vertices=True
)
if 'FINISHED' in result:
print(f"Successfully imported {file_ext} file")
else:
print(f"Import returned status: {result}")
except Exception as e:
print(f"Error during import: {str(e)}")
import traceback
traceback.print_exc()
# Schedule cleanup after successful import
def cleanup():
try:
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
print(f"Cleaned up temporary directory: {temp_dir}")
except Exception as e:
print(f"Error during cleanup: {str(e)}")
bpy.app.timers.register(cleanup, first_interval=1.0)
# Register the import operation to run in the main thread
bpy.app.timers.register(import_in_main_thread)
except Exception as e:
print(f"Error processing model from {url}: {str(e)}")
import traceback
traceback.print_exc()
# Ensure cleanup happens even if there's an error
try:
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
except Exception as cleanup_error:
print(f"Error during cleanup: {str(cleanup_error)}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment